Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions ingestify/domain/services/identifier_key_transformer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import hashlib
import re
import unicodedata
from abc import ABC, abstractmethod
from urllib.parse import quote
from enum import Enum
from typing import Callable, Optional, Union

Expand All @@ -8,6 +12,7 @@
class TransformationType(Enum):
IDENTITY = "IDENTITY"
BUCKET = "BUCKET"
PREFIX = "PREFIX"
RANGE = "RANGE"
CUSTOM = "CUSTOM"

Expand All @@ -30,6 +35,8 @@ def from_dict(cls, config: dict) -> "Transformation":
type_ = config.pop("type")
if type_ == "bucket":
return BucketTransformation(**config)
elif type_ == "prefix":
return PrefixTransformation(**config)
else:
raise IngestifyError(f"Cannot build Transformation from {config}")

Expand All @@ -51,7 +58,10 @@ def __init__(self, bucket_size: int = None, bucket_count: int = None):

def __call__(self, id_key_value: Union[str, int]) -> str:
if self.bucket_count:
return str(int(id_key_value) % self.bucket_count)
# Always hash the string representation so bucketing is consistent
# regardless of whether the value looks numeric or not.
value = int(hashlib.md5(str(id_key_value).encode()).hexdigest(), 16)
return str(value % self.bucket_count)
elif self.bucket_size:
bucket_start = int(id_key_value) // self.bucket_size * self.bucket_size
bucket_end = bucket_start + self.bucket_size - 1
Expand All @@ -60,6 +70,19 @@ def __call__(self, id_key_value: Union[str, int]) -> str:
raise IngestifyError("Invalid BucketTransformation")


class PrefixTransformation(Transformation):
transformation_type = TransformationType.PREFIX

def __init__(self, length: int = 1):
self.length = length

def __call__(self, id_key_value: Union[str, int]) -> str:
# Transliterate unicode (ü→u, é→e) then strip non-alphanumeric
text = unicodedata.normalize("NFKD", str(id_key_value).lower())
cleaned = re.sub(r"[^a-z0-9]", "", text)
return cleaned[: self.length] if cleaned else "_"


class IdentifierTransformer:
def __init__(self):
# Mapping of (provider, dataset_type, id_key) to the transformation
Expand Down Expand Up @@ -119,8 +142,9 @@ def to_path(self, provider: str, dataset_type: str, identifier: dict) -> str:
suffix = transformation.transformation_type.value.lower()
path_parts.append(f"{key}_{suffix}={transformed_value}")

# Append the original value (either standalone for identity or alongside transformed)
path_parts.append(f"{key}={value}")
# Append the original value (either standalone for identity or alongside transformed).
# URL-encode the value so special characters, spaces, etc. are safe in paths.
path_parts.append(f"{key}={quote(str(value), safe='')}")

# Join the parts with `/` to form the full path
return "/".join(path_parts)
Loading