Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions app/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def bad_request(detail: str = "Bad request") -> HTTPException:
return HTTPException(status_code=400, detail=detail)

@staticmethod
def payement_required(detail:str = "payement required")->HTTPException:
return HTTPException(status_code=402,detail=detail)
def payement_required(detail: str = "payement required") -> HTTPException:
return HTTPException(status_code=402, detail=detail)

@staticmethod
def internal_error(detail: str = "Internal server error") -> HTTPException:
Expand All @@ -50,6 +50,15 @@ def image_quality_error(detail: str = "Image does not meet quality requirements"
def image_format_error(detail: str = "Unsupported image format") -> HTTPException:
return HTTPException(status_code=400, detail=detail)

@staticmethod
def image_blur_error(detail: str = "Image is too blurry and could not be recovered") -> HTTPException:
return HTTPException(status_code=400, detail=detail)

@staticmethod
def unprocessable_image(detail: str = "Image could not be processed") -> HTTPException:
return HTTPException(status_code=422, detail=detail)


class DBException(ABC):
"""Abstract class to enforce DB error handling."""

Expand Down
29 changes: 29 additions & 0 deletions app/service/data_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from abc import ABC, abstractmethod
import numpy as np


class BaseFilter(ABC):

@abstractmethod
def verify_image(self, image: np.ndarray) -> bool:
pass

@abstractmethod
def process_image(self, image: np.ndarray) -> np.ndarray:
pass


class FilterFactory:
def __init__(self) -> None:
from app.service.filters import BlurFilter, BrightnessFilter

self.filters = {
"blur_filter": BlurFilter,
"brightness_filter": BrightnessFilter,
}

def get_filter(self, filter_type: str) -> BaseFilter:
if filter_type in self.filters:
return self.filters[filter_type]()
else:
raise ValueError(f"Invalid filter type: '{filter_type}'")
144 changes: 72 additions & 72 deletions app/service/face_embedding.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from __future__ import annotations

import asyncio
from dataclasses import dataclass
from typing import List, Literal, Optional, Sequence, Tuple, TypedDict

import cv2 # type: ignore
import numpy as np
from insightface.app import FaceAnalysis # type: ignore[import-untyped]
from app.core.exceptions import AppException


BBox = tuple[int, int, int, int]
BBox = Tuple[int, int, int, int]


class FaceImagePayload(TypedDict):
Expand All @@ -18,6 +18,7 @@ class FaceImagePayload(TypedDict):
bytes: bytes


@dataclass # ① proper dataclass
class FaceStub:
bbox: Tuple[float, float, float, float]
det_score: float
Expand All @@ -42,68 +43,70 @@ def __init__(
self.det_size = det_size
self._initialized = False

# ② single centralized readiness guard
def _ensure_ready(self) -> None:
if self.model is None or not self._initialized:
raise RuntimeError("Model not ready. Call `prepare()` first.")

def load_model(self) -> None:
if self.model is not None:
return

self.model = FaceAnalysis(
name=self.model_name,
providers=list(self.providers),
)
print("[FaceEmbedding] model loaded!")
name=self.model_name, providers=list(self.providers))

def init_model(self) -> None:
if self.model is None:
raise ValueError("Model not loaded")

raise ValueError("Model not loaded. Call `load_model()` first.")
if self._initialized:
return

self.model.prepare(ctx_id=self.ctx_id, det_size=self.det_size) # type: ignore
self.model.prepare(ctx_id=self.ctx_id,
det_size=self.det_size) # type: ignore
self._initialized = True
print("[FaceEmbedding] model initialized")

def prepare(self) -> None:
self.load_model()
self.init_model()

def embed(self, image: np.ndarray, bboxes: Sequence[BBox]) -> list[float]:
if not bboxes:
raise ValueError("No faces to embed")
# ③ explicit detect() method — fixes the abstraction leak in the service layer
def detect(self, image_bgr: np.ndarray) -> list[FaceStub]:
"""Run detection + embedding on a BGR image (OpenCV native format)."""
self._ensure_ready()
return self.model.get(image_bgr) # type: ignore

if self.model is None or not self._initialized:
raise RuntimeError("Model not ready. Call `prepare()` first.")

image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

faces: list[FaceStub] = self.model.get(image_rgb) # type: ignore
def embed(self, image_bgr: np.ndarray, bbox_hint: BBox | None = None) -> list[float]:
"""
Extract embedding of the face closest to bbox_hint (centroid match).
Falls back to highest-confidence face when bbox_hint is None.
"""
self._ensure_ready()
faces: list[FaceStub] = self.model.get(image_bgr) # type: ignore

if not faces:
raise ValueError("No faces detected by the model")

x1, y1, x2, y2 = bboxes[0]
target_cx = (x1 + x2) / 2
target_cy = (y1 + y2) / 2

best_face: Optional[FaceStub] = None
best_dist = float("inf")

for face in faces:
fx1, fy1, fx2, fy2 = face.bbox
cx = (fx1 + fx2) / 2
cy = (fy1 + fy2) / 2

dist = np.sqrt((cx - target_cx) ** 2 + (cy - target_cy) ** 2)

if dist < best_dist:
best_dist = dist
best_face = face
raise ValueError("No faces detected in image")

if best_face is None or best_face.embedding is None:
raise ValueError("Failed to generate embedding for selected face")
face = (
self._pick_by_bbox(faces, bbox_hint)
if bbox_hint is not None
else max(faces, key=lambda f: f.det_score) # ④ best score fallback
)

embedding = best_face.embedding.flatten()
return embedding.tolist()
if face.embedding is None:
raise ValueError("No embedding produced for the selected face")

return face.embedding.flatten().tolist()

@staticmethod
def _pick_by_bbox(faces: list[FaceStub], bbox: BBox) -> FaceStub:
"""⑤ Concise centroid matching with np.hypot instead of manual sqrt."""
tx = (bbox[0] + bbox[2]) / 2
ty = (bbox[1] + bbox[3]) / 2
return min(
faces,
key=lambda f: np.hypot(
(f.bbox[0] + f.bbox[2]) / 2 - tx,
(f.bbox[1] + f.bbox[3]) / 2 - ty,
),
)


class FaceEmbeddingService:
Expand All @@ -115,50 +118,47 @@ async def compute_average_embedding(
self,
payloads: Sequence[FaceImagePayload],
) -> list[float]:

if not payloads:
raise AppException.bad_request(
"At least one image is required for enrollment"
)

embeddings: list[np.ndarray] = []

for payload in payloads:
image = self._decode_image(payload)
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

# Single detection pass — model.get() already returns embeddings
faces: list[FaceStub] = await asyncio.to_thread( # type: ignore
self.face_embedding.model.get, image_rgb # type: ignore
)
# ⑥ parallel processing — all images are embedded concurrently
embeddings: list[np.ndarray] = await asyncio.gather(
*[self._embed_payload(p) for p in payloads]
)

if not faces:
raise AppException.bad_request(
f"No faces detected in image {payload['filename']}"
)
averaged = np.mean(np.stack(embeddings, axis=0), axis=0)
return averaged.astype(float).tolist()

face = faces[0]
async def _embed_payload(self, payload: FaceImagePayload) -> np.ndarray:
"""⑦ Extracted per-image logic into its own async method."""
image = self._decode_image(payload)

if face.embedding is None:
raise AppException.bad_request(
f"Failed to generate embedding for {payload['filename']}"
)
faces: list[FaceStub] = await asyncio.to_thread(
self.face_embedding.detect, image # ③ uses detect(), not model.get()
)

embeddings.append(face.embedding.astype(np.float32))
if not faces:
raise AppException.bad_request(
f"No faces detected in image '{payload['filename']}'"
)

stacked = np.stack(embeddings, axis=0)
averaged = np.mean(stacked, axis=0)
face = max(faces, key=lambda f: f.det_score)

return averaged.astype(float).tolist()
if face.embedding is None:
raise AppException.bad_request(
f"Failed to generate embedding for '{payload['filename']}'"
)

def _decode_image(self, payload: FaceImagePayload) -> np.ndarray:
return face.embedding.astype(np.float32)

@staticmethod
def _decode_image(payload: FaceImagePayload) -> np.ndarray:
buffer = np.frombuffer(payload["bytes"], dtype=np.uint8)
image = cv2.imdecode(buffer, cv2.IMREAD_COLOR)

if image is None:
raise AppException.bad_request(
f"Cannot decode uploaded image {payload['filename']}"
f"Cannot decode uploaded image '{payload['filename']}'"
)

return image
111 changes: 111 additions & 0 deletions app/service/filters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import cv2
import numpy as np
from app.service.data_processor import BaseFilter
from app.core.exceptions import AppException


class BlurFilter(BaseFilter):
"""
Detects if an image is too blurry.
Uses Laplacian variance — sharp images have high variance,
blurry images have low variance.
Blur cannot be fully fixed, but we attempt a sharpening pass once.
"""

BLUR_THRESHOLD = 15.0

def _measure_blur(self, image: np.ndarray) -> float:
"""Returns the Laplacian variance score. Higher = sharper."""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
return cv2.Laplacian(gray, cv2.CV_64F).var()

def _sharpen(self, image: np.ndarray) -> np.ndarray:
"""Applies an unsharp mask to try to recover some sharpness."""
kernel = np.array([
[0, -1, 0],
[-1, 5, -1],
[0, -1, 0]
])
return cv2.filter2D(image, -1, kernel)

def verify_image(self, image: np.ndarray) -> bool:
score = self._measure_blur(image)
print(
f"[BlurFilter] Blur score: {score:.2f} (threshold: {self.BLUR_THRESHOLD})")
return score >= self.BLUR_THRESHOLD

def process_image(self, image: np.ndarray) -> np.ndarray:
# Step 1 — test
if self.verify_image(image):
print("[BlurFilter] PASS — image is sharp enough.")
return image

print("[BlurFilter] FAIL — image is blurry. Attempting sharpening fix...")

# Step 2 — fix
fixed = self._sharpen(image)

# Step 3 — retest
if self.verify_image(fixed):
print("[BlurFilter] PASS after fix — sharpening worked.")
return fixed

# Step 4 — reject
raise AppException.image_blur_error(
f"Image is too blurry (score: {self._measure_blur(image):.2f}, threshold: {self.BLUR_THRESHOLD}) and could not be recovered."
)


class BrightnessFilter(BaseFilter):
"""
Detects if an image is too dark or too bright.
Uses the mean pixel value of the grayscale image.
Attempts gamma correction as a fix.
"""

MIN_BRIGHTNESS = 70 # below this = too dark
MAX_BRIGHTNESS = 220 # above this = too bright

def _measure_brightness(self, image: np.ndarray) -> float:
"""Returns mean brightness (0-255). 0 = black, 255 = white."""
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
return float(np.mean(gray))

def _gamma_correction(self, image: np.ndarray, gamma: float) -> np.ndarray:
"""
Gamma < 1 = darken, Gamma > 1 = brighten.
Builds a lookup table for fast per-pixel correction.
"""
inv_gamma = 1.0 / gamma
table = np.array([
((i / 255.0) ** inv_gamma) * 255
for i in range(256)
], dtype=np.uint8)
return cv2.LUT(image, table)

def verify_image(self, image: np.ndarray) -> bool:
brightness = self._measure_brightness(image)
print(
f"[BrightnessFilter] Brightness: {brightness:.2f} (range: {self.MIN_BRIGHTNESS}-{self.MAX_BRIGHTNESS})")
return self.MIN_BRIGHTNESS <= brightness <= self.MAX_BRIGHTNESS

def process_image(self, image: np.ndarray) -> np.ndarray:
# Step 1 — test
if self.verify_image(image):
return image

brightness = self._measure_brightness(image)

# Step 2 — fix
gamma = 2.0 if brightness < self.MIN_BRIGHTNESS else 0.5
fixed = self._gamma_correction(image, gamma)

# Step 3 — retest
if self.verify_image(fixed):
return fixed

# Step 4 — reject
raise AppException.bad_request(
f"Image brightness {brightness:.2f} is out of acceptable range "
f"({self.MIN_BRIGHTNESS}–{self.MAX_BRIGHTNESS}) and could not be corrected."
)