diff --git a/app/core/exceptions.py b/app/core/exceptions.py index 5f4a9b5..12fcb3f 100644 --- a/app/core/exceptions.py +++ b/app/core/exceptions.py @@ -23,8 +23,8 @@ def bad_request(detail: str = "Bad request") -> HTTPException: return HTTPException(status_code=400, detail=detail) @staticmethod - def payement_required(detail:str = "payement required")->HTTPException: - return HTTPException(status_code=402,detail=detail) + def payement_required(detail: str = "payement required") -> HTTPException: + return HTTPException(status_code=402, detail=detail) @staticmethod def internal_error(detail: str = "Internal server error") -> HTTPException: @@ -50,6 +50,15 @@ def image_quality_error(detail: str = "Image does not meet quality requirements" def image_format_error(detail: str = "Unsupported image format") -> HTTPException: return HTTPException(status_code=400, detail=detail) + @staticmethod + def image_blur_error(detail: str = "Image is too blurry and could not be recovered") -> HTTPException: + return HTTPException(status_code=400, detail=detail) + + @staticmethod + def unprocessable_image(detail: str = "Image could not be processed") -> HTTPException: + return HTTPException(status_code=422, detail=detail) + + class DBException(ABC): """Abstract class to enforce DB error handling.""" diff --git a/app/service/data_processor.py b/app/service/data_processor.py new file mode 100644 index 0000000..a7736cd --- /dev/null +++ b/app/service/data_processor.py @@ -0,0 +1,29 @@ +from abc import ABC, abstractmethod +import numpy as np + + +class BaseFilter(ABC): + + @abstractmethod + def verify_image(self, image: np.ndarray) -> bool: + pass + + @abstractmethod + def process_image(self, image: np.ndarray) -> np.ndarray: + pass + + +class FilterFactory: + def __init__(self) -> None: + from app.service.filters import BlurFilter, BrightnessFilter + + self.filters = { + "blur_filter": BlurFilter, + "brightness_filter": BrightnessFilter, + } + + def get_filter(self, filter_type: str) -> BaseFilter: + if filter_type in self.filters: + return self.filters[filter_type]() + else: + raise ValueError(f"Invalid filter type: '{filter_type}'") diff --git a/app/service/face_embedding.py b/app/service/face_embedding.py index 7d4f6d4..5f37918 100644 --- a/app/service/face_embedding.py +++ b/app/service/face_embedding.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +from dataclasses import dataclass from typing import List, Literal, Optional, Sequence, Tuple, TypedDict import cv2 # type: ignore @@ -8,8 +9,7 @@ from insightface.app import FaceAnalysis # type: ignore[import-untyped] from app.core.exceptions import AppException - -BBox = tuple[int, int, int, int] +BBox = Tuple[int, int, int, int] class FaceImagePayload(TypedDict): @@ -18,6 +18,7 @@ class FaceImagePayload(TypedDict): bytes: bytes +@dataclass # ① proper dataclass class FaceStub: bbox: Tuple[float, float, float, float] det_score: float @@ -42,68 +43,70 @@ def __init__( self.det_size = det_size self._initialized = False + # ② single centralized readiness guard + def _ensure_ready(self) -> None: + if self.model is None or not self._initialized: + raise RuntimeError("Model not ready. Call `prepare()` first.") + def load_model(self) -> None: if self.model is not None: return - self.model = FaceAnalysis( - name=self.model_name, - providers=list(self.providers), - ) - print("[FaceEmbedding] model loaded!") + name=self.model_name, providers=list(self.providers)) def init_model(self) -> None: if self.model is None: - raise ValueError("Model not loaded") - + raise ValueError("Model not loaded. Call `load_model()` first.") if self._initialized: return - - self.model.prepare(ctx_id=self.ctx_id, det_size=self.det_size) # type: ignore + self.model.prepare(ctx_id=self.ctx_id, + det_size=self.det_size) # type: ignore self._initialized = True - print("[FaceEmbedding] model initialized") def prepare(self) -> None: self.load_model() self.init_model() - def embed(self, image: np.ndarray, bboxes: Sequence[BBox]) -> list[float]: - if not bboxes: - raise ValueError("No faces to embed") + # ③ explicit detect() method — fixes the abstraction leak in the service layer + def detect(self, image_bgr: np.ndarray) -> list[FaceStub]: + """Run detection + embedding on a BGR image (OpenCV native format).""" + self._ensure_ready() + return self.model.get(image_bgr) # type: ignore - if self.model is None or not self._initialized: - raise RuntimeError("Model not ready. Call `prepare()` first.") - - image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - - faces: list[FaceStub] = self.model.get(image_rgb) # type: ignore + def embed(self, image_bgr: np.ndarray, bbox_hint: BBox | None = None) -> list[float]: + """ + Extract embedding of the face closest to bbox_hint (centroid match). + Falls back to highest-confidence face when bbox_hint is None. + """ + self._ensure_ready() + faces: list[FaceStub] = self.model.get(image_bgr) # type: ignore if not faces: - raise ValueError("No faces detected by the model") - - x1, y1, x2, y2 = bboxes[0] - target_cx = (x1 + x2) / 2 - target_cy = (y1 + y2) / 2 - - best_face: Optional[FaceStub] = None - best_dist = float("inf") - - for face in faces: - fx1, fy1, fx2, fy2 = face.bbox - cx = (fx1 + fx2) / 2 - cy = (fy1 + fy2) / 2 - - dist = np.sqrt((cx - target_cx) ** 2 + (cy - target_cy) ** 2) - - if dist < best_dist: - best_dist = dist - best_face = face + raise ValueError("No faces detected in image") - if best_face is None or best_face.embedding is None: - raise ValueError("Failed to generate embedding for selected face") + face = ( + self._pick_by_bbox(faces, bbox_hint) + if bbox_hint is not None + else max(faces, key=lambda f: f.det_score) # ④ best score fallback + ) - embedding = best_face.embedding.flatten() - return embedding.tolist() + if face.embedding is None: + raise ValueError("No embedding produced for the selected face") + + return face.embedding.flatten().tolist() + + @staticmethod + def _pick_by_bbox(faces: list[FaceStub], bbox: BBox) -> FaceStub: + """⑤ Concise centroid matching with np.hypot instead of manual sqrt.""" + tx = (bbox[0] + bbox[2]) / 2 + ty = (bbox[1] + bbox[3]) / 2 + return min( + faces, + key=lambda f: np.hypot( + (f.bbox[0] + f.bbox[2]) / 2 - tx, + (f.bbox[1] + f.bbox[3]) / 2 - ty, + ), + ) class FaceEmbeddingService: @@ -115,50 +118,47 @@ async def compute_average_embedding( self, payloads: Sequence[FaceImagePayload], ) -> list[float]: - if not payloads: raise AppException.bad_request( "At least one image is required for enrollment" ) - embeddings: list[np.ndarray] = [] - - for payload in payloads: - image = self._decode_image(payload) - image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) - - # Single detection pass — model.get() already returns embeddings - faces: list[FaceStub] = await asyncio.to_thread( # type: ignore - self.face_embedding.model.get, image_rgb # type: ignore - ) + # ⑥ parallel processing — all images are embedded concurrently + embeddings: list[np.ndarray] = await asyncio.gather( + *[self._embed_payload(p) for p in payloads] + ) - if not faces: - raise AppException.bad_request( - f"No faces detected in image {payload['filename']}" - ) + averaged = np.mean(np.stack(embeddings, axis=0), axis=0) + return averaged.astype(float).tolist() - face = faces[0] + async def _embed_payload(self, payload: FaceImagePayload) -> np.ndarray: + """⑦ Extracted per-image logic into its own async method.""" + image = self._decode_image(payload) - if face.embedding is None: - raise AppException.bad_request( - f"Failed to generate embedding for {payload['filename']}" - ) + faces: list[FaceStub] = await asyncio.to_thread( + self.face_embedding.detect, image # ③ uses detect(), not model.get() + ) - embeddings.append(face.embedding.astype(np.float32)) + if not faces: + raise AppException.bad_request( + f"No faces detected in image '{payload['filename']}'" + ) - stacked = np.stack(embeddings, axis=0) - averaged = np.mean(stacked, axis=0) + face = max(faces, key=lambda f: f.det_score) - return averaged.astype(float).tolist() + if face.embedding is None: + raise AppException.bad_request( + f"Failed to generate embedding for '{payload['filename']}'" + ) - def _decode_image(self, payload: FaceImagePayload) -> np.ndarray: + return face.embedding.astype(np.float32) + @staticmethod + def _decode_image(payload: FaceImagePayload) -> np.ndarray: buffer = np.frombuffer(payload["bytes"], dtype=np.uint8) image = cv2.imdecode(buffer, cv2.IMREAD_COLOR) - if image is None: raise AppException.bad_request( - f"Cannot decode uploaded image {payload['filename']}" + f"Cannot decode uploaded image '{payload['filename']}'" ) - return image diff --git a/app/service/filters.py b/app/service/filters.py new file mode 100644 index 0000000..9358ba8 --- /dev/null +++ b/app/service/filters.py @@ -0,0 +1,111 @@ +import cv2 +import numpy as np +from app.service.data_processor import BaseFilter +from app.core.exceptions import AppException + + +class BlurFilter(BaseFilter): + """ + Detects if an image is too blurry. + Uses Laplacian variance — sharp images have high variance, + blurry images have low variance. + Blur cannot be fully fixed, but we attempt a sharpening pass once. + """ + + BLUR_THRESHOLD = 15.0 + + def _measure_blur(self, image: np.ndarray) -> float: + """Returns the Laplacian variance score. Higher = sharper.""" + gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) + return cv2.Laplacian(gray, cv2.CV_64F).var() + + def _sharpen(self, image: np.ndarray) -> np.ndarray: + """Applies an unsharp mask to try to recover some sharpness.""" + kernel = np.array([ + [0, -1, 0], + [-1, 5, -1], + [0, -1, 0] + ]) + return cv2.filter2D(image, -1, kernel) + + def verify_image(self, image: np.ndarray) -> bool: + score = self._measure_blur(image) + print( + f"[BlurFilter] Blur score: {score:.2f} (threshold: {self.BLUR_THRESHOLD})") + return score >= self.BLUR_THRESHOLD + + def process_image(self, image: np.ndarray) -> np.ndarray: + # Step 1 — test + if self.verify_image(image): + print("[BlurFilter] PASS — image is sharp enough.") + return image + + print("[BlurFilter] FAIL — image is blurry. Attempting sharpening fix...") + + # Step 2 — fix + fixed = self._sharpen(image) + + # Step 3 — retest + if self.verify_image(fixed): + print("[BlurFilter] PASS after fix — sharpening worked.") + return fixed + + # Step 4 — reject + raise AppException.image_blur_error( + f"Image is too blurry (score: {self._measure_blur(image):.2f}, threshold: {self.BLUR_THRESHOLD}) and could not be recovered." + ) + + +class BrightnessFilter(BaseFilter): + """ + Detects if an image is too dark or too bright. + Uses the mean pixel value of the grayscale image. + Attempts gamma correction as a fix. + """ + + MIN_BRIGHTNESS = 70 # below this = too dark + MAX_BRIGHTNESS = 220 # above this = too bright + + def _measure_brightness(self, image: np.ndarray) -> float: + """Returns mean brightness (0-255). 0 = black, 255 = white.""" + gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) + return float(np.mean(gray)) + + def _gamma_correction(self, image: np.ndarray, gamma: float) -> np.ndarray: + """ + Gamma < 1 = darken, Gamma > 1 = brighten. + Builds a lookup table for fast per-pixel correction. + """ + inv_gamma = 1.0 / gamma + table = np.array([ + ((i / 255.0) ** inv_gamma) * 255 + for i in range(256) + ], dtype=np.uint8) + return cv2.LUT(image, table) + + def verify_image(self, image: np.ndarray) -> bool: + brightness = self._measure_brightness(image) + print( + f"[BrightnessFilter] Brightness: {brightness:.2f} (range: {self.MIN_BRIGHTNESS}-{self.MAX_BRIGHTNESS})") + return self.MIN_BRIGHTNESS <= brightness <= self.MAX_BRIGHTNESS + + def process_image(self, image: np.ndarray) -> np.ndarray: + # Step 1 — test + if self.verify_image(image): + return image + + brightness = self._measure_brightness(image) + + # Step 2 — fix + gamma = 2.0 if brightness < self.MIN_BRIGHTNESS else 0.5 + fixed = self._gamma_correction(image, gamma) + + # Step 3 — retest + if self.verify_image(fixed): + return fixed + + # Step 4 — reject + raise AppException.bad_request( + f"Image brightness {brightness:.2f} is out of acceptable range " + f"({self.MIN_BRIGHTNESS}–{self.MAX_BRIGHTNESS}) and could not be corrected." + )