🌙

Subscribe to the Taegis™ XDR Documentation RSS Feed at .

Learn more about RSS readers or RSS browser extensions.

Making Queries with the XDR Python SDK

Secureworks® Taegis™ XDR Python SDK is set up to query alerts, events, and investigations out of the box. Following are a few examples. For a more extensive list of examples, see XDR Python SDK Examples.

Query Alerts

from taegis_sdk_python import GraphQLService
from taegis_sdk_python.services.alerts.types import SearchRequestInput

service = GraphQLService()
results = service.alerts.query.alerts_service_search(SearchRequestInput(
    cql_query="FROM alert WHERE severity >= 0.6 AND status = 'OPEN' EARLIEST=-3d",
    limit=10000,
    offset=0,
))

Alerts Pagination

from taegis_sdk_python import GraphQLService
from taegis_sdk_python.services.alerts.types import SearchRequestInput, PollRequestInput, AlertsResponse

service = GraphQLService()
results = service.alerts.query.alerts_service_search(SearchRequestInput(
    cql_query="FROM alert WHERE severity >= 0.6 AND status = 'OPEN' EARLIEST=-3d",
    limit=1000000,
    offset=0,
))

poll_responses = [results]
search_id = results.search_id
total_parts = results.alerts.total_parts

if search_id:
    for part in range(2, total_parts + 1):
        results = None
        try:
            results = service.alerts.query.alerts_service_poll(
                PollRequestInput(
                    search_id=search_id,
                    part_id=part,
                )
            )
        except Exception as exc:
            if "not found" in str(exc):
                break
            raise exc

        if (
            isinstance(results, AlertsResponse)
            and results.alerts is not None
        ):
            poll_responses.append(results)

print(sum(
    len(response.alerts.list)
    for response in poll_responses
))

Query Events

from taegis_sdk_python.services import GraphQLService
from taegis_sdk_python.services.events.types import EventQueryOptions

service = GraphQLService()
options = EventQueryOptions(
    timestamp_ascending=True,
    page_size=1000,
    max_rows=1000,
    skip_cache=True,
    aggregation_off=False,
)

results = service.events.subscription.event_query("FROM process EARLIEST=-1d | head 10", options=options)

Events Pagination

from taegis_sdk_python.services import GraphQLService
from taegis_sdk_python.services.events.types import EventQueryResults, EventQueryOptions
from typing import List, Optional

def get_next_page(events_results: List[EventQueryResults]) -> Optional[str]:
    """Retrieve events  next page indicator."""
    try:
        # the next page could be found in any of the result pages,
        # but we cannot garuntee which result it will be found in
        return next(
            iter({result.next for result in events_results if result.next is not None})
        )
    except StopIteration:
        return None

service = GraphQLService()
options = EventQueryOptions(
    timestamp_ascending=True,
    page_size=1000,
    max_rows=100000,
    skip_cache=True,
    aggregation_off=False,
)
results = []

result = service.events.subscription.event_query("FROM process EARLIEST=-1d | head 10", options=options)
results.extend(result)
next_page = get_next_page(result)

while next_page:
    result = service.events.subscription.event_page(next_page)
    results.extend(result)
    next_page = get_next_page(result)

Query Investigations

from taegis_sdk_python import GraphQLService
from taegis_sdk_python.services.investigations2.types import InvestigationsV2Arguments

page = 1
per_page = 30
cql = "WHERE deleted_at IS NULL EARLIEST=-30d | sort updated_at asc"

service = GraphQLService()

investigation_output = service.investigations2.query.investigations_v2(
    InvestigationsV2Arguments(
        page=page,
        per_page=per_page,
        cql=cql,
    )
)
investigation_output

Investigations Pagination

from taegis_sdk_python import GraphQLService
from taegis_sdk_python.services.investigations2.types import InvestigationsV2Arguments

page = 1
per_page = 30
cql = "WHERE deleted_at IS NULL EARLIEST=-30d | sort updated_at asc"

results = []

service = GraphQLService()

investigation_output = service.investigations2.query.investigations_v2(
    InvestigationsV2Arguments(
        page=page,
        per_page=per_page,
        cql=cql,
    )
)
results.append(investigation_output)

total_count = investigation_output.total_count

while (
    sum_total := sum(len(result.investigations) for result in results)
) < total_count:
    page += 1
    investigation_output = service.investigations2.query.investigations_v2(
        InvestigationsV2Arguments(
            page=page,
            per_page=per_page,
            cql=cql,
        )
    )
    results.append(investigation_output)

investigations = [
    investigation for result in results for investigation in result.investigations
]

More

 

On this page: