diff --git a/docs/api_reference.md b/docs/api_reference.md index 288039a..25becec 100644 --- a/docs/api_reference.md +++ b/docs/api_reference.md @@ -216,6 +216,66 @@ class CustomSource(Source): yield dataset_resource ``` +### BatchLoader (batching file loads) + +When the underlying data source is more efficient with batched requests (e.g. an API that accepts many items per call), wrap your loader function in a `BatchLoader` and share the instance across the `DatasetResource`s that should be batched together. + +Ingestify groups resources by shared `BatchLoader` instance, chunks them into groups of `batch_size`, and calls the wrapped loader once per chunk. + +```python +from ingestify import BatchLoader, DatasetResource, Source +from ingestify.domain import DraftFile + + +def load_metrics(file_resources, current_files, dataset_resources): + """Called once per batch. Each argument is a list of up to batch_size items. + + current_files may contain None (for new datasets) or a File (for existing + ones). Return a list of DraftFile / NotModifiedFile in the same order. + """ + identifiers = [dr.dataset_resource_id for dr in dataset_resources] + results = fetch_batch_from_api(identifiers) # single API call + return [ + DraftFile.from_input( + file_=json.dumps(result), + data_serialization_format="json", + data_feed_key=fr.data_feed_key, + data_spec_version=fr.data_spec_version, + modified_at=fr.last_modified, + ) + for fr, result in zip(file_resources, results) + ] + + +class MySource(Source): + provider = "my_provider" + + def find_datasets(self, dataset_type, data_spec_versions, **selector): + # Share one BatchLoader instance across all DatasetResources that + # should be batched together. + batch_loader = BatchLoader(load_metrics, batch_size=20) + + for item_id in items: + resource = DatasetResource( + dataset_resource_id={"item_id": item_id}, + dataset_type=dataset_type, + provider=self.provider, + name=str(item_id), + ) + resource.add_file( + last_modified=last_modified, + data_feed_key="metrics", + data_spec_version="v1", + file_loader=batch_loader, + data_serialization_format="json", + ) + yield resource +``` + +Notes: +- Only resources that actually need loading (i.e. not skipped by `FetchPolicy`) are passed to the loader. +- The last batch of a group may contain fewer than `batch_size` items if the number of pending items is not a multiple of `batch_size`, or if the group crosses an internal chunk boundary. + ### Custom Event Subscriber To create a custom event subscriber, extend the `Subscriber` class: diff --git a/ingestify/__init__.py b/ingestify/__init__.py index 6a90f7c..e4e6d3d 100644 --- a/ingestify/__init__.py +++ b/ingestify/__init__.py @@ -7,6 +7,7 @@ if not __INGESTIFY_SETUP__: from .infra import retrieve_http from .source_base import Source, DatasetResource + from .domain.models.resources.batch_loader import BatchLoader from .main import debug_source __version__ = "0.13.0" diff --git a/ingestify/domain/models/ingestion/ingestion_job.py b/ingestify/domain/models/ingestion/ingestion_job.py index 325ce5e..32df9f1 100644 --- a/ingestify/domain/models/ingestion/ingestion_job.py +++ b/ingestify/domain/models/ingestion/ingestion_job.py @@ -1,8 +1,10 @@ +import inspect import itertools import json import logging import uuid from enum import Enum +from functools import lru_cache from typing import Optional, Iterator, Union from pydantic import ValidationError @@ -21,6 +23,7 @@ FileResource, DatasetResource, ) +from ingestify.domain.models.resources.batch_loader import BatchLoader from ingestify.domain.models.task.task_summary import TaskSummary from ingestify.exceptions import SaveError, IngestifyError from ingestify.utils import TaskExecutor, chunker @@ -57,7 +60,9 @@ def to_batches(input_): def load_file( - file_resource: FileResource, dataset: Optional[Dataset] = None + file_resource: FileResource, + dataset: Optional[Dataset] = None, + dataset_resource: Optional[DatasetResource] = None, ) -> Union[DraftFile, NotModifiedFile]: current_file = None if dataset: @@ -99,14 +104,35 @@ def load_file( **file_resource.loader_kwargs, ) else: + extra_kwargs = {} + if _loader_accepts_dataset_resource(file_resource.file_loader): + extra_kwargs["dataset_resource"] = dataset_resource return file_resource.file_loader( file_resource, current_file, - # TODO: check how to fix this with typehints + **extra_kwargs, **file_resource.loader_kwargs, ) +@lru_cache(maxsize=None) +def _loader_accepts_dataset_resource(loader) -> bool: + """Return True if loader accepts a `dataset_resource` keyword argument. + + BatchLoader instances always do. Plain functions are introspected once. + """ + if isinstance(loader, BatchLoader): + return True + try: + sig = inspect.signature(loader) + except (TypeError, ValueError): + return False + params = sig.parameters + if "dataset_resource" in params: + return True + return any(p.kind is inspect.Parameter.VAR_KEYWORD for p in params.values()) + + class UpdateDatasetTask(Task): def __init__( self, @@ -131,7 +157,11 @@ def run(self): ) as task_summary: files = { file_id: task_summary.record_load_file( - lambda: load_file(file_resource, dataset=self.dataset), + lambda: load_file( + file_resource, + dataset=self.dataset, + dataset_resource=self.dataset_resource, + ), metadata={"file_id": file_id}, ) for file_id, file_resource in self.dataset_resource.files.items() @@ -177,7 +207,11 @@ def run(self): with TaskSummary.create(self.task_id, dataset_identifier) as task_summary: files = { file_id: task_summary.record_load_file( - lambda: load_file(file_resource, dataset=None), + lambda: load_file( + file_resource, + dataset=None, + dataset_resource=self.dataset_resource, + ), metadata={"file_id": file_id}, ) for file_id, file_resource in self.dataset_resource.files.items() @@ -207,6 +241,96 @@ def __repr__(self): return f"CreateDatasetTask({self.dataset_resource.provider} -> {self.dataset_resource.dataset_resource_id})" +class BatchTask(Task): + """Wraps a group of inner tasks that share a BatchLoader instance. + + On run(), invokes the shared loader_fn once with all items in the batch, + caches the results, then runs each inner task sequentially. + """ + + def __init__(self, inner_tasks: list, loader: BatchLoader): + self.inner_tasks = inner_tasks + self.loader = loader + + def run(self): + # Collect items for the shared loader across all inner tasks. + file_resources, current_files, dataset_resources = [], [], [] + for task in self.inner_tasks: + dataset = getattr(task, "dataset", None) + for file_id, file_resource in task.dataset_resource.files.items(): + # A DatasetResource can have multiple files, each potentially + # using a different file_loader (e.g. a plain loader for one + # file and a BatchLoader for another, or multiple BatchLoaders). + # We only want files whose loader is this BatchTask's loader. + if file_resource.file_loader is not self.loader: + continue + current_file = None + if dataset is not None: + current_file = dataset.current_revision.modified_files_map.get( + file_id + ) + file_resources.append(file_resource) + current_files.append(current_file) + dataset_resources.append(task.dataset_resource) + + results = self.loader.loader_fn( + file_resources, current_files, dataset_resources + ) + if len(results) != len(file_resources): + raise RuntimeError( + f"BatchLoader expected {len(file_resources)} results, got {len(results)}" + ) + self.loader._store_results(file_resources, results) + + # Run the wrapped inner tasks — they pick up cached results via + # BatchLoader.__call__ inside load_file(). + return [task.run() for task in self.inner_tasks] + + def __repr__(self): + return f"BatchTask(n={len(self.inner_tasks)})" + + +def _wrap_batch_tasks(task_set: "TaskSet") -> "TaskSet": + """Rebuild a TaskSet, wrapping tasks that share a BatchLoader instance + into BatchTasks chunked by the loader's batch_size. + + Tasks with no BatchLoader in any of their files remain unchanged. + """ + loose: list = [] + grouped: dict = {} # id(loader) -> (loader, [tasks]) + + for task in task_set: + loader = _find_first_batch_loader(task) + if loader is None: + loose.append(task) + else: + grouped.setdefault(id(loader), (loader, []))[1].append(task) + + new_task_set = TaskSet() + for task in loose: + new_task_set.add(task) + + for loader, tasks in grouped.values(): + batch_size = loader.batch_size + for i in range(0, len(tasks), batch_size): + new_task_set.add( + BatchTask(inner_tasks=tasks[i : i + batch_size], loader=loader) + ) + + return new_task_set + + +def _find_first_batch_loader(task) -> "Optional[BatchLoader]": + """Return the first BatchLoader encountered in the task's file resources.""" + dataset_resource = getattr(task, "dataset_resource", None) + if dataset_resource is None: + return None + for file_resource in dataset_resource.files.values(): + if isinstance(file_resource.file_loader, BatchLoader): + return file_resource.file_loader + return None + + MAX_TASKS_PER_CHUNK = 10_000 @@ -365,13 +489,23 @@ def execute( with ingestion_job_summary.record_timing("tasks"): if task_set: + original_task_count = len(task_set) + task_set = _wrap_batch_tasks(task_set) logger.info( f"Discovered {len(dataset_identifiers)} datasets from {self.ingestion_plan.source.__class__.__name__} " - f"using selector {self.selector} => {len(task_set)} tasks. {skipped_tasks} skipped." + f"using selector {self.selector} => {original_task_count} tasks. {skipped_tasks} skipped." ) logger.info(f"Running {len(task_set)} tasks") - task_summaries = task_executor.run(run_task, task_set) + results = task_executor.run(run_task, task_set) + + # BatchTasks return a list of TaskSummary; flatten. + task_summaries = [] + for result in results: + if isinstance(result, list): + task_summaries.extend(result) + else: + task_summaries.append(result) ingestion_job_summary.add_task_summaries(task_summaries) else: diff --git a/ingestify/domain/models/resources/batch_loader.py b/ingestify/domain/models/resources/batch_loader.py new file mode 100644 index 0000000..6971df5 --- /dev/null +++ b/ingestify/domain/models/resources/batch_loader.py @@ -0,0 +1,43 @@ +"""BatchLoader wraps a file loader so multiple files are fetched in one call. + +Use this when the data source is more efficient with batched requests. Create +one BatchLoader instance and share it across the DatasetResources that should +be batched together. Ingestify groups those resources by shared BatchLoader +instance, chunks them into groups of `batch_size`, and calls the wrapped +loader_fn once per chunk. + +The wrapped loader_fn receives lists instead of single items and must return +a list of results in the same order: + + def load(file_resources, current_files, dataset_resources): + ... + return [DraftFile.from_input(...) for _ in file_resources] + + batch_loader = BatchLoader(load, batch_size=20) + resource.add_file(file_loader=batch_loader, ...) + +current_files may contain None entries (for create tasks) or a File (for +update tasks) — the loader_fn handles both. +""" +from typing import Callable, List + + +class BatchLoader: + def __init__(self, loader_fn: Callable, batch_size: int): + self.loader_fn = loader_fn + self.batch_size = batch_size + self._results: dict = {} + + def __call__(self, file_resource, current_file, dataset_resource=None, **kwargs): + key = id(file_resource) + if key not in self._results: + raise RuntimeError( + "BatchLoader result not precomputed. A BatchTask must " + "populate the cache before inner tasks execute." + ) + return self._results.pop(key) + + def _store_results(self, file_resources: List, results: List): + """Store batch results so they can be retrieved via __call__.""" + for file_resource, result in zip(file_resources, results): + self._results[id(file_resource)] = result diff --git a/ingestify/domain/services/identifier_key_transformer.py b/ingestify/domain/services/identifier_key_transformer.py index 0aa86b3..6d8eb07 100644 --- a/ingestify/domain/services/identifier_key_transformer.py +++ b/ingestify/domain/services/identifier_key_transformer.py @@ -1,4 +1,8 @@ +import hashlib +import re +import unicodedata from abc import ABC, abstractmethod +from urllib.parse import quote from enum import Enum from typing import Callable, Optional, Union @@ -8,6 +12,7 @@ class TransformationType(Enum): IDENTITY = "IDENTITY" BUCKET = "BUCKET" + PREFIX = "PREFIX" RANGE = "RANGE" CUSTOM = "CUSTOM" @@ -30,6 +35,8 @@ def from_dict(cls, config: dict) -> "Transformation": type_ = config.pop("type") if type_ == "bucket": return BucketTransformation(**config) + elif type_ == "prefix": + return PrefixTransformation(**config) else: raise IngestifyError(f"Cannot build Transformation from {config}") @@ -51,7 +58,12 @@ def __init__(self, bucket_size: int = None, bucket_count: int = None): def __call__(self, id_key_value: Union[str, int]) -> str: if self.bucket_count: - return str(int(id_key_value) % self.bucket_count) + try: + value = int(id_key_value) + except (ValueError, TypeError): + # String keys: use stable hash to distribute across buckets + value = int(hashlib.md5(str(id_key_value).encode()).hexdigest(), 16) + return str(value % self.bucket_count) elif self.bucket_size: bucket_start = int(id_key_value) // self.bucket_size * self.bucket_size bucket_end = bucket_start + self.bucket_size - 1 @@ -60,6 +72,19 @@ def __call__(self, id_key_value: Union[str, int]) -> str: raise IngestifyError("Invalid BucketTransformation") +class PrefixTransformation(Transformation): + transformation_type = TransformationType.PREFIX + + def __init__(self, length: int = 1): + self.length = length + + def __call__(self, id_key_value: Union[str, int]) -> str: + # Transliterate unicode (ü→u, é→e) then strip non-alphanumeric + text = unicodedata.normalize("NFKD", str(id_key_value).lower()) + cleaned = re.sub(r"[^a-z0-9]", "", text) + return cleaned[: self.length] if cleaned else "_" + + class IdentifierTransformer: def __init__(self): # Mapping of (provider, dataset_type, id_key) to the transformation @@ -119,8 +144,9 @@ def to_path(self, provider: str, dataset_type: str, identifier: dict) -> str: suffix = transformation.transformation_type.value.lower() path_parts.append(f"{key}_{suffix}={transformed_value}") - # Append the original value (either standalone for identity or alongside transformed) - path_parts.append(f"{key}={value}") + # Append the original value (either standalone for identity or alongside transformed). + # URL-encode the value so special characters, spaces, etc. are safe in paths. + path_parts.append(f"{key}={quote(str(value), safe='')}") # Join the parts with `/` to form the full path return "/".join(path_parts) diff --git a/ingestify/tests/test_batch_loader.py b/ingestify/tests/test_batch_loader.py new file mode 100644 index 0000000..c7aef4d --- /dev/null +++ b/ingestify/tests/test_batch_loader.py @@ -0,0 +1,23 @@ +"""Tests for BatchLoader.""" +import pytest + +from ingestify.domain.models.resources.batch_loader import BatchLoader + + +def test_batch_loader_returns_cached_result(): + """__call__ returns the result stored for each file_resource.""" + batch_loader = BatchLoader(lambda frs, cfs, drs: [], batch_size=5) + fr1, fr2 = object(), object() + + batch_loader._store_results([fr1, fr2], ["result_1", "result_2"]) + + assert batch_loader(fr1, current_file=None) == "result_1" + assert batch_loader(fr2, current_file=None) == "result_2" + + +def test_batch_loader_raises_if_not_precomputed(): + """__call__ raises when the cache has no entry for this file_resource.""" + batch_loader = BatchLoader(lambda frs, cfs, drs: [], batch_size=5) + + with pytest.raises(RuntimeError, match="not precomputed"): + batch_loader(object(), current_file=None)