🌙
 

Subscribe to the Taegis™ XDR Documentation RSS Feed at .

Learn more about RSS readers or RSS browser extensions.

Making Queries with the XDR Python SDK

Secureworks® Taegis™ XDR Python SDK is set up to query alerts, events, and investigations out of the box. Following are a few examples. For a more extensive list of examples, see XDR Python SDK Examples.

Query Alerts

from taegis_sdk_python import GraphQLService
from taegis_sdk_python.services.alerts.types import SearchRequestInput

service = GraphQLService()
results = service.alerts.query.alerts_service_search(SearchRequestInput(
    cql_query="FROM alert WHERE severity >= 0.6 AND status = 'OPEN' EARLIEST=-3d",
    limit=10000,
    offset=0,
))

Alerts Pagination

from taegis_sdk_python import GraphQLService
from taegis_sdk_python.services.alerts.types import SearchRequestInput, PollRequestInput, AlertsResponse

service = GraphQLService()
results = service.alerts.query.alerts_service_search(SearchRequestInput(
    cql_query="FROM alert WHERE severity >= 0.6 AND status = 'OPEN' EARLIEST=-3d",
    limit=1000000,
    offset=0,
))

poll_responses = [results]
search_id = results.search_id
total_parts = results.alerts.total_parts

if search_id:
    for part in range(2, total_parts + 1):
        results = None
        try:
            results = service.alerts.query.alerts_service_poll(
                PollRequestInput(
                    search_id=search_id,
                    part_id=part,
                )
            )
        except Exception as exc:
            if "not found" in str(exc):
                break
            raise exc

        if (
            isinstance(results, AlertsResponse)
            and results.alerts is not None
        ):
            poll_responses.append(results)

print(sum(
    len(response.alerts.list)
    for response in poll_responses
))

Query Events

from taegis_sdk_python.services import GraphQLService
from taegis_sdk_python.services.events.types import EventQueryOptions

service = GraphQLService()
options = EventQueryOptions(
    timestamp_ascending=True,
    page_size=1000,
    max_rows=1000,
    skip_cache=True,
    aggregation_off=False,
)

results = service.events.subscription.event_query("FROM process EARLIEST=-1d | head 10", options=options)

Events Pagination

from taegis_sdk_python.services import GraphQLService
from taegis_sdk_python.services.events.types import EventQueryResults, EventQueryOptions
from typing import List, Optional

def get_next_page(events_results: List[EventQueryResults]) -> Optional[str]:
    """Retrieve events  next page indicator."""
    try:
        # the next page could be found in any of the result pages,
        # but we cannot garuntee which result it will be found in
        return next(
            iter({result.next for result in events_results if result.next is not None})
        )
    except StopIteration:
        return None

service = GraphQLService()
options = EventQueryOptions(
    timestamp_ascending=True,
    page_size=1000,
    max_rows=100000,
    skip_cache=True,
    aggregation_off=False,
)
results = []

result = service.events.subscription.event_query("FROM process EARLIEST=-1d | head 10", options=options)
results.extend(result)
next_page = get_next_page(result)

while next_page:
    result = service.events.subscription.event_page(next_page)
    results.extend(result)
    next_page = get_next_page(result)

Query Investigations

from taegis_sdk_python import GraphQLService

service = GraphQLService()

results = service.investigations.query.investigations_search(
    page=1,
    per_page=500,
    query="WHERE deleted_at IS NULL EARLIEST=-30d",
    filter_text=None,
    order_by_field="updated_at",
    order_direction="asc",
)

Investigations Pagination

from taegis_sdk_python import GraphQLService

service = GraphQLService()

investigations = []
page = 1
per_page = 500

results = service.investigations.query.investigations_search(
    page=page,
    per_page=per_page,
    query="WHERE deleted_at IS NULL EARLIEST=-365d",
    filter_text=None,
    order_by_field="updated_at",
    order_direction="asc",
)
investigations.extend(results.investigations)

while len(results.investigations) == per_page:
    page += 1

    results = service.investigations.query.investigations_search(
        page=page,
        per_page=per_page,
        query="WHERE deleted_at IS NULL EARLIEST=-365d",
        filter_text=None,
        order_by_field="updated_at",
        order_direction="asc",
    )
    investigations.extend(results.investigations)

More

 

On this page: