diff --git a/app/container.py b/app/container.py index fd94a4f..be30b37 100644 --- a/app/container.py +++ b/app/container.py @@ -17,6 +17,7 @@ from app.service.users import AuthService from app.service.user_notification import UserNotificationService from db.generated import devices as device_queries +from db.generated import photo_faces as photo_face_queries from db.generated import photos as photo_queries from db.generated import session as session_queries from db.generated import staff_drive_connections as staff_drive_queries @@ -54,6 +55,7 @@ def __init__( self.upload_request_querier = upload_request_queries.AsyncQuerier(conn) self.upload_request_photo_querier = upload_request_photo_queries.AsyncQuerier(conn) self.photo_querier = photo_queries.AsyncQuerier(conn) + self.photo_face_querier = photo_face_queries.AsyncQuerier(conn) self.staff_notification_querier = staff_notification_queries.AsyncQuerier(conn) self.notification_querier = notification_queries.AsyncQuerier(conn) self.audit_querier = audit_queries.AsyncQuerier(conn) @@ -115,7 +117,6 @@ def __init__( ) self.staff_user_service = StaffUserService() - self.staff_user_service.init( staff_user_querier=self.staff_user_querier,) diff --git a/app/core/config.py b/app/core/config.py index 2adf52e..55d0841 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -1,4 +1,5 @@ -from pydantic_settings import BaseSettings +from pydantic_settings import BaseSettings, SettingsConfigDict +from pydantic import field_validator class Settings(BaseSettings): @@ -16,6 +17,8 @@ class Settings(BaseSettings): NATS_HOST: str NATS_PASSWORD: str NATS_USER: str + NATS_SINGLE_FACE_MATCH_STREAM: str = "single_face_matches" + NATS_SINGLE_FACE_MATCH_DURABLE: str = "single_face_match_worker" # MinIO @@ -23,6 +26,8 @@ class Settings(BaseSettings): MINIO_ROOT_USER: str MINIO_ROOT_PASSWORD: str MINIO_HOST: str + MINIO_RETRY_ATTEMPTS: int = 3 + MINIO_RETRY_BASE_SECONDS: float = 0.5 # PostgreSQL POSTGRES_USER: str @@ -44,6 +49,13 @@ class Settings(BaseSettings): encryption_key: str totp_issuer: str = "multAI" + # Face embedding model + FACE_EMBEDDING_MODEL_NAME: str = "buffalo_l" + FACE_EMBEDDING_PROVIDERS: str = "CPUExecutionProvider" + FACE_EMBEDDING_CTX_ID: int = -1 + FACE_EMBEDDING_DET_WIDTH: int = 640 + FACE_EMBEDDING_DET_HEIGHT: int = 640 + # Google Drive OAuth GOOGLE_CLIENT_ID: str = "" GOOGLE_CLIENT_SECRET: str = "" @@ -55,9 +67,21 @@ class Settings(BaseSettings): FACE_ENCRYPTION_KEY: str FIREBASE_CREDENTIALS_PATH: str = "multiai-c9380-firebase-adminsdk-fbsvc-cb6e5ce41b.json" - class Config: - env_file = ".env" - extra = "ignore" + model_config = SettingsConfigDict( + env_file=".env", + extra="ignore", + ) + + @field_validator("debug", mode="before") + @classmethod + def _parse_debug(cls, value): # type: ignore[no-untyped-def] + if isinstance(value, str): + lowered = value.strip().lower() + if lowered in {"release", "prod", "production", "false", "0", "no"}: + return False + if lowered in {"true", "1", "yes"}: + return True + return value settings = Settings() # type: ignore diff --git a/app/core/constant.py b/app/core/constant.py index 9f1f512..0925bbd 100644 --- a/app/core/constant.py +++ b/app/core/constant.py @@ -23,8 +23,6 @@ class AuditEventType(str, Enum): UPLOAD_REQUEST_REJECTED = "upload_request.rejected" - BlacklistedSession = "blacklist:session:{session_id}" - IMAGE_ALLOWED_TYPES = { "image/jpeg", "image/png", @@ -32,6 +30,19 @@ class AuditEventType(str, Enum): "image/heif" } +DEFAULT_CONTENT_TYPE = "application/octet-stream" +DRIVE_ALLOWED_HOSTS = {"drive.google.com", "docs.google.com"} +MINIO_URL_PREFIX = "minio://" + +IMAGES_BUCKET_NAME = "images" +DOCUMENTS_BUCKET_NAME = "documents" +WA_SIM_BUCKET_NAME = "wa-sim" + +GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth" +GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token" +GOOGLE_USERINFO_URL = "https://www.googleapis.com/oauth2/v2/userinfo" +GOOGLE_DRIVE_FILES_URL = "https://www.googleapis.com/drive/v3/files/{file_id}" + MAX_IMAGE_SIZE = 5 * 1024 * 1024 MIN_ENROLL_IMAGES = 3 MAX_ENROLL_IMAGES = 5 diff --git a/app/core/exceptions.py b/app/core/exceptions.py index 5f4a9b5..ddf0d36 100644 --- a/app/core/exceptions.py +++ b/app/core/exceptions.py @@ -75,6 +75,9 @@ def handle_check_violation(exc: Exception) -> HTTPException: def handle(exc: Exception) -> HTTPException: logger.error("Database error: %s", exc) + if isinstance(exc, HTTPException): + return exc + if isinstance(exc, IntegrityError): orig = getattr(exc, "orig", None) sqlstate = getattr(orig, "sqlstate", None) diff --git a/app/infra/google_drive.py b/app/infra/google_drive.py index 0b32ad6..4816ea5 100644 --- a/app/infra/google_drive.py +++ b/app/infra/google_drive.py @@ -9,12 +9,14 @@ from app.core.exceptions import AppException from app.core.config import settings +from app.core.constant import ( + GOOGLE_AUTH_URL, + GOOGLE_DRIVE_FILES_URL, + GOOGLE_TOKEN_URL, + GOOGLE_USERINFO_URL, +) -GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth" -GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token" -GOOGLE_USERINFO_URL = "https://www.googleapis.com/oauth2/v2/userinfo" -GOOGLE_DRIVE_FILES_URL = "https://www.googleapis.com/drive/v3/files/{file_id}" @dataclass diff --git a/app/infra/minio.py b/app/infra/minio.py index 09104ea..e6249da 100644 --- a/app/infra/minio.py +++ b/app/infra/minio.py @@ -9,11 +9,18 @@ from app.core.utils import check_extension from app.core.exceptions import AppException +from app.core.constant import ( + DEFAULT_CONTENT_TYPE, + DOCUMENTS_BUCKET_NAME as CORE_DOCUMENTS_BUCKET_NAME, + IMAGES_BUCKET_NAME as CORE_IMAGES_BUCKET_NAME, + WA_SIM_BUCKET_NAME as CORE_WA_SIM_BUCKET_NAME, +) -IMAGES_BUCKET_NAME = "images" -DOCUMENTS_BUCKET_NAME = "documents" -WA_SIM_BUCKET_NAME = "wa-sim" +# Re-export bucket names for compatibility with existing imports. +IMAGES_BUCKET_NAME = CORE_IMAGES_BUCKET_NAME +DOCUMENTS_BUCKET_NAME = CORE_DOCUMENTS_BUCKET_NAME +WA_SIM_BUCKET_NAME = CORE_WA_SIM_BUCKET_NAME async def init_minio_client( minio_host: str, minio_port: int, minio_root_user: str, minio_root_password: str @@ -48,7 +55,7 @@ async def put(self, file: UploadFile, object_name: str | None = None) -> str: object_name = str(uuid.uuid4()) if file.content_type is None: - file.content_type = "application/octet-stream" + file.content_type = DEFAULT_CONTENT_TYPE if file.filename is None: file.filename = object_name @@ -80,7 +87,7 @@ async def get(self, object_name: str) -> tuple[bytes, str, str]: data = await res.read() content_type = ( - res.content_type if res.content_type else "application/octet-stream" + res.content_type if res.content_type else DEFAULT_CONTENT_TYPE ) filename = res.headers.get("x-amz-meta-filename", f"{object_name}") diff --git a/app/infra/nats.py b/app/infra/nats.py index cf5b91e..5f01b55 100644 --- a/app/infra/nats.py +++ b/app/infra/nats.py @@ -2,7 +2,8 @@ from typing import Any, Callable, Optional from nats.aio.client import Client as NATS from nats.js.client import JetStreamContext -from nats.js.api import DeliverPolicy, AckPolicy +from nats.js.api import DeliverPolicy, AckPolicy, StreamConfig +from nats.js.errors import NotFoundError from nats.aio.msg import Msg from pydantic import BaseModel @@ -28,6 +29,7 @@ class NatsSubjects(Enum): STAFF_UPLOAD_REQUEST_CREATED = "staff.upload_request.created" STAFF_UPLOAD_REQUEST_APPROVED = "staff.upload_request.approved" STAFF_UPLOAD_REQUEST_REJECTED = "staff.upload_request.rejected" + SINGLE_FACE_MATCH_REQUESTED = "photo_faces.single.requested" class NatsClient: _nc: Optional[NATS] = None @@ -102,6 +104,8 @@ async def js_subscribe( if NatsClient._js is None: await NatsClient.connect() + await NatsClient.ensure_stream(stream_name=stream_name, subjects=[subject.value]) + async def _wrapper(msg: Msg) -> None: await callback(msg.data) await msg.ack() @@ -116,3 +120,20 @@ async def _wrapper(msg: Msg) -> None: deliver_policy=DeliverPolicy.NEW, # ack_policy=ack_policy ) + + @staticmethod + async def ensure_stream(*, stream_name: str, subjects: list[str]) -> None: + if NatsClient._js is None: + await NatsClient.connect() + js = NatsClient._js + assert js is not None + try: + await js.stream_info(stream_name) + except NotFoundError: + await js.add_stream( + name=stream_name, + config=StreamConfig( + name=stream_name, + subjects=subjects, + ), + ) diff --git a/app/router/mobile/enrollement.py b/app/router/mobile/enrollement.py index 109dfda..1a5f652 100644 --- a/app/router/mobile/enrollement.py +++ b/app/router/mobile/enrollement.py @@ -5,7 +5,13 @@ from app.container import Container, get_container from app.deps.token_auth import MobileUserSchema, get_current_mobile_user from app.core.exceptions import AppException -from app.core.constant import IMAGE_ALLOWED_TYPES, MAX_ENROLL_IMAGES, MAX_IMAGE_SIZE, MIN_ENROLL_IMAGES +from app.core.constant import ( + DEFAULT_CONTENT_TYPE, + IMAGE_ALLOWED_TYPES, + MAX_ENROLL_IMAGES, + MAX_IMAGE_SIZE, + MIN_ENROLL_IMAGES, +) from app.service.face_embedding import FaceImagePayload from db.generated.models import User @@ -57,7 +63,7 @@ async def enroll_face( payload: FaceImagePayload = FaceImagePayload( filename=file.filename or "unknown", - content_type=file.content_type or "application/octet-stream", + content_type=file.content_type or DEFAULT_CONTENT_TYPE, bytes=contents, ) diff --git a/app/router/web/users.py b/app/router/web/users.py index 4866d5f..f167376 100644 --- a/app/router/web/users.py +++ b/app/router/web/users.py @@ -10,7 +10,6 @@ from app.schema.response.web.user import AdminUserSchema, to_admin_user_schema from db.generated.models import StaffUser - router = APIRouter(prefix="/users") @router.post("/", response_model=AdminUserSchema, status_code=status.HTTP_201_CREATED) @@ -28,7 +27,6 @@ async def create_user( logger.info("admin %s created user %s", current_staff_user.id, user.id) return to_admin_user_schema(user) - @router.get("/", response_model=list[AdminUserSchema]) async def list_users( limit: int = Query( diff --git a/app/schema/dto/single_face_match.py b/app/schema/dto/single_face_match.py new file mode 100644 index 0000000..e691808 --- /dev/null +++ b/app/schema/dto/single_face_match.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from uuid import UUID, uuid4 + +from pydantic import BaseModel, Field + + +class BBoxPayload(BaseModel): + x1: float + y1: float + x2: float + y2: float + + +class SingleFaceMatchJob(BaseModel): + job_id: UUID = Field(default_factory=uuid4) + photo_id: UUID + face_index: int = 0 + image_ref: str + bbox: BBoxPayload | None = None + faces_detected: int | None = None + submitted_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + + model_config = {"extra": "allow"} diff --git a/app/service/face_embedding.py b/app/service/face_embedding.py index 076d29e..71295e3 100644 --- a/app/service/face_embedding.py +++ b/app/service/face_embedding.py @@ -1,11 +1,13 @@ from __future__ import annotations import asyncio +from dataclasses import dataclass from typing import List, Literal, Optional, Sequence, Tuple, TypedDict import cv2 # type: ignore import numpy as np from insightface.app import FaceAnalysis # type: ignore[import-untyped] +from app.core.config import settings from app.core.exceptions import AppException @@ -27,18 +29,35 @@ class FaceStub: embedding: Optional[np.ndarray] = None +@dataclass(frozen=True) +class DetectedFace: + embedding: list[float] + bbox: Tuple[float, float, float, float] + + class FaceEmbedding: def __init__( self, - model_name: str = "buffalo_l", - providers: Sequence[str] = ("CPUExecutionProvider",), - ctx_id: int = -1, - det_size: Tuple[int, int] = (640, 640), + model_name: str | None = None, + providers: Sequence[str] | None = None, + ctx_id: int | None = None, + det_size: Tuple[int, int] | None = None, ) -> None: self.model: FaceAnalysis | None = None - self.model_name = model_name + self.model_name = model_name or settings.FACE_EMBEDDING_MODEL_NAME + if providers is None: + providers = tuple( + p.strip() + for p in settings.FACE_EMBEDDING_PROVIDERS.split(",") + if p.strip() + ) self.providers = providers - self.ctx_id = ctx_id + self.ctx_id = settings.FACE_EMBEDDING_CTX_ID if ctx_id is None else ctx_id + if det_size is None: + det_size = ( + settings.FACE_EMBEDDING_DET_WIDTH, + settings.FACE_EMBEDDING_DET_HEIGHT, + ) self.det_size = det_size self._initialized = False @@ -184,6 +203,26 @@ async def compute_event_embedding( return results + async def detect_faces( + self, + payload: FaceImagePayload, + ) -> list[DetectedFace]: + image = self._decode_image(payload) + image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + + faces: list[FaceStub] = await asyncio.to_thread( # type: ignore + self.face_embedding.model.get, image_rgb # type: ignore + ) + + detected: list[DetectedFace] = [] + for face in faces: + if face.embedding is None: + continue + embedding = face.embedding.astype(float).flatten().tolist() + detected.append(DetectedFace(embedding=embedding, bbox=face.bbox)) + + return detected + def _decode_image(self, payload: FaceImagePayload) -> np.ndarray: buffer = np.frombuffer(payload["bytes"], dtype=np.uint8) diff --git a/app/service/session.py b/app/service/session.py index d0792d7..e441fc9 100644 --- a/app/service/session.py +++ b/app/service/session.py @@ -22,6 +22,8 @@ class SessionService : def init(self, session: session_queries.AsyncQuerier, redis: RedisClient) -> None: self.session_querier = session self.redis = redis + SessionService.session_querier = session + SessionService.redis = redis @staticmethod async def create_session(user_id:uuid.UUID,device_id:uuid.UUID)->UpsertSessionRow: diff --git a/app/service/single_face_match.py b/app/service/single_face_match.py new file mode 100644 index 0000000..30676c1 --- /dev/null +++ b/app/service/single_face_match.py @@ -0,0 +1,301 @@ +from __future__ import annotations + +import asyncio +import json +from dataclasses import dataclass +from uuid import UUID + +import sqlalchemy +import sqlalchemy.ext.asyncio + +from app.core.constant import MINIO_URL_PREFIX +from app.core.config import settings +from app.core.logger import logger +from sqlalchemy.exc import DBAPIError, SQLAlchemyError +from app.infra.minio import Bucket, IMAGES_BUCKET_NAME +from app.service.face_embedding import FaceEmbeddingService, FaceImagePayload +from app.schema.dto.single_face_match import BBoxPayload, SingleFaceMatchJob +from db.generated import photo_faces as photo_face_queries +from db.generated import models + + +@dataclass(frozen=True) +class ClosestUserMatch: + user_id: UUID + distance: float + + +PHOTO_EXISTS = """ +SELECT 1 +FROM photos +WHERE id = :photo_id +""" + +GET_CLOSEST_USER = """ +SELECT id, (face_embedding <=> CAST(:embedding AS vector)) AS distance +FROM users +WHERE face_embedding IS NOT NULL +ORDER BY distance ASC +LIMIT 1 +""" + +INSERT_FACE_MATCH = """ +INSERT INTO face_matches (photo_face_id, user_id, confidence) +VALUES (:photo_face_id, :user_id, :confidence) +RETURNING id +""" + +CHECK_MATCH_FOR_PHOTO = """ +SELECT 1 +FROM face_matches fm +JOIN photo_faces pf ON pf.id = fm.photo_face_id +WHERE pf.photo_id = :photo_id +LIMIT 1 +""" + +CHECK_MATCH_FOR_PHOTO_FACE = """ +SELECT 1 +FROM face_matches +WHERE photo_face_id = :photo_face_id +LIMIT 1 +""" + + + +class SingleFaceMatchService: + def __init__( + self, + *, + conn: sqlalchemy.ext.asyncio.AsyncConnection, + face_embedding_service: FaceEmbeddingService, + photo_face_querier: photo_face_queries.AsyncQuerier, + ) -> None: + self.conn = conn + self.face_embedding_service = face_embedding_service + self.photo_face_querier = photo_face_querier + + async def process_job(self, job: SingleFaceMatchJob) -> None: # noqa: C901 + if job.faces_detected is not None and job.faces_detected != 1: + logger.info( + "Skipping photo %s: faces_detected=%s (single-face worker)", + job.photo_id, + job.faces_detected, + ) + return + + if not job.image_ref: + logger.warning("Missing image_ref in event payload for photo %s", job.photo_id) + return + + if not await self._photo_exists(job.photo_id): + logger.warning("Photo not found: %s", job.photo_id) + return + if await self._match_exists_for_photo(job.photo_id): + logger.info("Photo %s already matched; skipping", job.photo_id) + return + + embedding, bbox = await self._resolve_embedding(job) + if embedding is None: + return + + try: + photo_face = await self._upsert_photo_face( + photo_id=job.photo_id, + face_index=job.face_index, + embedding=embedding, + bbox=bbox, + ) + if photo_face is None: + logger.warning("Failed to upsert photo_face for photo %s", job.photo_id) + return + await self._commit_best_effort() + except (DBAPIError, SQLAlchemyError) as exc: + await self._rollback_best_effort() + logger.warning("DB write failed for photo %s: %s", job.photo_id, exc) + return + except MemoryError: + logger.error("Out of memory while processing photo %s", job.photo_id) + return + + match = await self._find_closest_user(embedding) + if match is None: + logger.info("No user embeddings available for matching") + return + + if await self._match_exists_for_photo_face(photo_face.id): + logger.info("Match already exists for photo_face %s; skipping", photo_face.id) + return + + try: + await self._insert_face_match( + photo_face_id=photo_face.id, + user_id=match.user_id, + confidence=match.distance, + ) + await self._commit_best_effort() + except (DBAPIError, SQLAlchemyError) as exc: + await self._rollback_best_effort() + logger.warning("Failed to insert face match for photo %s: %s", job.photo_id, exc) + return + except MemoryError: + logger.error("Out of memory while matching photo %s", job.photo_id) + return + + async def _photo_exists(self, photo_id: UUID) -> bool: + row = (await self.conn.execute( + sqlalchemy.text(PHOTO_EXISTS), + {"photo_id": photo_id}, + )).first() + return row is not None + + async def _resolve_embedding( + self, + job: SingleFaceMatchJob, + ) -> tuple[list[float] | None, BBoxPayload | None]: + try: + payload = await self._load_payload(job) + except Exception as exc: + logger.warning("Failed to load image payload for photo %s: %s", job.photo_id, exc) + return None, None + + try: + faces = await self.face_embedding_service.detect_faces(payload) + except Exception as exc: + logger.warning("Face detection failed for photo %s: %s", job.photo_id, exc) + return None, None + + if len(faces) != 1: + logger.info( + "Skipping photo %s: detected %s faces (single-face worker)", + job.photo_id, + len(faces), + ) + return None, None + + face = faces[0] + bbox = BBoxPayload( + x1=float(face.bbox[0]), + y1=float(face.bbox[1]), + x2=float(face.bbox[2]), + y2=float(face.bbox[3]), + ) + return face.embedding, bbox + + async def _load_payload(self, job: SingleFaceMatchJob) -> FaceImagePayload: + if not job.image_ref: + raise ValueError("Missing image_ref in event payload") + + bucket_name, object_name = self._parse_minio_ref(job.image_ref) + bucket = Bucket(bucket_name, "") + last_exc: Exception | None = None + for attempt in range(1, settings.MINIO_RETRY_ATTEMPTS + 1): + try: + data, filename, content_type = await bucket.get(object_name) + return FaceImagePayload( + filename=filename, + content_type=content_type, + bytes=data, + ) + except Exception as exc: + last_exc = exc + logger.warning( + "MinIO fetch failed for %s (attempt %s/%s): %s", + object_name, + attempt, + settings.MINIO_RETRY_ATTEMPTS, + exc, + ) + if attempt < settings.MINIO_RETRY_ATTEMPTS: + await asyncio.sleep(settings.MINIO_RETRY_BASE_SECONDS * attempt) + assert last_exc is not None + raise last_exc + + async def _upsert_photo_face( + self, + *, + photo_id: UUID, + face_index: int, + embedding: list[float], + bbox: BBoxPayload | None, + ) -> models.PhotoFace | None: + embedding_literal = self._vector_literal(embedding) + bbox_payload = None + if bbox is not None: + bbox_payload = json.dumps( + {"x1": bbox.x1, "y1": bbox.y1, "x2": bbox.x2, "y2": bbox.y2} + ) + return await self.photo_face_querier.upsert_photo_face( + photo_id=photo_id, + face_index=face_index, + dollar_3=embedding_literal, + bbox=bbox_payload, + ) + + async def _find_closest_user( + self, + embedding: list[float], + ) -> ClosestUserMatch | None: + embedding_literal = self._vector_literal(embedding) + row = (await self.conn.execute( + sqlalchemy.text(GET_CLOSEST_USER), + {"embedding": embedding_literal}, + )).first() + if row is None: + return None + return ClosestUserMatch(user_id=row[0], distance=float(row[1])) + + async def _insert_face_match( + self, + *, + photo_face_id: UUID, + user_id: UUID, + confidence: float, + ) -> None: + await self.conn.execute( + sqlalchemy.text(INSERT_FACE_MATCH), + { + "photo_face_id": photo_face_id, + "user_id": user_id, + "confidence": confidence, + }, + ) + + async def _match_exists_for_photo(self, photo_id: UUID) -> bool: + row = (await self.conn.execute( + sqlalchemy.text(CHECK_MATCH_FOR_PHOTO), + {"photo_id": photo_id}, + )).first() + return row is not None + + async def _match_exists_for_photo_face(self, photo_face_id: UUID) -> bool: + row = (await self.conn.execute( + sqlalchemy.text(CHECK_MATCH_FOR_PHOTO_FACE), + {"photo_face_id": photo_face_id}, + )).first() + return row is not None + + async def _commit_best_effort(self) -> None: + try: + await self.conn.commit() + except Exception: + pass + + async def _rollback_best_effort(self) -> None: + try: + await self.conn.rollback() + except Exception: + pass + + @staticmethod + def _vector_literal(embedding: list[float]) -> str: + return "[" + ", ".join(str(x) for x in embedding) + "]" + + @staticmethod + def _parse_minio_ref(image_ref: str) -> tuple[str, str]: + if image_ref.startswith(MINIO_URL_PREFIX): + raw = image_ref[len(MINIO_URL_PREFIX) :] + parts = raw.split("/", 1) + if len(parts) != 2 or not parts[0] or not parts[1]: + raise ValueError("Invalid MinIO image_ref format") + return parts[0], parts[1] + return IMAGES_BUCKET_NAME, image_ref diff --git a/app/service/users.py b/app/service/users.py index eb853da..8ab3f81 100644 --- a/app/service/users.py +++ b/app/service/users.py @@ -1,10 +1,8 @@ from datetime import datetime, timedelta, timezone import uuid -from app.core import constant from app.core.exceptions import AppException, DBException from app.core.securite import ( - # EmbeddingCrypto, hash_password, verify_password, create_acces_mobile_token, @@ -12,6 +10,7 @@ decode_refresh_mobile_token, Get_expiry_time, ) +from app.core import constant from app.core.config import settings from app.infra.redis import RedisClient @@ -143,7 +142,7 @@ async def mobile_register_login( return MobileAuthResponse( access_token=access_token, refresh_token=refresh_token, - session_id=str(session.id), + session_id=str(session.id), expires_in=expiry, ) @@ -197,16 +196,13 @@ async def add_embbed_user( self, user_id: uuid.UUID, image_payloads: list[FaceImagePayload], - ) ->User: + ) -> User: logger.info("Generating face embeddings for user %s", user_id) averaging = await self.face_embedding_service.compute_average_embedding( image_payloads ) - # pgvector accepts input like: "[0.1, 0.2, ...]". Convert list to a vector literal. vector_literal = "[" + ", ".join(str(x) for x in averaging) + "]" - #TODO:we encrypt it here we wont store it as plaintext in the db but the porblmem is were lossing the search as trade of in the vestor so i will let it like this until i found somthing tht fit - # encrypted_embedding = EmbeddingCrypto.encrypt(averaging) user = await self.user_querier.set_user_embedding( dollar_1=vector_literal, id=user_id, diff --git a/app/worker/single_face_match/__init__.py b/app/worker/single_face_match/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/app/worker/single_face_match/__init__.py @@ -0,0 +1 @@ + diff --git a/app/worker/single_face_match/worker.py b/app/worker/single_face_match/worker.py new file mode 100644 index 0000000..fa8ef37 --- /dev/null +++ b/app/worker/single_face_match/worker.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +import asyncio + +from app.container import Container +from app.core.config import settings +from app.core.logger import logger +from app.infra.database import engine +from app.infra.minio import Bucket, init_minio_client +from app.infra.nats import NatsClient, NatsSubjects +from app.infra.redis import RedisClient +from app.schema.dto.single_face_match import SingleFaceMatchJob +from app.service.single_face_match import SingleFaceMatchService + + +class SingleFaceMatchWorker: + def __init__(self, service: SingleFaceMatchService) -> None: + self.service = service + + async def handle_message(self, data: bytes) -> None: + try: + job = SingleFaceMatchJob.model_validate_json(data) + except Exception as exc: + logger.warning("Failed to parse single face match job: %s", exc) + return + + try: + await self.service.process_job(job) + except Exception as exc: + logger.exception("Failed to process single face match job: %s", exc) + return + + +async def run_worker() -> None: + await init_minio_client( + minio_host=settings.MINIO_HOST, + minio_port=settings.MINIO_API_PORT, + minio_root_user=settings.MINIO_ROOT_USER, + minio_root_password=settings.MINIO_ROOT_PASSWORD, + ) + RedisClient( + host=settings.REDIS_HOST, + port=settings.REDIS_PORT, + password=settings.REDIS_PASSWORD, + ) + + async with engine.connect() as conn: + container = Container(conn) + service = SingleFaceMatchService( + conn=conn, + face_embedding_service=container.face_embedding_service, + photo_face_querier=container.photo_face_querier, + ) + worker = SingleFaceMatchWorker(service) + + await NatsClient.js_subscribe( + subject=NatsSubjects.SINGLE_FACE_MATCH_REQUESTED, + callback=worker.handle_message, + stream_name=settings.NATS_SINGLE_FACE_MATCH_STREAM, + durable_name=settings.NATS_SINGLE_FACE_MATCH_DURABLE, + ) + + logger.info("SingleFaceMatchWorker subscribed; waiting for jobs") + try: + await asyncio.Event().wait() + finally: + await _close_minio() + await NatsClient.close() + + +async def _close_minio() -> None: + client = getattr(Bucket, "client", None) + if client is None: + return + close_session = getattr(client, "close_session", None) + if close_session is None: + return + try: + await close_session() + except Exception: + pass + + +if __name__ == "__main__": + asyncio.run(run_worker()) diff --git a/db/generated/models.py b/db/generated/models.py index 1111a86..f54caf5 100644 --- a/db/generated/models.py +++ b/db/generated/models.py @@ -223,6 +223,7 @@ class User: face_embedding: Optional[Any] blocked: bool deleted_at: Optional[datetime.datetime] + blocked: bool @dataclasses.dataclass() diff --git a/db/generated/photo_faces.py b/db/generated/photo_faces.py new file mode 100644 index 0000000..09d76f1 --- /dev/null +++ b/db/generated/photo_faces.py @@ -0,0 +1,50 @@ +# Code generated by sqlc. DO NOT EDIT. +# versions: +# sqlc v1.30.0 +# source: photo_faces.sql +from typing import Any, Optional +import uuid + +import sqlalchemy +import sqlalchemy.ext.asyncio + +from db.generated import models + + +UPSERT_PHOTO_FACE = """-- name: upsert_photo_face \\:one +INSERT INTO photo_faces ( + photo_id, + face_index, + embedding, + bbox +) VALUES ( + :p1, :p2, :p3\\:\\:vector, :p4 +) +ON CONFLICT (photo_id, face_index) +DO UPDATE SET embedding = EXCLUDED.embedding, + bbox = EXCLUDED.bbox +RETURNING id, photo_id, face_index, embedding, bbox, created_at +""" + + +class AsyncQuerier: + def __init__(self, conn: sqlalchemy.ext.asyncio.AsyncConnection): + self._conn = conn + + async def upsert_photo_face(self, *, photo_id: uuid.UUID, face_index: int, dollar_3: Any, bbox: Optional[str]) -> Optional[models.PhotoFace]: + row = (await self._conn.execute(sqlalchemy.text(UPSERT_PHOTO_FACE), { + "p1": photo_id, + "p2": face_index, + "p3": dollar_3, + "p4": bbox, + })).first() + if row is None: + return None + return models.PhotoFace( + id=row[0], + photo_id=row[1], + face_index=row[2], + embedding=row[3], + bbox=row[4], + created_at=row[5], + ) diff --git a/db/generated/user.py b/db/generated/user.py index 823be6a..e812192 100644 --- a/db/generated/user.py +++ b/db/generated/user.py @@ -88,8 +88,14 @@ class AsyncQuerier: def __init__(self, conn: sqlalchemy.ext.asyncio.AsyncConnection): self._conn = conn - async def create_user(self, *, email: str, hashed_password: Optional[str]) -> Optional[models.User]: - row = (await self._conn.execute(sqlalchemy.text(CREATE_USER), {"p1": email, "p2": hashed_password})).first() + async def create_user( + self, *, email: str, hashed_password: Optional[str] + ) -> Optional[models.User]: + row = ( + await self._conn.execute( + sqlalchemy.text(CREATE_USER), {"p1": email, "p2": hashed_password} + ) + ).first() if row is None: return None return models.User( @@ -108,7 +114,11 @@ async def delete_user(self, *, id: uuid.UUID) -> None: await self._conn.execute(sqlalchemy.text(DELETE_USER), {"p1": id}) async def get_user_by_email(self, *, email: str) -> Optional[models.User]: - row = (await self._conn.execute(sqlalchemy.text(GET_USER_BY_EMAIL), {"p1": email})).first() + row = ( + await self._conn.execute( + sqlalchemy.text(GET_USER_BY_EMAIL), {"p1": email} + ) + ).first() if row is None: return None return models.User( @@ -124,7 +134,9 @@ async def get_user_by_email(self, *, email: str) -> Optional[models.User]: ) async def get_user_by_id(self, *, id: uuid.UUID) -> Optional[models.User]: - row = (await self._conn.execute(sqlalchemy.text(GET_USER_BY_ID), {"p1": id})).first() + row = ( + await self._conn.execute(sqlalchemy.text(GET_USER_BY_ID), {"p1": id}) + ).first() if row is None: return None return models.User( @@ -139,8 +151,12 @@ async def get_user_by_id(self, *, id: uuid.UUID) -> Optional[models.User]: deleted_at=row[8], ) - async def list_users(self, *, limit: int, offset: int) -> AsyncIterator[models.User]: - result = await self._conn.stream(sqlalchemy.text(LIST_USERS), {"p1": limit, "p2": offset}) + async def list_users( + self, *, limit: int, offset: int + ) -> AsyncIterator[models.User]: + result = await self._conn.stream( + sqlalchemy.text(LIST_USERS), {"p1": limit, "p2": offset} + ) async for row in result: yield models.User( id=row[0], @@ -155,7 +171,11 @@ async def list_users(self, *, limit: int, offset: int) -> AsyncIterator[models.U ) async def set_user_blocked(self, *, blocked: bool, id: uuid.UUID) -> Optional[models.User]: - row = (await self._conn.execute(sqlalchemy.text(SET_USER_BLOCKED), {"p1": blocked, "p2": id})).first() + row = ( + await self._conn.execute( + sqlalchemy.text(SET_USER_BLOCKED), {"p1": blocked, "p2": id} + ) + ).first() if row is None: return None return models.User( @@ -171,7 +191,11 @@ async def set_user_blocked(self, *, blocked: bool, id: uuid.UUID) -> Optional[mo ) async def set_user_embedding(self, *, dollar_1: Any, id: uuid.UUID) -> Optional[models.User]: - row = (await self._conn.execute(sqlalchemy.text(SET_USER_EMBEDDING), {"p1": dollar_1, "p2": id})).first() + row = ( + await self._conn.execute( + sqlalchemy.text(SET_USER_EMBEDDING), {"p1": dollar_1, "p2": id} + ) + ).first() if row is None: return None return models.User( @@ -186,13 +210,25 @@ async def set_user_embedding(self, *, dollar_1: Any, id: uuid.UUID) -> Optional[ deleted_at=row[8], ) - async def update_user(self, *, email: str, display_name: Optional[str], blocked: bool, id: uuid.UUID) -> Optional[models.User]: - row = (await self._conn.execute(sqlalchemy.text(UPDATE_USER), { - "p1": email, - "p2": display_name, - "p3": blocked, - "p4": id, - })).first() + async def update_user( + self, + *, + email: str, + display_name: Optional[str], + blocked: bool, + id: uuid.UUID, + ) -> Optional[models.User]: + row = ( + await self._conn.execute( + sqlalchemy.text(UPDATE_USER), + { + "p1": email, + "p2": display_name, + "p3": blocked, + "p4": id, + }, + ) + ).first() if row is None: return None return models.User( @@ -207,8 +243,15 @@ async def update_user(self, *, email: str, display_name: Optional[str], blocked: deleted_at=row[8], ) - async def update_user_password(self, *, hashed_password: Optional[str], id: uuid.UUID) -> Optional[models.User]: - row = (await self._conn.execute(sqlalchemy.text(UPDATE_USER_PASSWORD), {"p1": hashed_password, "p2": id})).first() + async def update_user_password( + self, *, hashed_password: Optional[str], id: uuid.UUID + ) -> Optional[models.User]: + row = ( + await self._conn.execute( + sqlalchemy.text(UPDATE_USER_PASSWORD), + {"p1": hashed_password, "p2": id}, + ) + ).first() if row is None: return None return models.User( diff --git a/db/queries/photo_faces.sql b/db/queries/photo_faces.sql new file mode 100644 index 0000000..de3ffbb --- /dev/null +++ b/db/queries/photo_faces.sql @@ -0,0 +1,13 @@ +-- name: UpsertPhotoFace :one +INSERT INTO photo_faces ( + photo_id, + face_index, + embedding, + bbox +) VALUES ( + $1, $2, $3::vector, $4 +) +ON CONFLICT (photo_id, face_index) +DO UPDATE SET embedding = EXCLUDED.embedding, + bbox = EXCLUDED.bbox +RETURNING *; diff --git a/migrations/sql/down/alter-photo-faces-embedding-dim.sql b/migrations/sql/down/alter-photo-faces-embedding-dim.sql new file mode 100644 index 0000000..f3be603 --- /dev/null +++ b/migrations/sql/down/alter-photo-faces-embedding-dim.sql @@ -0,0 +1,2 @@ +ALTER TABLE photo_faces +ALTER COLUMN embedding TYPE vector(1536); diff --git a/migrations/sql/up/alter-photo-faces-embedding-dim.sql b/migrations/sql/up/alter-photo-faces-embedding-dim.sql new file mode 100644 index 0000000..6538447 --- /dev/null +++ b/migrations/sql/up/alter-photo-faces-embedding-dim.sql @@ -0,0 +1,2 @@ +ALTER TABLE photo_faces +ALTER COLUMN embedding TYPE vector(512); diff --git a/migrations/versions/4dd6658b9f83_merge_heads.py b/migrations/versions/4dd6658b9f83_merge_heads.py new file mode 100644 index 0000000..b63cff0 --- /dev/null +++ b/migrations/versions/4dd6658b9f83_merge_heads.py @@ -0,0 +1,28 @@ +"""merge heads + +Revision ID: 4dd6658b9f83 +Revises: 9f6c1b4a3d21, c3b8d0f1e2a4 +Create Date: 2026-03-21 23:29:09.967007 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '4dd6658b9f83' +down_revision: Union[str, Sequence[str], None] = ('9f6c1b4a3d21', 'c3b8d0f1e2a4') +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + pass + + +def downgrade() -> None: + """Downgrade schema.""" + pass diff --git a/migrations/versions/9f6c1b4a3d21_alter_photo_faces_embedding_dim.py b/migrations/versions/9f6c1b4a3d21_alter_photo_faces_embedding_dim.py new file mode 100644 index 0000000..86df9cc --- /dev/null +++ b/migrations/versions/9f6c1b4a3d21_alter_photo_faces_embedding_dim.py @@ -0,0 +1,24 @@ +"""alter photo_faces embedding dimension to 512 + +Revision ID: 9f6c1b4a3d21 +Revises: 5ead72a95638 +Create Date: 2026-03-21 23:23:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "9f6c1b4a3d21" +down_revision: Union[str, Sequence[str], None] = "5ead72a95638" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.execute("ALTER TABLE photo_faces ALTER COLUMN embedding TYPE vector(512);") + + +def downgrade() -> None: + op.execute("ALTER TABLE photo_faces ALTER COLUMN embedding TYPE vector(1536);")