Making Queries with the XDR Python SDK
Secureworks® Taegis™ XDR Python SDK is set up to query alerts, events, and investigations out of the box. Following are a few examples. For a more extensive list of examples, see XDR Python SDK Examples.
Query Alerts ⫘
from taegis_sdk_python import GraphQLService
from taegis_sdk_python.services.alerts.types import SearchRequestInput
service = GraphQLService()
results = service.alerts.query.alerts_service_search(SearchRequestInput(
cql_query="FROM alert WHERE severity >= 0.6 AND status = 'OPEN' EARLIEST=-3d",
limit=10000,
offset=0,
))
Alerts Pagination ⫘
from taegis_sdk_python import GraphQLService
from taegis_sdk_python.services.alerts.types import SearchRequestInput, PollRequestInput, AlertsResponse
service = GraphQLService()
results = service.alerts.query.alerts_service_search(SearchRequestInput(
cql_query="FROM alert WHERE severity >= 0.6 AND status = 'OPEN' EARLIEST=-3d",
limit=1000000,
offset=0,
))
poll_responses = [results]
search_id = results.search_id
total_parts = results.alerts.total_parts
if search_id:
for part in range(2, total_parts + 1):
results = None
try:
results = service.alerts.query.alerts_service_poll(
PollRequestInput(
search_id=search_id,
part_id=part,
)
)
except Exception as exc:
if "not found" in str(exc):
break
raise exc
if (
isinstance(results, AlertsResponse)
and results.alerts is not None
):
poll_responses.append(results)
print(sum(
len(response.alerts.list)
for response in poll_responses
))
Query Events ⫘
from taegis_sdk_python.services import GraphQLService
from taegis_sdk_python.services.events.types import EventQueryOptions
service = GraphQLService()
options = EventQueryOptions(
timestamp_ascending=True,
page_size=1000,
max_rows=1000,
skip_cache=True,
aggregation_off=False,
)
results = service.events.subscription.event_query("FROM process EARLIEST=-1d | head 10", options=options)
Events Pagination ⫘
from taegis_sdk_python.services import GraphQLService
from taegis_sdk_python.services.events.types import EventQueryResults, EventQueryOptions
from typing import List, Optional
def get_next_page(events_results: List[EventQueryResults]) -> Optional[str]:
"""Retrieve events next page indicator."""
try:
# the next page could be found in any of the result pages,
# but we cannot garuntee which result it will be found in
return next(
iter({result.next for result in events_results if result.next is not None})
)
except StopIteration:
return None
service = GraphQLService()
options = EventQueryOptions(
timestamp_ascending=True,
page_size=1000,
max_rows=100000,
skip_cache=True,
aggregation_off=False,
)
results = []
result = service.events.subscription.event_query("FROM process EARLIEST=-1d | head 10", options=options)
results.extend(result)
next_page = get_next_page(result)
while next_page:
result = service.events.subscription.event_page(next_page)
results.extend(result)
next_page = get_next_page(result)
Query Investigations ⫘
from taegis_sdk_python import GraphQLService
from taegis_sdk_python.services.investigations2.types import InvestigationsV2Arguments
page = 1
per_page = 30
cql = "WHERE deleted_at IS NULL EARLIEST=-30d | sort updated_at asc"
service = GraphQLService()
investigation_output = service.investigations2.query.investigations_v2(
InvestigationsV2Arguments(
page=page,
per_page=per_page,
cql=cql,
)
)
investigation_output
Investigations Pagination ⫘
from taegis_sdk_python import GraphQLService
from taegis_sdk_python.services.investigations2.types import InvestigationsV2Arguments
page = 1
per_page = 30
cql = "WHERE deleted_at IS NULL EARLIEST=-30d | sort updated_at asc"
results = []
service = GraphQLService()
investigation_output = service.investigations2.query.investigations_v2(
InvestigationsV2Arguments(
page=page,
per_page=per_page,
cql=cql,
)
)
results.append(investigation_output)
total_count = investigation_output.total_count
while (
sum_total := sum(len(result.investigations) for result in results)
) < total_count:
page += 1
investigation_output = service.investigations2.query.investigations_v2(
InvestigationsV2Arguments(
page=page,
per_page=per_page,
cql=cql,
)
)
results.append(investigation_output)
investigations = [
investigation for result in results for investigation in result.investigations
]
More ⫘