From 157055080fe25a325c0e041d02442dd0aeaecfa8 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Wed, 1 Apr 2026 14:46:58 +0200 Subject: [PATCH 1/9] Add identifier expression indexes for high-cardinality dataset types - SqlAlchemySessionProvider.create_identifier_indexes(): creates composite expression indexes on identifier JSONB keys (Postgres only, IF NOT EXISTS) - DatasetStore.create_indexes(): delegates to repository, configured via identifier_index_configs from dataset_types config - `ingestify sync-indexes` CLI command to trigger index creation explicitly (never automatic to avoid locking large tables) - identifier_index: true option in dataset_types config - test-postgres job in test.yml with Postgres 15 service --- .github/workflows/test.yml | 40 ++++++++ docs/configuration.md | 1 + ingestify/application/dataset_store.py | 11 +++ ingestify/cmdline.py | 28 ++++++ .../store/dataset/sqlalchemy/repository.py | 37 ++++++++ ingestify/main.py | 10 ++ ingestify/tests/test_identifier_indexes.py | 92 +++++++++++++++++++ 7 files changed, 219 insertions(+) create mode 100644 ingestify/tests/test_identifier_indexes.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a83f11e..f2ca0f1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -76,3 +76,43 @@ jobs: INGESTIFY_TEST_DATABASE_URL: mysql://root:root@127.0.0.1:3306/ingestify_test run: | pytest --color=yes + + test-postgres: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.11"] + + services: + postgres: + image: postgres:15 + env: + POSTGRES_USER: ingestify + POSTGRES_PASSWORD: ingestify + POSTGRES_DB: ingestify_test + ports: + - 5432:5432 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e ".[test]" + - name: Install Postgres dependencies + run: | + pip install psycopg2-binary + - name: Test with pytest + env: + INGESTIFY_TEST_DATABASE_URL: postgresql://ingestify:ingestify@localhost:5432/ingestify_test + run: | + pytest --color=yes diff --git a/docs/configuration.md b/docs/configuration.md index 0313e5d..784f362 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -103,6 +103,7 @@ dataset_types: - `provider`: Provider name (must match a source's provider) - `dataset_type`: Type of dataset (e.g., "match", "player", "team") +- `identifier_index`: When `true`, a composite PostgreSQL expression index on all `identifier_keys` is created when `ingestify sync-indexes` is run. Use this for high-cardinality dataset types (e.g. one dataset per keyword). Never runs automatically — must be triggered explicitly to avoid locking large tables. - `identifier_keys`: Keys that uniquely identify datasets - Each key can have a transformation applied to standardize the format - Common transformations: diff --git a/ingestify/application/dataset_store.py b/ingestify/application/dataset_store.py index ffc6b46..81c469f 100644 --- a/ingestify/application/dataset_store.py +++ b/ingestify/application/dataset_store.py @@ -111,18 +111,29 @@ def __init__( dataset_repository: DatasetRepository, file_repository: FileRepository, bucket: str, + identifier_index_configs: list = None, ): self.dataset_repository = dataset_repository self.file_repository = file_repository self.storage_compression_method = "gzip" self.bucket = bucket self.event_bus: Optional[EventBus] = None + self._identifier_index_configs = identifier_index_configs or [] # Pass current version to repository for validation/migration from ingestify import __version__ self.dataset_repository.ensure_compatible_version(__version__) + def create_indexes(self): + """Create identifier expression indexes for configured dataset types. + + Only runs on PostgreSQL. Safe to call multiple times (IF NOT EXISTS). + Should be triggered explicitly (e.g. via `ingestify sync-indexes`), + never automatically, as it can be slow on large tables. + """ + self.dataset_repository.create_identifier_indexes(self._identifier_index_configs) + @property def _thread_local(self): if not hasattr(self, "_thread_local_"): diff --git a/ingestify/cmdline.py b/ingestify/cmdline.py index 130ca48..785c41e 100644 --- a/ingestify/cmdline.py +++ b/ingestify/cmdline.py @@ -244,6 +244,34 @@ def delete_dataset( # +@cli.command("sync-indexes") +@click.option( + "--config", + "config_file", + required=False, + help="Yaml config file", + type=click.Path(exists=True), + default=get_default_config, +) +@click.option( + "--bucket", + "bucket", + required=False, + help="bucket", + type=str, +) +def sync_indexes(config_file: str, bucket: Optional[str]): + """Create identifier expression indexes for dataset types with identifier_index: true. + + Safe to run multiple times (uses IF NOT EXISTS). Only affects PostgreSQL. + Run this explicitly after initial setup or after adding new indexed dataset types — + never runs automatically to avoid locking large tables unexpectedly. + """ + engine = get_engine(config_file, bucket) + engine.store.create_indexes() + logger.info("Done") + + def main(): logging.basicConfig( level=logging.INFO, diff --git a/ingestify/infra/store/dataset/sqlalchemy/repository.py b/ingestify/infra/store/dataset/sqlalchemy/repository.py index 7627ec5..3f3a19c 100644 --- a/ingestify/infra/store/dataset/sqlalchemy/repository.py +++ b/ingestify/infra/store/dataset/sqlalchemy/repository.py @@ -146,6 +146,40 @@ def close(self): def create_all_tables(self): self.metadata.create_all(self.engine) + def create_identifier_indexes(self, index_configs: list[dict]): + """Create composite expression indexes on identifier JSONB keys (Postgres only). + + Each entry in index_configs should have: + - name: a label used in the index name + - keys: list of identifier key names to include in the composite index + + Generates one functional index per config entry: + CREATE INDEX IF NOT EXISTS idx_dataset_identifier_ + ON dataset ((identifier->>'key1'), (identifier->>'key2'), ...) + + Call this explicitly (e.g. via `ingestify sync-indexes`) when datasets + have high-cardinality identifiers that are queried frequently. + """ + if self.engine.dialect.name != "postgresql": + logger.info("Skipping identifier indexes: not a PostgreSQL database") + return + + table_name = f"{self.table_prefix}dataset" + with self.engine.connect() as conn: + for config in index_configs: + name = config["name"] + keys = config["keys"] + index_name = f"{self.table_prefix}idx_dataset_identifier_{name}" + expressions = ", ".join(f"(identifier->>'{k}')" for k in keys) + conn.execute( + text( + f"CREATE INDEX IF NOT EXISTS {index_name} " + f"ON {table_name} ({expressions})" + ) + ) + logger.info("Created index %s on keys: %s", index_name, keys) + conn.commit() + def drop_all_tables(self): """Drop all tables in the database. Useful for test cleanup.""" if hasattr(self, "metadata") and hasattr(self, "engine"): @@ -159,6 +193,9 @@ class SqlAlchemyDatasetRepository(DatasetRepository): def __init__(self, session_provider: SqlAlchemySessionProvider): self.session_provider = session_provider + def create_identifier_indexes(self, index_configs: list[dict]): + self.session_provider.create_identifier_indexes(index_configs) + @property def session(self): return self.session_provider.get() diff --git a/ingestify/main.py b/ingestify/main.py index e60b09a..64d8601 100644 --- a/ingestify/main.py +++ b/ingestify/main.py @@ -137,10 +137,20 @@ def get_dataset_store_by_urls( dataset_repository = SqlAlchemyDatasetRepository(sqlalchemy_session_provider) + identifier_index_configs = [ + { + "name": dt["dataset_type"], + "keys": list(dt["identifier_keys"].keys()), + } + for dt in dataset_types + if dt.get("identifier_index") + ] + return DatasetStore( dataset_repository=dataset_repository, file_repository=file_repository, bucket=bucket, + identifier_index_configs=identifier_index_configs, ) diff --git a/ingestify/tests/test_identifier_indexes.py b/ingestify/tests/test_identifier_indexes.py new file mode 100644 index 0000000..b0f10bb --- /dev/null +++ b/ingestify/tests/test_identifier_indexes.py @@ -0,0 +1,92 @@ +"""Tests for create_identifier_indexes / sync-indexes functionality.""" +import os + +import pytest +import sqlalchemy + +from ingestify.application.dataset_store import DatasetStore +from ingestify.infra.store.dataset.sqlalchemy.repository import ( + SqlAlchemySessionProvider, + SqlAlchemyDatasetRepository, +) + + +INDEX_CONFIGS = [ + {"name": "keyword_metrics", "keys": ["keyword"]}, + {"name": "keyword_set", "keys": ["dataset_id", "table_name"]}, +] + + +@pytest.fixture +def repository(ingestify_test_database_url, db_cleanup): + provider = SqlAlchemySessionProvider(ingestify_test_database_url) + repo = SqlAlchemyDatasetRepository(provider) + yield repo + # Drop test indexes so they don't leak between runs + if provider.engine.dialect.name == "postgresql": + with provider.engine.connect() as conn: + for config in INDEX_CONFIGS: + conn.execute( + sqlalchemy.text( + f"DROP INDEX IF EXISTS idx_dataset_identifier_{config['name']}" + ) + ) + conn.commit() + provider.drop_all_tables() + + +def test_create_identifier_indexes_sqlite_noop(repository): + """No-op on non-Postgres databases — must not raise.""" + repository.create_identifier_indexes(INDEX_CONFIGS) + + +def test_create_identifier_indexes_creates_indexes(repository): + """Creates composite expression indexes on Postgres; skipped on other DBs.""" + if repository.session_provider.engine.dialect.name != "postgresql": + pytest.skip("Expression index test requires PostgreSQL") + + repository.create_identifier_indexes(INDEX_CONFIGS) + + with repository.session_provider.engine.connect() as conn: + result = conn.execute( + sqlalchemy.text( + "SELECT indexname FROM pg_indexes " + "WHERE tablename = 'dataset' AND indexname LIKE 'idx_dataset_identifier_%'" + ) + ) + index_names = {row[0] for row in result} + + assert "idx_dataset_identifier_keyword_metrics" in index_names + assert "idx_dataset_identifier_keyword_set" in index_names + + +def test_create_identifier_indexes_idempotent(repository): + """Running twice does not raise (IF NOT EXISTS).""" + repository.create_identifier_indexes(INDEX_CONFIGS) + repository.create_identifier_indexes(INDEX_CONFIGS) + + +def test_dataset_store_create_indexes(repository, tmp_path): + """DatasetStore.create_indexes() delegates to the repository.""" + from ingestify.infra.store.file.local_file_repository import LocalFileRepository + + store = DatasetStore( + dataset_repository=repository, + file_repository=LocalFileRepository(str(tmp_path)), + bucket="test", + identifier_index_configs=INDEX_CONFIGS, + ) + store.create_indexes() + + +def test_dataset_store_create_indexes_empty(repository, tmp_path): + """create_indexes() with empty configs is a no-op.""" + from ingestify.infra.store.file.local_file_repository import LocalFileRepository + + store = DatasetStore( + dataset_repository=repository, + file_repository=LocalFileRepository(str(tmp_path)), + bucket="test", + identifier_index_configs=[], + ) + store.create_indexes() From bf8e465b5da573b96667770303f8d3739e9c52dc Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Wed, 1 Apr 2026 14:56:41 +0200 Subject: [PATCH 2/9] Expand docs for identifier_index with YAML examples and sync-indexes usage --- docs/configuration.md | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index 784f362..9204228 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -104,6 +104,46 @@ dataset_types: - `provider`: Provider name (must match a source's provider) - `dataset_type`: Type of dataset (e.g., "match", "player", "team") - `identifier_index`: When `true`, a composite PostgreSQL expression index on all `identifier_keys` is created when `ingestify sync-indexes` is run. Use this for high-cardinality dataset types (e.g. one dataset per keyword). Never runs automatically — must be triggered explicitly to avoid locking large tables. + + Example — one dataset per keyword with an expression index: + ```yaml + dataset_types: + - provider: keyword_ads + dataset_type: keyword_metrics + identifier_index: true + identifier_keys: + keyword: + transformation: str + ``` + + After adding `identifier_index: true`, run once to create the index: + ```bash + ingestify sync-indexes --config config.yaml + ``` + + This creates: + ```sql + CREATE INDEX IF NOT EXISTS idx_dataset_identifier_keyword_metrics + ON dataset ((identifier->>'keyword')); + ``` + + For composite identifiers all keys are combined into a single index: + ```yaml + dataset_types: + - provider: keyword_ads + dataset_type: keyword_set + identifier_index: true + identifier_keys: + dataset_id: + transformation: str + table_name: + transformation: str + ``` + ```sql + CREATE INDEX IF NOT EXISTS idx_dataset_identifier_keyword_set + ON dataset ((identifier->>'dataset_id'), (identifier->>'table_name')); + ``` + - `identifier_keys`: Keys that uniquely identify datasets - Each key can have a transformation applied to standardize the format - Common transformations: From bfcdc594781e17da013594cd374ff49470dfbb32 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Wed, 1 Apr 2026 15:14:49 +0200 Subject: [PATCH 3/9] Apply black formatting --- ingestify/application/dataset_store.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ingestify/application/dataset_store.py b/ingestify/application/dataset_store.py index 81c469f..a98d425 100644 --- a/ingestify/application/dataset_store.py +++ b/ingestify/application/dataset_store.py @@ -132,7 +132,9 @@ def create_indexes(self): Should be triggered explicitly (e.g. via `ingestify sync-indexes`), never automatically, as it can be slow on large tables. """ - self.dataset_repository.create_identifier_indexes(self._identifier_index_configs) + self.dataset_repository.create_identifier_indexes( + self._identifier_index_configs + ) @property def _thread_local(self): From 27b75877612108bcffd91cea8bdb54c00de6cb85 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Wed, 1 Apr 2026 15:46:34 +0200 Subject: [PATCH 4/9] Fix test fixtures: use engine from conftest instead of LocalFileRepository --- ingestify/tests/test_identifier_indexes.py | 26 +++++----------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/ingestify/tests/test_identifier_indexes.py b/ingestify/tests/test_identifier_indexes.py index b0f10bb..c7edc90 100644 --- a/ingestify/tests/test_identifier_indexes.py +++ b/ingestify/tests/test_identifier_indexes.py @@ -66,27 +66,13 @@ def test_create_identifier_indexes_idempotent(repository): repository.create_identifier_indexes(INDEX_CONFIGS) -def test_dataset_store_create_indexes(repository, tmp_path): +def test_dataset_store_create_indexes(engine): """DatasetStore.create_indexes() delegates to the repository.""" - from ingestify.infra.store.file.local_file_repository import LocalFileRepository + engine.store._identifier_index_configs = INDEX_CONFIGS + engine.store.create_indexes() - store = DatasetStore( - dataset_repository=repository, - file_repository=LocalFileRepository(str(tmp_path)), - bucket="test", - identifier_index_configs=INDEX_CONFIGS, - ) - store.create_indexes() - -def test_dataset_store_create_indexes_empty(repository, tmp_path): +def test_dataset_store_create_indexes_empty(engine): """create_indexes() with empty configs is a no-op.""" - from ingestify.infra.store.file.local_file_repository import LocalFileRepository - - store = DatasetStore( - dataset_repository=repository, - file_repository=LocalFileRepository(str(tmp_path)), - bucket="test", - identifier_index_configs=[], - ) - store.create_indexes() + engine.store._identifier_index_configs = [] + engine.store.create_indexes() From 54f20cac881b370490b91825c618608d16b5143c Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Fri, 3 Apr 2026 12:46:16 +0200 Subject: [PATCH 5/9] Add key_type to identifier keys for typed JSONB index expressions - IdentifierTransformer now stores and returns declared key_type per key - register_transformation() accepts optional key_type ('str' or 'int') - Repository query building uses declared key_type for JSONB cast instead of inferring from Python value type at runtime - create_identifier_indexes() generates typed expressions: (identifier->>'key') for str, ((identifier->>'key')::integer) for int - main.py passes key_type from config to both transformer and index configs - Tests updated to use new dict key format {name, key_type} --- .../services/identifier_key_transformer.py | 15 ++++++++++ .../store/dataset/sqlalchemy/repository.py | 29 +++++++++++++++---- ingestify/main.py | 13 +++++++-- ingestify/tests/test_identifier_indexes.py | 10 +++++-- 4 files changed, 57 insertions(+), 10 deletions(-) diff --git a/ingestify/domain/services/identifier_key_transformer.py b/ingestify/domain/services/identifier_key_transformer.py index a473f22..0aa86b3 100644 --- a/ingestify/domain/services/identifier_key_transformer.py +++ b/ingestify/domain/services/identifier_key_transformer.py @@ -64,6 +64,8 @@ class IdentifierTransformer: def __init__(self): # Mapping of (provider, dataset_type, id_key) to the transformation self.key_transformations: dict[tuple[str, str, str], Transformation] = {} + # Mapping of (provider, dataset_type, id_key) to the declared key type ('str' or 'int') + self.key_types: dict[tuple[str, str, str], str] = {} def register_transformation( self, @@ -71,14 +73,27 @@ def register_transformation( dataset_type: str, id_key: str, transformation: Union[Transformation, dict], + key_type: Optional[str] = None, ): """ Registers a transformation for a specific (provider, dataset_type, id_key). + + key_type: declared value type of this identifier key ('str' or 'int'). + When set, the repository uses this to cast JSONB values in queries and + to generate matching expression indexes via sync-indexes. """ if isinstance(transformation, dict): transformation = Transformation.from_dict(transformation) self.key_transformations[(provider, dataset_type, id_key)] = transformation + if key_type is not None: + self.key_types[(provider, dataset_type, id_key)] = key_type + + def get_key_type( + self, provider: str, dataset_type: str, id_key: str + ) -> Optional[str]: + """Returns the declared key type ('str' or 'int'), or None if not declared.""" + return self.key_types.get((provider, dataset_type, id_key)) def get_transformation( self, provider: str, dataset_type: str, id_key: str diff --git a/ingestify/infra/store/dataset/sqlalchemy/repository.py b/ingestify/infra/store/dataset/sqlalchemy/repository.py index 3f3a19c..30d85af 100644 --- a/ingestify/infra/store/dataset/sqlalchemy/repository.py +++ b/ingestify/infra/store/dataset/sqlalchemy/repository.py @@ -170,7 +170,12 @@ def create_identifier_indexes(self, index_configs: list[dict]): name = config["name"] keys = config["keys"] index_name = f"{self.table_prefix}idx_dataset_identifier_{name}" - expressions = ", ".join(f"(identifier->>'{k}')" for k in keys) + expressions = ", ".join( + f"((identifier->>'{k['name']}')::integer)" + if isinstance(k, dict) and k.get("key_type") == "int" + else f"(identifier->>'{k['name'] if isinstance(k, dict) else k}')" + for k in keys + ) conn.execute( text( f"CREATE INDEX IF NOT EXISTS {index_name} " @@ -190,8 +195,11 @@ def get(self): class SqlAlchemyDatasetRepository(DatasetRepository): - def __init__(self, session_provider: SqlAlchemySessionProvider): + def __init__( + self, session_provider: SqlAlchemySessionProvider, identifier_transformer=None + ): self.session_provider = session_provider + self._identifier_transformer = identifier_transformer def create_identifier_indexes(self, index_configs: list[dict]): self.session_provider.create_identifier_indexes(index_configs) @@ -379,10 +387,19 @@ def _filter_query( if dialect == "postgresql": column = self.dataset_table.c.identifier[k] - # Take the value from the first selector to determine the type. - # TODO: check all selectors to determine the type - v = first_selector[k] - if isinstance(v, int): + # Use declared key_type when available so the cast matches + # the expression index created by sync-indexes. + # Fall back to inferring from the runtime value type. + declared_type = ( + self._identifier_transformer.get_key_type( + provider, dataset_type, k + ) + if self._identifier_transformer + else None + ) + if declared_type == "int" or ( + declared_type is None and isinstance(first_selector[k], int) + ): column = column.as_integer() else: column = column.as_string() diff --git a/ingestify/main.py b/ingestify/main.py index 64d8601..b40ea08 100644 --- a/ingestify/main.py +++ b/ingestify/main.py @@ -116,6 +116,7 @@ def get_dataset_store_by_urls( dataset_type=dataset_type["dataset_type"], id_key=id_key, transformation=id_config["transformation"], + key_type=id_config.get("key_type"), ) file_repository = build_file_repository( @@ -135,12 +136,20 @@ def get_dataset_store_by_urls( metadata_url, table_prefix=table_prefix ) - dataset_repository = SqlAlchemyDatasetRepository(sqlalchemy_session_provider) + dataset_repository = SqlAlchemyDatasetRepository( + sqlalchemy_session_provider, identifier_transformer=identifier_transformer + ) identifier_index_configs = [ { "name": dt["dataset_type"], - "keys": list(dt["identifier_keys"].keys()), + "keys": [ + { + "name": k, + "key_type": v.get("key_type", "str"), + } + for k, v in dt["identifier_keys"].items() + ], } for dt in dataset_types if dt.get("identifier_index") diff --git a/ingestify/tests/test_identifier_indexes.py b/ingestify/tests/test_identifier_indexes.py index c7edc90..4703a0c 100644 --- a/ingestify/tests/test_identifier_indexes.py +++ b/ingestify/tests/test_identifier_indexes.py @@ -12,8 +12,14 @@ INDEX_CONFIGS = [ - {"name": "keyword_metrics", "keys": ["keyword"]}, - {"name": "keyword_set", "keys": ["dataset_id", "table_name"]}, + {"name": "keyword_metrics", "keys": [{"name": "keyword", "key_type": "str"}]}, + { + "name": "keyword_set", + "keys": [ + {"name": "dataset_id", "key_type": "int"}, + {"name": "table_name", "key_type": "str"}, + ], + }, ] From 0a1f78557e01106d121e9eb1afe9f5ff88e1f657 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Fri, 3 Apr 2026 14:18:26 +0200 Subject: [PATCH 6/9] Use partial index (WHERE dataset_type = '...') for identifier indexes Limits each index to a single dataset_type, so it is smaller and dataset_type is an implicit condition rather than a post-scan filter. --- .../store/dataset/sqlalchemy/repository.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/ingestify/infra/store/dataset/sqlalchemy/repository.py b/ingestify/infra/store/dataset/sqlalchemy/repository.py index 30d85af..b0b68c1 100644 --- a/ingestify/infra/store/dataset/sqlalchemy/repository.py +++ b/ingestify/infra/store/dataset/sqlalchemy/repository.py @@ -147,15 +147,19 @@ def create_all_tables(self): self.metadata.create_all(self.engine) def create_identifier_indexes(self, index_configs: list[dict]): - """Create composite expression indexes on identifier JSONB keys (Postgres only). + """Create partial expression indexes on identifier JSONB keys (Postgres only). Each entry in index_configs should have: - - name: a label used in the index name - - keys: list of identifier key names to include in the composite index + - name: a label used in the index name (typically the dataset_type) + - keys: list of identifier key dicts {name, key_type} to index - Generates one functional index per config entry: + Generates one partial index per config entry: CREATE INDEX IF NOT EXISTS idx_dataset_identifier_ - ON dataset ((identifier->>'key1'), (identifier->>'key2'), ...) + ON dataset ((identifier->>'key1'), ((identifier->>'key2')::integer), ...) + WHERE dataset_type = '' + + The WHERE clause limits the index to a single dataset_type, making it + smaller and ensuring dataset_type is never a post-scan filter. Call this explicitly (e.g. via `ingestify sync-indexes`) when datasets have high-cardinality identifiers that are queried frequently. @@ -179,7 +183,8 @@ def create_identifier_indexes(self, index_configs: list[dict]): conn.execute( text( f"CREATE INDEX IF NOT EXISTS {index_name} " - f"ON {table_name} ({expressions})" + f"ON {table_name} ({expressions}) " + f"WHERE dataset_type = '{name}'" ) ) logger.info("Created index %s on keys: %s", index_name, keys) From 4190d16f8063ddb8f8ca28fdc6adf16e4465fc3d Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Fri, 3 Apr 2026 14:27:10 +0200 Subject: [PATCH 7/9] Include provider in partial index predicate and index name Two different providers can share the same dataset_type name, so the partial index WHERE clause now matches both provider and dataset_type. Index name uses provider_dataset_type to avoid collisions. --- .../infra/store/dataset/sqlalchemy/repository.py | 14 +++++++++----- ingestify/main.py | 4 +++- ingestify/tests/test_identifier_indexes.py | 11 +++++++++-- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/ingestify/infra/store/dataset/sqlalchemy/repository.py b/ingestify/infra/store/dataset/sqlalchemy/repository.py index b0b68c1..281d312 100644 --- a/ingestify/infra/store/dataset/sqlalchemy/repository.py +++ b/ingestify/infra/store/dataset/sqlalchemy/repository.py @@ -150,16 +150,18 @@ def create_identifier_indexes(self, index_configs: list[dict]): """Create partial expression indexes on identifier JSONB keys (Postgres only). Each entry in index_configs should have: - - name: a label used in the index name (typically the dataset_type) + - name: a label used in the index name (typically provider_dataset_type) + - provider: provider value to match in the partial index predicate + - dataset_type: dataset_type value to match in the partial index predicate - keys: list of identifier key dicts {name, key_type} to index Generates one partial index per config entry: CREATE INDEX IF NOT EXISTS idx_dataset_identifier_ ON dataset ((identifier->>'key1'), ((identifier->>'key2')::integer), ...) - WHERE dataset_type = '' + WHERE provider = '' AND dataset_type = '' - The WHERE clause limits the index to a single dataset_type, making it - smaller and ensuring dataset_type is never a post-scan filter. + The WHERE clause limits the index to a single (provider, dataset_type) pair, + making it smaller and ensuring neither column is a post-scan filter. Call this explicitly (e.g. via `ingestify sync-indexes`) when datasets have high-cardinality identifiers that are queried frequently. @@ -172,6 +174,8 @@ def create_identifier_indexes(self, index_configs: list[dict]): with self.engine.connect() as conn: for config in index_configs: name = config["name"] + provider = config["provider"] + dataset_type = config["dataset_type"] keys = config["keys"] index_name = f"{self.table_prefix}idx_dataset_identifier_{name}" expressions = ", ".join( @@ -184,7 +188,7 @@ def create_identifier_indexes(self, index_configs: list[dict]): text( f"CREATE INDEX IF NOT EXISTS {index_name} " f"ON {table_name} ({expressions}) " - f"WHERE dataset_type = '{name}'" + f"WHERE provider = '{provider}' AND dataset_type = '{dataset_type}'" ) ) logger.info("Created index %s on keys: %s", index_name, keys) diff --git a/ingestify/main.py b/ingestify/main.py index b40ea08..f195631 100644 --- a/ingestify/main.py +++ b/ingestify/main.py @@ -142,7 +142,9 @@ def get_dataset_store_by_urls( identifier_index_configs = [ { - "name": dt["dataset_type"], + "name": f"{dt['provider']}_{dt['dataset_type']}", + "provider": dt["provider"], + "dataset_type": dt["dataset_type"], "keys": [ { "name": k, diff --git a/ingestify/tests/test_identifier_indexes.py b/ingestify/tests/test_identifier_indexes.py index 4703a0c..433da44 100644 --- a/ingestify/tests/test_identifier_indexes.py +++ b/ingestify/tests/test_identifier_indexes.py @@ -12,9 +12,16 @@ INDEX_CONFIGS = [ - {"name": "keyword_metrics", "keys": [{"name": "keyword", "key_type": "str"}]}, { - "name": "keyword_set", + "name": "test_keyword_metrics", + "provider": "test", + "dataset_type": "keyword_metrics", + "keys": [{"name": "keyword", "key_type": "str"}], + }, + { + "name": "test_keyword_set", + "provider": "test", + "dataset_type": "keyword_set", "keys": [ {"name": "dataset_id", "key_type": "int"}, {"name": "table_name", "key_type": "str"}, From 755de1dc25aaae06005f2a3254a3745524b8d759 Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Fri, 3 Apr 2026 14:32:37 +0200 Subject: [PATCH 8/9] Fix test assertions to match provider_dataset_type index name format --- ingestify/tests/test_identifier_indexes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ingestify/tests/test_identifier_indexes.py b/ingestify/tests/test_identifier_indexes.py index 433da44..b48b110 100644 --- a/ingestify/tests/test_identifier_indexes.py +++ b/ingestify/tests/test_identifier_indexes.py @@ -69,8 +69,8 @@ def test_create_identifier_indexes_creates_indexes(repository): ) index_names = {row[0] for row in result} - assert "idx_dataset_identifier_keyword_metrics" in index_names - assert "idx_dataset_identifier_keyword_set" in index_names + assert "idx_dataset_identifier_test_keyword_metrics" in index_names + assert "idx_dataset_identifier_test_keyword_set" in index_names def test_create_identifier_indexes_idempotent(repository): From 1710ef1e827cfb2f084bb64c5f3994ba8ed3cc6b Mon Sep 17 00:00:00 2001 From: Koen Vossen Date: Fri, 3 Apr 2026 14:44:34 +0200 Subject: [PATCH 9/9] Update docs: key_type, partial index predicate, correct index names --- docs/configuration.md | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 9204228..f73bc04 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -103,7 +103,7 @@ dataset_types: - `provider`: Provider name (must match a source's provider) - `dataset_type`: Type of dataset (e.g., "match", "player", "team") -- `identifier_index`: When `true`, a composite PostgreSQL expression index on all `identifier_keys` is created when `ingestify sync-indexes` is run. Use this for high-cardinality dataset types (e.g. one dataset per keyword). Never runs automatically — must be triggered explicitly to avoid locking large tables. +- `identifier_index`: When `true`, a partial PostgreSQL expression index on all `identifier_keys` is created when `ingestify sync-indexes` is run. Use this for high-cardinality dataset types (e.g. one dataset per keyword). Never runs automatically — must be triggered explicitly to avoid locking large tables. Example — one dataset per keyword with an expression index: ```yaml @@ -113,7 +113,8 @@ dataset_types: identifier_index: true identifier_keys: keyword: - transformation: str + transformation: identity + key_type: str ``` After adding `identifier_index: true`, run once to create the index: @@ -121,13 +122,14 @@ dataset_types: ingestify sync-indexes --config config.yaml ``` - This creates: + This creates a partial index scoped to the specific `(provider, dataset_type)` pair: ```sql - CREATE INDEX IF NOT EXISTS idx_dataset_identifier_keyword_metrics - ON dataset ((identifier->>'keyword')); + CREATE INDEX IF NOT EXISTS idx_dataset_identifier_keyword_ads_keyword_metrics + ON dataset ((identifier->>'keyword')) + WHERE provider = 'keyword_ads' AND dataset_type = 'keyword_metrics'; ``` - For composite identifiers all keys are combined into a single index: + For composite identifiers with mixed types, all keys are combined into a single index: ```yaml dataset_types: - provider: keyword_ads @@ -135,21 +137,24 @@ dataset_types: identifier_index: true identifier_keys: dataset_id: - transformation: str + transformation: identity + key_type: int table_name: - transformation: str + transformation: identity + key_type: str ``` ```sql - CREATE INDEX IF NOT EXISTS idx_dataset_identifier_keyword_set - ON dataset ((identifier->>'dataset_id'), (identifier->>'table_name')); + CREATE INDEX IF NOT EXISTS idx_dataset_identifier_keyword_ads_keyword_set + ON dataset (((identifier->>'dataset_id')::integer), (identifier->>'table_name')) + WHERE provider = 'keyword_ads' AND dataset_type = 'keyword_set'; ``` - `identifier_keys`: Keys that uniquely identify datasets - - Each key can have a transformation applied to standardize the format + - Each key can have a `transformation` and an optional `key_type` + - `key_type`: Declared value type of the key (`str` or `int`). When set, the repository uses this to cast JSONB values in queries and to generate the correct expression in `sync-indexes`. Defaults to `str`. - Common transformations: - - `str`: Convert to string - - `int`: Convert to integer - - Bucket transformation: + - `identity`: Use value as-is (default) + - Bucket transformation (for file path organisation only — not related to indexing): ```yaml transformation: type: bucket