Skip to content
Merged
40 changes: 40 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,43 @@ jobs:
INGESTIFY_TEST_DATABASE_URL: mysql://root:root@127.0.0.1:3306/ingestify_test
run: |
pytest --color=yes

test-postgres:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11"]

services:
postgres:
image: postgres:15
env:
POSTGRES_USER: ingestify
POSTGRES_PASSWORD: ingestify
POSTGRES_DB: ingestify_test
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5

steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e ".[test]"
- name: Install Postgres dependencies
run: |
pip install psycopg2-binary
- name: Test with pytest
env:
INGESTIFY_TEST_DATABASE_URL: postgresql://ingestify:ingestify@localhost:5432/ingestify_test
run: |
pytest --color=yes
54 changes: 50 additions & 4 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,58 @@ dataset_types:

- `provider`: Provider name (must match a source's provider)
- `dataset_type`: Type of dataset (e.g., "match", "player", "team")
- `identifier_index`: When `true`, a partial PostgreSQL expression index on all `identifier_keys` is created when `ingestify sync-indexes` is run. Use this for high-cardinality dataset types (e.g. one dataset per keyword). Never runs automatically — must be triggered explicitly to avoid locking large tables.

Example — one dataset per keyword with an expression index:
```yaml
dataset_types:
- provider: keyword_ads
dataset_type: keyword_metrics
identifier_index: true
identifier_keys:
keyword:
transformation: identity
key_type: str
```

After adding `identifier_index: true`, run once to create the index:
```bash
ingestify sync-indexes --config config.yaml
```

This creates a partial index scoped to the specific `(provider, dataset_type)` pair:
```sql
CREATE INDEX IF NOT EXISTS idx_dataset_identifier_keyword_ads_keyword_metrics
ON dataset ((identifier->>'keyword'))
WHERE provider = 'keyword_ads' AND dataset_type = 'keyword_metrics';
```

For composite identifiers with mixed types, all keys are combined into a single index:
```yaml
dataset_types:
- provider: keyword_ads
dataset_type: keyword_set
identifier_index: true
identifier_keys:
dataset_id:
transformation: identity
key_type: int
table_name:
transformation: identity
key_type: str
```
```sql
CREATE INDEX IF NOT EXISTS idx_dataset_identifier_keyword_ads_keyword_set
ON dataset (((identifier->>'dataset_id')::integer), (identifier->>'table_name'))
WHERE provider = 'keyword_ads' AND dataset_type = 'keyword_set';
```

- `identifier_keys`: Keys that uniquely identify datasets
- Each key can have a transformation applied to standardize the format
- Each key can have a `transformation` and an optional `key_type`
- `key_type`: Declared value type of the key (`str` or `int`). When set, the repository uses this to cast JSONB values in queries and to generate the correct expression in `sync-indexes`. Defaults to `str`.
- Common transformations:
- `str`: Convert to string
- `int`: Convert to integer
- Bucket transformation:
- `identity`: Use value as-is (default)
- Bucket transformation (for file path organisation only — not related to indexing):
```yaml
transformation:
type: bucket
Expand Down
13 changes: 13 additions & 0 deletions ingestify/application/dataset_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,31 @@ def __init__(
dataset_repository: DatasetRepository,
file_repository: FileRepository,
bucket: str,
identifier_index_configs: list = None,
):
self.dataset_repository = dataset_repository
self.file_repository = file_repository
self.storage_compression_method = "gzip"
self.bucket = bucket
self.event_bus: Optional[EventBus] = None
self._identifier_index_configs = identifier_index_configs or []

# Pass current version to repository for validation/migration
from ingestify import __version__

self.dataset_repository.ensure_compatible_version(__version__)

def create_indexes(self):
"""Create identifier expression indexes for configured dataset types.

Only runs on PostgreSQL. Safe to call multiple times (IF NOT EXISTS).
Should be triggered explicitly (e.g. via `ingestify sync-indexes`),
never automatically, as it can be slow on large tables.
"""
self.dataset_repository.create_identifier_indexes(
self._identifier_index_configs
)

@property
def _thread_local(self):
if not hasattr(self, "_thread_local_"):
Expand Down
28 changes: 28 additions & 0 deletions ingestify/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,34 @@ def delete_dataset(
#


@cli.command("sync-indexes")
@click.option(
"--config",
"config_file",
required=False,
help="Yaml config file",
type=click.Path(exists=True),
default=get_default_config,
)
@click.option(
"--bucket",
"bucket",
required=False,
help="bucket",
type=str,
)
def sync_indexes(config_file: str, bucket: Optional[str]):
"""Create identifier expression indexes for dataset types with identifier_index: true.

Safe to run multiple times (uses IF NOT EXISTS). Only affects PostgreSQL.
Run this explicitly after initial setup or after adding new indexed dataset types —
never runs automatically to avoid locking large tables unexpectedly.
"""
engine = get_engine(config_file, bucket)
engine.store.create_indexes()
logger.info("Done")


def main():
logging.basicConfig(
level=logging.INFO,
Expand Down
15 changes: 15 additions & 0 deletions ingestify/domain/services/identifier_key_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,36 @@ class IdentifierTransformer:
def __init__(self):
# Mapping of (provider, dataset_type, id_key) to the transformation
self.key_transformations: dict[tuple[str, str, str], Transformation] = {}
# Mapping of (provider, dataset_type, id_key) to the declared key type ('str' or 'int')
self.key_types: dict[tuple[str, str, str], str] = {}

def register_transformation(
self,
provider: str,
dataset_type: str,
id_key: str,
transformation: Union[Transformation, dict],
key_type: Optional[str] = None,
):
"""
Registers a transformation for a specific (provider, dataset_type, id_key).

key_type: declared value type of this identifier key ('str' or 'int').
When set, the repository uses this to cast JSONB values in queries and
to generate matching expression indexes via sync-indexes.
"""
if isinstance(transformation, dict):
transformation = Transformation.from_dict(transformation)

self.key_transformations[(provider, dataset_type, id_key)] = transformation
if key_type is not None:
self.key_types[(provider, dataset_type, id_key)] = key_type

def get_key_type(
self, provider: str, dataset_type: str, id_key: str
) -> Optional[str]:
"""Returns the declared key type ('str' or 'int'), or None if not declared."""
return self.key_types.get((provider, dataset_type, id_key))

def get_transformation(
self, provider: str, dataset_type: str, id_key: str
Expand Down
73 changes: 68 additions & 5 deletions ingestify/infra/store/dataset/sqlalchemy/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,54 @@ def close(self):
def create_all_tables(self):
self.metadata.create_all(self.engine)

def create_identifier_indexes(self, index_configs: list[dict]):
"""Create partial expression indexes on identifier JSONB keys (Postgres only).

Each entry in index_configs should have:
- name: a label used in the index name (typically provider_dataset_type)
- provider: provider value to match in the partial index predicate
- dataset_type: dataset_type value to match in the partial index predicate
- keys: list of identifier key dicts {name, key_type} to index

Generates one partial index per config entry:
CREATE INDEX IF NOT EXISTS idx_dataset_identifier_<name>
ON dataset ((identifier->>'key1'), ((identifier->>'key2')::integer), ...)
WHERE provider = '<provider>' AND dataset_type = '<dataset_type>'

The WHERE clause limits the index to a single (provider, dataset_type) pair,
making it smaller and ensuring neither column is a post-scan filter.

Call this explicitly (e.g. via `ingestify sync-indexes`) when datasets
have high-cardinality identifiers that are queried frequently.
"""
if self.engine.dialect.name != "postgresql":
logger.info("Skipping identifier indexes: not a PostgreSQL database")
return

table_name = f"{self.table_prefix}dataset"
with self.engine.connect() as conn:
for config in index_configs:
name = config["name"]
provider = config["provider"]
dataset_type = config["dataset_type"]
keys = config["keys"]
index_name = f"{self.table_prefix}idx_dataset_identifier_{name}"
expressions = ", ".join(
f"((identifier->>'{k['name']}')::integer)"
if isinstance(k, dict) and k.get("key_type") == "int"
else f"(identifier->>'{k['name'] if isinstance(k, dict) else k}')"
for k in keys
)
conn.execute(
text(
f"CREATE INDEX IF NOT EXISTS {index_name} "
f"ON {table_name} ({expressions}) "
f"WHERE provider = '{provider}' AND dataset_type = '{dataset_type}'"
)
)
logger.info("Created index %s on keys: %s", index_name, keys)
conn.commit()

def drop_all_tables(self):
"""Drop all tables in the database. Useful for test cleanup."""
if hasattr(self, "metadata") and hasattr(self, "engine"):
Expand All @@ -156,8 +204,14 @@ def get(self):


class SqlAlchemyDatasetRepository(DatasetRepository):
def __init__(self, session_provider: SqlAlchemySessionProvider):
def __init__(
self, session_provider: SqlAlchemySessionProvider, identifier_transformer=None
):
self.session_provider = session_provider
self._identifier_transformer = identifier_transformer

def create_identifier_indexes(self, index_configs: list[dict]):
self.session_provider.create_identifier_indexes(index_configs)

@property
def session(self):
Expand Down Expand Up @@ -342,10 +396,19 @@ def _filter_query(
if dialect == "postgresql":
column = self.dataset_table.c.identifier[k]

# Take the value from the first selector to determine the type.
# TODO: check all selectors to determine the type
v = first_selector[k]
if isinstance(v, int):
# Use declared key_type when available so the cast matches
# the expression index created by sync-indexes.
# Fall back to inferring from the runtime value type.
declared_type = (
self._identifier_transformer.get_key_type(
provider, dataset_type, k
)
if self._identifier_transformer
else None
)
if declared_type == "int" or (
declared_type is None and isinstance(first_selector[k], int)
):
column = column.as_integer()
else:
column = column.as_string()
Expand Down
23 changes: 22 additions & 1 deletion ingestify/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def get_dataset_store_by_urls(
dataset_type=dataset_type["dataset_type"],
id_key=id_key,
transformation=id_config["transformation"],
key_type=id_config.get("key_type"),
)

file_repository = build_file_repository(
Expand All @@ -135,12 +136,32 @@ def get_dataset_store_by_urls(
metadata_url, table_prefix=table_prefix
)

dataset_repository = SqlAlchemyDatasetRepository(sqlalchemy_session_provider)
dataset_repository = SqlAlchemyDatasetRepository(
sqlalchemy_session_provider, identifier_transformer=identifier_transformer
)

identifier_index_configs = [
{
"name": f"{dt['provider']}_{dt['dataset_type']}",
"provider": dt["provider"],
"dataset_type": dt["dataset_type"],
"keys": [
{
"name": k,
"key_type": v.get("key_type", "str"),
}
for k, v in dt["identifier_keys"].items()
],
}
for dt in dataset_types
if dt.get("identifier_index")
]

return DatasetStore(
dataset_repository=dataset_repository,
file_repository=file_repository,
bucket=bucket,
identifier_index_configs=identifier_index_configs,
)


Expand Down
Loading
Loading