Conversation
…ort_by_time_range
…vel concerns, update tests
…updated unit tests
nathan-sift
left a comment
There was a problem hiding this comment.
This looks good, thanks for the changes! Approving, but it looks like you might need to re-update the stubs
|
|
||
| # Run the synchronous request in a thread pool to avoid blocking the event loop | ||
| loop = asyncio.get_event_loop() | ||
| extracted_files = await loop.run_in_executor( |
There was a problem hiding this comment.
is this necessary and does it even work in the sync context? ExportsAPIAsync functions are already async.
From Claude:
wait_until_complete is not safe for sync generation
This is the most serious issue. The async wait_until_complete
does:
loop = asyncio.get_event_loop()
extracted_files = await loop.run_in_executor(None,
self._download_and_extract, ...)
When the sync wrapper calls this, it's already running in a
dedicated event loop thread (per the architecture in
README.md). asyncio.get_event_loop() inside that context may
return a different loop than the one actually running the
coroutine, especially in Python 3.10+ where this is
deprecated. Even with get_running_loop(), the run_in_executor
call spawns a thread from within the dedicated gRPC thread —
this works but is fragile and untested.
Compare with how other resources handle blocking I/O: they
don't. Every other resource is pure async gRPC calls. Exports
is unique in mixing async gRPC with sync HTTP (requests.get)
via run_in_executor.
There was a problem hiding this comment.
run_in_executor is needed because requests.get() is synchronous (even using rest_client) and would block the event loop during download. Switching to get_running_loop() handles the issue (in sync mode as well) since it always returns the correct loop
There was a problem hiding this comment.
should we make a util/wrapper for REST calls such that we can handle this the same anytime we need to use REST? For example with file attachments
There was a problem hiding this comment.
That's a good idea, added a util function wrapping the executor for blocking calls.
Resolved
| from sift_client.transport.rest_transport import RestClient | ||
|
|
||
|
|
||
| def download_and_extract_zip( |
There was a problem hiding this comment.
can this be broken into two separate utils? a download (that we can use for any downloads such as file attachments, check what we do there) and the zip extraction? These are separate concerns
There was a problem hiding this comment.
resolved, broken into two separate functions and renamed the file to be more generic (_internal/util/file.py)
| ), | ||
| ) | ||
|
|
||
| return extracted_files |
There was a problem hiding this comment.
this is confusing naming since the files may or may not be extracted.
Once the concerns are separated a bit, it may be a bit more clear to:
zip_file_path = download_presigned_file(...)
if not extract:
return [zip_file_path]
return extract_zip(zip_file_path, delete=True, ...)There was a problem hiding this comment.
resolved, renamed to extract_zip to be more specific
| async def test_export_by_run(self, exports_api_async, sift_client): | ||
| runs = await sift_client.async_.runs.list_(limit=1) | ||
| assert runs, "No runs available" | ||
| job = await exports_api_async.export(runs=[runs[0]], output_format=CSV) |
There was a problem hiding this comment.
be careful with this, the run could have a lot of data, can we scope this in a safer way? Or perhaps we link this to an ingestion test and we then have full control of what data exists and are exporting
There was a problem hiding this comment.
Resolved, added an upper-bound time stamp (only 10s of data gets exported)
| assets = await sift_client.async_.assets.list_(limit=1) | ||
| assert assets, "No assets available" | ||
| now = datetime.now(timezone.utc) | ||
| job = await exports_api_async.export( |
There was a problem hiding this comment.
same comment as for the run export
There was a problem hiding this comment.
Resolved, added an upper-bound time stamp (only 10s of data gets exported)
| assert len(files) > 0 | ||
| assert all(f.exists() for f in files) | ||
|
|
||
| def test_sync_export_by_run(self, exports_api_sync, sift_client): |
There was a problem hiding this comment.
Since we are adding some unique loop handling (outside of what we are already doing in the sync wrapper), we should make sure we are sufficiently testing the following scenarios:
- Plain sync — no event loop active, user calls sync API directly (already tested)
- User's own event loop — user has their own loop and calls sync API from within it
- Sync from async — user calls sync API from inside a running async def
While this was tested extensively during development on the sync_wrapper, could you also please add these tests to the test_sync_wrapper for completeness?
There was a problem hiding this comment.
Resolved, added tests for all three scenarios (plain sync, user's own loop, sync from async) in test_sync_wrapper.py
…med the file to be more appropriate
…alls off the event loop
…d on the job object itself
What was changed
Adds a high-level data export API to sift_client to reach parity with sift_py (soon to be deprecated).
DataExportsAPIwith a singleexportmethod that accepts domain objects or raw string IDs for runs, assets, or time-range-based exports and returns aJobhandlewait_and_downloadpolls until complete, then downloads and extracts the exported files_resolve_calculated_channelshelper that resolves name-based channel identifiers to UUIDs via the channels API (the export API requires UUIDs butCalculatedChannelobjects store names)ExportsLowLevelClientthat wraps the gRPC protobuf calls; the high-level client has zero proto importsclient.exportsandclient.async_.exportsVerification
wait_and_downloadstatus handling, and calculated channel resolution