From 6dc42244e5712bbf5fd38416c96aa0003ff93007 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Sun, 5 Apr 2026 21:22:32 +0200 Subject: [PATCH 1/4] Add BatchLoader + BatchTask for batched file loading Sources can now wrap a loader function in BatchLoader(loader_fn, batch_size) and share the instance across DatasetResources that should be batched together. Ingestify groups those resources, chunks them into groups of batch_size, wraps each chunk in a BatchTask, and calls the loader_fn once per batch with lists of file_resources / current_files / dataset_resources. load_file() now passes dataset_resource to loaders that accept it (signature introspection with lru_cache, so existing loaders continue to work without changes). --- docs/api_reference.md | 60 +++++++ ingestify/__init__.py | 1 + .../domain/models/ingestion/ingestion_job.py | 146 +++++++++++++++++- .../domain/models/resources/batch_loader.py | 46 ++++++ ingestify/tests/test_batch_loader.py | 23 +++ 5 files changed, 270 insertions(+), 6 deletions(-) create mode 100644 ingestify/domain/models/resources/batch_loader.py create mode 100644 ingestify/tests/test_batch_loader.py diff --git a/docs/api_reference.md b/docs/api_reference.md index 288039a..25becec 100644 --- a/docs/api_reference.md +++ b/docs/api_reference.md @@ -216,6 +216,66 @@ class CustomSource(Source): yield dataset_resource ``` +### BatchLoader (batching file loads) + +When the underlying data source is more efficient with batched requests (e.g. an API that accepts many items per call), wrap your loader function in a `BatchLoader` and share the instance across the `DatasetResource`s that should be batched together. + +Ingestify groups resources by shared `BatchLoader` instance, chunks them into groups of `batch_size`, and calls the wrapped loader once per chunk. + +```python +from ingestify import BatchLoader, DatasetResource, Source +from ingestify.domain import DraftFile + + +def load_metrics(file_resources, current_files, dataset_resources): + """Called once per batch. Each argument is a list of up to batch_size items. + + current_files may contain None (for new datasets) or a File (for existing + ones). Return a list of DraftFile / NotModifiedFile in the same order. + """ + identifiers = [dr.dataset_resource_id for dr in dataset_resources] + results = fetch_batch_from_api(identifiers) # single API call + return [ + DraftFile.from_input( + file_=json.dumps(result), + data_serialization_format="json", + data_feed_key=fr.data_feed_key, + data_spec_version=fr.data_spec_version, + modified_at=fr.last_modified, + ) + for fr, result in zip(file_resources, results) + ] + + +class MySource(Source): + provider = "my_provider" + + def find_datasets(self, dataset_type, data_spec_versions, **selector): + # Share one BatchLoader instance across all DatasetResources that + # should be batched together. + batch_loader = BatchLoader(load_metrics, batch_size=20) + + for item_id in items: + resource = DatasetResource( + dataset_resource_id={"item_id": item_id}, + dataset_type=dataset_type, + provider=self.provider, + name=str(item_id), + ) + resource.add_file( + last_modified=last_modified, + data_feed_key="metrics", + data_spec_version="v1", + file_loader=batch_loader, + data_serialization_format="json", + ) + yield resource +``` + +Notes: +- Only resources that actually need loading (i.e. not skipped by `FetchPolicy`) are passed to the loader. +- The last batch of a group may contain fewer than `batch_size` items if the number of pending items is not a multiple of `batch_size`, or if the group crosses an internal chunk boundary. + ### Custom Event Subscriber To create a custom event subscriber, extend the `Subscriber` class: diff --git a/ingestify/__init__.py b/ingestify/__init__.py index 6a90f7c..e4e6d3d 100644 --- a/ingestify/__init__.py +++ b/ingestify/__init__.py @@ -7,6 +7,7 @@ if not __INGESTIFY_SETUP__: from .infra import retrieve_http from .source_base import Source, DatasetResource + from .domain.models.resources.batch_loader import BatchLoader from .main import debug_source __version__ = "0.13.0" diff --git a/ingestify/domain/models/ingestion/ingestion_job.py b/ingestify/domain/models/ingestion/ingestion_job.py index 325ce5e..e9a42c2 100644 --- a/ingestify/domain/models/ingestion/ingestion_job.py +++ b/ingestify/domain/models/ingestion/ingestion_job.py @@ -1,8 +1,10 @@ +import inspect import itertools import json import logging import uuid from enum import Enum +from functools import lru_cache from typing import Optional, Iterator, Union from pydantic import ValidationError @@ -21,6 +23,7 @@ FileResource, DatasetResource, ) +from ingestify.domain.models.resources.batch_loader import BatchLoader from ingestify.domain.models.task.task_summary import TaskSummary from ingestify.exceptions import SaveError, IngestifyError from ingestify.utils import TaskExecutor, chunker @@ -57,7 +60,9 @@ def to_batches(input_): def load_file( - file_resource: FileResource, dataset: Optional[Dataset] = None + file_resource: FileResource, + dataset: Optional[Dataset] = None, + dataset_resource: Optional[DatasetResource] = None, ) -> Union[DraftFile, NotModifiedFile]: current_file = None if dataset: @@ -99,14 +104,35 @@ def load_file( **file_resource.loader_kwargs, ) else: + extra_kwargs = {} + if _loader_accepts_dataset_resource(file_resource.file_loader): + extra_kwargs["dataset_resource"] = dataset_resource return file_resource.file_loader( file_resource, current_file, - # TODO: check how to fix this with typehints + **extra_kwargs, **file_resource.loader_kwargs, ) +@lru_cache(maxsize=None) +def _loader_accepts_dataset_resource(loader) -> bool: + """Return True if loader accepts a `dataset_resource` keyword argument. + + BatchLoader instances always do. Plain functions are introspected once. + """ + if isinstance(loader, BatchLoader): + return True + try: + sig = inspect.signature(loader) + except (TypeError, ValueError): + return False + params = sig.parameters + if "dataset_resource" in params: + return True + return any(p.kind is inspect.Parameter.VAR_KEYWORD for p in params.values()) + + class UpdateDatasetTask(Task): def __init__( self, @@ -131,7 +157,11 @@ def run(self): ) as task_summary: files = { file_id: task_summary.record_load_file( - lambda: load_file(file_resource, dataset=self.dataset), + lambda: load_file( + file_resource, + dataset=self.dataset, + dataset_resource=self.dataset_resource, + ), metadata={"file_id": file_id}, ) for file_id, file_resource in self.dataset_resource.files.items() @@ -177,7 +207,11 @@ def run(self): with TaskSummary.create(self.task_id, dataset_identifier) as task_summary: files = { file_id: task_summary.record_load_file( - lambda: load_file(file_resource, dataset=None), + lambda: load_file( + file_resource, + dataset=None, + dataset_resource=self.dataset_resource, + ), metadata={"file_id": file_id}, ) for file_id, file_resource in self.dataset_resource.files.items() @@ -207,6 +241,96 @@ def __repr__(self): return f"CreateDatasetTask({self.dataset_resource.provider} -> {self.dataset_resource.dataset_resource_id})" +class BatchTask(Task): + """Wraps a group of inner tasks whose DatasetResources share a BatchLoader. + + On run(), invokes the shared BatchLoader's loader_fn once with all items in + the batch, caches the results, then runs each inner task sequentially. + """ + + def __init__(self, inner_tasks: list): + self.inner_tasks = inner_tasks + + def run(self): + # Collect items per BatchLoader instance found across the inner tasks. + # A dataset_resource may have multiple files, each with its own loader; + # we group so a single BatchLoader receives all its pending items. + items_by_loader: dict = {} + + for task in self.inner_tasks: + dataset = getattr(task, "dataset", None) + for file_id, file_resource in task.dataset_resource.files.items(): + loader = file_resource.file_loader + if not isinstance(loader, BatchLoader): + continue + current_file = None + if dataset is not None: + current_file = dataset.current_revision.modified_files_map.get( + file_id + ) + items_by_loader.setdefault(id(loader), (loader, []))[1].append( + (file_resource, current_file, task.dataset_resource) + ) + + # Execute each BatchLoader's loader_fn with its collected items + for loader, items in items_by_loader.values(): + file_resources = [item[0] for item in items] + current_files = [item[1] for item in items] + dataset_resources = [item[2] for item in items] + results = loader.loader_fn(file_resources, current_files, dataset_resources) + if len(results) != len(items): + raise RuntimeError( + f"BatchLoader expected {len(items)} results, got {len(results)}" + ) + loader._store_results(file_resources, results) + + # Run the wrapped inner tasks — they will pick up cached results via + # BatchLoader.__call__ inside load_file(). + return [task.run() for task in self.inner_tasks] + + def __repr__(self): + return f"BatchTask(n={len(self.inner_tasks)})" + + +def _wrap_batch_tasks(task_set: "TaskSet") -> "TaskSet": + """Rebuild a TaskSet, wrapping tasks that share a BatchLoader instance + into BatchTasks chunked by the loader's batch_size. + + Tasks with no BatchLoader in any of their files remain unchanged. + """ + loose: list = [] + grouped: dict = {} # id(loader) -> (loader, [tasks]) + + for task in task_set: + loader = _find_first_batch_loader(task) + if loader is None: + loose.append(task) + else: + grouped.setdefault(id(loader), (loader, []))[1].append(task) + + new_task_set = TaskSet() + for task in loose: + new_task_set.add(task) + + for loader, tasks in grouped.values(): + batch_size = loader.batch_size + for i in range(0, len(tasks), batch_size): + new_task_set.add(BatchTask(inner_tasks=tasks[i : i + batch_size])) + + return new_task_set + + +def _find_first_batch_loader(task) -> "Optional[BatchLoader]": + """Return the first BatchLoader encountered in the task's file resources.""" + dataset_resource = getattr(task, "dataset_resource", None) + if dataset_resource is None: + return None + for file_resource in dataset_resource.files.values(): + if isinstance(file_resource.file_loader, BatchLoader): + return file_resource.file_loader + return None + + MAX_TASKS_PER_CHUNK = 10_000 @@ -365,13 +489,23 @@ def execute( with ingestion_job_summary.record_timing("tasks"): if task_set: + original_task_count = len(task_set) + task_set = _wrap_batch_tasks(task_set) logger.info( f"Discovered {len(dataset_identifiers)} datasets from {self.ingestion_plan.source.__class__.__name__} " - f"using selector {self.selector} => {len(task_set)} tasks. {skipped_tasks} skipped." + f"using selector {self.selector} => {original_task_count} tasks. {skipped_tasks} skipped." ) logger.info(f"Running {len(task_set)} tasks") - task_summaries = task_executor.run(run_task, task_set) + results = task_executor.run(run_task, task_set) + + # BatchTasks return a list of TaskSummary; flatten. + task_summaries = [] + for result in results: + if isinstance(result, list): + task_summaries.extend(result) + else: + task_summaries.append(result) ingestion_job_summary.add_task_summaries(task_summaries) else: diff --git a/ingestify/domain/models/resources/batch_loader.py b/ingestify/domain/models/resources/batch_loader.py new file mode 100644 index 0000000..861c56e --- /dev/null +++ b/ingestify/domain/models/resources/batch_loader.py @@ -0,0 +1,46 @@ +"""BatchLoader wraps a file loader so multiple files are fetched in one call. + +Use this when the data source is more efficient with batched requests. Create +one BatchLoader instance and share it across the DatasetResources that should +be batched together. Ingestify groups those resources by shared BatchLoader +instance, chunks them into groups of `batch_size`, and calls the wrapped +loader_fn once per chunk. + +The wrapped loader_fn receives lists instead of single items and must return +a list of results in the same order: + + def load(file_resources, current_files, dataset_resources): + ... + return [DraftFile.from_input(...) for _ in file_resources] + + batch_loader = BatchLoader(load, batch_size=20) + resource.add_file(file_loader=batch_loader, ...) + +current_files may contain None entries (for create tasks) or a File (for +update tasks) — the loader_fn handles both. +""" +import threading +from typing import Callable, List + + +class BatchLoader: + def __init__(self, loader_fn: Callable, batch_size: int): + self.loader_fn = loader_fn + self.batch_size = batch_size + self._results: dict = {} + self._lock = threading.Lock() + + def __call__(self, file_resource, current_file, dataset_resource=None, **kwargs): + with self._lock: + if id(file_resource) not in self._results: + raise RuntimeError( + "BatchLoader result not precomputed. A BatchTask must " + "populate the cache before inner tasks execute." + ) + return self._results.pop(id(file_resource)) + + def _store_results(self, file_resources: List, results: List): + """Store batch results so they can be retrieved via __call__.""" + with self._lock: + for file_resource, result in zip(file_resources, results): + self._results[id(file_resource)] = result diff --git a/ingestify/tests/test_batch_loader.py b/ingestify/tests/test_batch_loader.py new file mode 100644 index 0000000..c7aef4d --- /dev/null +++ b/ingestify/tests/test_batch_loader.py @@ -0,0 +1,23 @@ +"""Tests for BatchLoader.""" +import pytest + +from ingestify.domain.models.resources.batch_loader import BatchLoader + + +def test_batch_loader_returns_cached_result(): + """__call__ returns the result stored for each file_resource.""" + batch_loader = BatchLoader(lambda frs, cfs, drs: [], batch_size=5) + fr1, fr2 = object(), object() + + batch_loader._store_results([fr1, fr2], ["result_1", "result_2"]) + + assert batch_loader(fr1, current_file=None) == "result_1" + assert batch_loader(fr2, current_file=None) == "result_2" + + +def test_batch_loader_raises_if_not_precomputed(): + """__call__ raises when the cache has no entry for this file_resource.""" + batch_loader = BatchLoader(lambda frs, cfs, drs: [], batch_size=5) + + with pytest.raises(RuntimeError, match="not precomputed"): + batch_loader(object(), current_file=None) From 6cb99f703ae6bd24572273ee227be5846c865e26 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Sun, 5 Apr 2026 21:32:16 +0200 Subject: [PATCH 2/4] Pass loader into BatchTask instead of rediscovering it --- .../domain/models/ingestion/ingestion_job.py | 52 +++++++++---------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/ingestify/domain/models/ingestion/ingestion_job.py b/ingestify/domain/models/ingestion/ingestion_job.py index e9a42c2..f3866f2 100644 --- a/ingestify/domain/models/ingestion/ingestion_job.py +++ b/ingestify/domain/models/ingestion/ingestion_job.py @@ -242,49 +242,45 @@ def __repr__(self): class BatchTask(Task): - """Wraps a group of inner tasks whose DatasetResources share a BatchLoader. + """Wraps a group of inner tasks that share a BatchLoader instance. - On run(), invokes the shared BatchLoader's loader_fn once with all items in - the batch, caches the results, then runs each inner task sequentially. + On run(), invokes the shared loader_fn once with all items in the batch, + caches the results, then runs each inner task sequentially. """ - def __init__(self, inner_tasks: list): + def __init__(self, inner_tasks: list, loader: BatchLoader): self.inner_tasks = inner_tasks + self.loader = loader def run(self): - # Collect items per BatchLoader instance found across the inner tasks. - # A dataset_resource may have multiple files, each with its own loader; - # we group so a single BatchLoader receives all its pending items. - items_by_loader: dict = {} - + # Collect items for the shared loader across all inner tasks. A + # dataset_resource may have multiple files, so we match by loader + # identity to pick up every FileResource using this loader. + file_resources, current_files, dataset_resources = [], [], [] for task in self.inner_tasks: dataset = getattr(task, "dataset", None) for file_id, file_resource in task.dataset_resource.files.items(): - loader = file_resource.file_loader - if not isinstance(loader, BatchLoader): + if file_resource.file_loader is not self.loader: continue current_file = None if dataset is not None: current_file = dataset.current_revision.modified_files_map.get( file_id ) - items_by_loader.setdefault(id(loader), (loader, []))[1].append( - (file_resource, current_file, task.dataset_resource) - ) + file_resources.append(file_resource) + current_files.append(current_file) + dataset_resources.append(task.dataset_resource) - # Execute each BatchLoader's loader_fn with its collected items - for loader, items in items_by_loader.values(): - file_resources = [item[0] for item in items] - current_files = [item[1] for item in items] - dataset_resources = [item[2] for item in items] - results = loader.loader_fn(file_resources, current_files, dataset_resources) - if len(results) != len(items): - raise RuntimeError( - f"BatchLoader expected {len(items)} results, got {len(results)}" - ) - loader._store_results(file_resources, results) + results = self.loader.loader_fn( + file_resources, current_files, dataset_resources + ) + if len(results) != len(file_resources): + raise RuntimeError( + f"BatchLoader expected {len(file_resources)} results, got {len(results)}" + ) + self.loader._store_results(file_resources, results) - # Run the wrapped inner tasks — they will pick up cached results via + # Run the wrapped inner tasks — they pick up cached results via # BatchLoader.__call__ inside load_file(). return [task.run() for task in self.inner_tasks] @@ -315,7 +311,9 @@ def _wrap_batch_tasks(task_set: "TaskSet") -> "TaskSet": for loader, tasks in grouped.values(): batch_size = loader.batch_size for i in range(0, len(tasks), batch_size): - new_task_set.add(BatchTask(inner_tasks=tasks[i : i + batch_size])) + new_task_set.add( + BatchTask(inner_tasks=tasks[i : i + batch_size], loader=loader) + ) return new_task_set From 18ce4a70d931a2f46513be12c87f1d6e23e5c2fe Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Sun, 5 Apr 2026 21:51:23 +0200 Subject: [PATCH 3/4] Remove thread lock from BatchLoader Different BatchTasks sharing the same loader write/read different keys (id(file_resource)); CPython dict operations on distinct keys are atomic under the GIL. --- .../domain/models/resources/batch_loader.py | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/ingestify/domain/models/resources/batch_loader.py b/ingestify/domain/models/resources/batch_loader.py index 861c56e..6971df5 100644 --- a/ingestify/domain/models/resources/batch_loader.py +++ b/ingestify/domain/models/resources/batch_loader.py @@ -19,7 +19,6 @@ def load(file_resources, current_files, dataset_resources): current_files may contain None entries (for create tasks) or a File (for update tasks) — the loader_fn handles both. """ -import threading from typing import Callable, List @@ -28,19 +27,17 @@ def __init__(self, loader_fn: Callable, batch_size: int): self.loader_fn = loader_fn self.batch_size = batch_size self._results: dict = {} - self._lock = threading.Lock() def __call__(self, file_resource, current_file, dataset_resource=None, **kwargs): - with self._lock: - if id(file_resource) not in self._results: - raise RuntimeError( - "BatchLoader result not precomputed. A BatchTask must " - "populate the cache before inner tasks execute." - ) - return self._results.pop(id(file_resource)) + key = id(file_resource) + if key not in self._results: + raise RuntimeError( + "BatchLoader result not precomputed. A BatchTask must " + "populate the cache before inner tasks execute." + ) + return self._results.pop(key) def _store_results(self, file_resources: List, results: List): """Store batch results so they can be retrieved via __call__.""" - with self._lock: - for file_resource, result in zip(file_resources, results): - self._results[id(file_resource)] = result + for file_resource, result in zip(file_resources, results): + self._results[id(file_resource)] = result From 1c67f3d6ff04265a01083e006c3ddbfac3db3a23 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Sun, 5 Apr 2026 22:01:52 +0200 Subject: [PATCH 4/4] Document why BatchTask filters file_resources by loader identity --- ingestify/domain/models/ingestion/ingestion_job.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/ingestify/domain/models/ingestion/ingestion_job.py b/ingestify/domain/models/ingestion/ingestion_job.py index f3866f2..32df9f1 100644 --- a/ingestify/domain/models/ingestion/ingestion_job.py +++ b/ingestify/domain/models/ingestion/ingestion_job.py @@ -253,13 +253,15 @@ def __init__(self, inner_tasks: list, loader: BatchLoader): self.loader = loader def run(self): - # Collect items for the shared loader across all inner tasks. A - # dataset_resource may have multiple files, so we match by loader - # identity to pick up every FileResource using this loader. + # Collect items for the shared loader across all inner tasks. file_resources, current_files, dataset_resources = [], [], [] for task in self.inner_tasks: dataset = getattr(task, "dataset", None) for file_id, file_resource in task.dataset_resource.files.items(): + # A DatasetResource can have multiple files, each potentially + # using a different file_loader (e.g. a plain loader for one + # file and a BatchLoader for another, or multiple BatchLoaders). + # We only want files whose loader is this BatchTask's loader. if file_resource.file_loader is not self.loader: continue current_file = None