Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
5ca890a
python(feature): sift_client low level wrapper for exports
wei-qlu Mar 7, 2026
a083f1d
python(feat): data export api for sift_client
wei-qlu Mar 10, 2026
7da8381
python(fix): updated sift_type export docstrings to handle channel_re…
wei-qlu Mar 10, 2026
07ea61d
python(fix): enforce channel_ids or calculated_channel_config for exp…
wei-qlu Mar 11, 2026
7327208
python(fix): rename internal grpc module to _grpc to aovid shadowing …
wei-qlu Mar 12, 2026
8748f8c
python(fix): add input validation and job status checks to export met…
wei-qlu Mar 12, 2026
e04bfae
python(fix): added timestamp checks and unit tests for exports
wei-qlu Mar 12, 2026
db372c6
python(fix): updated docstring args to match export options in UI
wei-qlu Mar 12, 2026
891fe87
python(fix): added assert for calc configs result
wei-qlu Mar 12, 2026
37810c2
python(fix): updated sync stubs
wei-qlu Mar 12, 2026
9ae44c6
python(refactor): exports API accepts domain objects alongisde raw IDs
wei-qlu Mar 15, 2026
67dc30c
python(refactor): exports API returns job, separate high-level/low-le…
wei-qlu Mar 15, 2026
5a54678
python(fix): low-level-client accepts ExportOutputFormat enum
wei-qlu Mar 15, 2026
ca1caf0
python(fix): linting
wei-qlu Mar 15, 2026
dae2ae5
python(refactor): merge low-level export methods into single method, …
wei-qlu Mar 16, 2026
60f2cff
python(fix): add assertions for datetime to resolve mypy errors
wei-qlu Mar 16, 2026
3ef402f
linting
wei-qlu Mar 16, 2026
1420dda
python(refactor): remove redundant code
wei-qlu Mar 16, 2026
fc844e8
python(refactor): removed use_legacy_format as a possible field for e…
wei-qlu Mar 16, 2026
6787655
python(feat): return file path from export job, update test
wei-qlu Mar 17, 2026
753cd67
python(fix): resolve calc channel name-based identifiers to uuid for …
wei-qlu Mar 17, 2026
efcbf84
python(fix): add assertions for export tests
wei-qlu Mar 17, 2026
0f853c7
linting
wei-qlu Mar 17, 2026
66e29ca
python(refactor): simplified resolve_calculated_channel logic
wei-qlu Mar 17, 2026
60a1942
mypy fix
wei-qlu Mar 17, 2026
bda329b
python(fix): update export tests to check equality
wei-qlu Mar 18, 2026
f42704a
pyright
wei-qlu Mar 18, 2026
3d7d7c0
python(fix): return paths to extracted_files in wait_until_complete
wei-qlu Mar 18, 2026
7c1e13c
python(refactor): rename wait_until_complete to download_when_complet…
wei-qlu Mar 18, 2026
2455fd1
python(refactor): rename to wait_and_download and regenerate stubs
wei-qlu Mar 18, 2026
837d8fa
python(refactor): revert _grpc to grpc and updated stubs
wei-qlu Mar 18, 2026
535335a
python(fix): updated unit tests to use the renamed function
wei-qlu Mar 18, 2026
ca7dd63
python(refactor): shared _export helper to deduplicate export methods
wei-qlu Mar 18, 2026
6e78e03
python(fix): use asyncio.get_running_loop() instead of deprecated get…
wei-qlu Mar 18, 2026
61a355c
python(fix): add integration tests and combined duplicate unit tests
wei-qlu Mar 19, 2026
3fb4557
pyright fix
wei-qlu Mar 19, 2026
faf4af8
move _resolve_calculated_channels and download_and_extract_zip to _in…
wei-qlu Mar 19, 2026
edca0ad
add extract parameter to wait_and_download can keep the zip without e…
wei-qlu Mar 19, 2026
4194dd0
refactored export methods to a single entry point
wei-qlu Mar 19, 2026
35fd172
mypy fix
wei-qlu Mar 19, 2026
75f0d1d
python(refactor): use rest_client instead of raw request
wei-qlu Mar 19, 2026
28d353c
python(refactor): added dict support for calc channels
wei-qlu Mar 19, 2026
c971119
python(refactor): class rename to DataExportsAPI
wei-qlu Mar 19, 2026
ab35764
mypy fix
wei-qlu Mar 19, 2026
f8c9caf
pyright fix
wei-qlu Mar 19, 2026
9e32d81
timeout increased
wei-qlu Mar 19, 2026
05a4ce1
python(refactor): split up the download_and_extract function and rena…
wei-qlu Mar 20, 2026
e8ab290
python(fix): scoped the integration export jobs to 10s
wei-qlu Mar 20, 2026
95df192
python(fix): add sync wrapper tests for run_in_executer loop scenarios
wei-qlu Mar 20, 2026
45f32a8
python(refactor): add a run_sync_function util for running blocking c…
wei-qlu Mar 20, 2026
af904bb
python(refactor): move wait_and_download to JobsAPI and add the metho…
wei-qlu Mar 20, 2026
1cfc7ec
python(fix): scoped the export_by_asset test to use one channel
wei-qlu Mar 21, 2026
3924682
python(fix): updated export_by_asset integration tests with ingested …
wei-qlu Mar 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions python/lib/sift_client/_internal/low_level_wrappers/exports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
from __future__ import annotations

from typing import TYPE_CHECKING, cast

from sift.calculated_channels.v2.calculated_channels_pb2 import (
CalculatedChannelAbstractChannelReference,
)
from sift.exports.v1.exports_pb2 import (
AssetsAndTimeRange,
CalculatedChannelConfig,
ExportDataRequest,
ExportDataResponse,
ExportOptions,
GetDownloadUrlRequest,
GetDownloadUrlResponse,
RunsAndTimeRange,
TimeRange,
)
from sift.exports.v1.exports_pb2_grpc import ExportServiceStub

from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client._internal.util.timestamp import to_pb_timestamp
from sift_client.sift_types.calculated_channel import CalculatedChannel, CalculatedChannelCreate
from sift_client.transport import WithGrpcClient

if TYPE_CHECKING:
from datetime import datetime

from sift_client.sift_types.export import ExportOutputFormat
from sift_client.transport.grpc_transport import GrpcClient


def _build_calc_channel_configs(
calculated_channels: list[CalculatedChannel | CalculatedChannelCreate] | None,
) -> list[CalculatedChannelConfig]:
"""Convert high-level calculated channel objects to proto CalculatedChannelConfig messages."""
if not calculated_channels:
return []
configs = []
for cc in calculated_channels:
if isinstance(cc, CalculatedChannelCreate):
refs = cc.expression_channel_references or []
else:
refs = cc.channel_references
configs.append(
CalculatedChannelConfig(
name=cc.name,
expression=cc.expression or "",
channel_references=[
CalculatedChannelAbstractChannelReference(
channel_reference=ref.channel_reference,
channel_identifier=ref.channel_identifier,
)
for ref in refs
],
units=cc.units,
)
)
return configs


class ExportsLowLevelClient(LowLevelClientBase, WithGrpcClient):
"""Low-level client for the DataExportAPI.

This class provides a thin wrapper around the autogenerated gRPC bindings for the DataExportAPI.
"""

def __init__(self, grpc_client: GrpcClient):
"""Initialize the ExportsLowLevelClient.

Args:
grpc_client: The gRPC client to use for making API calls.
"""
super().__init__(grpc_client)

async def export_data(
self,
*,
output_format: ExportOutputFormat,
run_ids: list[str] | None = None,
asset_ids: list[str] | None = None,
start_time: datetime | None = None,
stop_time: datetime | None = None,
channel_ids: list[str] | None = None,
calculated_channels: list[CalculatedChannel | CalculatedChannelCreate] | None = None,
simplify_channel_names: bool = False,
combine_runs: bool = False,
split_export_by_asset: bool = False,
split_export_by_run: bool = False,
) -> str:
"""Initiate a data export.

Builds the ExportDataRequest proto and makes the gRPC call.
Sets whichever time_selection oneof fields are provided
(run_ids, asset_ids, or time range); the server validates
the request.

Returns:
The job ID for the background export.
"""
request = ExportDataRequest(
output_format=output_format.value,
export_options=ExportOptions(
use_legacy_format=False,
simplify_channel_names=simplify_channel_names,
combine_runs=combine_runs,
split_export_by_asset=split_export_by_asset,
split_export_by_run=split_export_by_run,
),
channel_ids=channel_ids or [],
calculated_channel_configs=_build_calc_channel_configs(calculated_channels),
)

if run_ids is not None:
runs_and_time_range = RunsAndTimeRange(run_ids=run_ids)
if start_time:
runs_and_time_range.start_time.CopyFrom(to_pb_timestamp(start_time))
if stop_time:
runs_and_time_range.stop_time.CopyFrom(to_pb_timestamp(stop_time))
request.runs_and_time_range.CopyFrom(runs_and_time_range)

if asset_ids is not None:
assets_and_time_range = AssetsAndTimeRange(asset_ids=asset_ids)
if start_time:
assets_and_time_range.start_time.CopyFrom(to_pb_timestamp(start_time))
if stop_time:
assets_and_time_range.stop_time.CopyFrom(to_pb_timestamp(stop_time))
request.assets_and_time_range.CopyFrom(assets_and_time_range)

if run_ids is None and asset_ids is None:
time_range = TimeRange()
if start_time:
time_range.start_time.CopyFrom(to_pb_timestamp(start_time))
if stop_time:
time_range.stop_time.CopyFrom(to_pb_timestamp(stop_time))
request.time_range.CopyFrom(time_range)

response = await self._grpc_client.get_stub(ExportServiceStub).ExportData(request)
response = cast("ExportDataResponse", response)
return response.job_id

async def get_download_url(self, job_id: str) -> str:
"""Get the download URL for a background export job.

Args:
job_id: The job ID returned from export_data.

Returns:
The presigned URL to download the exported zip file.
"""
request = GetDownloadUrlRequest(job_id=job_id)
response = await self._grpc_client.get_stub(ExportServiceStub).GetDownloadUrl(request)
response = cast("GetDownloadUrlResponse", response)
return response.presigned_url
54 changes: 54 additions & 0 deletions python/lib/sift_client/_internal/util/channels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from sift_client.sift_types.calculated_channel import CalculatedChannel, CalculatedChannelCreate
from sift_client.sift_types.channel import ChannelReference

if TYPE_CHECKING:
from sift_client.resources.channels import ChannelsAPIAsync


async def resolve_calculated_channels(
calculated_channels: list[CalculatedChannel | CalculatedChannelCreate] | None,
channels_api: ChannelsAPIAsync,
) -> list[CalculatedChannel | CalculatedChannelCreate] | None:
"""Resolve channel reference identifiers from names to UUIDs.

For each channel reference, looks up the identifier as a channel name.
If found, replaces it with the channel's UUID. If not found, assumes
the identifier is already a UUID and keeps it as-is.
"""
if not calculated_channels:
return None

resolved: list[CalculatedChannel | CalculatedChannelCreate] = []
for cc in calculated_channels:
refs = (
(cc.expression_channel_references or [])
if isinstance(cc, CalculatedChannelCreate)
else cc.channel_references
)

resolved_refs: list[ChannelReference] = []
for ref in refs:
channel = await channels_api.find(
name=ref.channel_identifier,
assets=cc.asset_ids,
)
if channel is not None:
ref = ChannelReference(
channel_reference=ref.channel_reference,
channel_identifier=channel._id_or_error,
)
resolved_refs.append(ref)

resolved.append(
CalculatedChannelCreate(
name=cc.name,
expression=cc.expression,
expression_channel_references=resolved_refs,
units=cc.units or None,
)
)
return resolved
10 changes: 10 additions & 0 deletions python/lib/sift_client/_internal/util/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from __future__ import annotations

import asyncio
from typing import Any, Callable


async def run_sync_function(fn: Callable[..., Any], *args: Any) -> Any:
"""Run a synchronous function in a thread pool to avoid blocking the event loop."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, fn, *args)
57 changes: 57 additions & 0 deletions python/lib/sift_client/_internal/util/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from __future__ import annotations

import zipfile
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from pathlib import Path

from sift_client.transport.rest_transport import RestClient


def download_file(url: str, dest: Path, *, rest_client: RestClient) -> Path:
"""Download a file from a URL in streaming 4 MiB chunks.

Args:
url: The URL to download from.
dest: Path where the file will be saved. Parent directories are created if needed.
rest_client: The SDK rest client to use for the download.

Returns:
The path to the downloaded file.

Raises:
requests.HTTPError: If the download request fails.
"""
dest.parent.mkdir(parents=True, exist_ok=True)
# Strip the session's default Authorization header, presigned URLs carry their own auth
with rest_client.get(url, stream=True, headers={"Authorization": None}) as response:
response.raise_for_status()
with dest.open("wb") as file:
for chunk in response.iter_content(chunk_size=4194304): # 4 MiB
if chunk:
file.write(chunk)
return dest


def extract_zip(zip_path: Path, output_dir: Path, *, delete_zip: bool = True) -> list[Path]:
"""Extract a zip file to a directory.

Args:
zip_path: Path to the zip file.
output_dir: Directory to extract contents into. Created if it doesn't exist.
delete_zip: If True (default), delete the zip file after extraction.

Returns:
List of paths to the extracted files (excludes directories).

Raises:
zipfile.BadZipFile: If the file is not a valid zip.
"""
output_dir.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as zip_file:
names = zip_file.namelist()
zip_file.extractall(output_dir)
if delete_zip:
zip_path.unlink()
return [output_dir / name for name in names if not name.endswith("/")]
48 changes: 48 additions & 0 deletions python/lib/sift_client/_tests/_internal/test_sync_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ async def async_method_with_exception(self) -> None:
await asyncio.sleep(0.01)
raise ValueError("Test exception")

async def async_method_with_executor(self) -> str:
"""Test async method that uses run_in_executor, like wait_and_download."""
self._record_call("async_method_with_executor")
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, lambda: "executor_result")
return result

async def async_method_with_complex_args(
self, arg1: str, arg2: dict[str, Any] | None = None, *args, **kwargs
) -> dict[str, Any]:
Expand Down Expand Up @@ -183,3 +190,44 @@ def test_complex_arguments(self, mock_resource_sync):
assert result["args"] == ("extra_arg",)
assert result["kwargs"] == {"keyword": "keyword_value"}
assert mock_resource_sync._async_impl.get_call_count("async_method_with_complex_args") == 1


class TestSyncWrapperEventLoopScenarios:
"""Test sync wrapper with run_in_executor under different event loop scenarios."""

@pytest.fixture
def mock_resource_sync(self):
mock_client = MockClient()
MockResource = generate_sync_api(MockResourceAsync, "MockResource") # noqa: N806
return MockResource(mock_client, value="testVal")

def test_sync_no_event_loop(self, mock_resource_sync):
"""Plain sync call with no active event loop in the calling thread."""
result = mock_resource_sync.async_method_with_executor()
assert result == "executor_result"

def test_with_user_event_loop(self, mock_resource_sync):
"""User has their own event loop running in another thread."""
user_loop = asyncio.new_event_loop()
user_thread = threading.Thread(target=user_loop.run_forever, daemon=True)
user_thread.start()
try:
result = mock_resource_sync.async_method_with_executor()
assert result == "executor_result"
finally:
user_loop.call_soon_threadsafe(user_loop.stop)
user_thread.join(timeout=1.0)
user_loop.close()

def test_sync_from_async(self, mock_resource_sync):
"""Sync API called from inside a running async function."""

async def caller():
return mock_resource_sync.async_method_with_executor()

loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(caller())
assert result == "executor_result"
finally:
loop.close()
Loading
Loading