diff --git a/python/lib/sift_client/_internal/low_level_wrappers/exports.py b/python/lib/sift_client/_internal/low_level_wrappers/exports.py new file mode 100644 index 000000000..63aa200cd --- /dev/null +++ b/python/lib/sift_client/_internal/low_level_wrappers/exports.py @@ -0,0 +1,154 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, cast + +from sift.calculated_channels.v2.calculated_channels_pb2 import ( + CalculatedChannelAbstractChannelReference, +) +from sift.exports.v1.exports_pb2 import ( + AssetsAndTimeRange, + CalculatedChannelConfig, + ExportDataRequest, + ExportDataResponse, + ExportOptions, + GetDownloadUrlRequest, + GetDownloadUrlResponse, + RunsAndTimeRange, + TimeRange, +) +from sift.exports.v1.exports_pb2_grpc import ExportServiceStub + +from sift_client._internal.low_level_wrappers.base import LowLevelClientBase +from sift_client._internal.util.timestamp import to_pb_timestamp +from sift_client.sift_types.calculated_channel import CalculatedChannel, CalculatedChannelCreate +from sift_client.transport import WithGrpcClient + +if TYPE_CHECKING: + from datetime import datetime + + from sift_client.sift_types.export import ExportOutputFormat + from sift_client.transport.grpc_transport import GrpcClient + + +def _build_calc_channel_configs( + calculated_channels: list[CalculatedChannel | CalculatedChannelCreate] | None, +) -> list[CalculatedChannelConfig]: + """Convert high-level calculated channel objects to proto CalculatedChannelConfig messages.""" + if not calculated_channels: + return [] + configs = [] + for cc in calculated_channels: + if isinstance(cc, CalculatedChannelCreate): + refs = cc.expression_channel_references or [] + else: + refs = cc.channel_references + configs.append( + CalculatedChannelConfig( + name=cc.name, + expression=cc.expression or "", + channel_references=[ + CalculatedChannelAbstractChannelReference( + channel_reference=ref.channel_reference, + channel_identifier=ref.channel_identifier, + ) + for ref in refs + ], + units=cc.units, + ) + ) + return configs + + +class ExportsLowLevelClient(LowLevelClientBase, WithGrpcClient): + """Low-level client for the DataExportAPI. + + This class provides a thin wrapper around the autogenerated gRPC bindings for the DataExportAPI. + """ + + def __init__(self, grpc_client: GrpcClient): + """Initialize the ExportsLowLevelClient. + + Args: + grpc_client: The gRPC client to use for making API calls. + """ + super().__init__(grpc_client) + + async def export_data( + self, + *, + output_format: ExportOutputFormat, + run_ids: list[str] | None = None, + asset_ids: list[str] | None = None, + start_time: datetime | None = None, + stop_time: datetime | None = None, + channel_ids: list[str] | None = None, + calculated_channels: list[CalculatedChannel | CalculatedChannelCreate] | None = None, + simplify_channel_names: bool = False, + combine_runs: bool = False, + split_export_by_asset: bool = False, + split_export_by_run: bool = False, + ) -> str: + """Initiate a data export. + + Builds the ExportDataRequest proto and makes the gRPC call. + Sets whichever time_selection oneof fields are provided + (run_ids, asset_ids, or time range); the server validates + the request. + + Returns: + The job ID for the background export. + """ + request = ExportDataRequest( + output_format=output_format.value, + export_options=ExportOptions( + use_legacy_format=False, + simplify_channel_names=simplify_channel_names, + combine_runs=combine_runs, + split_export_by_asset=split_export_by_asset, + split_export_by_run=split_export_by_run, + ), + channel_ids=channel_ids or [], + calculated_channel_configs=_build_calc_channel_configs(calculated_channels), + ) + + if run_ids is not None: + runs_and_time_range = RunsAndTimeRange(run_ids=run_ids) + if start_time: + runs_and_time_range.start_time.CopyFrom(to_pb_timestamp(start_time)) + if stop_time: + runs_and_time_range.stop_time.CopyFrom(to_pb_timestamp(stop_time)) + request.runs_and_time_range.CopyFrom(runs_and_time_range) + + if asset_ids is not None: + assets_and_time_range = AssetsAndTimeRange(asset_ids=asset_ids) + if start_time: + assets_and_time_range.start_time.CopyFrom(to_pb_timestamp(start_time)) + if stop_time: + assets_and_time_range.stop_time.CopyFrom(to_pb_timestamp(stop_time)) + request.assets_and_time_range.CopyFrom(assets_and_time_range) + + if run_ids is None and asset_ids is None: + time_range = TimeRange() + if start_time: + time_range.start_time.CopyFrom(to_pb_timestamp(start_time)) + if stop_time: + time_range.stop_time.CopyFrom(to_pb_timestamp(stop_time)) + request.time_range.CopyFrom(time_range) + + response = await self._grpc_client.get_stub(ExportServiceStub).ExportData(request) + response = cast("ExportDataResponse", response) + return response.job_id + + async def get_download_url(self, job_id: str) -> str: + """Get the download URL for a background export job. + + Args: + job_id: The job ID returned from export_data. + + Returns: + The presigned URL to download the exported zip file. + """ + request = GetDownloadUrlRequest(job_id=job_id) + response = await self._grpc_client.get_stub(ExportServiceStub).GetDownloadUrl(request) + response = cast("GetDownloadUrlResponse", response) + return response.presigned_url diff --git a/python/lib/sift_client/_internal/util/channels.py b/python/lib/sift_client/_internal/util/channels.py new file mode 100644 index 000000000..8c3d39d82 --- /dev/null +++ b/python/lib/sift_client/_internal/util/channels.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from sift_client.sift_types.calculated_channel import CalculatedChannel, CalculatedChannelCreate +from sift_client.sift_types.channel import ChannelReference + +if TYPE_CHECKING: + from sift_client.resources.channels import ChannelsAPIAsync + + +async def resolve_calculated_channels( + calculated_channels: list[CalculatedChannel | CalculatedChannelCreate] | None, + channels_api: ChannelsAPIAsync, +) -> list[CalculatedChannel | CalculatedChannelCreate] | None: + """Resolve channel reference identifiers from names to UUIDs. + + For each channel reference, looks up the identifier as a channel name. + If found, replaces it with the channel's UUID. If not found, assumes + the identifier is already a UUID and keeps it as-is. + """ + if not calculated_channels: + return None + + resolved: list[CalculatedChannel | CalculatedChannelCreate] = [] + for cc in calculated_channels: + refs = ( + (cc.expression_channel_references or []) + if isinstance(cc, CalculatedChannelCreate) + else cc.channel_references + ) + + resolved_refs: list[ChannelReference] = [] + for ref in refs: + channel = await channels_api.find( + name=ref.channel_identifier, + assets=cc.asset_ids, + ) + if channel is not None: + ref = ChannelReference( + channel_reference=ref.channel_reference, + channel_identifier=channel._id_or_error, + ) + resolved_refs.append(ref) + + resolved.append( + CalculatedChannelCreate( + name=cc.name, + expression=cc.expression, + expression_channel_references=resolved_refs, + units=cc.units or None, + ) + ) + return resolved diff --git a/python/lib/sift_client/_internal/util/executor.py b/python/lib/sift_client/_internal/util/executor.py new file mode 100644 index 000000000..87525cce0 --- /dev/null +++ b/python/lib/sift_client/_internal/util/executor.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +import asyncio +from typing import Any, Callable + + +async def run_sync_function(fn: Callable[..., Any], *args: Any) -> Any: + """Run a synchronous function in a thread pool to avoid blocking the event loop.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, fn, *args) diff --git a/python/lib/sift_client/_internal/util/file.py b/python/lib/sift_client/_internal/util/file.py new file mode 100644 index 000000000..76ee0e8db --- /dev/null +++ b/python/lib/sift_client/_internal/util/file.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +import zipfile +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from pathlib import Path + + from sift_client.transport.rest_transport import RestClient + + +def download_file(url: str, dest: Path, *, rest_client: RestClient) -> Path: + """Download a file from a URL in streaming 4 MiB chunks. + + Args: + url: The URL to download from. + dest: Path where the file will be saved. Parent directories are created if needed. + rest_client: The SDK rest client to use for the download. + + Returns: + The path to the downloaded file. + + Raises: + requests.HTTPError: If the download request fails. + """ + dest.parent.mkdir(parents=True, exist_ok=True) + # Strip the session's default Authorization header, presigned URLs carry their own auth + with rest_client.get(url, stream=True, headers={"Authorization": None}) as response: + response.raise_for_status() + with dest.open("wb") as file: + for chunk in response.iter_content(chunk_size=4194304): # 4 MiB + if chunk: + file.write(chunk) + return dest + + +def extract_zip(zip_path: Path, output_dir: Path, *, delete_zip: bool = True) -> list[Path]: + """Extract a zip file to a directory. + + Args: + zip_path: Path to the zip file. + output_dir: Directory to extract contents into. Created if it doesn't exist. + delete_zip: If True (default), delete the zip file after extraction. + + Returns: + List of paths to the extracted files (excludes directories). + + Raises: + zipfile.BadZipFile: If the file is not a valid zip. + """ + output_dir.mkdir(parents=True, exist_ok=True) + with zipfile.ZipFile(zip_path, "r") as zip_file: + names = zip_file.namelist() + zip_file.extractall(output_dir) + if delete_zip: + zip_path.unlink() + return [output_dir / name for name in names if not name.endswith("/")] diff --git a/python/lib/sift_client/_tests/_internal/test_sync_wrapper.py b/python/lib/sift_client/_tests/_internal/test_sync_wrapper.py index 86841f2b6..9e4973324 100644 --- a/python/lib/sift_client/_tests/_internal/test_sync_wrapper.py +++ b/python/lib/sift_client/_tests/_internal/test_sync_wrapper.py @@ -82,6 +82,13 @@ async def async_method_with_exception(self) -> None: await asyncio.sleep(0.01) raise ValueError("Test exception") + async def async_method_with_executor(self) -> str: + """Test async method that uses run_in_executor, like wait_and_download.""" + self._record_call("async_method_with_executor") + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, lambda: "executor_result") + return result + async def async_method_with_complex_args( self, arg1: str, arg2: dict[str, Any] | None = None, *args, **kwargs ) -> dict[str, Any]: @@ -183,3 +190,44 @@ def test_complex_arguments(self, mock_resource_sync): assert result["args"] == ("extra_arg",) assert result["kwargs"] == {"keyword": "keyword_value"} assert mock_resource_sync._async_impl.get_call_count("async_method_with_complex_args") == 1 + + +class TestSyncWrapperEventLoopScenarios: + """Test sync wrapper with run_in_executor under different event loop scenarios.""" + + @pytest.fixture + def mock_resource_sync(self): + mock_client = MockClient() + MockResource = generate_sync_api(MockResourceAsync, "MockResource") # noqa: N806 + return MockResource(mock_client, value="testVal") + + def test_sync_no_event_loop(self, mock_resource_sync): + """Plain sync call with no active event loop in the calling thread.""" + result = mock_resource_sync.async_method_with_executor() + assert result == "executor_result" + + def test_with_user_event_loop(self, mock_resource_sync): + """User has their own event loop running in another thread.""" + user_loop = asyncio.new_event_loop() + user_thread = threading.Thread(target=user_loop.run_forever, daemon=True) + user_thread.start() + try: + result = mock_resource_sync.async_method_with_executor() + assert result == "executor_result" + finally: + user_loop.call_soon_threadsafe(user_loop.stop) + user_thread.join(timeout=1.0) + user_loop.close() + + def test_sync_from_async(self, mock_resource_sync): + """Sync API called from inside a running async function.""" + + async def caller(): + return mock_resource_sync.async_method_with_executor() + + loop = asyncio.new_event_loop() + try: + result = loop.run_until_complete(caller()) + assert result == "executor_result" + finally: + loop.close() diff --git a/python/lib/sift_client/_tests/resources/test_exports.py b/python/lib/sift_client/_tests/resources/test_exports.py new file mode 100644 index 000000000..35224f5d4 --- /dev/null +++ b/python/lib/sift_client/_tests/resources/test_exports.py @@ -0,0 +1,534 @@ +"""Tests for the Exports API.""" + +from __future__ import annotations + +import uuid +from datetime import datetime, timedelta, timezone +from typing import TYPE_CHECKING +from unittest.mock import AsyncMock, MagicMock, patch +from urllib.parse import urljoin + +import pytest +import requests + +from sift_client._internal.low_level_wrappers.exports import _build_calc_channel_configs +from sift_client._internal.util.channels import resolve_calculated_channels + +if TYPE_CHECKING: + from sift_client import SiftClient +from sift_client.resources import DataExportAPI +from sift_client.resources.exports import DataExportAPIAsync +from sift_client.resources.jobs import JobsAPIAsync +from sift_client.sift_types.asset import Asset +from sift_client.sift_types.calculated_channel import ( + CalculatedChannel, + CalculatedChannelCreate, + ChannelReference, +) +from sift_client.sift_types.channel import Channel +from sift_client.sift_types.export import ExportOutputFormat +from sift_client.sift_types.job import DataExportStatusDetails, Job, JobStatus +from sift_client.sift_types.run import Run + +START = datetime(2025, 1, 1, tzinfo=timezone.utc) +STOP = datetime(2025, 1, 2, tzinfo=timezone.utc) +CSV = ExportOutputFormat.CSV + + +@pytest.fixture +def exports_api_async(sift_client: SiftClient): + return sift_client.async_.data_export + + +@pytest.fixture +def exports_api_sync(sift_client: SiftClient): + return sift_client.data_export + + +@pytest.fixture +def mock_client(): + client = MagicMock() + client.grpc_client = MagicMock() + client.async_ = MagicMock() + client.async_.jobs = MagicMock() + client.async_.channels = MagicMock() + client.async_.channels.find = AsyncMock(return_value=None) + return client + + +@pytest.fixture +def mock_job(): + job = MagicMock(spec=Job) + job._id_or_error = "job-123" + job.job_status = JobStatus.FINISHED + return job + + +@pytest.fixture +def exports_api(mock_client, mock_job): + with patch("sift_client.resources.exports.ExportsLowLevelClient", autospec=True) as mock_ll: + api = DataExportAPIAsync(mock_client) + api._low_level_client = mock_ll.return_value + api._low_level_client.export_data = AsyncMock(return_value="job-123") + mock_client.async_.jobs.get = AsyncMock(return_value=mock_job) + return api + + +@pytest.mark.integration +def test_client_binding(sift_client): + assert isinstance(sift_client.data_export, DataExportAPI) + assert isinstance(sift_client.async_.data_export, DataExportAPIAsync) + + +INGEST_TIMESTAMP = datetime(2025, 6, 1, tzinfo=timezone.utc) + + +@pytest.fixture(scope="session") +def ingested_export_channel(sift_client, nostromo_asset): + """Ingest a single data point into a unique channel on the nostromo asset for export tests.""" + import time + + channel_name = f"export-test-{uuid.uuid4().hex[:8]}" + rest_client = sift_client.rest_client + ingest_url = urljoin(rest_client.base_url, "api/v2/ingest") + api_key = rest_client._config.api_key + + payload = { + "asset_name": nostromo_asset.name, + "data": [ + { + "timestamp": INGEST_TIMESTAMP.isoformat(), + "values": [{"channel": channel_name, "value": 42}], + } + ], + } + resp = requests.post( + ingest_url, + headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, + json=payload, + timeout=30, + ) + resp.raise_for_status() + + channel = None + for _ in range(20): + channel = sift_client.channels.find(name=channel_name, asset=nostromo_asset._id_or_error) + if channel is not None: + break + time.sleep(0.5) + assert channel is not None, f"Channel {channel_name} did not appear after ingest" + + yield channel + + sift_client.channels.archive([channel]) + + +@pytest.mark.integration +class TestExportsIntegration: + @pytest.mark.asyncio + async def test_export_by_run(self, exports_api_async, nostromo_run): + start = nostromo_run.start_time + job = await exports_api_async.export( + runs=[nostromo_run], + start_time=start, + stop_time=start + timedelta(seconds=10), + output_format=CSV, + ) + assert isinstance(job, Job) + assert job.id_ is not None + + @pytest.mark.asyncio + async def test_export_by_asset( + self, exports_api_async, nostromo_asset, ingested_export_channel + ): + job = await exports_api_async.export( + assets=[nostromo_asset], + start_time=INGEST_TIMESTAMP - timedelta(seconds=1), + stop_time=INGEST_TIMESTAMP + timedelta(seconds=1), + channels=[ingested_export_channel], + output_format=CSV, + ) + assert isinstance(job, Job) + + @pytest.mark.asyncio + async def test_export_by_time_range(self, exports_api_async, sift_client, nostromo_run): + channels = await sift_client.async_.channels.list_(limit=1) + assert channels, "No channels available" + start = nostromo_run.start_time + job = await exports_api_async.export( + start_time=start, + stop_time=start + timedelta(seconds=10), + channels=[channels[0]], + output_format=CSV, + ) + assert isinstance(job, Job) + + @pytest.mark.asyncio + async def test_wait_and_download(self, exports_api_async, nostromo_run, tmp_path): + start = nostromo_run.start_time + job = await exports_api_async.export( + runs=[nostromo_run], + start_time=start, + stop_time=start + timedelta(seconds=10), + output_format=CSV, + ) + files = job.wait_and_download(output_dir=tmp_path, timeout_secs=300) + assert len(files) > 0 + assert all(f.exists() for f in files) + + def test_sync_export_by_run(self, exports_api_sync, nostromo_run): + start = nostromo_run.start_time + job = exports_api_sync.export( + runs=[nostromo_run], + start_time=start, + stop_time=start + timedelta(seconds=10), + output_format=CSV, + ) + assert isinstance(job, Job) + + def test_sync_export_by_asset(self, exports_api_sync, nostromo_asset, ingested_export_channel): + job = exports_api_sync.export( + assets=[nostromo_asset], + start_time=INGEST_TIMESTAMP - timedelta(seconds=1), + stop_time=INGEST_TIMESTAMP + timedelta(seconds=1), + channels=[ingested_export_channel], + output_format=CSV, + ) + assert isinstance(job, Job) + + def test_sync_export_by_time_range(self, exports_api_sync, sift_client, nostromo_run): + channels = sift_client.channels.list_(limit=1) + assert channels, "No channels available" + start = nostromo_run.start_time + job = exports_api_sync.export( + start_time=start, + stop_time=start + timedelta(seconds=10), + channels=[channels[0]], + output_format=CSV, + ) + assert isinstance(job, Job) + + +class TestExportDelegation: + """Verify each mode correctly delegates to the low-level client.""" + + @pytest.mark.asyncio + async def test_by_runs(self, exports_api): + await exports_api.export( + runs=["run-1", "run-2"], + output_format=CSV, + start_time=START, + stop_time=STOP, + channels=["ch-1"], + simplify_channel_names=True, + combine_runs=True, + split_export_by_asset=True, + ) + exports_api._low_level_client.export_data.assert_awaited_once_with( + run_ids=["run-1", "run-2"], + asset_ids=None, + output_format=CSV, + start_time=START, + stop_time=STOP, + channel_ids=["ch-1"], + calculated_channels=None, + simplify_channel_names=True, + combine_runs=True, + split_export_by_asset=True, + split_export_by_run=False, + ) + + @pytest.mark.asyncio + async def test_by_assets(self, exports_api): + await exports_api.export( + assets=["asset-1"], + start_time=START, + stop_time=STOP, + output_format=CSV, + channels=["ch-1", "ch-2"], + ) + exports_api._low_level_client.export_data.assert_awaited_once_with( + run_ids=None, + asset_ids=["asset-1"], + start_time=START, + stop_time=STOP, + output_format=CSV, + channel_ids=["ch-1", "ch-2"], + calculated_channels=None, + simplify_channel_names=False, + combine_runs=False, + split_export_by_asset=False, + split_export_by_run=False, + ) + + @pytest.mark.asyncio + async def test_by_time_range(self, exports_api): + await exports_api.export( + start_time=START, + stop_time=STOP, + output_format=ExportOutputFormat.SUN, + channels=["ch-1"], + ) + exports_api._low_level_client.export_data.assert_awaited_once_with( + run_ids=None, + asset_ids=None, + start_time=START, + stop_time=STOP, + output_format=ExportOutputFormat.SUN, + channel_ids=["ch-1"], + calculated_channels=None, + simplify_channel_names=False, + combine_runs=False, + split_export_by_asset=False, + split_export_by_run=False, + ) + + +class TestDomainObjectResolution: + @pytest.mark.asyncio + async def test_run_objects_to_ids(self, exports_api): + mock_run = MagicMock(spec=Run) + mock_run._id_or_error = "resolved-run-id" + await exports_api.export(runs=[mock_run, "raw-id"], output_format=CSV) + assert exports_api._low_level_client.export_data.call_args.kwargs["run_ids"] == [ + "resolved-run-id", + "raw-id", + ] + + @pytest.mark.asyncio + async def test_asset_objects_to_ids(self, exports_api): + mock_asset = MagicMock(spec=Asset) + mock_asset._id_or_error = "resolved-asset-id" + await exports_api.export( + assets=[mock_asset, "raw-id"], start_time=START, stop_time=STOP, output_format=CSV + ) + assert exports_api._low_level_client.export_data.call_args.kwargs["asset_ids"] == [ + "resolved-asset-id", + "raw-id", + ] + + @pytest.mark.asyncio + async def test_channel_objects_to_ids(self, exports_api): + mock_ch = MagicMock(spec=Channel) + mock_ch._id_or_error = "resolved-ch-id" + await exports_api.export(runs=["run-1"], output_format=CSV, channels=[mock_ch, "raw-ch-id"]) + assert exports_api._low_level_client.export_data.call_args.kwargs["channel_ids"] == [ + "resolved-ch-id", + "raw-ch-id", + ] + + +class TestDictConversion: + @pytest.mark.asyncio + async def test_calculated_channel_dict_converted(self, exports_api): + await exports_api.export( + runs=["run-1"], + output_format=CSV, + calculated_channels=[ + { + "name": "calc", + "expression": "$1 + 1", + "expression_channel_references": [ + {"channel_reference": "$1", "channel_identifier": "ch-1"} + ], + } + ], + ) + cc = exports_api._low_level_client.export_data.call_args.kwargs["calculated_channels"] + assert cc is not None + assert len(cc) == 1 + assert isinstance(cc[0], CalculatedChannelCreate) + assert cc[0].name == "calc" + + +class TestExportValidation: + @pytest.mark.asyncio + async def test_runs_and_assets_raises(self, exports_api): + with pytest.raises(ValueError, match="not both"): + await exports_api.export( + runs=["r"], assets=["a"], start_time=START, stop_time=STOP, output_format=CSV + ) + + @pytest.mark.asyncio + async def test_nothing_provided_raises(self, exports_api): + with pytest.raises(ValueError, match="At least one"): + await exports_api.export(output_format=CSV) + + +class TestBuildCalcChannelConfigs: + @pytest.mark.parametrize("input_val", [None, []]) + def test_empty_input(self, input_val): + assert _build_calc_channel_configs(input_val) == [] + + def test_create_objects(self): + ccs = [ + CalculatedChannelCreate( + name="speed_doubled", + expression="$1 * 2", + units="m/s", + expression_channel_references=[ + ChannelReference(channel_reference="$1", channel_identifier="ch-1") + ], + ), + CalculatedChannelCreate( + name="no_units", + expression="$1 + $2", + expression_channel_references=[ + ChannelReference(channel_reference="$1", channel_identifier="ch-1"), + ChannelReference(channel_reference="$2", channel_identifier="ch-2"), + ], + ), + ] + result = _build_calc_channel_configs(ccs) + assert len(result) == 2 + assert result[0].name == "speed_doubled" + assert result[0].units == "m/s" + assert result[1].name == "no_units" + assert result[1].units == "" + assert len(result[1].channel_references) == 2 + + def test_existing_calculated_channel(self): + cc = MagicMock(spec=CalculatedChannel) + cc.name, cc.expression, cc.units = "derived", "$1 / $2", "m/s" + cc.channel_references = [ + ChannelReference(channel_reference="$1", channel_identifier="ch-dist"), + ChannelReference(channel_reference="$2", channel_identifier="ch-time"), + ] + result = _build_calc_channel_configs([cc]) + assert len(result) == 1 + assert result[0].name == "derived" + assert [r.channel_identifier for r in result[0].channel_references] == [ + "ch-dist", + "ch-time", + ] + + +class TestResolveCalculatedChannels: + @pytest.mark.asyncio + async def test_none_passthrough(self): + api = MagicMock() + api.find = AsyncMock(return_value=None) + assert await resolve_calculated_channels(None, channels_api=api) is None + + @pytest.mark.asyncio + async def test_resolves_name_to_uuid(self): + mock_ch = MagicMock(spec=Channel) + mock_ch._id_or_error = "resolved-uuid" + api = MagicMock() + api.find = AsyncMock(return_value=mock_ch) + + cc = MagicMock(spec=CalculatedChannel) + cc.name, cc.expression, cc.units = "calc", "$1 + 10", "m/s" + cc.asset_ids = ["asset-1"] + cc.channel_references = [ + ChannelReference(channel_reference="$1", channel_identifier="sensor.vel") + ] + + result = await resolve_calculated_channels([cc], channels_api=api) + assert result is not None + assert len(result) == 1 + refs = result[0].expression_channel_references + assert refs is not None + assert refs[0].channel_identifier == "resolved-uuid" + + @pytest.mark.asyncio + async def test_keeps_identifier_when_not_found(self): + api = MagicMock() + api.find = AsyncMock(return_value=None) + cc = CalculatedChannelCreate( + name="x", + expression="$1", + units="m", + expression_channel_references=[ + ChannelReference(channel_reference="$1", channel_identifier="ch-1") + ], + ) + result = await resolve_calculated_channels([cc], channels_api=api) + assert result is not None + assert result[0] == cc + + +@pytest.fixture +def download_setup(mock_client, tmp_path): + completed_job = MagicMock(spec=Job) + completed_job.job_status = JobStatus.FINISHED + + jobs_api = JobsAPIAsync(mock_client) + jobs_api.wait_until_complete = AsyncMock(return_value=completed_job) + mock_client.async_.data_export = MagicMock() + mock_client.async_.data_export._low_level_client = MagicMock() + mock_client.async_.data_export._low_level_client.get_download_url = AsyncMock( + return_value="https://dl.test/export.zip" + ) + + fake_file = tmp_path / "data.csv" + fake_file.write_text("col1,col2\n1,2") + mock_loop = MagicMock() + mock_loop.run_in_executor = AsyncMock(return_value=None) + + return { + "api": jobs_api, + "client": mock_client, + "tmp_path": tmp_path, + "fake_file": fake_file, + "loop": mock_loop, + } + + +class TestWaitAndDownload: + @pytest.mark.asyncio + async def test_success(self, download_setup): + s = download_setup + job = MagicMock(spec=Job) + job._id_or_error = "job-123" + with patch("asyncio.get_running_loop", return_value=s["loop"]): + with patch("sift_client.resources.jobs.extract_zip", return_value=[s["fake_file"]]): + result = await s["api"].wait_and_download(job=job, output_dir=s["tmp_path"]) + assert result == [s["fake_file"]] + s["api"].wait_until_complete.assert_awaited_once_with( + job="job-123", polling_interval_secs=5, timeout_secs=None + ) + + @pytest.mark.asyncio + async def test_job_id_string(self, download_setup): + s = download_setup + with patch("asyncio.get_running_loop", return_value=s["loop"]): + with patch("sift_client.resources.jobs.extract_zip", return_value=[s["fake_file"]]): + result = await s["api"].wait_and_download(job="job-456", output_dir=s["tmp_path"]) + assert result == [s["fake_file"]] + + @pytest.mark.asyncio + async def test_custom_polling_and_timeout(self, download_setup): + s = download_setup + job = MagicMock(spec=Job) + job._id_or_error = "job-123" + with patch("asyncio.get_running_loop", return_value=s["loop"]): + with patch("sift_client.resources.jobs.extract_zip", return_value=[s["fake_file"]]): + await s["api"].wait_and_download( + job=job, polling_interval_secs=1, timeout_secs=10, output_dir=s["tmp_path"] + ) + s["api"].wait_until_complete.assert_awaited_once_with( + job="job-123", polling_interval_secs=1, timeout_secs=10 + ) + + @pytest.mark.asyncio + @pytest.mark.parametrize( + ("status", "details", "match"), + [ + ( + JobStatus.FAILED, + DataExportStatusDetails(error_message="out of memory"), + r"failed.*out of memory", + ), + (JobStatus.FAILED, None, "failed"), + (JobStatus.CANCELLED, None, "cancelled"), + ], + ) + async def test_terminal_status_raises(self, mock_client, status, details, match): + jobs_api = JobsAPIAsync(mock_client) + completed = MagicMock(spec=Job) + completed.job_status = status + completed.job_status_details = details + jobs_api.wait_until_complete = AsyncMock(return_value=completed) + with pytest.raises(RuntimeError, match=match): + await jobs_api.wait_and_download(job="job-err") diff --git a/python/lib/sift_client/client.py b/python/lib/sift_client/client.py index ae3302673..ed7aeba9a 100644 --- a/python/lib/sift_client/client.py +++ b/python/lib/sift_client/client.py @@ -7,6 +7,8 @@ CalculatedChannelsAPIAsync, ChannelsAPI, ChannelsAPIAsync, + DataExportAPI, + DataExportAPIAsync, FileAttachmentsAPI, FileAttachmentsAPIAsync, IngestionAPIAsync, @@ -101,9 +103,13 @@ class SiftClient( tags: TagsAPI """Instance of the Tags API for making synchronous requests.""" + test_results: TestResultsAPI """Instance of the Test Results API for making synchronous requests.""" + data_export: DataExportAPI + """Instance of the Data Export API for making synchronous requests.""" + async_: AsyncAPIs """Accessor for the asynchronous APIs. All asynchronous APIs are available as attributes on this accessor.""" @@ -152,6 +158,7 @@ def __init__( self.runs = RunsAPI(self) self.tags = TagsAPI(self) self.test_results = TestResultsAPI(self) + self.data_export = DataExportAPI(self) # Accessor for the asynchronous APIs self.async_ = AsyncAPIs( @@ -167,6 +174,7 @@ def __init__( runs=RunsAPIAsync(self), tags=TagsAPIAsync(self), test_results=TestResultsAPIAsync(self), + data_export=DataExportAPIAsync(self), ) @property diff --git a/python/lib/sift_client/resources/__init__.py b/python/lib/sift_client/resources/__init__.py index af9fe5e31..78b3b4eba 100644 --- a/python/lib/sift_client/resources/__init__.py +++ b/python/lib/sift_client/resources/__init__.py @@ -162,6 +162,7 @@ async def main(): from sift_client.resources.runs import RunsAPIAsync from sift_client.resources.tags import TagsAPIAsync from sift_client.resources.test_results import TestResultsAPIAsync +from sift_client.resources.exports import DataExportAPIAsync # ruff: noqa All imports needs to be imported before sync_stubs to avoid circular import from sift_client.resources.sync_stubs import ( @@ -176,6 +177,7 @@ async def main(): TagsAPI, TestResultsAPI, FileAttachmentsAPI, + DataExportAPI, ) import sys @@ -211,4 +213,6 @@ async def main(): "TestResultsAPI", "TestResultsAPIAsync", "TracingConfig", + "DataExportAPI", + "DataExportAPIAsync", ] diff --git a/python/lib/sift_client/resources/exports.py b/python/lib/sift_client/resources/exports.py new file mode 100644 index 000000000..ed8676960 --- /dev/null +++ b/python/lib/sift_client/resources/exports.py @@ -0,0 +1,139 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from sift_client._internal.low_level_wrappers.exports import ExportsLowLevelClient +from sift_client._internal.util.channels import resolve_calculated_channels +from sift_client.resources._base import ResourceBase +from sift_client.sift_types.asset import Asset +from sift_client.sift_types.calculated_channel import CalculatedChannelCreate +from sift_client.sift_types.channel import Channel +from sift_client.sift_types.export import ExportOutputFormat # noqa: TC001 +from sift_client.sift_types.run import Run + +if TYPE_CHECKING: + from datetime import datetime + + from sift_client.client import SiftClient + from sift_client.sift_types.calculated_channel import CalculatedChannel + from sift_client.sift_types.job import Job + + +class DataExportAPIAsync(ResourceBase): + """High-level API for exporting data from Sift.""" + + def __init__(self, sift_client: SiftClient): + """Initialize the DataExportAPI. + + Args: + sift_client: The Sift client to use. + """ + super().__init__(sift_client) + self._low_level_client = ExportsLowLevelClient(grpc_client=self.client.grpc_client) + + async def export( + self, + *, + output_format: ExportOutputFormat, + runs: list[str | Run] | None = None, + assets: list[str | Asset] | None = None, + start_time: datetime | None = None, + stop_time: datetime | None = None, + channels: list[str | Channel] | None = None, + calculated_channels: list[CalculatedChannel | CalculatedChannelCreate | dict] | None = None, + simplify_channel_names: bool = False, + combine_runs: bool = False, + split_export_by_asset: bool = False, + split_export_by_run: bool = False, + ) -> Job: + """Export data from Sift. + + Initiates an export on the server and returns a Job handle. Use + ``job.wait_and_download()`` to poll for completion and download the files. + + There are three ways to scope the export, determined by which arguments + are provided: + + 1. **By runs** — provide ``runs``. The ``start_time``/``stop_time`` are + optional (if omitted, the full time range of each run is used). If no + ``channels`` or ``calculated_channels`` are provided, all channels + from the runs' assets are included. + + 2. **By assets** — provide ``assets``. Both ``start_time`` and + ``stop_time`` are **required**. If no ``channels`` or + ``calculated_channels`` are provided, all channels from the assets + are included. + + 3. **By time range only** — provide ``start_time`` and ``stop_time`` + without ``runs`` or ``assets``. At least one of ``channels`` or + ``calculated_channels`` **must** be provided to scope the data. + + You cannot provide both ``runs`` and ``assets`` at the same time. + + Args: + output_format: The file format for the export (CSV or Sun/WinPlot). + runs: One or more Run objects or run IDs to export data from. + assets: One or more Asset objects or asset IDs to export data from. + start_time: Start of the time range to export. Required when using + assets or time-range-only mode; optional when using runs. + stop_time: End of the time range to export. Required when using + assets or time-range-only mode; optional when using runs. + channels: Channel objects or channel IDs to include. If omitted and + runs or assets are provided, all channels are exported. Required + (along with ``calculated_channels``) in time-range-only mode. + calculated_channels: Calculated channels to include in the export. + Accepts existing CalculatedChannel objects, + CalculatedChannelCreate definitions, or dictionaries that + will be converted to CalculatedChannelCreate via model_validate. + simplify_channel_names: Remove text preceding last period in channel + names, only if the resulting simplified name is unique. + combine_runs: Identical channels within the same asset across + multiple runs will be combined into a single column. + split_export_by_asset: Split each asset into a separate file, with + asset name removed from channel name display. + split_export_by_run: Split each run into a separate file, with run + name removed from channel name display. + + Returns: + A Job handle for the pending export. + """ + if runs and assets: + raise ValueError("Provide either 'runs' or 'assets', not both.") + if not runs and not assets and not start_time and not stop_time: + raise ValueError("At least one of 'runs', 'assets', or a time range must be provided.") + + run_ids = [r._id_or_error if isinstance(r, Run) else r for r in runs] if runs else None + asset_ids = ( + [a._id_or_error if isinstance(a, Asset) else a for a in assets] if assets else None + ) + channel_ids = ( + [c._id_or_error if isinstance(c, Channel) else c for c in channels] if channels else [] + ) + normalized_calc_channels: list[CalculatedChannel | CalculatedChannelCreate] | None = ( + [ + CalculatedChannelCreate.model_validate(cc) if isinstance(cc, dict) else cc + for cc in calculated_channels + ] + if calculated_channels + else None + ) + resolved_calc_channels = await resolve_calculated_channels( + normalized_calc_channels, + channels_api=self.client.async_.channels, + ) + + job_id = await self._low_level_client.export_data( + run_ids=run_ids, + asset_ids=asset_ids, + output_format=output_format, + start_time=start_time, + stop_time=stop_time, + channel_ids=channel_ids, + calculated_channels=resolved_calc_channels, + simplify_channel_names=simplify_channel_names, + combine_runs=combine_runs, + split_export_by_asset=split_export_by_asset, + split_export_by_run=split_export_by_run, + ) + + return await self.client.async_.jobs.get(job_id=job_id) diff --git a/python/lib/sift_client/resources/jobs.py b/python/lib/sift_client/resources/jobs.py index c3f775389..5789212d6 100644 --- a/python/lib/sift_client/resources/jobs.py +++ b/python/lib/sift_client/resources/jobs.py @@ -1,12 +1,16 @@ from __future__ import annotations import asyncio +import tempfile import time +from pathlib import Path from typing import TYPE_CHECKING from sift_client._internal.low_level_wrappers.jobs import JobsLowLevelClient +from sift_client._internal.util.executor import run_sync_function +from sift_client._internal.util.file import download_file, extract_zip from sift_client.resources._base import ResourceBase -from sift_client.sift_types.job import Job, JobStatus, JobType +from sift_client.sift_types.job import DataExportStatusDetails, Job, JobStatus, JobType from sift_client.util import cel_utils as cel if TYPE_CHECKING: @@ -189,3 +193,74 @@ async def wait_until_complete( if timeout_secs is not None and (time.monotonic() - start) >= timeout_secs: raise TimeoutError(f"Job {job_id} did not complete within {timeout_secs} seconds") await asyncio.sleep(polling_interval_secs) + + async def wait_and_download( + self, + *, + job: Job | str, + polling_interval_secs: int = 5, + timeout_secs: int | None = None, + output_dir: str | Path | None = None, + extract: bool = True, + ) -> list[Path]: + """Wait for an export job to complete and download the exported files. + + Polls the job status at the given interval until the job is FINISHED, + FAILED, or CANCELLED, then downloads and extracts the exported data files. + + Args: + job: The export Job or job ID to wait for. + polling_interval_secs: Seconds between status polls. Defaults to 5. + timeout_secs: Maximum seconds to wait. If None, polls indefinitely. + output_dir: Directory to save the extracted files. If omitted, a + temporary directory is created automatically. + extract: If True (default), extract the zip and delete it, + returning paths to the extracted files. If False, keep the + zip file and return its path. + + Returns: + List of paths to the extracted data files, or a single-element + list containing the zip path if extract is False. + + Raises: + RuntimeError: If the export job fails or is cancelled. + TimeoutError: If the export job does not complete within timeout_secs. + """ + job_id = job._id_or_error if isinstance(job, Job) else job + + completed_job = await self.wait_until_complete( + job=job_id, + polling_interval_secs=polling_interval_secs, + timeout_secs=timeout_secs, + ) + if completed_job.job_status == JobStatus.FAILED: + if ( + isinstance(completed_job.job_status_details, DataExportStatusDetails) + and completed_job.job_status_details.error_message + ): + raise RuntimeError( + f"Export job '{job_id}' failed. {completed_job.job_status_details.error_message}" + ) + raise RuntimeError(f"Export job '{job_id}' failed.") + if completed_job.job_status == JobStatus.CANCELLED: + raise RuntimeError(f"Export job '{job_id}' was cancelled.") + + presigned_url = await self.client.async_.data_export._low_level_client.get_download_url( + job_id=job_id + ) + output_dir = ( + Path(output_dir) + if output_dir is not None + else Path(tempfile.mkdtemp(prefix="sift_export_")) + ) + zip_file_path = output_dir / f"{job_id}.zip" + + # Run the synchronous download in a thread pool to avoid blocking the event loop + rest_client = self.client.rest_client + await run_sync_function( + lambda: download_file(presigned_url, zip_file_path, rest_client=rest_client) + ) + + if not extract: + return [zip_file_path] + return extract_zip(zip_file_path, output_dir) diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.py b/python/lib/sift_client/resources/sync_stubs/__init__.py index 3f6cc427c..acd73755e 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.py +++ b/python/lib/sift_client/resources/sync_stubs/__init__.py @@ -7,6 +7,7 @@ AssetsAPIAsync, CalculatedChannelsAPIAsync, ChannelsAPIAsync, + DataExportAPIAsync, FileAttachmentsAPIAsync, JobsAPIAsync, PingAPIAsync, @@ -28,11 +29,13 @@ ReportsAPI = generate_sync_api(ReportsAPIAsync, "ReportsAPI") TagsAPI = generate_sync_api(TagsAPIAsync, "TagsAPI") TestResultsAPI = generate_sync_api(TestResultsAPIAsync, "TestResultsAPI") +DataExportAPI = generate_sync_api(DataExportAPIAsync, "DataExportAPI") __all__ = [ "AssetsAPI", "CalculatedChannelsAPI", "ChannelsAPI", + "DataExportAPI", "FileAttachmentsAPI", "JobsAPI", "PingAPI", diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.pyi b/python/lib/sift_client/resources/sync_stubs/__init__.pyi index 843a0061f..f08a7fdf5 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.pyi +++ b/python/lib/sift_client/resources/sync_stubs/__init__.pyi @@ -21,12 +21,17 @@ if TYPE_CHECKING: CalculatedChannelUpdate, ) from sift_client.sift_types.channel import Channel + from sift_client.sift_types.export import ExportOutputFormat from sift_client.sift_types.file_attachment import ( FileAttachment, FileAttachmentUpdate, RemoteFileEntityType, ) - from sift_client.sift_types.job import Job, JobStatus, JobType + from sift_client.sift_types.job import ( + Job, + JobStatus, + JobType, + ) from sift_client.sift_types.report import Report, ReportUpdate from sift_client.sift_types.rule import Rule, RuleCreate, RuleUpdate, RuleVersion from sift_client.sift_types.run import Run, RunCreate, RunUpdate @@ -533,6 +538,89 @@ class ChannelsAPI: """ ... +class DataExportAPI: + """Sync counterpart to `DataExportAPIAsync`. + + High-level API for exporting data from Sift. + """ + + def __init__(self, sift_client: SiftClient): + """Initialize the DataExportAPI. + + Args: + sift_client: The Sift client to use. + """ + ... + + def _run(self, coro): ... + def export( + self, + *, + output_format: ExportOutputFormat, + runs: list[str | Run] | None = None, + assets: list[str | Asset] | None = None, + start_time: datetime | None = None, + stop_time: datetime | None = None, + channels: list[str | Channel] | None = None, + calculated_channels: list[CalculatedChannel | CalculatedChannelCreate | dict] | None = None, + simplify_channel_names: bool = False, + combine_runs: bool = False, + split_export_by_asset: bool = False, + split_export_by_run: bool = False, + ) -> Job: + """Export data from Sift. + + Initiates an export on the server and returns a Job handle. Use + ``job.wait_and_download()`` to poll for completion and download the files. + + There are three ways to scope the export, determined by which arguments + are provided: + + 1. **By runs** — provide ``runs``. The ``start_time``/``stop_time`` are + optional (if omitted, the full time range of each run is used). If no + ``channels`` or ``calculated_channels`` are provided, all channels + from the runs' assets are included. + + 2. **By assets** — provide ``assets``. Both ``start_time`` and + ``stop_time`` are **required**. If no ``channels`` or + ``calculated_channels`` are provided, all channels from the assets + are included. + + 3. **By time range only** — provide ``start_time`` and ``stop_time`` + without ``runs`` or ``assets``. At least one of ``channels`` or + ``calculated_channels`` **must** be provided to scope the data. + + You cannot provide both ``runs`` and ``assets`` at the same time. + + Args: + output_format: The file format for the export (CSV or Sun/WinPlot). + runs: One or more Run objects or run IDs to export data from. + assets: One or more Asset objects or asset IDs to export data from. + start_time: Start of the time range to export. Required when using + assets or time-range-only mode; optional when using runs. + stop_time: End of the time range to export. Required when using + assets or time-range-only mode; optional when using runs. + channels: Channel objects or channel IDs to include. If omitted and + runs or assets are provided, all channels are exported. Required + (along with ``calculated_channels``) in time-range-only mode. + calculated_channels: Calculated channels to include in the export. + Accepts existing CalculatedChannel objects, + CalculatedChannelCreate definitions, or dictionaries that + will be converted to CalculatedChannelCreate via model_validate. + simplify_channel_names: Remove text preceding last period in channel + names, only if the resulting simplified name is unique. + combine_runs: Identical channels within the same asset across + multiple runs will be combined into a single column. + split_export_by_asset: Split each asset into a separate file, with + asset name removed from channel name display. + split_export_by_run: Split each run into a separate file, with run + name removed from channel name display. + + Returns: + A Job handle for the pending export. + """ + ... + class FileAttachmentsAPI: """Sync counterpart to `FileAttachmentsAPIAsync`. @@ -763,6 +851,40 @@ class JobsAPI: """ ... + def wait_and_download( + self, + *, + job: Job | str, + polling_interval_secs: int = 5, + timeout_secs: int | None = None, + output_dir: str | Path | None = None, + extract: bool = True, + ) -> list[Path]: + """Wait for an export job to complete and download the exported files. + + Polls the job status at the given interval until the job is FINISHED, + FAILED, or CANCELLED, then downloads and extracts the exported data files. + + Args: + job: The export Job or job ID to wait for. + polling_interval_secs: Seconds between status polls. Defaults to 5. + timeout_secs: Maximum seconds to wait. If None, polls indefinitely. + output_dir: Directory to save the extracted files. If omitted, a + temporary directory is created automatically. + extract: If True (default), extract the zip and delete it, + returning paths to the extracted files. If False, keep the + zip file and return its path. + + Returns: + List of paths to the extracted data files, or a single-element + list containing the zip path if extract is False. + + Raises: + RuntimeError: If the export job fails or is cancelled. + TimeoutError: If the export job does not complete within timeout_secs. + """ + ... + def wait_until_complete( self, *, job: Job | str, polling_interval_secs: int = 5, timeout_secs: int | None = None ) -> Job: diff --git a/python/lib/sift_client/sift_types/export.py b/python/lib/sift_client/sift_types/export.py new file mode 100644 index 000000000..bac3eac31 --- /dev/null +++ b/python/lib/sift_client/sift_types/export.py @@ -0,0 +1,17 @@ +from __future__ import annotations + +from enum import Enum + +from sift.exports.v1.exports_pb2 import ExportOutputFormat as ExportOutputFormatProto + + +class ExportOutputFormat(Enum): + """Supported output formats for data exports. + + Attributes: + CSV: Comma-separated values format. + SUN: Sun (WinPlot) format (not used in certain environments). + """ + + CSV = ExportOutputFormatProto.EXPORT_OUTPUT_FORMAT_CSV + SUN = ExportOutputFormatProto.EXPORT_OUTPUT_FORMAT_SUN diff --git a/python/lib/sift_client/sift_types/job.py b/python/lib/sift_client/sift_types/job.py index 32b355763..97d68a1b5 100644 --- a/python/lib/sift_client/sift_types/job.py +++ b/python/lib/sift_client/sift_types/job.py @@ -16,6 +16,8 @@ from sift_client.sift_types._base import BaseType if TYPE_CHECKING: + from pathlib import Path + from sift_client.client import SiftClient @@ -312,3 +314,41 @@ def wait_until_complete( ) self._update(completed_job) return self + + def wait_and_download( + self, + *, + polling_interval_secs: int = 5, + timeout_secs: int | None = None, + output_dir: str | Path | None = None, + extract: bool = True, + ) -> list[Path]: + """Wait for an export job to complete and download the exported files. + + Polls the job status at the given interval until the job is FINISHED, + FAILED, or CANCELLED, then downloads and extracts the exported data files. + + Args: + polling_interval_secs: Seconds between status polls. Defaults to 5. + timeout_secs: Maximum seconds to wait. If None, polls indefinitely. + output_dir: Directory to save the extracted files. If omitted, a + temporary directory is created automatically. + extract: If True (default), extract the zip and delete it, + returning paths to the extracted files. If False, keep the + zip file and return its path. + + Returns: + List of paths to the extracted data files, or a single-element + list containing the zip path if extract is False. + + Raises: + RuntimeError: If the export job fails or is cancelled. + TimeoutError: If the export job does not complete within timeout_secs. + """ + return self.client.jobs.wait_and_download( + job=self, + polling_interval_secs=polling_interval_secs, + timeout_secs=timeout_secs, + output_dir=output_dir, + extract=extract, + ) diff --git a/python/lib/sift_client/util/util.py b/python/lib/sift_client/util/util.py index 3800f91a7..e82a8ccfe 100644 --- a/python/lib/sift_client/util/util.py +++ b/python/lib/sift_client/util/util.py @@ -7,6 +7,7 @@ AssetsAPIAsync, CalculatedChannelsAPIAsync, ChannelsAPIAsync, + DataExportAPIAsync, FileAttachmentsAPIAsync, IngestionAPIAsync, JobsAPIAsync, @@ -58,6 +59,9 @@ class AsyncAPIs(NamedTuple): test_results: TestResultsAPIAsync """Instance of the Test Results API for making asynchronous requests.""" + data_export: DataExportAPIAsync + """Instance of the Data Export API for making asynchronous requests.""" + def count_non_none(*args: Any) -> int: """Count the number of non-none arguments."""