Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions docs/api_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,66 @@ class CustomSource(Source):
yield dataset_resource
```

### BatchLoader (batching file loads)

When the underlying data source is more efficient with batched requests (e.g. an API that accepts many items per call), wrap your loader function in a `BatchLoader` and share the instance across the `DatasetResource`s that should be batched together.

Ingestify groups resources by shared `BatchLoader` instance, chunks them into groups of `batch_size`, and calls the wrapped loader once per chunk.

```python
from ingestify import BatchLoader, DatasetResource, Source
from ingestify.domain import DraftFile


def load_metrics(file_resources, current_files, dataset_resources):
"""Called once per batch. Each argument is a list of up to batch_size items.

current_files may contain None (for new datasets) or a File (for existing
ones). Return a list of DraftFile / NotModifiedFile in the same order.
"""
identifiers = [dr.dataset_resource_id for dr in dataset_resources]
results = fetch_batch_from_api(identifiers) # single API call
return [
DraftFile.from_input(
file_=json.dumps(result),
data_serialization_format="json",
data_feed_key=fr.data_feed_key,
data_spec_version=fr.data_spec_version,
modified_at=fr.last_modified,
)
for fr, result in zip(file_resources, results)
]


class MySource(Source):
provider = "my_provider"

def find_datasets(self, dataset_type, data_spec_versions, **selector):
# Share one BatchLoader instance across all DatasetResources that
# should be batched together.
batch_loader = BatchLoader(load_metrics, batch_size=20)

for item_id in items:
resource = DatasetResource(
dataset_resource_id={"item_id": item_id},
dataset_type=dataset_type,
provider=self.provider,
name=str(item_id),
)
resource.add_file(
last_modified=last_modified,
data_feed_key="metrics",
data_spec_version="v1",
file_loader=batch_loader,
data_serialization_format="json",
)
yield resource
```

Notes:
- Only resources that actually need loading (i.e. not skipped by `FetchPolicy`) are passed to the loader.
- The last batch of a group may contain fewer than `batch_size` items if the number of pending items is not a multiple of `batch_size`, or if the group crosses an internal chunk boundary.

### Custom Event Subscriber

To create a custom event subscriber, extend the `Subscriber` class:
Expand Down
1 change: 1 addition & 0 deletions ingestify/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
if not __INGESTIFY_SETUP__:
from .infra import retrieve_http
from .source_base import Source, DatasetResource
from .domain.models.resources.batch_loader import BatchLoader
from .main import debug_source

__version__ = "0.13.0"
146 changes: 140 additions & 6 deletions ingestify/domain/models/ingestion/ingestion_job.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import inspect
import itertools
import json
import logging
import uuid
from enum import Enum
from functools import lru_cache
from typing import Optional, Iterator, Union

from pydantic import ValidationError
Expand All @@ -21,6 +23,7 @@
FileResource,
DatasetResource,
)
from ingestify.domain.models.resources.batch_loader import BatchLoader
from ingestify.domain.models.task.task_summary import TaskSummary
from ingestify.exceptions import SaveError, IngestifyError
from ingestify.utils import TaskExecutor, chunker
Expand Down Expand Up @@ -57,7 +60,9 @@ def to_batches(input_):


def load_file(
file_resource: FileResource, dataset: Optional[Dataset] = None
file_resource: FileResource,
dataset: Optional[Dataset] = None,
dataset_resource: Optional[DatasetResource] = None,
) -> Union[DraftFile, NotModifiedFile]:
current_file = None
if dataset:
Expand Down Expand Up @@ -99,14 +104,35 @@ def load_file(
**file_resource.loader_kwargs,
)
else:
extra_kwargs = {}
if _loader_accepts_dataset_resource(file_resource.file_loader):
extra_kwargs["dataset_resource"] = dataset_resource
return file_resource.file_loader(
file_resource,
current_file,
# TODO: check how to fix this with typehints
**extra_kwargs,
**file_resource.loader_kwargs,
)


@lru_cache(maxsize=None)
def _loader_accepts_dataset_resource(loader) -> bool:
"""Return True if loader accepts a `dataset_resource` keyword argument.

BatchLoader instances always do. Plain functions are introspected once.
"""
if isinstance(loader, BatchLoader):
return True
try:
sig = inspect.signature(loader)
except (TypeError, ValueError):
return False
params = sig.parameters
if "dataset_resource" in params:
return True
return any(p.kind is inspect.Parameter.VAR_KEYWORD for p in params.values())


class UpdateDatasetTask(Task):
def __init__(
self,
Expand All @@ -131,7 +157,11 @@ def run(self):
) as task_summary:
files = {
file_id: task_summary.record_load_file(
lambda: load_file(file_resource, dataset=self.dataset),
lambda: load_file(
file_resource,
dataset=self.dataset,
dataset_resource=self.dataset_resource,
),
metadata={"file_id": file_id},
)
for file_id, file_resource in self.dataset_resource.files.items()
Expand Down Expand Up @@ -177,7 +207,11 @@ def run(self):
with TaskSummary.create(self.task_id, dataset_identifier) as task_summary:
files = {
file_id: task_summary.record_load_file(
lambda: load_file(file_resource, dataset=None),
lambda: load_file(
file_resource,
dataset=None,
dataset_resource=self.dataset_resource,
),
metadata={"file_id": file_id},
)
for file_id, file_resource in self.dataset_resource.files.items()
Expand Down Expand Up @@ -207,6 +241,96 @@ def __repr__(self):
return f"CreateDatasetTask({self.dataset_resource.provider} -> {self.dataset_resource.dataset_resource_id})"


class BatchTask(Task):
"""Wraps a group of inner tasks that share a BatchLoader instance.

On run(), invokes the shared loader_fn once with all items in the batch,
caches the results, then runs each inner task sequentially.
"""

def __init__(self, inner_tasks: list, loader: BatchLoader):
self.inner_tasks = inner_tasks
self.loader = loader

def run(self):
# Collect items for the shared loader across all inner tasks.
file_resources, current_files, dataset_resources = [], [], []
for task in self.inner_tasks:
dataset = getattr(task, "dataset", None)
for file_id, file_resource in task.dataset_resource.files.items():
# A DatasetResource can have multiple files, each potentially
# using a different file_loader (e.g. a plain loader for one
# file and a BatchLoader for another, or multiple BatchLoaders).
# We only want files whose loader is this BatchTask's loader.
if file_resource.file_loader is not self.loader:
continue
current_file = None
if dataset is not None:
current_file = dataset.current_revision.modified_files_map.get(
file_id
)
file_resources.append(file_resource)
current_files.append(current_file)
dataset_resources.append(task.dataset_resource)

results = self.loader.loader_fn(
file_resources, current_files, dataset_resources
)
if len(results) != len(file_resources):
raise RuntimeError(
f"BatchLoader expected {len(file_resources)} results, got {len(results)}"
)
self.loader._store_results(file_resources, results)

# Run the wrapped inner tasks — they pick up cached results via
# BatchLoader.__call__ inside load_file().
return [task.run() for task in self.inner_tasks]

def __repr__(self):
return f"BatchTask(n={len(self.inner_tasks)})"


def _wrap_batch_tasks(task_set: "TaskSet") -> "TaskSet":
"""Rebuild a TaskSet, wrapping tasks that share a BatchLoader instance
into BatchTasks chunked by the loader's batch_size.

Tasks with no BatchLoader in any of their files remain unchanged.
"""
loose: list = []
grouped: dict = {} # id(loader) -> (loader, [tasks])

for task in task_set:
loader = _find_first_batch_loader(task)
if loader is None:
loose.append(task)
else:
grouped.setdefault(id(loader), (loader, []))[1].append(task)

new_task_set = TaskSet()
for task in loose:
new_task_set.add(task)

for loader, tasks in grouped.values():
batch_size = loader.batch_size
for i in range(0, len(tasks), batch_size):
new_task_set.add(
BatchTask(inner_tasks=tasks[i : i + batch_size], loader=loader)
)

return new_task_set


def _find_first_batch_loader(task) -> "Optional[BatchLoader]":
"""Return the first BatchLoader encountered in the task's file resources."""
dataset_resource = getattr(task, "dataset_resource", None)
if dataset_resource is None:
return None
for file_resource in dataset_resource.files.values():
if isinstance(file_resource.file_loader, BatchLoader):
return file_resource.file_loader
return None


MAX_TASKS_PER_CHUNK = 10_000


Expand Down Expand Up @@ -365,13 +489,23 @@ def execute(

with ingestion_job_summary.record_timing("tasks"):
if task_set:
original_task_count = len(task_set)
task_set = _wrap_batch_tasks(task_set)
logger.info(
f"Discovered {len(dataset_identifiers)} datasets from {self.ingestion_plan.source.__class__.__name__} "
f"using selector {self.selector} => {len(task_set)} tasks. {skipped_tasks} skipped."
f"using selector {self.selector} => {original_task_count} tasks. {skipped_tasks} skipped."
)
logger.info(f"Running {len(task_set)} tasks")

task_summaries = task_executor.run(run_task, task_set)
results = task_executor.run(run_task, task_set)

# BatchTasks return a list of TaskSummary; flatten.
task_summaries = []
for result in results:
if isinstance(result, list):
task_summaries.extend(result)
else:
task_summaries.append(result)

ingestion_job_summary.add_task_summaries(task_summaries)
else:
Expand Down
43 changes: 43 additions & 0 deletions ingestify/domain/models/resources/batch_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""BatchLoader wraps a file loader so multiple files are fetched in one call.

Use this when the data source is more efficient with batched requests. Create
one BatchLoader instance and share it across the DatasetResources that should
be batched together. Ingestify groups those resources by shared BatchLoader
instance, chunks them into groups of `batch_size`, and calls the wrapped
loader_fn once per chunk.

The wrapped loader_fn receives lists instead of single items and must return
a list of results in the same order:

def load(file_resources, current_files, dataset_resources):
...
return [DraftFile.from_input(...) for _ in file_resources]

batch_loader = BatchLoader(load, batch_size=20)
resource.add_file(file_loader=batch_loader, ...)

current_files may contain None entries (for create tasks) or a File (for
update tasks) — the loader_fn handles both.
"""
from typing import Callable, List


class BatchLoader:
def __init__(self, loader_fn: Callable, batch_size: int):
self.loader_fn = loader_fn
self.batch_size = batch_size
self._results: dict = {}

def __call__(self, file_resource, current_file, dataset_resource=None, **kwargs):
key = id(file_resource)
if key not in self._results:
raise RuntimeError(
"BatchLoader result not precomputed. A BatchTask must "
"populate the cache before inner tasks execute."
)
return self._results.pop(key)

def _store_results(self, file_resources: List, results: List):
"""Store batch results so they can be retrieved via __call__."""
for file_resource, result in zip(file_resources, results):
self._results[id(file_resource)] = result
23 changes: 23 additions & 0 deletions ingestify/tests/test_batch_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Tests for BatchLoader."""
import pytest

from ingestify.domain.models.resources.batch_loader import BatchLoader


def test_batch_loader_returns_cached_result():
"""__call__ returns the result stored for each file_resource."""
batch_loader = BatchLoader(lambda frs, cfs, drs: [], batch_size=5)
fr1, fr2 = object(), object()

batch_loader._store_results([fr1, fr2], ["result_1", "result_2"])

assert batch_loader(fr1, current_file=None) == "result_1"
assert batch_loader(fr2, current_file=None) == "result_2"


def test_batch_loader_raises_if_not_precomputed():
"""__call__ raises when the cache has no entry for this file_resource."""
batch_loader = BatchLoader(lambda frs, cfs, drs: [], batch_size=5)

with pytest.raises(RuntimeError, match="not precomputed"):
batch_loader(object(), current_file=None)
Loading