Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
0dc29a5
feat: Introduce new audit, notification, and storage cleaner workers,
wailbentafat Mar 20, 2026
bb04045
feat: Introduce new services for auditing and user notifications, and…
wailbentafat Mar 20, 2026
338b46a
feat: Add user notification system with API endpoints for listing and…
wailbentafat Mar 20, 2026
fb5288a
feat: Implement push notification support using Firebase Admin and AP…
wailbentafat Mar 20, 2026
fc2b2cd
feat: Add type stubs for googleapiclient and pywebpush, refactor noti…
wailbentafat Mar 20, 2026
b570044
feat: Introduce an audit logging system with a dedicated worker, serv…
wailbentafat Mar 20, 2026
ec6cd71
feat: Add audit table migrations and refactor audit and notification …
wailbentafat Mar 20, 2026
195f49b
refactor: migrate notification event payload schema to Pydantic with …
wailbentafat Mar 20, 2026
2e4bb34
Refactor notification providers to directly use device/subscription i…
wailbentafat Mar 20, 2026
65660ef
feat:refactor the worker to unify firebase message sending and handli…
wailbentafat Mar 24, 2026
aef9861
chore:remove firebase stubs replace it with internal typing cast and…
wailbentafat Mar 25, 2026
2de9900
Add device token lifecycle tracking
wailbentafat Mar 25, 2026
53f04d9
Fix lint issues
wailbentafat Mar 25, 2026
a30c44b
feat: fix file structure
wailbentafat Mar 25, 2026
e8a8da7
Fix type checking errors
wailbentafat Mar 25, 2026
fa023db
Add audit gateway and notification service
wailbentafat Mar 25, 2026
152a3c4
add notifcation gateway in the service
wailbentafat Mar 25, 2026
6cc3969
Enhance audit filters
wailbentafat Mar 25, 2026
5a4b52f
feat: add notifcaation and audit endpoint with propre filter link th…
wailbentafat Mar 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .env.staging.example
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,11 @@ encryption_key=super_secret_encryption_key
totp_issuer=MultiAI




GOOGLE_CLIENT_ID=
GOOGLE_CLIENT_SECRET=
GOOGLE_REDIRECT_URI=http://127.0.0.1:8000/staff/drive/callback
GOOGLE_OAUTH_SCOPES=https://www.googleapis.com/auth/drive.readonly openid email profile

FACE_ENCRYPTION_KEY=base64-encoded-32-byte-key
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ __pycache__
db/schema.sql

.vscode/settings.json

multiai-c9380-firebase-adminsdk-fbsvc-cb6e5ce41b.json
db.txt

23 changes: 20 additions & 3 deletions app/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
from app.service.staff_notifications import StaffNotificationsService
from app.service.staff_user import StaffUserService

from app.service.audit import AuditService
from app.service.upload_requests import UploadRequestsService
from app.service.users import AuthService
from app.service.user_notification import UserNotificationService
from db.generated import devices as device_queries
from db.generated import photos as photo_queries
from db.generated import session as session_queries
Expand All @@ -27,7 +29,11 @@
from db.generated import events as event_queries
from db.generated import eventParticipant as participant_queries
from db.generated import stuff_user as staff_queries
from db.generated import notifications as notification_queries
from db.generated import audit as audit_queries
from app.service.event import EventService
from app.worker.notification.notification_queue import NotificationQueue
from app.worker.notification.settings import NotifSetting

class Container:
def __init__(
Expand All @@ -49,6 +55,8 @@ def __init__(
self.upload_request_photo_querier = upload_request_photo_queries.AsyncQuerier(conn)
self.photo_querier = photo_queries.AsyncQuerier(conn)
self.staff_notification_querier = staff_notification_queries.AsyncQuerier(conn)
self.notification_querier = notification_queries.AsyncQuerier(conn)
self.audit_querier = audit_queries.AsyncQuerier(conn)
self.event_querier = event_queries.AsyncQuerier(conn)
self.participant_querier = participant_queries.AsyncQuerier(conn)
self.staff_querier = staff_queries.AsyncQuerier(conn)
Expand Down Expand Up @@ -94,6 +102,18 @@ def __init__(
staff_notifications_service=self.staff_notifications_service,
)

notification_queue = NotificationQueue(settings=NotifSetting)

self.user_notifications_service = UserNotificationService(
notification_querier=self.notification_querier,
notification_queue=notification_queue,
)

self.audit_service = AuditService(
audit_querier=self.audit_querier,
user_querier=self.user_querier,
)

self.staff_user_service = StaffUserService()

self.staff_user_service.init(
Expand All @@ -104,9 +124,6 @@ def __init__(
p_querier=self.participant_querier,
)




async def get_container(
conn: sqlalchemy.ext.asyncio.AsyncConnection = Depends(get_db),
) -> Container:
Expand Down
1 change: 1 addition & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class Settings(BaseSettings):
)

FACE_ENCRYPTION_KEY: str
FIREBASE_CREDENTIALS_PATH: str = "multiai-c9380-firebase-adminsdk-fbsvc-cb6e5ce41b.json"

class Config:
env_file = ".env"
Expand Down
16 changes: 16 additions & 0 deletions app/core/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,22 @@
class RedisKey(str, Enum):
UserSession = "user_session"
UserSessionByUser = "user_session:{user_id}"
INVALID_TOKEN_SET_KEY= "notifications:invalid_tokens"


NOTIFICATION_EVENT_SUBJECT = "notification_event"
AUDIT_EVENT_SUBJECT = "audit.event"


class AuditEventType(str, Enum):
USER_SIGNUP = "user.signup"
USER_LOGIN = "user.login"
USER_LOGOUT = "user.logout"
UPLOAD_REQUEST_CREATED = "upload_request.created"
UPLOAD_REQUEST_APPROVED = "upload_request.approved"
UPLOAD_REQUEST_REJECTED = "upload_request.rejected"



IMAGE_ALLOWED_TYPES = {
"image/jpeg",
Expand Down
35 changes: 25 additions & 10 deletions app/infra/nats.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@
from pydantic import BaseModel

from app.core.config import settings
from app.core.constant import NOTIFICATION_EVENT_SUBJECT, AUDIT_EVENT_SUBJECT


class Message(BaseModel):
data: dict[str, Any]


class NatsSubjects(Enum):
USER_SIGNUP = "user.signup"
USER_LOGIN = "user.login"
USER_LOGOUT = "user.logout"
NOTIFICATION_EVENT = NOTIFICATION_EVENT_SUBJECT
AUDIT_EVENT = AUDIT_EVENT_SUBJECT
STAFF_UPLOAD_REQUEST_CREATED = "staff.upload_request.created"
STAFF_UPLOAD_REQUEST_APPROVED = "staff.upload_request.approved"
STAFF_UPLOAD_REQUEST_REJECTED = "staff.upload_request.rejected"
Expand All @@ -24,13 +29,19 @@ class NatsClient:
_js: Optional[JetStreamContext] = None

@staticmethod
async def connect() -> None:
async def connect(
*,
host: str | None = None,
port: int | None = None,
user: str | None = None,
password: str | None = None,
) -> None:
if NatsClient._nc is None:
nc = NATS()
await nc.connect(
servers=[f"nats://{settings.NATS_HOST}:{settings.NATS_PORT}"],
user=settings.NATS_USER,
password=settings.NATS_PASSWORD,
servers=[f"nats://{host or settings.NATS_HOST}:{port or settings.NATS_PORT}"],
user=user or settings.NATS_USER,
password=password or settings.NATS_PASSWORD,
)
NatsClient._nc = nc
NatsClient._js = nc.jetstream() # type: ignore
Expand All @@ -45,23 +56,25 @@ async def close() -> None:


@staticmethod
async def publish(subject: NatsSubjects, message: bytes) -> None:
async def publish(subject: NatsSubjects | str, message: bytes) -> None:
if NatsClient._nc is None:
await NatsClient.connect()
nc = NatsClient._nc
assert nc is not None
await nc.publish(subject.value, message)
subject_name = subject.value if isinstance(subject, NatsSubjects) else subject
await nc.publish(subject_name, message)

@staticmethod
async def subscribe(subject: NatsSubjects, callback: Callable[[Any], Any]) -> None:
async def subscribe(subject: NatsSubjects | str, callback: Callable[[Any], Any]) -> None:
if NatsClient._nc is None:
await NatsClient.connect()
nc = NatsClient._nc
assert nc is not None
async def _wrapper(msg: Msg) -> None:
await callback(msg.data)

await nc.subscribe(subject.value, cb=_wrapper) # type: ignore
subject_name = subject.value if isinstance(subject, NatsSubjects) else subject
await nc.subscribe(subject_name, cb=_wrapper) # type: ignore


@staticmethod
Expand All @@ -70,7 +83,8 @@ async def js_publish(subject: NatsSubjects, message: bytes, stream_name: str) ->
await NatsClient.connect()
js = NatsClient._js
assert js is not None
await js.publish(subject.value, message, stream=stream_name)
subject_name = subject.value if isinstance(subject, NatsSubjects) else subject # type: ignore
await js.publish(subject_name, message, stream=stream_name)

@staticmethod
async def js_subscribe(
Expand All @@ -88,8 +102,9 @@ async def _wrapper(msg: Msg) -> None:
await msg.ack()
js = NatsClient._js
assert js is not None
subject_name = subject.value
await js.subscribe(
subject=subject.value,
subject=subject_name,
stream=stream_name,
durable=durable_name,
cb=_wrapper,
Expand Down
67 changes: 46 additions & 21 deletions app/infra/redis.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,37 @@
from typing import Any
from typing import cast, ClassVar

from redis.asyncio import Redis

from app.core.constant import RedisKey


class RedisClient:
client: Redis
_instance = None
_client: Redis
_instance: ClassVar["RedisClient | None"] = None

def __new__(cls, *args: Any, **kwargs: Any) -> "RedisClient":
def __init__(self, host: str, port: int, password: str) -> None:
self._client = Redis.from_url( # type: ignore
f"redis://{host}:{port}",
password=password,
decode_responses=True,
)


@classmethod
def init(cls, host: str, port: int, password: str) -> "RedisClient":
if cls._instance is not None:
raise RuntimeError("RedisClient already initialized")

cls._instance = cls(host, port, password)
return cls._instance

@classmethod
def get_instance(cls) -> "RedisClient":
if cls._instance is None:
cls._instance = super().__new__(cls)
raise RuntimeError("RedisClient not initialized")

return cls._instance

def __init__(self, host: str, port: int, password: str) -> None:
if not hasattr(self, "client"):
self.client = Redis.from_url( # type: ignore
f"redis://{host}:{port}", password=password, decode_responses=True
)

async def set(
self,
Expand All @@ -27,25 +40,37 @@ async def set(
expire: int | None = None,
nx: bool = False,
) -> bool:
return await self.client.set(key, value, ex=expire, nx=nx)
result = await self._client.set(key, value, ex=expire, nx=nx)
return bool(result)

async def get(self, key: RedisKey | str) -> str | None:
return await self.client.get(key)
return await self._client.get(key)

async def delete(self, key: RedisKey | str) -> int:
return await self.client.delete(key)
result = await self._client.delete(key)
return int(cast(int, result))

async def exists(self, key: RedisKey | str) -> bool:
return await self.client.exists(key) > 0
result = await self._client.exists(key)
return int(cast(int, result)) > 0

async def expire(self, key: RedisKey | str, seconds: int) -> bool:
return await self.client.expire(key, seconds)
result = await self._client.expire(key, seconds)
return int(cast(int, result)) == 1


async def sadd(self, key: RedisKey | str, *values: str) -> int:
result = self._client.sadd(key, *values)
return int(cast(int, result))

async def sismember(self, key: RedisKey | str, value: str) -> bool:
result = self._client.sismember(key, value)
return int(cast(int, result)) == 1

async def srem(self, key: RedisKey | str, *values: str) -> int:
result = self._client.srem(key, *values)
return int(cast(int, result))

@classmethod
def get_instance(cls) -> "RedisClient":
if cls._instance is None:
raise RuntimeError("RedisClient not initialized")
return cls._instance

async def close(self) -> None:
await self.client.close()
await self._client.close()
1 change: 0 additions & 1 deletion app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from fastapi import FastAPI, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint

from app.core.config import settings
from app.infra.minio import init_minio_client
from app.infra.nats import NatsClient
Expand Down
5 changes: 3 additions & 2 deletions app/router/mobile/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
from app.router.mobile.auth import router as mobile_auth_router
from app.router.mobile.enrollement import router as onboarding_router
from app.router.mobile.event import router as event_router
from app.router.mobile.notifications import router as mobile_notifications_router


router = APIRouter(prefix="/user",tags=["user"])
router = APIRouter(prefix="/user", tags=["user"])
router.add_api_route
router.include_router(mobile_auth_router)
router.include_router(onboarding_router)
router.include_router(event_router)

router.include_router(mobile_notifications_router)
40 changes: 40 additions & 0 deletions app/router/mobile/audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from __future__ import annotations

from datetime import datetime
from uuid import UUID

from fastapi import APIRouter, Depends, Query

from app.container import Container, get_container
from app.core.constant import AuditEventType
from app.deps.token_auth import MobileUserSchema, get_current_mobile_user
from app.schema.response.mobile.audit import AuditEventListResponse, AuditEventSchema

router = APIRouter(prefix="/audits", tags=["audits"])


@router.get("", response_model=AuditEventListResponse)
async def list_audits(
event_type: AuditEventType | None = Query(None),
user_id: UUID | None = Query(None),
created_from: datetime | None = Query(None, alias="from"),
created_to: datetime | None = Query(None, alias="to"),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
container: Container = Depends(get_container),
_: MobileUserSchema = Depends(get_current_mobile_user),
) -> AuditEventListResponse:
events = await container.audit_service.list_audit_events(
event_type=event_type,
user_id=user_id,
created_from=created_from,
created_to=created_to,
limit=limit,
offset=offset,
)
return AuditEventListResponse(
items=[
AuditEventSchema.from_model(audit_event, actor=actor)
for audit_event, actor in events
]
)
Loading