diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a83f11e..f2ca0f1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -76,3 +76,43 @@ jobs: INGESTIFY_TEST_DATABASE_URL: mysql://root:root@127.0.0.1:3306/ingestify_test run: | pytest --color=yes + + test-postgres: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.11"] + + services: + postgres: + image: postgres:15 + env: + POSTGRES_USER: ingestify + POSTGRES_PASSWORD: ingestify + POSTGRES_DB: ingestify_test + ports: + - 5432:5432 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e ".[test]" + - name: Install Postgres dependencies + run: | + pip install psycopg2-binary + - name: Test with pytest + env: + INGESTIFY_TEST_DATABASE_URL: postgresql://ingestify:ingestify@localhost:5432/ingestify_test + run: | + pytest --color=yes diff --git a/docs/configuration.md b/docs/configuration.md index 0313e5d..f73bc04 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -103,12 +103,58 @@ dataset_types: - `provider`: Provider name (must match a source's provider) - `dataset_type`: Type of dataset (e.g., "match", "player", "team") +- `identifier_index`: When `true`, a partial PostgreSQL expression index on all `identifier_keys` is created when `ingestify sync-indexes` is run. Use this for high-cardinality dataset types (e.g. one dataset per keyword). Never runs automatically — must be triggered explicitly to avoid locking large tables. + + Example — one dataset per keyword with an expression index: + ```yaml + dataset_types: + - provider: keyword_ads + dataset_type: keyword_metrics + identifier_index: true + identifier_keys: + keyword: + transformation: identity + key_type: str + ``` + + After adding `identifier_index: true`, run once to create the index: + ```bash + ingestify sync-indexes --config config.yaml + ``` + + This creates a partial index scoped to the specific `(provider, dataset_type)` pair: + ```sql + CREATE INDEX IF NOT EXISTS idx_dataset_identifier_keyword_ads_keyword_metrics + ON dataset ((identifier->>'keyword')) + WHERE provider = 'keyword_ads' AND dataset_type = 'keyword_metrics'; + ``` + + For composite identifiers with mixed types, all keys are combined into a single index: + ```yaml + dataset_types: + - provider: keyword_ads + dataset_type: keyword_set + identifier_index: true + identifier_keys: + dataset_id: + transformation: identity + key_type: int + table_name: + transformation: identity + key_type: str + ``` + ```sql + CREATE INDEX IF NOT EXISTS idx_dataset_identifier_keyword_ads_keyword_set + ON dataset (((identifier->>'dataset_id')::integer), (identifier->>'table_name')) + WHERE provider = 'keyword_ads' AND dataset_type = 'keyword_set'; + ``` + - `identifier_keys`: Keys that uniquely identify datasets - - Each key can have a transformation applied to standardize the format + - Each key can have a `transformation` and an optional `key_type` + - `key_type`: Declared value type of the key (`str` or `int`). When set, the repository uses this to cast JSONB values in queries and to generate the correct expression in `sync-indexes`. Defaults to `str`. - Common transformations: - - `str`: Convert to string - - `int`: Convert to integer - - Bucket transformation: + - `identity`: Use value as-is (default) + - Bucket transformation (for file path organisation only — not related to indexing): ```yaml transformation: type: bucket diff --git a/ingestify/application/dataset_store.py b/ingestify/application/dataset_store.py index ffc6b46..a98d425 100644 --- a/ingestify/application/dataset_store.py +++ b/ingestify/application/dataset_store.py @@ -111,18 +111,31 @@ def __init__( dataset_repository: DatasetRepository, file_repository: FileRepository, bucket: str, + identifier_index_configs: list = None, ): self.dataset_repository = dataset_repository self.file_repository = file_repository self.storage_compression_method = "gzip" self.bucket = bucket self.event_bus: Optional[EventBus] = None + self._identifier_index_configs = identifier_index_configs or [] # Pass current version to repository for validation/migration from ingestify import __version__ self.dataset_repository.ensure_compatible_version(__version__) + def create_indexes(self): + """Create identifier expression indexes for configured dataset types. + + Only runs on PostgreSQL. Safe to call multiple times (IF NOT EXISTS). + Should be triggered explicitly (e.g. via `ingestify sync-indexes`), + never automatically, as it can be slow on large tables. + """ + self.dataset_repository.create_identifier_indexes( + self._identifier_index_configs + ) + @property def _thread_local(self): if not hasattr(self, "_thread_local_"): diff --git a/ingestify/cmdline.py b/ingestify/cmdline.py index 130ca48..785c41e 100644 --- a/ingestify/cmdline.py +++ b/ingestify/cmdline.py @@ -244,6 +244,34 @@ def delete_dataset( # +@cli.command("sync-indexes") +@click.option( + "--config", + "config_file", + required=False, + help="Yaml config file", + type=click.Path(exists=True), + default=get_default_config, +) +@click.option( + "--bucket", + "bucket", + required=False, + help="bucket", + type=str, +) +def sync_indexes(config_file: str, bucket: Optional[str]): + """Create identifier expression indexes for dataset types with identifier_index: true. + + Safe to run multiple times (uses IF NOT EXISTS). Only affects PostgreSQL. + Run this explicitly after initial setup or after adding new indexed dataset types — + never runs automatically to avoid locking large tables unexpectedly. + """ + engine = get_engine(config_file, bucket) + engine.store.create_indexes() + logger.info("Done") + + def main(): logging.basicConfig( level=logging.INFO, diff --git a/ingestify/domain/services/identifier_key_transformer.py b/ingestify/domain/services/identifier_key_transformer.py index a473f22..0aa86b3 100644 --- a/ingestify/domain/services/identifier_key_transformer.py +++ b/ingestify/domain/services/identifier_key_transformer.py @@ -64,6 +64,8 @@ class IdentifierTransformer: def __init__(self): # Mapping of (provider, dataset_type, id_key) to the transformation self.key_transformations: dict[tuple[str, str, str], Transformation] = {} + # Mapping of (provider, dataset_type, id_key) to the declared key type ('str' or 'int') + self.key_types: dict[tuple[str, str, str], str] = {} def register_transformation( self, @@ -71,14 +73,27 @@ def register_transformation( dataset_type: str, id_key: str, transformation: Union[Transformation, dict], + key_type: Optional[str] = None, ): """ Registers a transformation for a specific (provider, dataset_type, id_key). + + key_type: declared value type of this identifier key ('str' or 'int'). + When set, the repository uses this to cast JSONB values in queries and + to generate matching expression indexes via sync-indexes. """ if isinstance(transformation, dict): transformation = Transformation.from_dict(transformation) self.key_transformations[(provider, dataset_type, id_key)] = transformation + if key_type is not None: + self.key_types[(provider, dataset_type, id_key)] = key_type + + def get_key_type( + self, provider: str, dataset_type: str, id_key: str + ) -> Optional[str]: + """Returns the declared key type ('str' or 'int'), or None if not declared.""" + return self.key_types.get((provider, dataset_type, id_key)) def get_transformation( self, provider: str, dataset_type: str, id_key: str diff --git a/ingestify/infra/store/dataset/sqlalchemy/repository.py b/ingestify/infra/store/dataset/sqlalchemy/repository.py index 7627ec5..281d312 100644 --- a/ingestify/infra/store/dataset/sqlalchemy/repository.py +++ b/ingestify/infra/store/dataset/sqlalchemy/repository.py @@ -146,6 +146,54 @@ def close(self): def create_all_tables(self): self.metadata.create_all(self.engine) + def create_identifier_indexes(self, index_configs: list[dict]): + """Create partial expression indexes on identifier JSONB keys (Postgres only). + + Each entry in index_configs should have: + - name: a label used in the index name (typically provider_dataset_type) + - provider: provider value to match in the partial index predicate + - dataset_type: dataset_type value to match in the partial index predicate + - keys: list of identifier key dicts {name, key_type} to index + + Generates one partial index per config entry: + CREATE INDEX IF NOT EXISTS idx_dataset_identifier_ + ON dataset ((identifier->>'key1'), ((identifier->>'key2')::integer), ...) + WHERE provider = '' AND dataset_type = '' + + The WHERE clause limits the index to a single (provider, dataset_type) pair, + making it smaller and ensuring neither column is a post-scan filter. + + Call this explicitly (e.g. via `ingestify sync-indexes`) when datasets + have high-cardinality identifiers that are queried frequently. + """ + if self.engine.dialect.name != "postgresql": + logger.info("Skipping identifier indexes: not a PostgreSQL database") + return + + table_name = f"{self.table_prefix}dataset" + with self.engine.connect() as conn: + for config in index_configs: + name = config["name"] + provider = config["provider"] + dataset_type = config["dataset_type"] + keys = config["keys"] + index_name = f"{self.table_prefix}idx_dataset_identifier_{name}" + expressions = ", ".join( + f"((identifier->>'{k['name']}')::integer)" + if isinstance(k, dict) and k.get("key_type") == "int" + else f"(identifier->>'{k['name'] if isinstance(k, dict) else k}')" + for k in keys + ) + conn.execute( + text( + f"CREATE INDEX IF NOT EXISTS {index_name} " + f"ON {table_name} ({expressions}) " + f"WHERE provider = '{provider}' AND dataset_type = '{dataset_type}'" + ) + ) + logger.info("Created index %s on keys: %s", index_name, keys) + conn.commit() + def drop_all_tables(self): """Drop all tables in the database. Useful for test cleanup.""" if hasattr(self, "metadata") and hasattr(self, "engine"): @@ -156,8 +204,14 @@ def get(self): class SqlAlchemyDatasetRepository(DatasetRepository): - def __init__(self, session_provider: SqlAlchemySessionProvider): + def __init__( + self, session_provider: SqlAlchemySessionProvider, identifier_transformer=None + ): self.session_provider = session_provider + self._identifier_transformer = identifier_transformer + + def create_identifier_indexes(self, index_configs: list[dict]): + self.session_provider.create_identifier_indexes(index_configs) @property def session(self): @@ -342,10 +396,19 @@ def _filter_query( if dialect == "postgresql": column = self.dataset_table.c.identifier[k] - # Take the value from the first selector to determine the type. - # TODO: check all selectors to determine the type - v = first_selector[k] - if isinstance(v, int): + # Use declared key_type when available so the cast matches + # the expression index created by sync-indexes. + # Fall back to inferring from the runtime value type. + declared_type = ( + self._identifier_transformer.get_key_type( + provider, dataset_type, k + ) + if self._identifier_transformer + else None + ) + if declared_type == "int" or ( + declared_type is None and isinstance(first_selector[k], int) + ): column = column.as_integer() else: column = column.as_string() diff --git a/ingestify/main.py b/ingestify/main.py index e60b09a..f195631 100644 --- a/ingestify/main.py +++ b/ingestify/main.py @@ -116,6 +116,7 @@ def get_dataset_store_by_urls( dataset_type=dataset_type["dataset_type"], id_key=id_key, transformation=id_config["transformation"], + key_type=id_config.get("key_type"), ) file_repository = build_file_repository( @@ -135,12 +136,32 @@ def get_dataset_store_by_urls( metadata_url, table_prefix=table_prefix ) - dataset_repository = SqlAlchemyDatasetRepository(sqlalchemy_session_provider) + dataset_repository = SqlAlchemyDatasetRepository( + sqlalchemy_session_provider, identifier_transformer=identifier_transformer + ) + + identifier_index_configs = [ + { + "name": f"{dt['provider']}_{dt['dataset_type']}", + "provider": dt["provider"], + "dataset_type": dt["dataset_type"], + "keys": [ + { + "name": k, + "key_type": v.get("key_type", "str"), + } + for k, v in dt["identifier_keys"].items() + ], + } + for dt in dataset_types + if dt.get("identifier_index") + ] return DatasetStore( dataset_repository=dataset_repository, file_repository=file_repository, bucket=bucket, + identifier_index_configs=identifier_index_configs, ) diff --git a/ingestify/tests/test_identifier_indexes.py b/ingestify/tests/test_identifier_indexes.py new file mode 100644 index 0000000..b48b110 --- /dev/null +++ b/ingestify/tests/test_identifier_indexes.py @@ -0,0 +1,91 @@ +"""Tests for create_identifier_indexes / sync-indexes functionality.""" +import os + +import pytest +import sqlalchemy + +from ingestify.application.dataset_store import DatasetStore +from ingestify.infra.store.dataset.sqlalchemy.repository import ( + SqlAlchemySessionProvider, + SqlAlchemyDatasetRepository, +) + + +INDEX_CONFIGS = [ + { + "name": "test_keyword_metrics", + "provider": "test", + "dataset_type": "keyword_metrics", + "keys": [{"name": "keyword", "key_type": "str"}], + }, + { + "name": "test_keyword_set", + "provider": "test", + "dataset_type": "keyword_set", + "keys": [ + {"name": "dataset_id", "key_type": "int"}, + {"name": "table_name", "key_type": "str"}, + ], + }, +] + + +@pytest.fixture +def repository(ingestify_test_database_url, db_cleanup): + provider = SqlAlchemySessionProvider(ingestify_test_database_url) + repo = SqlAlchemyDatasetRepository(provider) + yield repo + # Drop test indexes so they don't leak between runs + if provider.engine.dialect.name == "postgresql": + with provider.engine.connect() as conn: + for config in INDEX_CONFIGS: + conn.execute( + sqlalchemy.text( + f"DROP INDEX IF EXISTS idx_dataset_identifier_{config['name']}" + ) + ) + conn.commit() + provider.drop_all_tables() + + +def test_create_identifier_indexes_sqlite_noop(repository): + """No-op on non-Postgres databases — must not raise.""" + repository.create_identifier_indexes(INDEX_CONFIGS) + + +def test_create_identifier_indexes_creates_indexes(repository): + """Creates composite expression indexes on Postgres; skipped on other DBs.""" + if repository.session_provider.engine.dialect.name != "postgresql": + pytest.skip("Expression index test requires PostgreSQL") + + repository.create_identifier_indexes(INDEX_CONFIGS) + + with repository.session_provider.engine.connect() as conn: + result = conn.execute( + sqlalchemy.text( + "SELECT indexname FROM pg_indexes " + "WHERE tablename = 'dataset' AND indexname LIKE 'idx_dataset_identifier_%'" + ) + ) + index_names = {row[0] for row in result} + + assert "idx_dataset_identifier_test_keyword_metrics" in index_names + assert "idx_dataset_identifier_test_keyword_set" in index_names + + +def test_create_identifier_indexes_idempotent(repository): + """Running twice does not raise (IF NOT EXISTS).""" + repository.create_identifier_indexes(INDEX_CONFIGS) + repository.create_identifier_indexes(INDEX_CONFIGS) + + +def test_dataset_store_create_indexes(engine): + """DatasetStore.create_indexes() delegates to the repository.""" + engine.store._identifier_index_configs = INDEX_CONFIGS + engine.store.create_indexes() + + +def test_dataset_store_create_indexes_empty(engine): + """create_indexes() with empty configs is a no-op.""" + engine.store._identifier_index_configs = [] + engine.store.create_indexes()