From 0f2e4ce80185319dcce41321ad54f52daf14cfc8 Mon Sep 17 00:00:00 2001 From: Alex Luck Date: Wed, 15 Oct 2025 16:09:27 -0700 Subject: [PATCH 01/15] pre-release branch for `SiftClient` --- python/pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index 993b4236b..c4db450eb 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sift_stack_py" -version = "0.9.1" +version = "1.0.0a0" description = "Python client library for the Sift API" requires-python = ">=3.8" readme = { file = "README.md", content-type = "text/markdown" } @@ -14,7 +14,7 @@ classifiers = [ "Programming Language :: Python :: 3.12", ] maintainers = [ - { name = "Sift Software Engineers", email = "engineering@siftstack.com" }, + { name = "Sift", email = "engineering@siftstack.com" }, ] keywords = ["sift", "sift-stack", "siftstack", "sift_py"] dependencies = [ From f03db5cc9ccfdee5c802fb8d2f0c1f82f68e93cb Mon Sep 17 00:00:00 2001 From: Alex Luck Date: Wed, 15 Oct 2025 16:45:35 -0700 Subject: [PATCH 02/15] fix missing secrets for release python CI --- .github/workflows/python_release.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/python_release.yaml b/.github/workflows/python_release.yaml index 964ef9ea3..fad61f3d7 100644 --- a/.github/workflows/python_release.yaml +++ b/.github/workflows/python_release.yaml @@ -7,11 +7,13 @@ jobs: python-ci: if: github.event_name == 'workflow_dispatch' && startsWith(github.ref, 'refs/tags') uses: ./.github/workflows/python_ci.yaml + secrets: inherit build-offline-archives: if: github.event_name == 'workflow_dispatch' && startsWith(github.ref, 'refs/tags') needs: python-ci uses: ./.github/workflows/python_build.yaml + secrets: inherit publish-to-pypi: name: Upload release to PyPI From 4603b1e8f9846acce37778d45595ad00423761a7 Mon Sep 17 00:00:00 2001 From: Alex Luck Date: Mon, 20 Oct 2025 09:50:44 -0700 Subject: [PATCH 03/15] version bump --- python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index c4db450eb..aa5811eb5 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sift_stack_py" -version = "1.0.0a0" +version = "1.0.0a1" description = "Python client library for the Sift API" requires-python = ">=3.8" readme = { file = "README.md", content-type = "text/markdown" } From ee33977eb62a65f0253b10c98b384fa673a1c99c Mon Sep 17 00:00:00 2001 From: Nathan Federknopp Date: Mon, 10 Nov 2025 20:42:47 -0800 Subject: [PATCH 04/15] python(feat): Sift Client Ingestion (#370) --- .github/workflows/python_ci.yaml | 4 +- python/docs/examples/ingestion.ipynb | 102 ++++ .../_internal/low_level_wrappers/ingestion.py | 487 +++++---------- .../_tests/resources/test_ingestion.py | 444 -------------- .../sift_client/_tests/resources/test_runs.py | 59 ++ .../_tests/sift_types/test_ingestion.py | 125 ++-- python/lib/sift_client/resources/__init__.py | 3 +- python/lib/sift_client/resources/ingestion.py | 569 ++++++++++++++++-- python/lib/sift_client/sift_types/__init__.py | 10 +- python/lib/sift_client/sift_types/channel.py | 38 ++ .../lib/sift_client/sift_types/ingestion.py | 237 +++++++- python/lib/sift_client/sift_types/run.py | 31 + python/mkdocs.yml | 1 + python/pyproject.toml | 7 +- 14 files changed, 1155 insertions(+), 962 deletions(-) create mode 100644 python/docs/examples/ingestion.ipynb delete mode 100644 python/lib/sift_client/_tests/resources/test_ingestion.py diff --git a/.github/workflows/python_ci.yaml b/.github/workflows/python_ci.yaml index 836e00437..ae0ec00a7 100644 --- a/.github/workflows/python_ci.yaml +++ b/.github/workflows/python_ci.yaml @@ -17,7 +17,9 @@ jobs: working-directory: python steps: - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v4 + with: + ref: ${{ github.event.pull_request.head.sha }} - name: Set up Python uses: actions/setup-python@v2 diff --git a/python/docs/examples/ingestion.ipynb b/python/docs/examples/ingestion.ipynb new file mode 100644 index 000000000..adf011421 --- /dev/null +++ b/python/docs/examples/ingestion.ipynb @@ -0,0 +1,102 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "0b202351", + "metadata": {}, + "source": [ + "# Sift Client Ingestion Basic Example\n", + "\n", + "This notebook demonstrates some examples features of SiftClient ingestion\n", + "- Initializing the Sift client\n", + "- Creating an ingestion config\n", + "- Creating a run\n", + "- Creating and sending flows\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "02268d76", + "metadata": { + "vscode": { + "languageId": "plaintext" + } + }, + "outputs": [], + "source": [ + "import asyncio\n", + "import random\n", + "import time\n", + "from datetime import datetime, timezone\n", + "\n", + "from sift_client import SiftClient, SiftConnectionConfig\n", + "from sift_client.sift_types import (\n", + " ChannelConfig,\n", + " ChannelDataType,\n", + " FlowConfig,\n", + " IngestionConfigCreate,\n", + " RunCreate,\n", + ")\n", + "\n", + "\n", + "async def main():\n", + " connection_config = SiftConnectionConfig(\n", + " api_key=\"my_api_key\",\n", + " grpc_url=\"sift_grpc_url\",\n", + " rest_url=\"sift_rest_url\",\n", + " )\n", + "\n", + " client = SiftClient(connection_config=connection_config)\n", + "\n", + " # Ingestion configs are created using SiftClient types\n", + " ingestion_config = IngestionConfigCreate(\n", + " asset_name=\"sift_rover_1\",\n", + " flows=[\n", + " FlowConfig(\n", + " name=\"onboard_sensors\",\n", + " channels=[\n", + " ChannelConfig(name=\"motor_temp\", unit=\"C\", data_type=ChannelDataType.DOUBLE),\n", + " ChannelConfig(\n", + " name=\"tank_pressure\", unit=\"kPa\", data_type=ChannelDataType.DOUBLE\n", + " ),\n", + " ],\n", + " )\n", + " ],\n", + " )\n", + "\n", + " run = RunCreate(name=\"sift_rover-\" + str(int(time.time())))\n", + "\n", + " async with await client.async_.ingestion.create_ingestion_config_streaming_client(\n", + " ingestion_config=ingestion_config,\n", + " run=run,\n", + " ) as ingest_client:\n", + " while True:\n", + " # Flows can be generated easily from the ingest client\n", + " flow_config = ingest_client.get_flow_config(flow_name=\"onboard_sensors\")\n", + " flow = flow_config.as_flow(\n", + " timestamp=datetime.now(timezone.utc),\n", + " values={\n", + " \"motor_temp\": 50.0 + random.random() * 5.0,\n", + " \"tank_pressure\": 2000.0 + random.random() * 100.0,\n", + " },\n", + " )\n", + " # Ingest the flow with .send()\n", + " await ingest_client.send(flow=flow)\n", + "\n", + " await asyncio.sleep(1)\n", + "\n", + "\n", + "if __name__ == \"__main__\":\n", + " asyncio.run(main())\n" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py index 8ebd9201a..0bf38cc37 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py @@ -8,15 +8,10 @@ from __future__ import annotations -import asyncio -import atexit import hashlib import logging -import threading -import time from collections import namedtuple -from queue import Queue -from typing import TYPE_CHECKING, Any, cast +from typing import TYPE_CHECKING, cast from sift.ingestion_configs.v2.ingestion_configs_pb2 import ( GetIngestionConfigRequest, @@ -31,7 +26,7 @@ from sift_client._internal.low_level_wrappers.base import ( LowLevelClientBase, ) -from sift_client.sift_types.ingestion import Flow, IngestionConfig, _to_rust_value +from sift_client.sift_types.ingestion import FlowConfig, IngestionConfig from sift_client.transport import GrpcClient, WithGrpcClient from sift_client.util import cel_utils as cel @@ -41,14 +36,25 @@ from datetime import datetime from sift_stream_bindings import ( + DurationPy, + FlowConfigPy, + FlowPy, IngestionConfigFormPy, IngestWithConfigDataStreamRequestPy, + MetadataPy, + RecoveryStrategyPy, + RunFormPy, + RunSelectorPy, SiftStreamBuilderPy, + SiftStreamMetricsSnapshotPy, + SiftStreamPy, TimeValuePy, ) + from sift_client.resources.ingestion import TracingConfig -def to_rust_py_timestamp(time: datetime) -> TimeValuePy: + +def _to_rust_py_timestamp(time: datetime) -> TimeValuePy: """Convert a Python datetime to a Rust TimeValuePy. Args: @@ -57,6 +63,7 @@ def to_rust_py_timestamp(time: datetime) -> TimeValuePy: Returns: A TimeValuePy representation """ + # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users from sift_stream_bindings import TimeValuePy ts = time.timestamp() @@ -65,106 +72,6 @@ def to_rust_py_timestamp(time: datetime) -> TimeValuePy: return TimeValuePy.from_timestamp(secs, nsecs) -class IngestionThread(threading.Thread): - """Manages ingestion for a single ingestion config.""" - - IDLE_LOOP_PERIOD = 0.1 # Time of intervals loop will sleep while waiting for data. - SIFT_STREAM_FINISH_TIMEOUT = 0.06 # Measured ~0.05s to finish stream. - CLEANUP_TIMEOUT = IDLE_LOOP_PERIOD + SIFT_STREAM_FINISH_TIMEOUT - - def __init__( - self, - sift_stream_builder: SiftStreamBuilderPy, - data_queue: Queue, - ingestion_config: IngestionConfigFormPy, - no_data_timeout: int = 1, - metric_interval: float = 0.5, - ): - """Initialize the IngestionThread. - - Args: - sift_stream_builder: The sift stream builder to build a new stream. - data_queue: The queue to put IngestWithConfigDataStreamRequestPy requests into for ingestion. - ingestion_config: The ingestion config to use for ingestion. - no_data_timeout: The number of (whole number) seconds to wait for data before stopping the thread (Saves minorly on startup resources. Ingesting new data will always restart the thread if it is stopped). - metric_interval: Time (seconds) to wait between logging metrics. - """ - super().__init__(daemon=True) - self.data_queue = data_queue - self._stop_event = threading.Event() - self.sift_stream_builder = sift_stream_builder - self.ingestion_config = ingestion_config - self.no_data_timeout = no_data_timeout - self.metric_interval = metric_interval - self.initialized = False - - def stop(self): - self._stop_event.set() - # Give a brief chance to finish the stream (should take < 50ms). - time.sleep(self.CLEANUP_TIMEOUT) - self.task.cancel() - - async def await_stream_build(self): - while not self.initialized: - await asyncio.sleep(0.01) - - async def main(self): - logger.debug("Ingestion thread started") - self.sift_stream_builder.ingestion_config = self.ingestion_config - sift_stream = await self.sift_stream_builder.build() - time_of_last_metric = time.time() - time_of_last_data = time.time() - count = 0 - self.initialized = True - try: - while True: - while not self.data_queue.empty(): - if self._stop_event.is_set(): - # Being forced to stop. Try to finish the stream. - logger.info( - f"Ingestion thread received stop signal. Exiting. Sent {count} requests. {self.data_queue.qsize()} requests remaining." - ) - await sift_stream.finish() - return - time_of_last_metric = time.time() - item = self.data_queue.get() - sift_stream = await sift_stream.send_requests(item) - count += 1 - time_since_last_metric = time.time() - time_of_last_metric - if time_since_last_metric > self.metric_interval: - logger.debug( - f"Ingestion thread sent {count} requests, remaining: {self.data_queue.qsize()}" - ) - time_of_last_metric = time.time() - - # Queue empty, check if we should stop. - time_since_last_data = time.time() - time_of_last_data - if self._stop_event.is_set() or time_since_last_data > self.no_data_timeout: - logger.debug( - f"No more requests. Stopping. Sent {count} requests. {self.data_queue.qsize()} requests remaining." - ) - await sift_stream.finish() - return - else: - await asyncio.sleep(self.IDLE_LOOP_PERIOD) - - except asyncio.CancelledError: - # It's possible the thread was joined while sleeping waiting for data. Only note error if we have data left. - if self.data_queue.qsize() > 0: - logger.error( - f"Ingestion thread cancelled without finishing stream. {self.data_queue.qsize()} requests were not sent." - ) - - async def _run(self): - self.task = asyncio.create_task(self.main()) - await self.task - - def run(self): - """This thread will handle sending data to Sift.""" - # Even thought this is a thread, we need to run this async task to await send_requests otherwise we get sift_stream consumed errors. - asyncio.run(self._run()) - - class IngestionLowLevelClient(LowLevelClientBase, WithGrpcClient): """Low-level client for the IngestionAPI. @@ -184,76 +91,14 @@ def __init__(self, grpc_client: GrpcClient): grpc_client: The gRPC client to use for making API calls. """ super().__init__(grpc_client=grpc_client) - self._sift_stream_builder = None # Lazy-initialized - self.stream_cache = {} - atexit.register(self.cleanup, timeout=0.1) - - def _ensure_sift_stream_bindings(self): - """Ensure sift_stream_bindings is available and initialize the stream builder. - Raises: - ImportError: If sift_stream_bindings is not installed. - """ - if self._sift_stream_builder is not None: - return - - try: - from sift_stream_bindings import ( - RecoveryStrategyPy, - RetryPolicyPy, - SiftStreamBuilderPy, - ) - except ImportError as e: - raise ImportError( - "The 'sift-stream' package is required for ingestion operations. " - "Please install it with:` `pip install sift-stack-py[sift-stream]`" - ) from e - - # Rust GRPC client expects URI to have http(s):// prefix. - uri = self._grpc_client._config.uri - if not uri.startswith("http"): - uri = f"https://{uri}" if self._grpc_client._config.use_ssl else f"http://{uri}" - self._sift_stream_builder = SiftStreamBuilderPy( - uri=uri, - apikey=self._grpc_client._config.api_key, - ) - self._sift_stream_builder.enable_tls = self._grpc_client._config.use_ssl - # FD-177: Expose configuration for recovery strategy. - self._sift_stream_builder.recovery_strategy = RecoveryStrategyPy.retry_only( - RetryPolicyPy.default() - ) - - @property - def sift_stream_builder(self) -> SiftStreamBuilderPy: - """Get the sift stream builder, initializing it if necessary.""" - self._ensure_sift_stream_bindings() - assert self._sift_stream_builder is not None - return self._sift_stream_builder - - def cleanup(self, timeout: float | None = None): - """Cleanup the ingestion threads. - - Args: - timeout: The timeout in seconds to wait for ingestion to complete. If None, will wait forever. - """ - for cache_entry in self.stream_cache.values(): - data_queue, ingestion_config, thread = cache_entry - # "None" value on the queue signals its loop to terminate. - if thread: - thread.join(timeout=timeout) - if thread.is_alive(): - logger.error( - f"Ingestion thread did not finish after {timeout} seconds. Forcing stop." - ) - thread.stop() - - async def get_ingestion_config_flows(self, ingestion_config_id: str) -> list[Flow]: + async def get_ingestion_config_flows(self, ingestion_config_id: str) -> list[FlowConfig]: """Get the flows for an ingestion config.""" res = await self._grpc_client.get_stub(IngestionConfigServiceStub).GetIngestionConfig( GetIngestionConfigRequest(ingestion_config_id=ingestion_config_id) ) res = cast("ListIngestionConfigFlowsResponse", res) - return [Flow._from_proto(flow) for flow in res.flows] + return [FlowConfig._from_proto(flow) for flow in res.flows] async def list_ingestion_configs(self, filter_query: str) -> list[IngestionConfig]: """List ingestion configs.""" @@ -275,199 +120,139 @@ async def get_ingestion_config_id_from_client_key(self, client_key: str) -> str ) return ingestion_configs[0].id_ - def _new_ingestion_thread( - self, - ingestion_config_id: str, - ingestion_config: IngestionConfigFormPy | None = None, - ): - """Start a new ingestion thread. - This allows ingestion to happen in the background regardless of if the user is using the sync or async client - and without them having to set up threading themselves. We are using a thread vs asyncio since our - sync wrapper will block on incomlete tasks. + def _hash_flows(self, asset_name: str, flows: list[FlowConfig]) -> str: + """Generate a client key that should be unique but deterministic for the given asset and flow configuration.""" + return _hash_flows(asset_name=asset_name, flows=flows) - Args: - ingestion_config_id: The id of the ingestion config for the flows this stream will ingest. Used to cache the stream. - ingestion_config: The ingestion config to use for ingestion. - """ - self._ensure_sift_stream_bindings() - data_queue: Queue[list[IngestWithConfigDataStreamRequestPy]] = Queue() - existing = self.stream_cache.get(ingestion_config_id) - if existing: - existing_data_queue, existing_ingestion_config, existing_thread = existing - if existing_thread.is_alive(): - return existing_thread +class IngestionConfigStreamingLowLevelClient(LowLevelClientBase): + DEFAULT_MAX_LOG_FILES = 7 # Equal to 1 week of logs + DEFAULT_LOGFILE_PREFIX = "sift_stream_bindings.log" + _sift_stream_instance: SiftStreamPy + _known_flows: dict[str, FlowConfig] + + def __init__(self, sift_stream_instance: SiftStreamPy, known_flows: dict[str, FlowConfig]): + super().__init__() + self._sift_stream_instance = sift_stream_instance + self._known_flows = known_flows + + @classmethod + async def create_sift_stream_instance( + cls, + api_key: str, + grpc_uri: str, + ingestion_config: IngestionConfigFormPy, + run_form: RunFormPy | None = None, + run_id: str | None = None, + asset_tags: list[str] | None = None, + asset_metadata: list[MetadataPy] | None = None, + recovery_strategy: RecoveryStrategyPy | None = None, + checkpoint_interval: DurationPy | None = None, + enable_tls: bool = True, + tracing_config: TracingConfig | None = None, + ) -> IngestionConfigStreamingLowLevelClient: + # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users + # TODO(nathan): Fix bindings to fix mypy issues with tracing functions + from sift_stream_bindings import ( # type: ignore[attr-defined] + SiftStreamBuilderPy, + init_tracing, # type: ignore[attr-defined] + init_tracing_with_file, # type: ignore[attr-defined] + is_tracing_initialized, # type: ignore[attr-defined] + ) # type: ignore[attr-defined] + + from sift_client.resources.ingestion import TracingConfig + + if not is_tracing_initialized(): + if tracing_config is None: + tracing_config = TracingConfig.with_file() + + if tracing_config.log_dir is not None: + # Use file logging + init_tracing_with_file( + tracing_config.level, + tracing_config.log_dir, + tracing_config.filename_prefix or cls.DEFAULT_LOGFILE_PREFIX, + tracing_config.max_log_files or cls.DEFAULT_MAX_LOG_FILES, + ) else: - ingestion_config = existing_ingestion_config - # Re-use existing queue since ingest_flow has already put data on it. - data_queue = existing_data_queue - assert ingestion_config is not None # Appease mypy. - thread = IngestionThread(self.sift_stream_builder, data_queue, ingestion_config) - thread.start() + # Use stdout/stderr only + init_tracing(tracing_config.level) - return self.CacheEntry(data_queue, ingestion_config, thread) + builder = SiftStreamBuilderPy( + uri=grpc_uri, + apikey=api_key, + ) - def _hash_flows(self, asset_name: str, flows: list[Flow]) -> str: - """Generate a client key that should be unique but deterministic for the given asset and flow configuration.""" - # TODO: Taken from sift_py/ingestion/config/telemetry.py. Confirm intent from Marc. - m = hashlib.sha256() - m.update(asset_name.encode()) - for flow in sorted(flows, key=lambda f: f.name): - m.update(flow.name.encode()) - # Do not sort channels in alphabetical order since order matters. - for channel in flow.channels: - m.update(channel.name.encode()) - # Use api_format for data type since that should be consistent between languages. - m.update(channel.data_type.hash_str(api_format=True).encode()) - m.update((channel.description or "").encode()) - m.update((channel.unit or "").encode()) - if channel.bit_field_elements: - for bfe in sorted(channel.bit_field_elements, key=lambda bfe: bfe.index): - m.update(bfe.name.encode()) - m.update(str(bfe.index).encode()) - m.update(str(bfe.bit_count).encode()) - if channel.enum_types: - for enum_name, enum_key in sorted( - channel.enum_types.items(), key=lambda it: it[1] - ): - m.update(str(enum_key).encode()) - m.update(enum_name.encode()) - - return m.hexdigest() - - async def create_ingestion_config( - self, - *, - asset_name: str, - flows: list[Flow], - client_key: str | None = None, - ) -> str: - """Create an ingestion config. + builder.enable_tls = enable_tls + builder.ingestion_config = ingestion_config + builder.recovery_strategy = recovery_strategy + builder.checkpoint_interval = checkpoint_interval + builder.asset_tags = asset_tags + builder.metadata = asset_metadata + builder.run = run_form + builder.run_id = run_id - Args: - asset_name: The name of the asset to ingest to. - flows: The flows to ingest. - client_key: The client key to use for ingestion. If not provided, a new one will be generated. - organization_id: The organization id to use for ingestion. Only needed if the user is part of several organizations. + sift_stream_instance = await builder.build() - Returns: - The id of the new or found ingestion config. - """ - from sift_stream_bindings import IngestionConfigFormPy - - self._ensure_sift_stream_bindings() - - ingestion_config_id = None - if client_key: - logger.debug(f"Getting ingestion config id for client key {client_key}") - ingestion_config_id = await self.get_ingestion_config_id_from_client_key(client_key) - if ingestion_config_id: - # Perform validation that the flows are valid for the ingestion config. - existing_flows = await self.get_ingestion_config_flows(ingestion_config_id) - for flow in flows: - if flow.name in {existing_flow.name for existing_flow in existing_flows}: - raise ValueError( - f"Flow {flow.name} already exists for ingestion client {client_key}" - ) - else: - client_key = self._hash_flows(asset_name, flows) - try: - logger.debug(f"Getting ingestion config id from generated client key {client_key}") - ingestion_config_id = await self.get_ingestion_config_id_from_client_key(client_key) - except ValueError: - logger.debug( - f"No ingestion config found for client key {client_key}. Creating new one." - ) - pass + known_flows = { + flow.name: FlowConfig._from_rust_config(flow) for flow in ingestion_config.flows + } - data_queue, ingestion_config, thread = ( - self.stream_cache.get(ingestion_config_id, (None, None, None)) - if ingestion_config_id - else (None, None, None) - ) - if not (thread and thread.is_alive()): - ingestion_config = IngestionConfigFormPy( - asset_name=asset_name, - flows=[flow._to_rust_config() for flow in flows], - client_key=client_key, - ) + return cls(sift_stream_instance, known_flows) - cache_entry = self._new_ingestion_thread(ingestion_config_id or "", ingestion_config) - if not ingestion_config_id: - # No ingestion config ID exists for client key but stream builder in ingestion thread should create it. - await cache_entry.thread.await_stream_build() - ingestion_config_id = await self.get_ingestion_config_id_from_client_key(client_key) - assert ingestion_config_id is not None, ( - "No ingestion config id found after building new stream. Likely server error." - ) - logger.debug(f"Built new stream for ingestion config {ingestion_config_id}") - self.stream_cache[ingestion_config_id] = cache_entry + async def send(self, flow: FlowPy): + await self._sift_stream_instance.send(flow) - for flow in flows: - flow.ingestion_config_id = ingestion_config_id + async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy]): + await self._sift_stream_instance.send_requests(requests) - if not ingestion_config_id: - raise ValueError("No ingestion config id found") - return ingestion_config_id + async def add_new_flows(self, flow_configs: list[FlowConfigPy]): + await self._sift_stream_instance.add_new_flows(flow_configs) + self._known_flows.update( + { + flow_config.name: FlowConfig._from_rust_config(flow_config) + for flow_config in flow_configs + } + ) - def wait_for_ingestion_to_complete(self, timeout: float | None = None): - """Blocks until all ingestion to complete. + async def attach_run(self, run_selector: RunSelectorPy): + await self._sift_stream_instance.attach_run(run_selector) - Args: - timeout: The timeout in seconds to wait for ingestion to complete. If None, will wait forever. - """ - logger.debug("Waiting for ingestion to complete") - self.cleanup(timeout) - - def ingest_flow( - self, - *, - flow: Flow, - timestamp: datetime, - channel_values: dict[str, Any], - organization_id: str | None = None, - ): - """Ingest a flow. This is a synchronous call that queues an ingestion request that will be processed asynchronously on a background thread. + def detach_run(self): + self._sift_stream_instance.detach_run() - Args: - flow: The flow to ingest. - timestamp: The timestamp of the flow. - channel_values: The channel values to ingest. - organization_id: The organization id to use for ingestion. Only relevant if the user is part of several organizations. - """ - from sift_stream_bindings import IngestWithConfigDataStreamRequestPy + def get_run_id(self) -> str | None: + return self._sift_stream_instance.run() - self._ensure_sift_stream_bindings() + async def finish(self): + await self._sift_stream_instance.finish() - if not flow.ingestion_config_id: - raise ValueError( - "Flow has no ingestion config id -- have you created an ingestion config for this flow?" - ) - cache_entry = self.stream_cache.get(flow.ingestion_config_id) - if not cache_entry: - raise ValueError( - f"Ingestion config {flow.ingestion_config_id} not found. Have you created an ingestion config for this flow?" - ) - rust_channel_values = [] - # Iterate through all expected channels for flow and convert to ingestion types (missing channels use a special empty type) + def get_metrics_snapshot(self) -> SiftStreamMetricsSnapshotPy: + return self._sift_stream_instance.get_metrics_snapshot() + + +def _hash_flows(asset_name: str, flows: list[FlowConfig]) -> str: + """Generate a client key that should be unique but deterministic for the given asset and flow configuration.""" + # TODO: Taken from sift_py/ingestion/config/telemetry.py. Confirm intent from Marc. + m = hashlib.sha256() + m.update(asset_name.encode()) + for flow in sorted(flows, key=lambda f: f.name): + m.update(flow.name.encode()) + # Do not sort channels in alphabetical order since order matters. for channel in flow.channels: - val = channel_values.get(channel.name) - rust_channel_values.append(_to_rust_value(channel, val)) - req = IngestWithConfigDataStreamRequestPy( - ingestion_config_id=flow.ingestion_config_id, - run_id=flow.run_id or "", - flow=flow.name, - timestamp=to_rust_py_timestamp(timestamp), - channel_values=rust_channel_values, - end_stream_on_validation_error=False, - organization_id=organization_id or "", # This will be filled in by the server - ) - data_queue, ingestion_config, thread = cache_entry - assert data_queue is not None - # Put data on queue before potentially starting a new thread so it doesn't initially sleep waiting for data. - data_queue.put([req]) - if not (thread and thread.is_alive()): - # We previously had a thread for this ingestion config but it finished ingestion so create a new one. - self.stream_cache[flow.ingestion_config_id] = self._new_ingestion_thread( - flow.ingestion_config_id, ingestion_config - ) + m.update(channel.name.encode()) + # Use api_format for data type since that should be consistent between languages. + m.update(channel.data_type.hash_str(api_format=True).encode()) + m.update((channel.description or "").encode()) + m.update((channel.unit or "").encode()) + if channel.bit_field_elements: + for bfe in sorted(channel.bit_field_elements, key=lambda bfe: bfe.index): + m.update(bfe.name.encode()) + m.update(str(bfe.index).encode()) + m.update(str(bfe.bit_count).encode()) + if channel.enum_types: + for enum_name, enum_key in sorted(channel.enum_types.items(), key=lambda it: it[1]): + m.update(str(enum_key).encode()) + m.update(enum_name.encode()) + + return m.hexdigest() diff --git a/python/lib/sift_client/_tests/resources/test_ingestion.py b/python/lib/sift_client/_tests/resources/test_ingestion.py deleted file mode 100644 index d64b91ea6..000000000 --- a/python/lib/sift_client/_tests/resources/test_ingestion.py +++ /dev/null @@ -1,444 +0,0 @@ -"""Pytest tests for the Ingestion API. - -These tests demonstrate and validate the usage of the Ingestion API including: -- Creating ingestion configurations -- Ingesting data with various channel types (double, enum, bit field) -- Flow management and validation -- High-speed and regular flow ingestion -- Error handling and edge cases -""" - -import math -import random -import time -from datetime import datetime, timedelta, timezone - -import pytest - -from sift_client import SiftClient -from sift_client.sift_types.channel import ChannelBitFieldElement, ChannelDataType -from sift_client.sift_types.ingestion import ChannelConfig, Flow - -pytestmark = pytest.mark.integration - -ASSET_NAME = "test-ingestion-asset" - - -def test_client_binding(sift_client): - assert getattr(sift_client, "ingestion", None) is None # Only async! - assert sift_client.async_.ingestion - - -@pytest.fixture -def test_run(sift_client: SiftClient): - """Create a test run for ingestion tests.""" - run = sift_client.runs.create( - { - "name": f"test-ingestion-run-{datetime.now(tz=timezone.utc).timestamp()}", - "description": "Test run for ingestion integration tests", - "tags": ["test", "ingestion", "pytest"], - } - ) - yield run - # Cleanup - sift_client.runs.archive(run=run) - - -class TestIngestionAPIAsync: - """Test suite for the async Ingestion API functionality.""" - - class TestCreateIngestionConfig: - """Tests for creating ingestion configurations.""" - - @pytest.mark.asyncio - async def test_create_basic_config(self, sift_client, test_run): - """Test creating a basic ingestion configuration.""" - flow = Flow( - name="test-basic-flow", - channels=[ - ChannelConfig(name="test-channel", data_type=ChannelDataType.DOUBLE), - ], - ) - - config_id = await sift_client.async_.ingestion.create_ingestion_config( - asset_name=ASSET_NAME, - run_id=test_run.id_, - flows=[flow], - ) - - assert config_id is not None - assert isinstance(config_id, str) - - @pytest.mark.asyncio - async def test_create_config_with_multiple_flows(self, sift_client, test_run): - """Test creating an ingestion configuration with multiple flows.""" - regular_flow = Flow( - name="test-regular-flow", - channels=[ - ChannelConfig(name="regular-channel", data_type=ChannelDataType.DOUBLE), - ], - ) - - highspeed_flow = Flow( - name="test-highspeed-flow", - channels=[ - ChannelConfig(name="highspeed-channel", data_type=ChannelDataType.DOUBLE), - ], - ) - - config_id = await sift_client.async_.ingestion.create_ingestion_config( - asset_name=ASSET_NAME, - run_id=test_run.id_, - flows=[regular_flow, highspeed_flow], - ) - - assert config_id is not None - - @pytest.mark.asyncio - async def test_create_config_with_enum_channel(self, sift_client, test_run): - """Test creating an ingestion configuration with enum channel.""" - flow = Flow( - name="test-enum-flow", - channels=[ - ChannelConfig( - name="test-enum-channel", - data_type=ChannelDataType.ENUM, - enum_types={"state1": 1, "state2": 2, "state3": 3}, - ), - ], - ) - - config_id = await sift_client.async_.ingestion.create_ingestion_config( - asset_name=ASSET_NAME, - run_id=test_run.id_, - flows=[flow], - ) - - assert config_id is not None - - @pytest.mark.asyncio - async def test_create_config_with_bit_field_channel(self, sift_client, test_run): - """Test creating an ingestion configuration with bit field channel.""" - flow = Flow( - name="test-bitfield-flow", - channels=[ - ChannelConfig( - name="test-bit-field-channel", - data_type=ChannelDataType.BIT_FIELD, - bit_field_elements=[ - ChannelBitFieldElement(name="voltage", index=0, bit_count=4), - ChannelBitFieldElement(name="current", index=4, bit_count=2), - ChannelBitFieldElement(name="status", index=6, bit_count=2), - ], - ), - ], - ) - - config_id = await sift_client.async_.ingestion.create_ingestion_config( - asset_name=ASSET_NAME, - run_id=test_run.id_, - flows=[flow], - ) - - assert config_id is not None - - @pytest.mark.asyncio - async def test_flow_sealed_after_config_creation(self, sift_client, test_run): - """Test that flows are sealed after ingestion config creation.""" - flow = Flow( - name="test-sealed-flow", - channels=[ - ChannelConfig(name="test-channel", data_type=ChannelDataType.DOUBLE), - ], - ) - - await sift_client.async_.ingestion.create_ingestion_config( - asset_name=ASSET_NAME, - run_id=test_run.id_, - flows=[flow], - ) - - # Try to add a channel after config creation - with pytest.raises(ValueError, match="Cannot add a channel to a flow after creation"): - flow.add_channel( - ChannelConfig(name="new-channel", data_type=ChannelDataType.DOUBLE) - ) - - class TestIngestData: - """Tests for ingesting data.""" - - @pytest.mark.asyncio - async def test_ingest_double_data(self, sift_client, test_run): - """Test ingesting double data.""" - flow = Flow( - name="test-double-flow", - channels=[ - ChannelConfig(name="double-channel", data_type=ChannelDataType.DOUBLE), - ], - ) - - await sift_client.async_.ingestion.create_ingestion_config( - asset_name=ASSET_NAME, - run_id=test_run.id_, - flows=[flow], - ) - - start_time = datetime.now(tz=timezone.utc) - for i in range(10): - timestamp = start_time + timedelta(seconds=i) - flow.ingest( - timestamp=timestamp, - channel_values={"double-channel": float(i)}, - ) - - sift_client.async_.ingestion.wait_for_ingestion_to_complete(timeout=2) - - @pytest.mark.asyncio - async def test_ingest_enum_data(self, sift_client, test_run): - """Test ingesting enum data.""" - flow = Flow( - name="test-enum-ingest-flow", - channels=[ - ChannelConfig( - name="enum-channel", - data_type=ChannelDataType.ENUM, - enum_types={"low": 1, "medium": 2, "high": 3}, - ), - ], - ) - - await sift_client.async_.ingestion.create_ingestion_config( - asset_name=ASSET_NAME, - run_id=test_run.id_, - flows=[flow], - ) - - start_time = datetime.now(tz=timezone.utc) - for i in range(10): - timestamp = start_time + timedelta(seconds=i) - flow.ingest( - timestamp=timestamp, - channel_values={"enum-channel": (i % 3) + 1}, - ) - - sift_client.async_.ingestion.wait_for_ingestion_to_complete(timeout=2) - - @pytest.mark.asyncio - async def test_ingest_bit_field_data_as_dict(self, sift_client, test_run): - """Test ingesting bit field data as dictionary.""" - flow = Flow( - name="test-bitfield-ingest-flow", - channels=[ - ChannelConfig( - name="bitfield-channel", - data_type=ChannelDataType.BIT_FIELD, - bit_field_elements=[ - ChannelBitFieldElement(name="voltage", index=0, bit_count=4), - ChannelBitFieldElement(name="current", index=4, bit_count=2), - ChannelBitFieldElement(name="led", index=6, bit_count=1), - ChannelBitFieldElement(name="heater", index=7, bit_count=1), - ], - ), - ], - ) - - await sift_client.async_.ingestion.create_ingestion_config( - asset_name=ASSET_NAME, - run_id=test_run.id_, - flows=[flow], - ) - - start_time = datetime.now(tz=timezone.utc) - for i in range(10): - timestamp = start_time + timedelta(seconds=i) - flow.ingest( - timestamp=timestamp, - channel_values={ - "bitfield-channel": { - "voltage": random.randint(3, 13), - "current": random.randint(1, 3), - "led": random.choice([0, 1]), - "heater": random.choice([0, 1]), - } - }, - ) - - sift_client.async_.ingestion.wait_for_ingestion_to_complete(timeout=2) - - @pytest.mark.asyncio - async def test_ingest_bit_field_data_as_bytes(self, sift_client, test_run): - """Test ingesting bit field data as bytes.""" - flow = Flow( - name="test-bitfield-bytes-flow", - channels=[ - ChannelConfig( - name="bitfield-channel", - data_type=ChannelDataType.BIT_FIELD, - bit_field_elements=[ - ChannelBitFieldElement(name="field1", index=0, bit_count=4), - ChannelBitFieldElement(name="field2", index=4, bit_count=4), - ], - ), - ], - ) - - await sift_client.async_.ingestion.create_ingestion_config( - asset_name=ASSET_NAME, - run_id=test_run.id_, - flows=[flow], - ) - - timestamp = datetime.now(tz=timezone.utc) - flow.ingest( - timestamp=timestamp, - channel_values={"bitfield-channel": bytes([0b11110000])}, - ) - - sift_client.async_.ingestion.wait_for_ingestion_to_complete(timeout=2) - - @pytest.mark.asyncio - async def test_ingest_multiple_channels(self, sift_client, test_run): - """Test ingesting data for multiple channels simultaneously.""" - flow = Flow( - name="test-multi-channel-flow", - channels=[ - ChannelConfig(name="channel1", data_type=ChannelDataType.DOUBLE), - ChannelConfig( - name="channel2", - data_type=ChannelDataType.ENUM, - enum_types={"a": 1, "b": 2}, - ), - ChannelConfig( - name="channel3", - data_type=ChannelDataType.BIT_FIELD, - bit_field_elements=[ - ChannelBitFieldElement(name="bit1", index=0, bit_count=4), - ChannelBitFieldElement(name="bit2", index=4, bit_count=4), - ], - ), - ], - ) - - await sift_client.async_.ingestion.create_ingestion_config( - asset_name=ASSET_NAME, - run_id=test_run.id_, - flows=[flow], - ) - - start_time = datetime.now(tz=timezone.utc) - for i in range(5): - timestamp = start_time + timedelta(seconds=i) - flow.ingest( - timestamp=timestamp, - channel_values={ - "channel1": float(i), - "channel2": (i % 2) + 1, - "channel3": {"bit1": i % 16, "bit2": (i * 2) % 16}, - }, - ) - - sift_client.async_.ingestion.wait_for_ingestion_to_complete(timeout=2) - - @pytest.mark.asyncio - async def test_ingest_highspeed_data(self, sift_client, test_run): - """Test ingesting high-speed data.""" - flow = Flow( - name="test-highspeed-data-flow", - channels=[ - ChannelConfig(name="highspeed-channel", data_type=ChannelDataType.DOUBLE), - ], - ) - - await sift_client.async_.ingestion.create_ingestion_config( - asset_name=ASSET_NAME, - run_id=test_run.id_, - flows=[flow], - ) - - start_time = datetime.now(tz=timezone.utc) - fake_hs_rate = 50 # Hz - fake_hs_period = 1 / fake_hs_rate - duration = 2 # seconds - - for i in range(duration): - for j in range(fake_hs_rate): - val = 3.0 * math.sin(2 * math.pi * fake_hs_rate * (i + j * 0.001)) - timestamp = start_time + timedelta( - seconds=i, milliseconds=j * fake_hs_period * 1000 - ) - flow.ingest( - timestamp=timestamp, - channel_values={"highspeed-channel": val}, - ) - time.sleep(0.01) - - sift_client.async_.ingestion.wait_for_ingestion_to_complete(timeout=2) - - class TestIngestionValidation: - """Tests for ingestion validation and error handling.""" - - @pytest.mark.asyncio - async def test_ingest_invalid_enum_value_raises_error(self, sift_client, test_run): - """Test that ingesting an invalid enum value raises an error.""" - flow = Flow( - name="test-enum-validation-flow", - channels=[ - ChannelConfig( - name="enum-channel", - data_type=ChannelDataType.ENUM, - enum_types={"valid1": 1, "valid2": 2}, - ), - ], - ) - - await sift_client.async_.ingestion.create_ingestion_config( - asset_name=ASSET_NAME, - run_id=test_run.id_, - flows=[flow], - ) - - timestamp = datetime.now(tz=timezone.utc) - # Test with invalid integer - with pytest.raises(ValueError, match="Could not find enum value"): - flow.ingest( - timestamp=timestamp, - channel_values={"enum-channel": 99}, - ) - - # Test with invalid string - with pytest.raises(ValueError, match="Could not find enum value"): - flow.ingest( - timestamp=timestamp, - channel_values={"enum-channel": "invalid-enum"}, - ) - - @pytest.mark.asyncio - async def test_resume_ingestion_after_wait(self, sift_client, test_run): - """Test that ingestion can resume after waiting for completion.""" - flow = Flow( - name="test-resume-flow", - channels=[ - ChannelConfig(name="test-channel", data_type=ChannelDataType.DOUBLE), - ], - ) - - await sift_client.async_.ingestion.create_ingestion_config( - asset_name=ASSET_NAME, - run_id=test_run.id_, - flows=[flow], - ) - - # First batch - timestamp1 = datetime.now(tz=timezone.utc) - flow.ingest(timestamp=timestamp1, channel_values={"test-channel": 1.0}) - - sift_client.async_.ingestion.wait_for_ingestion_to_complete(timeout=2) - - # Wait a bit - time.sleep(0.1) - - # Second batch after wait - timestamp2 = timestamp1 + timedelta(seconds=2) - flow.ingest(timestamp=timestamp2, channel_values={"test-channel": 2.0}) - - sift_client.async_.ingestion.wait_for_ingestion_to_complete(timeout=2) diff --git a/python/lib/sift_client/_tests/resources/test_runs.py b/python/lib/sift_client/_tests/resources/test_runs.py index 798555635..aebe1c487 100644 --- a/python/lib/sift_client/_tests/resources/test_runs.py +++ b/python/lib/sift_client/_tests/resources/test_runs.py @@ -487,6 +487,65 @@ async def test_stop_run_with_start_time(self, runs_api_async, new_run): class TestAssetAssociation: """Tests for the async asset association methods.""" + @pytest.mark.asyncio + async def test_create_adhoc_run_all( + self, runs_api_async, sift_client, test_tag, ci_pytest_tag + ): + """Test creating an adhoc run with associated assets.""" + run_name = f"test_adhoc_run_assets_{datetime.now(timezone.utc).isoformat()}" + + start_time = datetime.now(timezone.utc) - timedelta(hours=2) + stop_time = datetime.now(timezone.utc) - timedelta(hours=1) + # Get some assets to associate + assets = await sift_client.async_.assets.list_(limit=2) + assert len(assets) == 2 + tags = [test_tag, ci_pytest_tag] + + run_create = RunCreate( + name=run_name, + description="Test adhoc run", + start_time=start_time, + stop_time=stop_time, + tags=tags, + metadata={"test_key": "test_value", "number": 42.5, "flag": True}, + ) + created_run = await runs_api_async.create( + run_create, assets=assets, associate_new_data=False + ) + + try: + assert created_run.name == run_name + assert created_run.is_adhoc is True + assert created_run.asset_ids is not None + assert len(created_run.asset_ids) >= len(assets) + # Verify all requested assets are in the run's asset_ids + for asset in assets: + assert asset.id_ in created_run.asset_ids + assert created_run.metadata is not None + assert created_run.metadata["test_key"] == "test_value" + assert created_run.metadata["number"] == 42.5 + assert created_run.metadata["flag"] is True + assert set(created_run.tags) == {tag.name for tag in tags} + finally: + await runs_api_async.archive(created_run) + + @pytest.mark.asyncio + async def test_create_adhoc_run_missing_assets(self, runs_api_async): + """Test creating an adhoc run with missing assets.""" + run_name = f"test_adhoc_run_missing_assets_{datetime.now(timezone.utc).isoformat()}" + run_create = RunCreate( + name=run_name, + start_time=datetime.now(timezone.utc), + stop_time=datetime.now(timezone.utc) + timedelta(seconds=11), + ) + with pytest.raises( + AioRpcError, + match='invalid argument: invalid input syntax for type uuid: "asset-name-not-id"', + ): + await runs_api_async.create( + run_create, assets=["asset-name-not-id"], associate_new_data=False + ) + @pytest.mark.asyncio async def test_create_automatic_association_for_assets(self, runs_api_async, sift_client): """Test associating assets with a run for automatic data ingestion.""" diff --git a/python/lib/sift_client/_tests/sift_types/test_ingestion.py b/python/lib/sift_client/_tests/sift_types/test_ingestion.py index 6b29abafe..cd31221d6 100644 --- a/python/lib/sift_client/_tests/sift_types/test_ingestion.py +++ b/python/lib/sift_client/_tests/sift_types/test_ingestion.py @@ -6,7 +6,11 @@ import pytest from sift_client.sift_types.channel import ChannelBitFieldElement, ChannelDataType -from sift_client.sift_types.ingestion import ChannelConfig, Flow, IngestionConfig +from sift_client.sift_types.ingestion import ( + ChannelConfig, + FlowConfig, + IngestionConfig, +) class TestChannelConfig: @@ -69,98 +73,67 @@ def test_other_data_types_dont_require_special_fields(self): assert channel.data_type == ChannelDataType.DOUBLE -@pytest.fixture -def mock_flow(mock_client): - """Create a mock Flow instance for testing.""" - flow = Flow( - proto=MagicMock(), - name="test_flow", - channels=[ - ChannelConfig( - name="channel1", - data_type=ChannelDataType.DOUBLE, - description="Test channel 1", - ), - ChannelConfig( - name="channel2", - data_type=ChannelDataType.FLOAT, - description="Test channel 2", - ), - ], - ingestion_config_id="test_config_id", - run_id="test_run_id", - ) - flow._apply_client_to_instance(mock_client) - return flow - - -class TestFlow: - """Unit tests for Flow model - tests methods.""" - - def test_add_channel_success(self): - """Test that add_channel() adds a channel when no ingestion_config_id is set.""" - flow = Flow( +class TestFlowConfig: + """Unit tests for FlowConfig model.""" + + def test_as_flow_creates_flow_with_values(self): + """Test that as_flow creates a Flow with correct channel values.""" + flow_config = FlowConfig( name="test_flow", - channels=[], - ingestion_config_id=None, + channels=[ + ChannelConfig(name="channel1", data_type=ChannelDataType.DOUBLE), + ChannelConfig(name="channel2", data_type=ChannelDataType.INT_64), + ], ) - channel = ChannelConfig( - name="new_channel", - data_type=ChannelDataType.DOUBLE, - ) + timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + values = {"channel1": 42.5, "channel2": 100} - # Should not raise - flow.add_channel(channel) + flow = flow_config.as_flow(timestamp=timestamp, values=values) - assert len(flow.channels) == 1 - assert flow.channels[0].name == "new_channel" + assert flow.flow == "test_flow" + assert flow.timestamp == timestamp + assert len(flow.channel_values) == 2 + assert flow.channel_values[0].name == "channel1" + assert flow.channel_values[0].value == 42.5 + assert flow.channel_values[1].name == "channel2" + assert flow.channel_values[1].value == 100 - def test_add_channel_raises_after_creation(self): - """Test that add_channel() raises ValueError when ingestion_config_id is set.""" - flow = Flow( + def test_as_flow_raises_on_unknown_channel(self): + """Test that as_flow raises ValueError for unknown channel values.""" + flow_config = FlowConfig( name="test_flow", - channels=[], - ingestion_config_id="config123", + channels=[ChannelConfig(name="channel1", data_type=ChannelDataType.DOUBLE)], ) - channel = ChannelConfig( - name="new_channel", - data_type=ChannelDataType.DOUBLE, - ) - - with pytest.raises(ValueError, match="Cannot add a channel to a flow after creation"): - flow.add_channel(channel) - - def test_ingest_calls_client(self, mock_flow, mock_client): - """Test that ingest() calls client.async_.ingestion.ingest with correct parameters.""" timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) - channel_values = {"channel1": 42.5, "channel2": 100.0} + values = {"channel1": 42.5, "unknown_channel": 100} - # Call ingest - mock_flow.ingest(timestamp=timestamp, channel_values=channel_values) - - # Verify client method was called with correct parameters - mock_client.async_.ingestion.ingest.assert_called_once_with( - flow=mock_flow, - timestamp=timestamp, - channel_values=channel_values, - ) + with pytest.raises( + ValueError, + match="Provided channel values which do not exist in the flow config", + ): + flow_config.as_flow(timestamp=timestamp, values=values) - def test_ingest_raises_without_config_id(self, mock_client): - """Test that ingest() raises ValueError when ingestion_config_id is not set.""" - flow = Flow( + def test_as_flow_only_includes_provided_channels(self): + """Test that as_flow only includes channels with provided values.""" + flow_config = FlowConfig( name="test_flow", - channels=[], - ingestion_config_id=None, + channels=[ + ChannelConfig(name="channel1", data_type=ChannelDataType.DOUBLE), + ChannelConfig(name="channel2", data_type=ChannelDataType.FLOAT), + ChannelConfig(name="channel3", data_type=ChannelDataType.INT_64), + ], ) - flow._apply_client_to_instance(mock_client) timestamp = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) - channel_values = {"channel1": 42.5} + values = {"channel1": 42.5, "channel3": 100} + + flow = flow_config.as_flow(timestamp=timestamp, values=values) - with pytest.raises(ValueError, match="Ingestion config ID is not set"): - flow.ingest(timestamp=timestamp, channel_values=channel_values) + assert len(flow.channel_values) == 2 + assert flow.channel_values[0].name == "channel1" + assert flow.channel_values[1].name == "channel3" class TestIngestionConfig: diff --git a/python/lib/sift_client/resources/__init__.py b/python/lib/sift_client/resources/__init__.py index 968fabdb3..7b2eacc29 100644 --- a/python/lib/sift_client/resources/__init__.py +++ b/python/lib/sift_client/resources/__init__.py @@ -153,7 +153,7 @@ async def main(): from sift_client.resources.assets import AssetsAPIAsync from sift_client.resources.calculated_channels import CalculatedChannelsAPIAsync from sift_client.resources.channels import ChannelsAPIAsync -from sift_client.resources.ingestion import IngestionAPIAsync +from sift_client.resources.ingestion import IngestionAPIAsync, TracingConfig from sift_client.resources.ping import PingAPIAsync from sift_client.resources.reports import ReportsAPIAsync from sift_client.resources.rules import RulesAPIAsync @@ -194,4 +194,5 @@ async def main(): "TagsAPIAsync", "TestResultsAPI", "TestResultsAPIAsync", + "TracingConfig", ] diff --git a/python/lib/sift_client/resources/ingestion.py b/python/lib/sift_client/resources/ingestion.py index a04368c21..59668378f 100644 --- a/python/lib/sift_client/resources/ingestion.py +++ b/python/lib/sift_client/resources/ingestion.py @@ -1,20 +1,217 @@ from __future__ import annotations import logging -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING -from sift_client._internal.low_level_wrappers.ingestion import IngestionLowLevelClient +from sift_client._internal.low_level_wrappers.ingestion import ( + IngestionConfigStreamingLowLevelClient, + IngestionLowLevelClient, +) from sift_client.resources._base import ResourceBase +from sift_client.sift_types.ingestion import Flow, IngestionConfig, IngestionConfigCreate +from sift_client.sift_types.run import Run, RunCreate, Tag if TYPE_CHECKING: - from datetime import datetime + from sift_stream_bindings import ( + DiskBackupPolicyPy, + DurationPy, + FlowPy, + IngestionConfigFormPy, + IngestWithConfigDataStreamRequestPy, + MetadataPy, + RecoveryStrategyPy, + RetryPolicyPy, + RunFormPy, + SiftStreamMetricsSnapshotPy, + ) from sift_client.client import SiftClient - from sift_client.sift_types.ingestion import Flow + from sift_client.sift_types.ingestion import FlowConfig logger = logging.getLogger(__name__) +class TracingConfig: + """Configuration for tracing in SiftStream. + + This class provides factory methods to create tracing configurations for use + with IngestionConfigStreamingClient. Tracing will only be initialized once per process. + """ + + def __init__( + self, + is_enabled: bool = True, + level: str = "info", + log_dir: str | None = None, + filename_prefix: str | None = None, + max_log_files: int | None = None, + ): + """Initialize a TracingConfig. + + Args: + is_enabled: Whether tracing is enabled. Defaults to True. + level: Logging level as string - one of "trace", "debug", "info", "warn", "error". + Defaults to "info". + log_dir: Directory path for log files. Required if using file logging. + Defaults to "./logs" when using with_file. + filename_prefix: Prefix for log filenames. Required if using file logging. + Defaults to "sift_stream_bindings.log" when using with_file. + max_log_files: Maximum number of log files to keep. Required if using file logging. + Defaults to 7 when using with_file. + """ + self.is_enabled = is_enabled + self.level = level + self.log_dir = log_dir + self.filename_prefix = filename_prefix + self.max_log_files = max_log_files + + @classmethod + def disabled(cls) -> TracingConfig: + """Create a configuration that disables tracing. + + Returns: + A TracingConfig with tracing disabled. + """ + return cls(is_enabled=False) + + @classmethod + def console_only(cls, level: str = "info") -> TracingConfig: + """Create a configuration that enables tracing to stdout/stderr only. + + Args: + level: Logging level as string - one of "trace", "debug", "info", "warn", "error". + Defaults to "info". + + Returns: + A TracingConfig with tracing enabled (outputs to stdout/stderr only). + """ + return cls(level=level) + + @classmethod + def with_file( + cls, + level: str = "info", + log_dir: str = "./logs", + filename_prefix: str = "sift_stream_bindings.log", + max_log_files: int = 7, + ) -> TracingConfig: + """Create a configuration that enables tracing to both stdout and rolling log files. + + Args: + level: Logging level as string - one of "trace", "debug", "info", "warn", "error". + Defaults to "info". + log_dir: Directory path for log files. Defaults to "./logs". + filename_prefix: Prefix for log filenames. Defaults to "sift_stream_bindings.log". + max_log_files: Maximum number of log files to keep. Defaults to 7. + + Returns: + A TracingConfig with tracing enabled for both stdout and file output. + """ + return cls( + level=level, + log_dir=log_dir, + filename_prefix=filename_prefix, + max_log_files=max_log_files, + ) + + +class RecoveryStrategyConfig: + """Configuration for the SiftStream recovery strategy. + + This class provides a Python-friendly interface for configuring the recovery strategy used in SiftStream. + Recovery strategies determine how SiftStream handles failures and retries when ingesting data. + + Recovery strategies control: + - How frequently to retry a failed connection to Sift. + - Whether to use per checkpoint backups to allow re-ingestion of data to Sift after a streaming failure. + - Settings to control the number and size of backup files, and whether to retain backups after verification of successful ingestion into sift. + + Most users should use one of the factory methods: + - `retry_only()` - Only attempt to reconnect to Sift after a connection failure. Any data which failed to be ingested will be lost. + - More performant, but with no guarantee of data ingestion. + - `retry_with_backups()` - Ingestion is checkpointed. If an ingestion issue occurs during a checkpoint, that data will be re-ingested into Sift + asynchronously along with incoming live data. Backup files are generated and by default, cleared after a successful checkpoint or re-ingestion. + """ + + def __init__(self, recovery_strategy_py: RecoveryStrategyPy | None): + """Initialize a RecoveryStrategyConfig. + + Args: + recovery_strategy_py: The underlying RecoveryStrategyPy instance. + If None, uses the default retry_with_backups strategy. + + Note: + Most users should use the factory methods (`retry_only()` or `retry_with_backups()`) + instead of calling this constructor directly. + """ + # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users + from sift_stream_bindings import DiskBackupPolicyPy, RecoveryStrategyPy, RetryPolicyPy + + # Default to retry_with_backups() + # This is intentionally different from SiftStream, which defaults to retry_only + self._recovery_strategy_py = recovery_strategy_py or RecoveryStrategyPy.retry_with_backups( + retry_policy=RetryPolicyPy.default(), disk_backup_policy=DiskBackupPolicyPy.default() + ) + + def _to_rust_config(self) -> RecoveryStrategyPy: + """Convert to RecoveryStrategyPy for use with the ingestion client. + + Returns: + A RecoveryStrategyPy instance that can be passed to the ingestion client. + """ + return self._recovery_strategy_py + + @classmethod + def retry_only(cls, retry_policy: RetryPolicyPy | None = None) -> RecoveryStrategyConfig: + """Create a recovery strategy that only retries connection failures. + + Args: + retry_policy: Retry policy configuration specifying retry attempts, backoff timing, etc. + If None, uses the default retry policy (5 attempts, 50ms initial backoff, + 5s max backoff, multiplier of 5). + + Returns: + A RecoveryStrategyConfig configured for retry-only strategy. + """ + # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users + from sift_stream_bindings import RecoveryStrategyPy, RetryPolicyPy + + retry_policy_py = retry_policy or RetryPolicyPy.default() + + recovery_strategy_py = RecoveryStrategyPy.retry_only(retry_policy_py) + return cls(recovery_strategy_py=recovery_strategy_py) + + @classmethod + def retry_with_backups( + cls, + retry_policy: RetryPolicyPy | None = None, + disk_backup_policy: DiskBackupPolicyPy | None = None, + ) -> RecoveryStrategyConfig: + """Create a recovery strategy with retries re-ingestion using disk based backups. + + Args: + retry_policy: Retry policy configuration specifying retry attempts, backoff timing, etc. + If None, uses the default retry policy (5 attempts, 50ms initial backoff, + 5s max backoff, multiplier of 5). + disk_backup_policy: Disk backup policy configuration specifying backup directory, + file size limits, etc. If None, uses the default disk backup policy. + + Returns: + A RecoveryStrategyConfig configured for retry with disk backups. + """ + # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users + from sift_stream_bindings import DiskBackupPolicyPy, RecoveryStrategyPy, RetryPolicyPy + + retry_policy_py = retry_policy or RetryPolicyPy.default() + disk_backup_policy_py = disk_backup_policy or DiskBackupPolicyPy.default() + + recovery_strategy_py = RecoveryStrategyPy.retry_with_backups( + retry_policy=retry_policy_py, + disk_backup_policy=disk_backup_policy_py, + ) + return cls(recovery_strategy_py=recovery_strategy_py) + + class IngestionAPIAsync(ResourceBase): """High-level API for interacting with ingestion services. @@ -34,73 +231,333 @@ def __init__(self, sift_client: SiftClient): super().__init__(sift_client) self._low_level_client = IngestionLowLevelClient(grpc_client=self.client.grpc_client) - async def create_ingestion_config( + async def create_ingestion_config_streaming_client( self, + ingestion_config: IngestionConfig | IngestionConfigCreate | IngestionConfigFormPy, *, - asset_name: str, - run_id: str | None = None, - flows: list[Flow], - client_key: str | None = None, - organization_id: str | None = None, - ) -> str: - """Create an ingestion config. + run: RunCreate | dict | str | Run | None = None, + asset_tags: list[str] | list[Tag] | None = None, + asset_metadata: dict[str, str | float | bool] | None = None, + recovery_strategy: RecoveryStrategyConfig | RecoveryStrategyPy | None = None, + checkpoint_interval_seconds: int | None = None, + enable_tls: bool = True, + tracing_config: TracingConfig | None = None, + ) -> IngestionConfigStreamingClient: + """Create an IngestionConfigStreamingClient. Args: - asset_name: The name of the asset for this ingestion config. - run_id: Optionally provide a run ID to create a run for the given asset. - flows: List of flow configurations. - client_key: Optional client key for identifying this config. - organization_id: The organization ID. + ingestion_config: The ingestion config. Can be a IngestionConfig or IngestionConfigFormPy. + run: The run to associate with ingestion. Can be a Run, RunCreate, dict, or run ID string. + asset_tags: Tags to associate with the asset. + asset_metadata: Metadata to associate with the asset. + recovery_strategy: The recovery strategy to use for ingestion. + checkpoint_interval_seconds: The checkpoint interval in seconds. + enable_tls: Whether to enable TLS for the connection. + tracing_config: Configuration for SiftStream tracing. Use TracingConfig.stdout_only() + to enable tracing to stdout only, or TracingConfig.stdout_with_file() to enable + tracing to both stdout and rolling log files. Defaults to None (tracing will be + initialized with default settings if not already initialized). Returns: - The ingestion config ID. - - Raises: - ValueError: If asset_name is not provided or flows is empty. + An initialized IngestionConfigStreamingClient. """ - if not asset_name: - raise ValueError("asset_name must be provided") - if not flows: - raise ValueError("flows must not be empty") - - ingestion_config_id = await self._low_level_client.create_ingestion_config( - asset_name=asset_name, - flows=flows, - client_key=client_key, + return await IngestionConfigStreamingClient._create( + self.client, + ingestion_config=ingestion_config, + run=run, + asset_tags=asset_tags, + asset_metadata=asset_metadata, + recovery_strategy=recovery_strategy, + checkpoint_interval_seconds=checkpoint_interval_seconds, + enable_tls=enable_tls, + tracing_config=tracing_config, ) - for flow in flows: - flow._apply_client_to_instance(self.client) - if run_id: - flow.run_id = run_id - return ingestion_config_id - def ingest( - self, - *, - flow: Flow, - timestamp: datetime, - channel_values: dict[str, Any], +class IngestionConfigStreamingClient(ResourceBase): + """A client for streaming ingestion with an ingestion config. + + This client provides a high-level interface for streaming data to Sift using + an ingestion config. Under the hood, this client uses the Rust powered SiftStream library to provide + a high-performance, low-latency, and reliable streaming interface to Sift. + + This client should be initialized using the create classmethod, and not directly. Once streaming has ended, the client should be shutdown using the finish method. + """ + + def __init__( + self, sift_client: SiftClient, low_level_client: IngestionConfigStreamingLowLevelClient ): - """Ingest data for a flow. + """Initialize an IngestionConfigStreamingClient. Users should not initialize this class directly, but rather use the create classmethod.""" + super().__init__(sift_client) + self._low_level_client = low_level_client + + @classmethod + async def _create( + cls, + sift_client: SiftClient, + ingestion_config: IngestionConfig | IngestionConfigCreate | IngestionConfigFormPy, + *, + run: RunCreate | dict | str | Run | RunFormPy | None = None, + asset_tags: list[str] | list[Tag] | None = None, + asset_metadata: dict[str, str | float | bool] | None = None, + recovery_strategy: RecoveryStrategyConfig | RecoveryStrategyPy | None = None, + checkpoint_interval_seconds: int | None = None, + enable_tls: bool = True, + tracing_config: TracingConfig | None = None, + ) -> IngestionConfigStreamingClient: + """Create an IngestionConfigStreamingClient. Args: - flow: The flow to ingest data for. - timestamp: The timestamp of the data. - channel_values: Dictionary mapping channel names to their values. - """ - self._low_level_client.ingest_flow( - flow=flow, - timestamp=timestamp, - channel_values=channel_values, + sift_client: The Sift client to use. + ingestion_config: The ingestion config to use for streaming. + run: The run to associate with ingestion. Can be a Run, RunCreate, dict, or run ID string. + asset_tags: Tags to associate with the asset. + asset_metadata: Metadata to associate with the asset. + recovery_strategy: The recovery strategy to use for ingestion. + checkpoint_interval_seconds: The checkpoint interval in seconds. + enable_tls: Whether to enable TLS for the connection. + tracing_config: Configuration for SiftStream tracing. Use TracingConfig.console_only() + to enable tracing to stdout only, or TracingConfig.with_file() to enable + tracing to both stdout and rolling log files. Defaults to None (tracing will be + initialized with default settings for TracingConfig.with_file()). + + Returns: + An initialized IngestionConfigStreamingClient. + """ + # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users + from sift_stream_bindings import ( + DurationPy, + IngestionConfigFormPy, + MetadataPy, + MetadataValuePy, + RecoveryStrategyPy, + RunFormPy, + ) + + instance = cls.__new__(cls) + instance._sift_client = sift_client + + # Get API key and gRPC URI from the client + grpc_config = sift_client.grpc_client._config + api_key = grpc_config.api_key + grpc_uri = grpc_config.uri + + # Convert the ingestion_config variants to a IngestionConfigFormPy + if isinstance(ingestion_config, IngestionConfig): + # SiftStream will retrieve the existing config from the client_key + asset = sift_client.assets.get(asset_id=ingestion_config.asset_id) + ingestion_config_form = IngestionConfigFormPy( + asset_name=asset.name, + client_key=ingestion_config.client_key, + flows=[], + ) + elif isinstance(ingestion_config, IngestionConfigCreate): + ingestion_config_form = ingestion_config._to_rust_form() + else: + ingestion_config_form = ingestion_config + + # Convert the recovery strategy variants + recovery_strategy_py: RecoveryStrategyPy | None = None + if isinstance(recovery_strategy, RecoveryStrategyConfig): + recovery_strategy_py = recovery_strategy._to_rust_config() + elif isinstance(recovery_strategy, RecoveryStrategyPy): + recovery_strategy_py = recovery_strategy + + # Convert the run variants to a run or run_id + run_form: RunFormPy | None = None + run_id: str | None = None + if isinstance(run, RunFormPy): + run_form = run + elif isinstance(run, str): + run_id = run + elif isinstance(run, dict): + run_create = RunCreate.model_validate(run) + run_form = run_create._to_rust_form() + elif isinstance(run, Run): + run_id = run._id_or_error + elif isinstance(run, RunCreate): + run_form = run._to_rust_form() + + # Convert asset_tags to list of strings + asset_tags_list: list[str] | None = None + if asset_tags is not None: + asset_tags_list = [tag.name if isinstance(tag, Tag) else tag for tag in asset_tags] + + # Convert asset_metadata dict to list of MetadataPy + asset_metadata_list: list[MetadataPy] | None = None + if asset_metadata is not None: + asset_metadata_list = [ + MetadataPy(key=key, value=MetadataValuePy(value)) + for key, value in asset_metadata.items() + ] + + # Convert checkpoint_interval_seconds to DurationPy + checkpoint_interval: DurationPy | None = None + if checkpoint_interval_seconds is not None: + checkpoint_interval = DurationPy(secs=checkpoint_interval_seconds, nanos=0) + + low_level_client = await IngestionConfigStreamingLowLevelClient.create_sift_stream_instance( + api_key=api_key, + grpc_uri=grpc_uri, + ingestion_config=ingestion_config_form, + run_form=run_form, + run_id=run_id, + asset_tags=asset_tags_list, + asset_metadata=asset_metadata_list, + recovery_strategy=recovery_strategy_py, + checkpoint_interval=checkpoint_interval, + enable_tls=enable_tls, + tracing_config=tracing_config, ) - def wait_for_ingestion_to_complete(self, timeout: float | None = None): - """Wait for all ingestion to complete. + return cls(sift_client, low_level_client) + + async def send(self, flow: Flow | FlowPy): + """Send telemetry to Sift in the form of a Flow. + + This is the entry-point to send actual telemetry to Sift. If a message is sent that + doesn't match any flows that the stream knows about locally, the message will still be + transmitted and a warning log emitted. If you are certain that the message corresponds + to an unregistered flow then `add_new_flows` should be called first to register the flow + before calling `send`; otherwise you should monitor the Sift DLQ either in the Sift UI + or Sift API to ensure successful transmission. + + When sending messages, if backups are enabled, first the message is sent to the backup system. This system is + used to backup data to disk until the data is confirmed received by Sift. If streaming + encounters errors, the backed up data will be re-ingested ensuring all data is received + by Sift. + + If the backup system has fallen behind and the backup queue/channel is full, it will still + proceed to sending the message to Sift. This ensures data is sent to Sift even if the + backup system is lagging. + + Args: + flow: The flow to send to Sift. + """ + if isinstance(flow, Flow): + flow_py = flow._to_rust_form() + else: + flow_py = flow + await self._low_level_client.send(flow_py) + + async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy]): + """Send data in a manner identical to the raw gRPC service for ingestion-config based streaming. + + This method offers a way to send data that matches the raw gRPC service interface. You are + expected to handle channel value ordering as well as empty values correctly. + + Important: + Most users should prefer to use `send`. This method primarily exists to make it easier + for existing integrations to utilize sift-stream. + + Args: + requests: List of ingestion requests to send to Sift. + """ + await self._low_level_client.send_requests(requests) + + async def add_new_flows(self, flow_configs: list[FlowConfig]): + """Modify the existing ingestion config by adding new flows that weren't accounted for during initialization. + + This allows you to dynamically add new flow configurations to the ingestion config after + the stream has been initialized. The new flows will be registered with Sift and can then + be used in subsequent `send` calls. Args: - run_id: The id of the run to wait for. - timeout: The timeout in seconds to wait for ingestion to complete. If None, will wait forever. + flow_configs: List of flow configurations to add to the ingestion config. """ - logger.info("Waiting for ingestion to complete") - self._low_level_client.wait_for_ingestion_to_complete(timeout) + flow_configs_py = [flow_config._to_rust_config() for flow_config in flow_configs] + await self._low_level_client.add_new_flows(flow_configs_py) + + async def attach_run(self, run: RunCreate | dict | str | Run | RunFormPy): + """Attach a run to the stream. + + Any data provided through `send` after this function returns will be associated with + the run. The run can be specified as a Run object, RunCreate object, dict, run ID string, + or RunFormPy object. + + Args: + run: The run to attach. Can be a Run, RunCreate, dict, run ID string, or RunFormPy. + """ + # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users + from sift_stream_bindings import RunFormPy, RunSelectorPy + + if isinstance(run, RunFormPy): + run_selector_py = RunSelectorPy.by_form(run) + elif isinstance(run, dict): + run_create = RunCreate.model_validate(run) + run_form_py = run_create._to_rust_form() + run_selector_py = RunSelectorPy.by_form(run_form_py) + elif isinstance(run, Run): + if run.id_ is None: + raise ValueError("The Run object must contain a run_id") + run_selector_py = RunSelectorPy.by_id(run.id_) + elif isinstance(run, RunCreate): + run_form_py = run._to_rust_form() + run_selector_py = RunSelectorPy.by_form(run_form_py) + elif isinstance(run, str): + run_selector_py = RunSelectorPy.by_id(run) + + await self._low_level_client.attach_run(run_selector_py) + + def detach_run(self): + """Detach the run, if any, associated with the stream. + + Any data provided through `send` after this function is called will not be associated + with a run. + """ + self._low_level_client.detach_run() + + def get_run_id(self) -> str | None: + """Retrieve the ID of the attached run, if one exists. + + Returns: + The run ID if a run is attached, None otherwise. + """ + return self._low_level_client.get_run_id() + + async def finish(self): + """Conclude the stream and return when Sift has sent its final response. + + It is important that this method be called in order to obtain the final checkpoint + acknowledgement from Sift, otherwise some tail-end data may fail to send. This method + will gracefully shut down the streaming system and ensure all data has been properly + sent to Sift. + """ + await self._low_level_client.finish() + + def get_metrics_snapshot(self) -> SiftStreamMetricsSnapshotPy: + """Retrieve a snapshot of the current metrics for this stream. + + NOTE: The returned metrics snapshot is currently an unstable feature and may change at any time. + + Metrics are recorded related to the performance and operational status of the stream. + Snapshots are taken at any time this method is called. Metrics are internally updated + atomically, and calls to get metric snapshots are non-blocking to stream operation. + + Returns: + A snapshot of the current stream metrics. + """ + return self._low_level_client.get_metrics_snapshot() + + def get_flow_config(self, flow_name: str) -> FlowConfig: + """Retrieve a flow configuration by name. + + Args: + flow_name: The name of the flow configuration to retrieve. + + Returns: + The FlowConfig associated with the given flow name. + + Raises: + KeyError: If the flow name is not found in the known flows. + """ + flow_config = self._low_level_client._known_flows.get(flow_name) + if flow_config is None: + raise KeyError(f"FlowConfig {flow_name} is unknown to the ingestion client") + return flow_config + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.finish() diff --git a/python/lib/sift_client/sift_types/__init__.py b/python/lib/sift_client/sift_types/__init__.py index e5318dca7..b55717c60 100644 --- a/python/lib/sift_client/sift_types/__init__.py +++ b/python/lib/sift_client/sift_types/__init__.py @@ -141,7 +141,13 @@ ChannelDataType, ChannelReference, ) -from sift_client.sift_types.ingestion import ChannelConfig, Flow, IngestionConfig +from sift_client.sift_types.ingestion import ( + ChannelConfig, + Flow, + FlowConfig, + IngestionConfig, + IngestionConfigCreate, +) from sift_client.sift_types.report import Report, ReportRuleStatus, ReportRuleSummary, ReportUpdate from sift_client.sift_types.rule import ( Rule, @@ -179,7 +185,9 @@ "ChannelDataType", "ChannelReference", "Flow", + "FlowConfig", "IngestionConfig", + "IngestionConfigCreate", "Report", "ReportRuleStatus", "ReportRuleSummary", diff --git a/python/lib/sift_client/sift_types/channel.py b/python/lib/sift_client/sift_types/channel.py index 2c18dbb83..69ba4b8ed 100644 --- a/python/lib/sift_client/sift_types/channel.py +++ b/python/lib/sift_client/sift_types/channel.py @@ -28,6 +28,8 @@ from sift_client.sift_types._base import BaseType if TYPE_CHECKING: + from sift_stream_bindings import ChannelBitFieldElementPy, ChannelDataTypePy + from sift_client.client import SiftClient from sift_client.sift_types.asset import Asset from sift_client.sift_types.run import Run @@ -110,6 +112,32 @@ def from_str(raw: str) -> ChannelDataType | None: return None raise Exception(f"Unknown channel data type: {raw}") + @staticmethod + def _from_rust_type(channel_data_type_py: ChannelDataTypePy) -> ChannelDataType: + # Use enum name for comparison to avoid PyO3 enum comparison issues + # Extract the enum name from the string representation + enum_str = str(channel_data_type_py) + enum_name = enum_str.split(".")[-1] if "." in enum_str else enum_str + + mapping = { + "Double": ChannelDataType.DOUBLE, + "String": ChannelDataType.STRING, + "Enum": ChannelDataType.ENUM, + "BitField": ChannelDataType.BIT_FIELD, + "Bool": ChannelDataType.BOOL, + "Float": ChannelDataType.FLOAT, + "Int32": ChannelDataType.INT_32, + "Uint32": ChannelDataType.UINT_32, + "Int64": ChannelDataType.INT_64, + "Uint64": ChannelDataType.UINT_64, + "Bytes": ChannelDataType.BYTES, + } + + if enum_name in mapping: + return mapping[enum_name] + else: + raise ValueError(f"Unknown channel data type: {channel_data_type_py}") + @staticmethod def proto_data_class(data_type: ChannelDataType): """Return the appropriate protobuf class for the given channel data type. @@ -194,6 +222,16 @@ def _from_proto(cls, message: ChannelBitFieldElementPb) -> ChannelBitFieldElemen bit_count=message.bit_count, ) + @classmethod + def _from_rust_type( + cls, bit_field_element_py: ChannelBitFieldElementPy + ) -> ChannelBitFieldElement: + return ChannelBitFieldElement( + name=bit_field_element_py.name, + index=bit_field_element_py.index, + bit_count=bit_field_element_py.bit_count, + ) + def _to_proto(self) -> ChannelBitFieldElementPb: return ChannelBitFieldElementPb( name=self.name, diff --git a/python/lib/sift_client/sift_types/ingestion.py b/python/lib/sift_client/sift_types/ingestion.py index 2f90e94f5..d8db6d207 100644 --- a/python/lib/sift_client/sift_types/ingestion.py +++ b/python/lib/sift_client/sift_types/ingestion.py @@ -1,31 +1,39 @@ from __future__ import annotations +import logging import math +from datetime import datetime, timezone from typing import TYPE_CHECKING, Any -from google.protobuf.empty_pb2 import Empty -from pydantic import ConfigDict, model_validator -from sift.ingest.v1.ingest_pb2 import IngestWithConfigDataChannelValue +from pydantic import BaseModel, ConfigDict, Field, model_validator from sift.ingestion_configs.v2.ingestion_configs_pb2 import ( ChannelConfig as ChannelConfigProto, ) from sift.ingestion_configs.v2.ingestion_configs_pb2 import ( - FlowConfig, + CreateIngestionConfigRequest as CreateIngestionConfigRequestProto, +) +from sift.ingestion_configs.v2.ingestion_configs_pb2 import ( + FlowConfig as FlowConfigProto, ) from sift.ingestion_configs.v2.ingestion_configs_pb2 import ( IngestionConfig as IngestionConfigProto, ) -from sift_client.sift_types._base import BaseType +from sift_client.sift_types._base import ( + BaseType, + ModelCreate, +) from sift_client.sift_types.channel import ChannelBitFieldElement, ChannelDataType -if TYPE_CHECKING: - from datetime import datetime +logger = logging.getLogger(__name__) +if TYPE_CHECKING: from sift_stream_bindings import ( ChannelConfigPy, ChannelDataTypePy, FlowConfigPy, + FlowPy, + IngestionConfigFormPy, IngestWithConfigDataChannelValuePy, ) @@ -52,6 +60,43 @@ def _from_proto( ) +class IngestionConfigCreate(ModelCreate[CreateIngestionConfigRequestProto]): + """Create model for IngestionConfig.""" + + asset_name: str + flows: list[FlowConfig] | None = None + organization_id: str | None = None + client_key: str | None = None + + def _get_proto_class(self) -> type[CreateIngestionConfigRequestProto]: + return CreateIngestionConfigRequestProto + + def _to_rust_form(self) -> IngestionConfigFormPy: + # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users + from sift_stream_bindings import IngestionConfigFormPy + + # Imported here to avoid circular dependancy + from sift_client._internal.low_level_wrappers.ingestion import _hash_flows + + if self.organization_id: + logger.warning( + "OrgId is ignored when passing an IngestionConfigCreate to the ingestion client" + ) + + if self.client_key: + client_key = self.client_key + else: + client_key = _hash_flows(self.asset_name, self.flows or []) + + return IngestionConfigFormPy( + asset_name=self.asset_name, + flows=[flow_config._to_rust_config() for flow_config in self.flows] + if self.flows + else [], + client_key=client_key, + ) + + class ChannelConfig(BaseType[ChannelConfigProto, "ChannelConfig"]): """Channel configuration model for ingestion purposes. @@ -121,6 +166,20 @@ def from_channel(cls, channel: Channel) -> ChannelConfig: enum_types=channel.enum_types, ) + @classmethod + def _from_rust_config(cls, channel_config_py: ChannelConfigPy) -> ChannelConfig: + return ChannelConfig( + name=channel_config_py.name, + description=channel_config_py.description or None, + unit=channel_config_py.unit or None, + data_type=ChannelDataType._from_rust_type(channel_config_py.data_type), + bit_field_elements=[ + ChannelBitFieldElement._from_rust_type(bfe) + for bfe in channel_config_py.bit_field_elements + ], + enum_types={enum.name: enum.key for enum in channel_config_py.enum_types}, + ) + def _to_config_proto(self) -> ChannelConfigProto: """Convert to ChannelConfigProto for ingestion.""" from sift.common.type.v1.channel_bit_field_element_pb2 import ( @@ -149,11 +208,28 @@ def _to_config_proto(self) -> ChannelConfigProto: ], ) + def as_channel_value(self, value: Any) -> ChannelValue: + """Create a ChannelValue from a value using this channel's configuration. -class Flow(BaseType[FlowConfig, "Flow"]): - """Model representing a data flow for ingestion. + Args: + value: The value to wrap in a ChannelValue. The type should match + this channel's data_type. - A Flow represents a collection of channels that are ingested together. + Returns: + A ChannelValue instance with this channel's name and data type, + containing the provided value. + """ + return ChannelValue( + name=self.name, + ty=self.data_type, + value=value, + ) + + +class FlowConfig(BaseType[FlowConfigProto, "FlowConfig"]): + """Model representing a data flow config for ingestion. + + A FlowConfig represents the configuration of a collection of channels that are ingested together. """ model_config = ConfigDict(frozen=False) @@ -163,7 +239,9 @@ class Flow(BaseType[FlowConfig, "Flow"]): run_id: str | None = None @classmethod - def _from_proto(cls, proto: FlowConfig, sift_client: SiftClient | None = None) -> Flow: + def _from_proto( + cls, proto: FlowConfigProto, sift_client: SiftClient | None = None + ) -> FlowConfig: return cls( proto=proto, name=proto.name, @@ -171,18 +249,28 @@ def _from_proto(cls, proto: FlowConfig, sift_client: SiftClient | None = None) - _client=sift_client, ) - def _to_proto(self) -> FlowConfig: + @classmethod + def _from_rust_config(cls, flow_config_py: FlowConfigPy) -> FlowConfig: return FlowConfig( + name=flow_config_py.name, + channels=[ + ChannelConfig._from_rust_config(channel) for channel in flow_config_py.channels + ], + ) + + def _to_proto(self) -> FlowConfigProto: + return FlowConfigProto( name=self.name, channels=[channel._to_config_proto() for channel in self.channels], ) def _to_rust_config(self) -> FlowConfigPy: + # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users from sift_stream_bindings import FlowConfigPy return FlowConfigPy( name=self.name, - channels=[_channel_to_rust_config(channel) for channel in self.channels], + channels=[_channel_config_to_rust_config(channel) for channel in self.channels], ) def add_channel(self, channel: ChannelConfig): @@ -198,27 +286,118 @@ def add_channel(self, channel: ChannelConfig): raise ValueError("Cannot add a channel to a flow after creation") self.channels.append(channel) - def ingest(self, *, timestamp: datetime, channel_values: dict[str, Any]): - """Ingest data for this Flow. + def as_flow(self, *, timestamp: datetime | None = None, values: dict[str, Any]) -> Flow: + """Create a Flow from this FlowConfig with the provided values. Args: - timestamp: The timestamp of the data. - channel_values: Dictionary mapping Channel names to their values. + timestamp: The timestamp for the flow. If None, uses the current UTC time. + values: A dictionary mapping channel names to their values. Only channels + present in this dictionary will be included in the resulting Flow. - Raises: - ValueError: If the ingestion config ID is not set. + Returns: + A Flow object with channel values created from the provided values dictionary. """ - if self.ingestion_config_id is None: - raise ValueError("Ingestion config ID is not set.") - self.client.async_.ingestion.ingest( - flow=self, + # Get current timestamp ASAP if not provided + timestamp = timestamp or datetime.now(timezone.utc) + + found_values: set[str] = set() + channel_values = [] + for channel in self.channels: + if channel.name in values: + channel_values.append(channel.as_channel_value(values[channel.name])) + found_values.add(channel.name) + + missing_values = values.keys() - found_values + if missing_values: + raise ValueError( + f"Provided channel values which do not exist in the flow config: {missing_values}" + ) + + return Flow( + flow=self.name, timestamp=timestamp, channel_values=channel_values, ) +class Flow(BaseModel): + """Model representing a data flow for ingestion. + + A Flow represents a collection of channels that are ingested together. + + A representation of the IngestWithConfigDataStreamRequest proto + """ + + model_config = ConfigDict(frozen=False) + ingestion_config_id: str | None = None + flow: str + timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + channel_values: list[ChannelValue] + run_id: str | None = None + end_stream_on_validation_error: bool | None = None + organization_id: str | None = None + + def _to_rust_form(self) -> FlowPy: + # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users + from sift_stream_bindings import FlowPy + + from sift_client._internal.low_level_wrappers.ingestion import _to_rust_py_timestamp + + return FlowPy( + flow_name=self.flow, + timestamp=_to_rust_py_timestamp(self.timestamp), + values=[channel_value._to_rust_form() for channel_value in self.channel_values], + ) + + +class ChannelValue(BaseModel): + """Model representing a channel value for ingestion. + + A ChannelValue represents the data of a channel to be ingested. + """ + + model_config = ConfigDict(frozen=False) + name: str + ty: ChannelDataType + value: Any + + def _to_rust_form(self): + """Convert this ChannelValue to its Rust form for ingestion.""" + # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users + from sift_stream_bindings import ChannelValuePy, ValuePy + + if self.ty == ChannelDataType.BIT_FIELD: + value_py = ValuePy.BitField(self.value) + elif self.ty == ChannelDataType.ENUM: + value_py = ValuePy.Enum(self.value) + elif self.ty == ChannelDataType.BOOL: + value_py = ValuePy.Bool(self.value) + elif self.ty == ChannelDataType.FLOAT: + value_py = ValuePy.Float(self.value) + elif self.ty == ChannelDataType.DOUBLE: + value_py = ValuePy.Double(self.value) + elif self.ty == ChannelDataType.INT_32: + value_py = ValuePy.Int32(self.value) + elif self.ty == ChannelDataType.INT_64: + value_py = ValuePy.Int64(self.value) + elif self.ty == ChannelDataType.UINT_32: + value_py = ValuePy.Uint32(self.value) + elif self.ty == ChannelDataType.UINT_64: + value_py = ValuePy.Uint64(self.value) + elif self.ty == ChannelDataType.STRING: + value_py = ValuePy.String(self.value) + else: + raise ValueError(f"Invalid data type: {self.ty}") + + return ChannelValuePy( + name=self.name, + value=value_py, + ) + + # Converter functions. -def _channel_to_rust_config(channel: ChannelConfig) -> ChannelConfigPy: +def _channel_config_to_rust_config(channel: ChannelConfig) -> ChannelConfigPy: + # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users from sift_stream_bindings import ( ChannelBitFieldElementPy, ChannelConfigPy, @@ -259,6 +438,7 @@ def _rust_channel_value_from_bitfield( Returns: A ChannelValuePy object. """ + # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users from sift_stream_bindings import IngestWithConfigDataChannelValuePy assert channel.bit_field_elements is not None @@ -285,6 +465,7 @@ def _rust_channel_value_from_bitfield( def _to_rust_value(channel: ChannelConfig, value: Any) -> IngestWithConfigDataChannelValuePy: + # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users from sift_stream_bindings import IngestWithConfigDataChannelValuePy if value is None: @@ -325,6 +506,7 @@ def _to_rust_value(channel: ChannelConfig, value: Any) -> IngestWithConfigDataCh def _to_rust_type(data_type: ChannelDataType) -> ChannelDataTypePy: + # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users from sift_stream_bindings import ChannelDataTypePy if data_type == ChannelDataType.DOUBLE: @@ -348,10 +530,3 @@ def _to_rust_type(data_type: ChannelDataType) -> ChannelDataTypePy: elif data_type == ChannelDataType.UINT_64: return ChannelDataTypePy.Uint64 raise ValueError(f"Unknown data type: {data_type}") - - -def _to_ingestion_value(data_type: ChannelDataType, value: Any) -> IngestWithConfigDataChannelValue: - if value is None: - return IngestWithConfigDataChannelValue(empty=Empty()) - ingestion_type_string = data_type.name.lower().replace("int_", "int") - return IngestWithConfigDataChannelValue(**{ingestion_type_string: value}) diff --git a/python/lib/sift_client/sift_types/run.py b/python/lib/sift_client/sift_types/run.py index 2e7816a5b..cc1bb24bf 100644 --- a/python/lib/sift_client/sift_types/run.py +++ b/python/lib/sift_client/sift_types/run.py @@ -18,6 +18,8 @@ from sift_client.util.metadata import metadata_dict_to_proto, metadata_proto_to_dict if TYPE_CHECKING: + from sift_stream_bindings import RunFormPy + from sift_client.client import SiftClient from sift_client.sift_types.asset import Asset @@ -163,6 +165,35 @@ class RunCreate(RunBase, ModelCreate[CreateRunRequestProto]): def _get_proto_class(self) -> type[CreateRunRequestProto]: return CreateRunRequestProto + def _to_rust_form(self) -> RunFormPy: + # Importing here to allow sift_stream_bindings to be an optional dependancy for non-ingestion users + from sift_stream_bindings import MetadataPy, MetadataValuePy, RunFormPy + + if self.client_key: + client_key = self.client_key + else: + client_key = self.name + + if self.tags: + tags = [tag.name if isinstance(tag, Tag) else tag for tag in self.tags] + else: + tags = None + + if self.metadata: + metadata = [] + for key, value in self.metadata.items(): + metadata.append(MetadataPy(key=key, value=MetadataValuePy(value))) + else: + metadata = None + + return RunFormPy( + name=self.name, + client_key=client_key, + description=self.description, + tags=tags, + metadata=metadata, + ) + class RunUpdate(RunBase, ModelUpdate[RunProto]): """Update model for Run.""" diff --git a/python/mkdocs.yml b/python/mkdocs.yml index 3c68c3564..ed73da187 100644 --- a/python/mkdocs.yml +++ b/python/mkdocs.yml @@ -57,6 +57,7 @@ nav: - Sift Client API (New) - Examples: - examples/basic.ipynb + - examples/ingestion.ipynb # - Guides: # - Logging # - Error Handling diff --git a/python/pyproject.toml b/python/pyproject.toml index aa5811eb5..03e585892 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -71,7 +71,7 @@ docs = ["mkdocs", openssl = ["pyOpenSSL<24.0.0", "types-pyOpenSSL<24.0.0", "cffi~=1.14"] tdms = ["npTDMS~=1.9"] rosbags = ["rosbags~=0.0"] -sift-stream = ["sift-stream-bindings>=0.1.2"] +sift-stream = ["sift-stream-bindings>=0.2.0-rc"] hdf5 = ["h5py~=3.11", "polars~=1.8"] # Ensure any new user build extras are added to .github/workflows/python_build.yaml @@ -149,6 +149,11 @@ module = "requests_toolbelt" ignore_missing_imports = true ignore_errors = true +[[tool.mypy.overrides]] +module = "sift-stream-bindings" +ignore_missing_imports = true +ignore_errors = true + [tool.setuptools.packages.find] where = ["lib"] From e0258847aa87ee74e42935d26ee3e87de536f7f1 Mon Sep 17 00:00:00 2001 From: Nathan Federknopp Date: Mon, 10 Nov 2025 20:57:33 -0800 Subject: [PATCH 05/15] Sync up test_runs with main. Version bump --- .../sift_client/_tests/resources/test_runs.py | 39 +++---------------- python/pyproject.toml | 2 +- 2 files changed, 7 insertions(+), 34 deletions(-) diff --git a/python/lib/sift_client/_tests/resources/test_runs.py b/python/lib/sift_client/_tests/resources/test_runs.py index aebe1c487..0de6bc04e 100644 --- a/python/lib/sift_client/_tests/resources/test_runs.py +++ b/python/lib/sift_client/_tests/resources/test_runs.py @@ -10,6 +10,7 @@ from datetime import datetime, timedelta, timezone import pytest +from grpc.aio import AioRpcError from sift_client import SiftClient from sift_client.resources import RunsAPI, RunsAPIAsync @@ -182,7 +183,11 @@ class TestFind: async def test_find_run(self, runs_api_async, test_run): """Test finding a single run.""" # Find the same run by name - found_run = await runs_api_async.find(name=test_run.name) + found_run = await runs_api_async.find( + name=test_run.name, + created_after=test_run.created_date - timedelta(seconds=10), + created_before=test_run.created_date + timedelta(seconds=10), + ) assert found_run is not None assert found_run.id_ == test_run.id_ @@ -546,38 +551,6 @@ async def test_create_adhoc_run_missing_assets(self, runs_api_async): run_create, assets=["asset-name-not-id"], associate_new_data=False ) - @pytest.mark.asyncio - async def test_create_automatic_association_for_assets(self, runs_api_async, sift_client): - """Test associating assets with a run for automatic data ingestion.""" - # Create a test run - run_name = f"test_run_asset_assoc_{datetime.now(timezone.utc).isoformat()}" - run_create = RunCreate( - name=run_name, - description="Test run for asset association", - tags=["sift-client-pytest"], - ) - created_run = await runs_api_async.create(run_create) - - try: - # Get some assets to associate - assets = await sift_client.async_.assets.list_(limit=2) - assert len(assets) >= 1 - - asset_names = [asset.name for asset in assets[:2]] - - # Associate assets with the run - await runs_api_async.create_automatic_association_for_assets( - run=created_run, asset_names=asset_names - ) - - # Verify the association by getting the run and checking asset_ids - updated_run = await runs_api_async.get(run_id=created_run.id_) - assert updated_run.asset_ids is not None - assert len(updated_run.asset_ids) >= len(asset_names) - - finally: - await runs_api_async.archive(created_run) - class TestRunsAPISync: """Test suite for the synchronous Runs API functionality. diff --git a/python/pyproject.toml b/python/pyproject.toml index 03e585892..bb888eba4 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sift_stack_py" -version = "1.0.0a1" +version = "1.0.0a2" description = "Python client library for the Sift API" requires-python = ">=3.8" readme = { file = "README.md", content-type = "text/markdown" } From 7e4010e16e9a4ca82050a8dd0357f604378c6f1a Mon Sep 17 00:00:00 2001 From: Nathan Federknopp Date: Mon, 10 Nov 2025 21:30:25 -0800 Subject: [PATCH 06/15] ci fix --- .github/workflows/python_build.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/python_build.yaml b/.github/workflows/python_build.yaml index 275af6c58..761904a10 100644 --- a/.github/workflows/python_build.yaml +++ b/.github/workflows/python_build.yaml @@ -117,7 +117,7 @@ jobs: shell: bash run: | python -m pip install --upgrade pip - pip install build pip-tools + pip install build "pip-tools>=7.0.0" - name: Generate requirements working-directory: python From 77cc658f7956c8ea4c20829956f6a2d57a6724af Mon Sep 17 00:00:00 2001 From: Nathan Federknopp Date: Mon, 10 Nov 2025 21:43:22 -0800 Subject: [PATCH 07/15] ci fix 2 --- .github/workflows/python_build.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/python_build.yaml b/.github/workflows/python_build.yaml index 761904a10..d2626dbe5 100644 --- a/.github/workflows/python_build.yaml +++ b/.github/workflows/python_build.yaml @@ -116,8 +116,8 @@ jobs: - name: Install build tools shell: bash run: | - python -m pip install --upgrade pip - pip install build "pip-tools>=7.0.0" + python -m pip install --upgrade "pip<25.3" + pip install build "pip-tools>=7.4.0" - name: Generate requirements working-directory: python From 1c22fd0db4cdcee0715fdd248654f557a0037aac Mon Sep 17 00:00:00 2001 From: Ian Later Date: Tue, 11 Nov 2025 16:47:07 -0800 Subject: [PATCH 08/15] version bump --- python/CHANGELOG.md | 4 ++++ python/pyproject.toml | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/python/CHANGELOG.md b/python/CHANGELOG.md index ff6d0856b..0ee2c0383 100644 --- a/python/CHANGELOG.md +++ b/python/CHANGELOG.md @@ -3,6 +3,10 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## [v1.0.0] - Nov 11, 2025 (**NOTE: WIP still in pre-release**) +- Initial stable release of sift_client +- Test results context managers + ## [v0.9.1] - August 18, 2025 - [Allow importing TDMS metadata to existing Runs](https://github.com/sift-stack/sift/pull/320) diff --git a/python/pyproject.toml b/python/pyproject.toml index 914eb4034..2928889b9 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sift_stack_py" -version = "1.0.0a2" +version = "1.0.0a3" description = "Python client library for the Sift API" requires-python = ">=3.8" readme = { file = "README.md", content-type = "text/markdown" } From 1e80bb09006692633d8582c4698be753e9078e1b Mon Sep 17 00:00:00 2001 From: Nathan Federknopp Date: Fri, 14 Nov 2025 17:28:19 -0800 Subject: [PATCH 09/15] version bump --- python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index 919f170e7..ff5f0852c 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "sift_stack_py" -version = "1.0.0a3" +version = "1.0.0a4" description = "Python client library for the Sift API" requires-python = ">=3.8" readme = { file = "README.md", content-type = "text/markdown" } From 3b637b0d67c996056d09cbd4bf5cb6d781e86091 Mon Sep 17 00:00:00 2001 From: Alex Luck Date: Tue, 18 Nov 2025 10:43:08 -0800 Subject: [PATCH 10/15] add pre-release alias so we can link to this instead --- .github/workflows/python_release.yaml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/python_release.yaml b/.github/workflows/python_release.yaml index e37f65f80..9e49c8c39 100644 --- a/.github/workflows/python_release.yaml +++ b/.github/workflows/python_release.yaml @@ -133,9 +133,9 @@ jobs: HIDDEN="false" echo "Stable release detected: $FULL_VERSION -> $VERSION" else - # Pre-release (alpha, beta, rc) - no 'latest' alias and hide from dropdown + # Pre-release (alpha, beta, rc) - use 'pre-release' alias and hide from dropdown VERSION="${VERSION}${SUFFIX}" - ALIAS="" + ALIAS="pre-release" HIDDEN="true" echo "Pre-release detected: $FULL_VERSION -> $VERSION" fi @@ -174,6 +174,10 @@ jobs: # Stable release: deploy abbreviated version with latest alias, visible in dropdown echo "Deploying stable release $VERSION with $ALIAS alias" mike deploy "$VERSION" "$ALIAS" --push --update-aliases + elif [[ "$ALIAS" == "pre-release" ]]; then + # Pre-release: deploy abbreviated version with pre-release alias, hidden from dropdown + echo "Deploying pre-release $VERSION with $ALIAS alias" + mike deploy "$VERSION" "$ALIAS" --push --update-aliases --prop-set hidden=true fi env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} From bbc41925bff92625b2fc8f6d06aa9f6437458ecb Mon Sep 17 00:00:00 2001 From: Alex Luck Date: Tue, 25 Nov 2025 09:58:56 -0800 Subject: [PATCH 11/15] version bump --- python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index 7338c905a..09a15572f 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "sift_stack_py" -version = "1.0.0a4" +version = "1.0.0a5" description = "Python client library for the Sift API" requires-python = ">=3.8" readme = { file = "README.md", content-type = "text/markdown" } From 3defb133fd9ebb34d4295137122ea29db82d431b Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Wed, 26 Nov 2025 14:17:43 -0600 Subject: [PATCH 12/15] Version bump --- python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index f7f6696b1..4de4504ea 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "sift_stack_py" -version = "1.0.0a5" +version = "1.0.0a6" description = "Python client library for the Sift API" requires-python = ">=3.8" readme = { file = "README.md", content-type = "text/markdown" } From 43acca6d67dde1ac8c3351c05a0391db708bb07e Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Fri, 9 Jan 2026 11:05:28 -0600 Subject: [PATCH 13/15] Version bump --- python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index 4de4504ea..69048999b 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "sift_stack_py" -version = "1.0.0a6" +version = "1.0.0a7" description = "Python client library for the Sift API" requires-python = ">=3.8" readme = { file = "README.md", content-type = "text/markdown" } From 01e5a1ec40bbe2586a146d471aefe5357460a91a Mon Sep 17 00:00:00 2001 From: Ian Later Date: Thu, 15 Jan 2026 12:51:00 -0800 Subject: [PATCH 14/15] Version bump --- python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index 69048999b..f10156844 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "sift_stack_py" -version = "1.0.0a7" +version = "1.0.0a8" description = "Python client library for the Sift API" requires-python = ">=3.8" readme = { file = "README.md", content-type = "text/markdown" } From 296e423223a977aeeeb33f930017030a3414f025 Mon Sep 17 00:00:00 2001 From: Benji Nguyen <45523555+solidiquis@users.noreply.github.com> Date: Fri, 16 Jan 2026 15:15:32 -0800 Subject: [PATCH 15/15] (sift-client): update sift stream bindings (#462) --- .github/workflows/python_ci.yaml | 16 ++++++++-------- python/pyproject.toml | 14 +++++++------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/.github/workflows/python_ci.yaml b/.github/workflows/python_ci.yaml index 1e5818dc5..7bf425a2d 100644 --- a/.github/workflows/python_ci.yaml +++ b/.github/workflows/python_ci.yaml @@ -62,17 +62,17 @@ jobs: run: | pytest -m "not integration" - - name: Pytest Integration Tests - env: - SIFT_GRPC_URI: ${{ vars.SIFT_GRPC_URI }} - SIFT_REST_URI: ${{ vars.SIFT_REST_URI }} - SIFT_API_KEY: ${{ secrets.SIFT_API_KEY }} - run: | - pytest -m "integration" + #- name: Pytest Integration Tests + #env: + #SIFT_GRPC_URI: ${{ vars.SIFT_GRPC_URI }} + #SIFT_REST_URI: ${{ vars.SIFT_REST_URI }} + #SIFT_API_KEY: ${{ secrets.SIFT_API_KEY }} + #run: | + #pytest -m "integration" - name: Sync Stubs Mypy working-directory: python/lib run: | stubtest \ --mypy-config-file ../pyproject.toml \ - sift_client.resources.sync_stubs \ No newline at end of file + sift_client.resources.sync_stubs diff --git a/python/pyproject.toml b/python/pyproject.toml index f10156844..85b79ffe7 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "sift_stack_py" -version = "1.0.0a8" +version = "1.0.0a9" description = "Python client library for the Sift API" requires-python = ">=3.8" readme = { file = "README.md", content-type = "text/markdown" } @@ -59,7 +59,7 @@ all = [ 'pyOpenSSL<24.0.0', 'pyarrow>=17.0.0', 'rosbags~=0.0', - 'sift-stream-bindings>=0.2.0-rc4', + 'sift-stream-bindings>=0.2.0-rc8', 'types-pyOpenSSL<24.0.0', ] build = [ @@ -100,7 +100,7 @@ dev-all = [ 'pytest==8.2.2', 'rosbags~=0.0', 'ruff~=0.12.10', - 'sift-stream-bindings>=0.2.0-rc4', + 'sift-stream-bindings>=0.2.0-rc8', 'tomlkit~=0.13.3', 'types-pyOpenSSL<24.0.0', ] @@ -153,7 +153,7 @@ docs-build = [ 'pytest==8.2.2', 'rosbags~=0.0', 'ruff~=0.12.10', - 'sift-stream-bindings>=0.2.0-rc4', + 'sift-stream-bindings>=0.2.0-rc8', 'tomlkit~=0.13.3', 'types-pyOpenSSL<24.0.0', ] @@ -176,10 +176,10 @@ rosbags = [ 'rosbags~=0.0', ] sift-stream = [ - 'sift-stream-bindings>=0.2.0-rc4', + 'sift-stream-bindings>=0.2.0-rc8', ] sift-stream-bindings = [ - 'sift-stream-bindings>=0.2.0-rc4', + 'sift-stream-bindings>=0.2.0-rc8', ] tdms = [ 'npTDMS~=1.9', @@ -215,7 +215,7 @@ docs = ["mkdocs", openssl = ["pyOpenSSL<24.0.0", "types-pyOpenSSL<24.0.0", "cffi~=1.14"] tdms = ["npTDMS~=1.9"] rosbags = ["rosbags~=0.0"] -sift-stream = ["sift-stream-bindings>=0.2.0-rc4"] +sift-stream = ["sift-stream-bindings>=0.2.0-rc8"] hdf5 = ["h5py~=3.11", "polars~=1.8"] data-review = ["pyarrow>=17.0.0"]