Skip to content

feat: switch to 1:1 eventType as a topic#207

Merged
HardMax71 merged 9 commits intomainfrom
feat/v3-less-useless-services
Feb 19, 2026
Merged

feat: switch to 1:1 eventType as a topic#207
HardMax71 merged 9 commits intomainfrom
feat/v3-less-useless-services

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Feb 18, 2026


Summary by cubic

Switched to 1:1 EventType-as-topic names (no prefix) and moved DLQ handling fully in-process with per-event retry policies. Removed kafka-init, mapping code, and the generic dlq_events topic; fixed SSE OpenAPI media type and added ExecutionCancelled to both saga handling and SSE execution streams.

  • Refactors

    • Removed KafkaTopic and GroupId; producers publish to EventType string topics, consumers subscribe by EventType.
    • Deleted kafka/mappings.py and topic creation script; simplified topic logic.
    • DLQ: internal manager with preset per-event policies; removed DLQ routes, schemas, and E2E tests; DLQ events publish to their own EventType topics.
    • Replay configs now map EventType to string topic names; simplified service identifiers to string names.
    • Updated OpenAPI and regenerated frontend; SSE endpoints show text/event-stream; SSE execution stream includes ExecutionCancelled.
    • Moved non-E2E tests to unit; added unit app tests; consolidated subscribers into a single handler.
  • Migration

    • No manual topic creation; Kafka auto-creates topics (compose sets KAFKA_NUM_PARTITIONS=3).
    • Topic names match EventType strings with no prefix; update any dashboards/alerts expecting prefixed names.
    • DLQ REST endpoints are removed; retire any external integrations relying on them.
    • The generic dlq_events topic is removed; point any consumers to the DLQ EventType topics.

Written for commit 7d0ceb5. Summary will update on new commits.

Summary by CodeRabbit

  • New Features

    • Per-event retry policies with several predefined strategies; topics are now derived from event types (string names).
  • Bug Fixes & Improvements

    • Simplified event routing and DLQ processing; services wait for Kafka health and default partitions set.
  • Removed

    • DLQ management HTTP endpoints, DLQ schemas and SDK methods, and related tests.
  • Docs

    • Architecture and deployment docs updated to reflect per-event-topic design and operational changes.

@coderabbitai
Copy link

coderabbitai bot commented Feb 18, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Removed KafkaTopic/GroupId enums and the mappings layer; topics now use EventType string values. DLQ retry-policy resolution moved internal to DLQManager and DLQ HTTP/schema surface and topic-creation automation were removed. Producers, handlers, configs, docs, tests, and frontend types updated accordingly.

Changes

Cohort / File(s) Summary
Enums & mappings removed
backend/app/domain/enums/kafka.py, backend/app/domain/enums/__init__.py, backend/app/infrastructure/kafka/mappings.py, backend/app/infrastructure/kafka/__init__.py
Deleted KafkaTopic and GroupId, removed EVENT_TYPE_TO_TOPIC and consumer-group/topic mapping helpers, and reduced re-exports.
DLQ manager & providers
backend/app/dlq/manager.py, backend/app/core/providers.py, backend/tests/e2e/dlq/test_dlq_manager.py
DLQManager ctor params removed (no default_retry_policy/retry_policies/dlq_topic); added _retry_overrides, _resolve_retry_policy, set_retry_policy; MessagingProvider/UnifiedProducer signature adjusted; tests updated to EventType usage.
Retry policy models
backend/app/dlq/models.py
Removed topic from RetryPolicy; added predefined policies (AGGRESSIVE_RETRY, CAUTIOUS_RETRY, IMMEDIATE_RETRY, DEFAULT_RETRY) and retry_policy_for(event_type) selector.
Producer & handlers: topic derivation
backend/app/events/core/producer.py, backend/app/events/handlers.py
Removed mapping lookup and Settings/topic-prefix usage; producers and handlers derive topics directly from EventType (string) and register per-event subscriptions instead of topic-filtering.
Topic categorization
backend/app/infrastructure/kafka/topics.py
Replaced KafkaTopic configs with EventType sets (e.g., COMMAND_TYPES, RESULT_TYPES, etc.); removed get_all_topics/get_topic_configs and KafkaTopic typing.
Replay/schemas/configs
backend/app/db/docs/replay.py, backend/app/domain/replay/models.py, backend/app/schemas_pydantic/replays.py, backend/app/services/pod_monitor/config.py
Changed target_topics types from dict[EventType, KafkaTopic]dict[EventType, str]; removed KafkaTopic-typed config fields and related imports.
DLQ API & schemas removed
backend/app/api/routes/dlq.py, backend/app/schemas_pydantic/dlq.py, frontend/src/lib/api/sdk.gen.ts, frontend/src/lib/api/types.gen.ts
Deleted DLQ REST routes and Pydantic DLQ schemas; removed frontend SDK DLQ endpoints and DLQ public types; OpenAPI/schema KafkaTopic refs replaced with string.
Topic creation & deployment changes
backend/scripts/create_topics.py, docker-compose.yaml, docs/operations/deployment.md
Removed topic-creation script and kafka-init service; added KAFKA_NUM_PARTITIONS: 3; services now depend on Kafka health; docs updated for on-demand topic creation.
SSE & service-name adjustments
backend/app/services/sse/redis_bus.py, backend/app/services/pod_monitor/event_mapper.py, backend/app/services/result_processor/processor.py
Removed SSE_ROUTED_EVENTS and EventType/ClassVar imports; replaced GroupId enum values with literal service-name strings.
Tests cleanup & updates
backend/tests/e2e/conftest.py, backend/tests/unit/events/test_mappings_and_types.py, backend/tests/unit/services/pod_monitor/test_config_and_init.py, backend/tests/e2e/test_dlq_routes.py
Updated topic construction to use EventType strings; removed mapping-related unit tests and many DLQ e2e/unit tests.
Docs & OpenAPI
docs/architecture/*, docs/reference/openapi.json
Docs and OpenAPI updated to a 1:1 EventType→topic model; removed mapping layer; renamed SSEExecutionEventData → SSEExecutionEventSchema; removed KafkaTopic component.
Frontend formatting & types
frontend/src/lib/api/*, frontend/src/lib/editor/execution.svelte.ts, frontend/src/lib/api/index.ts
Formatting/type refactors and renames to SseExecutionEventSchema; removed KafkaTopic usages in types. Note: a TypeScript syntax error was introduced in frontend/src/lib/api/client/types.gen.ts (extraneous tokens).

Sequence Diagram(s)

sequenceDiagram
    participant Producer
    participant DLQManager
    participant Repository
    participant Publisher
    Note over Producer: message processing failed / error
    Producer->>DLQManager: submit failed message
    DLQManager->>DLQManager: _resolve_retry_policy(message)
    DLQManager->>Repository: persist DLQMessage (status=PENDING)
    DLQManager->>Publisher: publish DLQMessageReceivedEvent (topic = EventType.DLQ_MESSAGE_RECEIVED)
    Publisher-->>DLQManager: ack
    DLQManager->>DLQManager: schedule/apply retries per resolved policy or _retry_overrides
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰
I hopped through enums and mappings today,
EventType strings now lead the way,
DLQ policies tucked safely inside,
Topics bloom on-demand, far and wide,
A rabbit cheers as messages play!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 21.28% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title accurately summarizes the main change: switching to a 1:1 mapping between EventType and Kafka topics, eliminating the previous prefix+mapping approach.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/v3-less-useless-services

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues found across 41 files

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (5)
backend/app/events/handlers.py (1)

366-370: ⚠️ Potential issue | 🟠 Major

Replace assert with an explicit guard — assertions are stripped at -O

assert not isinstance(raw, tuple) is used as a runtime type guard before accessing raw.offset and raw.partition. Python's -O optimization flag silently drops assert statements, causing an AttributeError on the subsequent attribute access if raw is ever a tuple (e.g., if batch processing is inadvertently enabled).

🛡️ Proposed fix
-        assert not isinstance(raw, tuple)
-        body.dlq_offset = raw.offset
-        body.dlq_partition = raw.partition
+        if isinstance(raw, tuple):
+            raise TypeError(
+                f"DLQ subscriber received a batch message; expected a single AIOKafkaMessage, got tuple"
+            )
+        body.dlq_offset = raw.offset
+        body.dlq_partition = raw.partition
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/events/handlers.py` around lines 366 - 370, Replace the fragile
assert in the message handler with an explicit runtime guard: check if
msg.raw_message (variable raw) is a tuple using isinstance(raw, tuple) and if so
raise a clear exception (e.g., TypeError or RuntimeError) or log and return
early so you don't attempt to access raw.offset/raw.partition; update the block
around start = asyncio.get_running_loop().time(), raw = msg.raw_message,
body.dlq_offset and body.dlq_partition to perform this explicit check and
fail-fast with a descriptive error mentioning the unexpected type and message
metadata (use the symbols raw, msg.raw_message, body.dlq_offset,
body.dlq_partition).
docs/architecture/kafka-topic-architecture.md (1)

130-140: ⚠️ Potential issue | 🟡 Minor

Diagram topic name should match the code.

Line 134 references dead_letter_queue topic in the diagram, but DLQManager derives the DLQ events topic from EventType.DLQ_MESSAGE_RECEIVED (see manager.py line 60). Ensure the diagram uses the actual topic name for consistency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/architecture/kafka-topic-architecture.md` around lines 130 - 140, The
diagram label "dead_letter_queue topic" is inconsistent with the code: update
the Mermaid diagram to use the actual DLQ topic name derived from
EventType.DLQ_MESSAGE_RECEIVED (as used in DLQManager in manager.py) so the
diagram matches the implementation; replace the current text with the exact
topic identifier or constant name used by EventType.DLQ_MESSAGE_RECEIVED and
ensure any explanatory text references that same constant.
docs/reference/openapi.json (2)

10484-10492: ⚠️ Potential issue | 🟡 Minor

Document expected keys for target_topics.

Line 10484 now documents only string values, but the keys are still implicitly EventType. Without a hint, client validation and docs will allow arbitrary keys. Consider adding a description clarifying the key semantics.

📝 Suggested documentation tweak
           "target_topics": {
             "anyOf": [
               {
                 "additionalProperties": {
                   "type": "string"
                 },
                 "type": "object"
               },
               {
                 "type": "null"
               }
             ],
-            "title": "Target Topics"
+            "title": "Target Topics",
+            "description": "Map of EventType values to target topic names."
           },
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/reference/openapi.json` around lines 10484 - 10492, The target_topics
schema currently only documents string values but doesn't indicate that object
keys represent EventType, allowing arbitrary keys; update the OpenAPI schema for
target_topics (the "target_topics" property in the same object) to include a
clear description that keys are EventType identifiers and constrain them—either
add a descriptive "description" explaining key semantics and examples, or use
schema features (patternProperties or additionalProperties with a key
pattern/enumeration) to validate keys as EventType values so clients and docs
know the expected keys.

11346-11503: ⚠️ Potential issue | 🟠 Major

Breaking rename for generated clients—add a deprecated alias or migration notice.

The OpenAPI spec now references SSEExecutionEventSchema, but if this is a rename from SSEExecutionEventData that previously existed in the schema, existing generated clients will break. While the backend code maintains both the domain dataclass (SSEExecutionEventData) and Pydantic schema (SSEExecutionEventSchema) separately, clients consuming the OpenAPI spec will only see the new name.

To preserve compatibility:

  • Add a deprecated alias SSEExecutionEventData in the OpenAPI schema pointing to SSEExecutionEventSchema, or
  • Document the breaking change in migration/upgrade documentation (currently absent).
🛠️ Suggested compatibility alias
      "SSEExecutionEventSchema": {
        ...
      },
+     "SSEExecutionEventData": {
+       "allOf": [
+         { "$ref": "#/components/schemas/SSEExecutionEventSchema" }
+       ],
+       "deprecated": true,
+       "description": "Deprecated alias for SSEExecutionEventSchema. Use SSEExecutionEventSchema instead."
+     },
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/reference/openapi.json` around lines 11346 - 11503, The OpenAPI rename
breaks generated clients; add a deprecated alias named SSEExecutionEventData
that references the new SSEExecutionEventSchema (so tools resolving $ref
"#/components/schemas/SSEExecutionEventData" will point to
"#/components/schemas/SSEExecutionEventSchema"), or alternatively add a clear
migration note in docs describing the rename from SSEExecutionEventData ->
SSEExecutionEventSchema and mark the old name as deprecated; update the
components/schemas section to include the SSEExecutionEventData alias
referencing SSEExecutionEventSchema and add a deprecation description.
frontend/src/lib/api/client/client.gen.ts (1)

94-100: ⚠️ Potential issue | 🟡 Minor

Error interceptors are not chained properly.

The loop passes the original error to each interceptor instead of the transformed finalError from the previous interceptor. This breaks the chaining pattern used for request interceptors (which pass transformed values forward). Each error interceptor receives the original error rather than being able to build on previous transformations.

Suggested fix
-          finalError = (await fn(error, undefined as any, request, opts)) as unknown;
+          finalError = (await fn(finalError, undefined as any, request, opts)) as unknown;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frontend/src/lib/api/client/client.gen.ts` around lines 94 - 100, The error
interceptor loop currently always passes the original error variable to each
function, breaking chaining; update the loop over interceptors.error.fns so each
call receives the most-recent transformed error (finalError) and then assign the
awaited result back to finalError (preserving the cast/typing), i.e., call
fn(finalError, undefined as any, request, opts) and set finalError = (await
fn(...)) as unknown so subsequent interceptors see prior transformations.
🧹 Nitpick comments (8)
frontend/src/lib/api/core/queryKeySerializer.gen.ts (1)

1-1: Formatting edits to an auto-generated file may be silently overwritten.

The // This file is auto-generated by @hey-api/openapi-ts`` header signals that the file's canonical source is the OpenAPI spec + generator config, not the file itself. Any formatting changes applied here will be lost the next time openapi-ts regenerates this file. If the intent is to persist these stylistic preferences, they should instead be expressed through the generator's config options or a post-generation prettier/ESLint pass that is run automatically (e.g., as part of the generation script).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frontend/src/lib/api/core/queryKeySerializer.gen.ts` at line 1, This file has
a generator header "// This file is auto-generated by `@hey-api/openapi-ts`" so
stop making direct formatting edits in queryKeySerializer.gen.ts; instead, move
any desired style/config changes into the OpenAPI generator pipeline — update
the `@hey-api/openapi-ts` generator config or add a post-generation step
(prettier/ESLint) in the generation script so the formatting is applied
automatically and persisted on regen.
backend/app/services/pod_monitor/event_mapper.py (1)

184-188: Consider a module-level constant for "pod-monitor".

Hardcoding the service name inline is fine for a single call site, but defining it as a module-level constant (e.g., _SERVICE_NAME = "pod-monitor") makes future renames grep-safe and consistent with how service_version is already handled at the module level in other services.

♻️ Optional: extract as a module-level constant
+_SERVICE_NAME = "pod-monitor"
+_SERVICE_VERSION = "1.0.0"

 ...

     md = EventMetadata(
         user_id=labels.get("user-id", str(uuid4())),
-        service_name="pod-monitor",
-        service_version="1.0.0",
+        service_name=_SERVICE_NAME,
+        service_version=_SERVICE_VERSION,
     )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/pod_monitor/event_mapper.py` around lines 184 - 188,
Extract the hardcoded "pod-monitor" into a module-level constant (e.g.,
_SERVICE_NAME = "pod-monitor") and replace the inline literal in the
EventMetadata construction (where md is assigned) with that constant; keep
service_version usage consistent with the existing pattern and ensure the
constant name is private (leading underscore) to match module conventions.
backend/app/services/result_processor/processor.py (1)

147-147: Extract the duplicated service-name literal to a module-level constant.

"result-processor" is duplicated at lines 147 and 162 in _publish_result_stored and _publish_result_failed. Extracting it to a constant prevents typos and improves maintainability.

♻️ Proposed refactor — hoist to a module constant
 import structlog
 
+_SERVICE_NAME = "result-processor"
+
 from app.core.metrics import ExecutionMetrics
-                service_name="result-processor",
+                service_name=_SERVICE_NAME,

Apply in both _publish_result_stored (line 147) and _publish_result_failed (line 162).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/result_processor/processor.py` at line 147, Extract the
duplicated literal "result-processor" into a module-level constant (e.g.,
RESULT_PROCESSOR_SERVICE_NAME) and replace the hardcoded strings in both
_publish_result_stored and _publish_result_failed with that constant; declare
the constant at the top of the processor.py module and use the constant in the
service_name parameter in both functions to avoid duplication and typos.
backend/app/events/handlers.py (2)

284-301: All 16 SSE handlers share the name on_sse_event — breaks AsyncAPI docs and test mocks

Each loop iteration shadows the on_sse_event name, leaving only the last iteration's function object in scope. The subscriptions themselves are registered correctly (the topic argument to @broker.subscriber is evaluated eagerly), but:

  1. AsyncAPI / observability: FastStream uses the handler function's __name__ for documentation and logging. All 16 subscribers will be labelled on_sse_event, making them indistinguishable in traces and generated docs.
  2. Testing: In TestKafkaBroker, the on_sse_event variable refers only to the last registered handler, so mocking or asserting on earlier topics via this name is impossible.

Use a unique name per event type or pass an explicit title to the decorator:

♻️ Proposed fix — unique handler names via functools.partial / factory
-    for et in _SSE_EVENT_TYPES:
-        topic = f"{prefix}{et}"
-
-        `@broker.subscriber`(
-            topic,
-            group_id="sse-bridge-pool",
-            ack_policy=AckPolicy.ACK_FIRST,
-            auto_offset_reset="latest",
-            max_workers=settings.SSE_CONSUMER_POOL_SIZE,
-        )
-        async def on_sse_event(
-                body: DomainEvent,
-                sse_bus: FromDishka[SSERedisBus],
-        ) -> None:
-            await sse_bus.route_domain_event(body)
+    def _make_sse_handler(et: EventType) -> Any:
+        topic = f"{prefix}{et}"
+
+        `@broker.subscriber`(
+            topic,
+            group_id="sse-bridge-pool",
+            ack_policy=AckPolicy.ACK_FIRST,
+            auto_offset_reset="latest",
+            max_workers=settings.SSE_CONSUMER_POOL_SIZE,
+            title=f"on_sse_event_{et.name.lower()}",
+        )
+        async def _handler(
+                body: DomainEvent,
+                sse_bus: FromDishka[SSERedisBus],
+        ) -> None:
+            await sse_bus.route_domain_event(body)
+
+        return _handler
+
+    for et in _SSE_EVENT_TYPES:
+        _make_sse_handler(et)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/events/handlers.py` around lines 284 - 301, The loop in
register_sse_subscriber creates multiple subscriber functions all named
on_sse_event which causes shadowing and indistinguishable handlers; fix by
generating a unique handler per event type (capturing et/topic) — either create
a handler factory that returns an async function (e.g., make_on_sse_event(et) ->
async def handler(body, sse_bus): ...) and set its __name__ to include the event
type, or pass an explicit title/name into broker.subscriber for each topic;
ensure the handler references the captured topic/et and calls
sse_bus.route_domain_event(body) so each subscription has a distinct identifier
for docs/tests.

64-122: Missing explicit auto_offset_reset on coordinator and saga subscribers may silently drop messages

The coordinator and saga subscribers omit auto_offset_reset, so they inherit FastStream's Kafka default of "latest". The default auto_offset_reset for FastStream's Kafka subscriber is "latest". For a new consumer group (first deploy, group rename, or offset reset), any EXECUTION_REQUESTED / EXECUTION_COMPLETED / etc. messages published before the consumer starts will be skipped entirely.

The result-processor and DLQ already explicitly use "earliest" — the coordinator and saga should do the same to be consistent and safe for cold starts.

🛡️ Proposed fix — add explicit offset reset to all coordinator subscribers
     `@broker.subscriber`(
         f"{prefix}{EventType.EXECUTION_REQUESTED}",
         group_id="execution-coordinator",
         ack_policy=AckPolicy.ACK,
+        auto_offset_reset="earliest",
     )

(Apply the same addition to all four coordinator and four saga subscribers.)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/events/handlers.py` around lines 64 - 122, The coordinator
subscriber handlers on_execution_requested, on_execution_completed,
on_execution_failed and on_execution_cancelled currently rely on the
broker.subscriber default offset policy ("latest") and must explicitly set
auto_offset_reset="earliest"; update each broker.subscriber decorator for these
functions (and do the same for the corresponding saga subscriber decorators) to
include auto_offset_reset="earliest" so new consumer groups/cold starts will
consume past EXECUTION_* events instead of skipping them.
backend/app/events/core/producer.py (1)

34-34: Production auto-create topic governance needs explicit policy.

The per-event-type topic change creates 57 distinct Kafka topics (previously many-to-one mapping). Infrastructure is appropriately configured for this volume: docker-compose sets KAFKA_NUM_PARTITIONS: 3 and KAFKA_LOG_RETENTION_HOURS: 168. All 7 workers have dedicated consumer groups (execution-coordinator, dlq-processor, event-replay, k8s-worker, pod-monitor, result-processor, saga-orchestrator). Monitoring is in place via otel-collector kafkametrics (brokers, topics, consumers scrapers).

However, production deployment reuses the same docker-compose.yaml, leaving KAFKA_AUTO_CREATE_TOPICS_ENABLE: true active. While documented, this should be explicitly governed in production—either via a separate prod config that disables auto-create, or documented as an intentional decision with compensating controls (e.g., topic allowlist, approval workflow, or audit logging).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/events/core/producer.py` at line 34, The new per-event topic
naming (topic = f"{self._topic_prefix}{event_to_produce.event_type}") can cause
uncontrolled auto-creation of many Kafka topics in production when
KAFKA_AUTO_CREATE_TOPICS_ENABLE is left true; update deployment/config
management to either (A) disable auto-create in production (provide a separate
prod config that sets KAFKA_AUTO_CREATE_TOPICS_ENABLE=false) or (B) implement an
explicit topic governance control: add a topic allowlist and an approval/audit
workflow that validates event_to_produce.event_type values before producing, and
ensure the producer (the code building topic from self._topic_prefix and
event_to_produce.event_type) logs denied attempts and fails fast when a topic is
not in the approved list. Ensure the chosen approach is documented and applied
to the docker-compose/prod config used by the seven consumer groups.
backend/app/dlq/manager.py (1)

70-73: Policy resolution is clean; topic-key semantics are consistent.

The override-first, then event-type-fallback approach is sound. Note that message.original_topic includes the prefix (e.g., "dev_execution_requested"), so callers of set_retry_policy must pass the full prefixed topic name — which is the case in the test file. This is worth a brief docstring note on set_retry_policy for clarity.

📝 Optional docstring addition
     def set_retry_policy(self, topic: str, policy: RetryPolicy) -> None:
+        """Override the retry policy for a specific topic (must include prefix)."""
         self._retry_overrides[topic] = policy
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/dlq/manager.py` around lines 70 - 73, Add a brief docstring to
the set_retry_policy method clarifying that the topic key must be the full
prefixed topic name (e.g., "dev_execution_requested") because
_resolve_retry_policy uses message.original_topic to lookup overrides; reference
DLQMessage.original_topic and the fallback to retry_policy_for(event_type) (see
_resolve_retry_policy) so callers/tests know to pass prefixed topic names.
backend/app/dlq/models.py (1)

113-125: Shared mutable dataclass instances — consider frozen=True.

These module-level RetryPolicy instances are returned directly by retry_policy_for() and _resolve_retry_policy(). Since RetryPolicy is a regular (non-frozen) dataclass, any caller could accidentally mutate a shared instance (e.g., policy.max_retries = 10), silently affecting all subsequent callers.

Current usage appears read-only, so this is low risk today, but adding frozen=True to the dataclass would prevent future accidents at zero cost.

🔒 Proposed fix
-@dataclass
+@dataclass(frozen=True)
 class RetryPolicy:
     """Retry policy configuration for DLQ messages."""
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/dlq/models.py` around lines 113 - 125, The module-level
RetryPolicy instances (AGGRESSIVE_RETRY, CAUTIOUS_RETRY, IMMEDIATE_RETRY,
DEFAULT_RETRY) are mutable and returned by
retry_policy_for()/_resolve_retry_policy(), so make the RetryPolicy dataclass
immutable by adding frozen=True to its `@dataclass` decorator (i.e., change
`@dataclass`(...) -> `@dataclass`(..., frozen=True)); then run a quick grep for any
code that mutates RetryPolicy fields and update that code to construct new
instances instead of in-place mutation if needed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/architecture/event-system-design.md`:
- Line 176: Update the docs entry for topics.py to accurately reflect its
current contents: change the description from "Category-based topic configs for
partition/retention tuning" to something like "Event-type category sets for
policy resolution and grouping" and mention that topics.py defines sets such as
EXECUTION_TYPES and POD_TYPES (no partition or retention configuration exists in
this file); ensure the table row linking to infrastructure/kafka/topics.py
references these symbol names to make the intended purpose clear.

In `@docs/architecture/kafka-topic-architecture.md`:
- Line 42: The DLQ row in the topic architecture table currently lists an
invalid EventType value `dead_letter_queue`; update that row's "Event Types"
cell to list the three real DLQ event types: `dlq_message_received`,
`dlq_message_retried`, and `dlq_message_discarded` (each should be
comma-separated and formatted consistently with the other rows) so each
corresponds to its own Kafka topic entry for the DLQ.

In `@docs/operations/deployment.md`:
- Around line 45-46: The docs and current behavior diverge because auto-created
topics use broker defaults; implement explicit topic creation in
infrastructure/kafka/topics.py rather than relying on auto.create.topics.enable.
Add a startup routine (e.g., ensure_topics_exist or create_topics) that uses
KafkaAdminClient to create topics from the documented mapping (use the existing
category constants like TOPIC_CATEGORIES or DLQ_RETRY_SETS and add a
TOPIC_CONFIGS map) with the correct partition counts and retention.ms per
category, make the operation idempotent (check existing topics before creating),
and call this routine from application startup so topics match
kafka-topic-architecture.md.

In `@frontend/src/lib/api/core/types.gen.ts`:
- Line 56: The generated union type for header values ("string | number |
boolean | (string | number | boolean)[] | null | undefined | unknown") collapses
to unknown and erases header typing; do not edit types.gen.ts directly—inspect
the OpenAPI input (docs/reference/openapi.json) for the headers schema and
adjust the generator configuration or source schema so the generator emits a
concrete union (e.g., string | number | boolean | string[] | null | undefined)
instead of unknown, then regenerate with `@hey-api/openapi-ts` (check generator
version/options that introduced unknown in v0.92.4) so the generated symbol for
header values is correctly typed.
- Line 56: The headers value union currently contains `unknown`, which collapses
the whole union to `unknown`; remove the `unknown` member from the union used
for header values so the type remains `string | number | boolean | (string |
number | boolean)[] | null | undefined`. Locate the generated type for
`Config.headers` (the Record<string, …> value union) in
frontend/src/lib/api/core/types.gen.ts and edit the union to drop `unknown`,
then run the generator or regenerate the file to ensure the change persists
across regenerations.

---

Outside diff comments:
In `@backend/app/events/handlers.py`:
- Around line 366-370: Replace the fragile assert in the message handler with an
explicit runtime guard: check if msg.raw_message (variable raw) is a tuple using
isinstance(raw, tuple) and if so raise a clear exception (e.g., TypeError or
RuntimeError) or log and return early so you don't attempt to access
raw.offset/raw.partition; update the block around start =
asyncio.get_running_loop().time(), raw = msg.raw_message, body.dlq_offset and
body.dlq_partition to perform this explicit check and fail-fast with a
descriptive error mentioning the unexpected type and message metadata (use the
symbols raw, msg.raw_message, body.dlq_offset, body.dlq_partition).

In `@docs/architecture/kafka-topic-architecture.md`:
- Around line 130-140: The diagram label "dead_letter_queue topic" is
inconsistent with the code: update the Mermaid diagram to use the actual DLQ
topic name derived from EventType.DLQ_MESSAGE_RECEIVED (as used in DLQManager in
manager.py) so the diagram matches the implementation; replace the current text
with the exact topic identifier or constant name used by
EventType.DLQ_MESSAGE_RECEIVED and ensure any explanatory text references that
same constant.

In `@docs/reference/openapi.json`:
- Around line 10484-10492: The target_topics schema currently only documents
string values but doesn't indicate that object keys represent EventType,
allowing arbitrary keys; update the OpenAPI schema for target_topics (the
"target_topics" property in the same object) to include a clear description that
keys are EventType identifiers and constrain them—either add a descriptive
"description" explaining key semantics and examples, or use schema features
(patternProperties or additionalProperties with a key pattern/enumeration) to
validate keys as EventType values so clients and docs know the expected keys.
- Around line 11346-11503: The OpenAPI rename breaks generated clients; add a
deprecated alias named SSEExecutionEventData that references the new
SSEExecutionEventSchema (so tools resolving $ref
"#/components/schemas/SSEExecutionEventData" will point to
"#/components/schemas/SSEExecutionEventSchema"), or alternatively add a clear
migration note in docs describing the rename from SSEExecutionEventData ->
SSEExecutionEventSchema and mark the old name as deprecated; update the
components/schemas section to include the SSEExecutionEventData alias
referencing SSEExecutionEventSchema and add a deprecation description.

In `@frontend/src/lib/api/client/client.gen.ts`:
- Around line 94-100: The error interceptor loop currently always passes the
original error variable to each function, breaking chaining; update the loop
over interceptors.error.fns so each call receives the most-recent transformed
error (finalError) and then assign the awaited result back to finalError
(preserving the cast/typing), i.e., call fn(finalError, undefined as any,
request, opts) and set finalError = (await fn(...)) as unknown so subsequent
interceptors see prior transformations.

---

Duplicate comments:
In `@docs/reference/openapi.json`:
- Around line 10768-10776: The "Target Topics" schema is missing the descriptive
key documentation for target_topics (same issue as ReplayConfigSchema); locate
the schema titled "Target Topics" and add the same key-level description for the
target_topics property explaining its shape (a map whose values are either an
object or null, and what that object represents) so the OpenAPI entry mirrors
the corrected ReplayConfigSchema documentation and clarifies expected types and
semantics.
- Line 1972: The OpenAPI reference still points to the old
SSEExecutionEventSchema symbol; update this $ref to use the new canonical schema
name used in components/schemas (or add a compatibility alias there) and ensure
the schema definition for SSEExecutionEventSchema includes the documented
deprecation note so consumers see the compatibility guidance; locate references
to SSEExecutionEventSchema and replace them with the renamed schema reference
(or add the alias in components/schemas) and keep the deprecation comment on the
original schema.

---

Nitpick comments:
In `@backend/app/dlq/manager.py`:
- Around line 70-73: Add a brief docstring to the set_retry_policy method
clarifying that the topic key must be the full prefixed topic name (e.g.,
"dev_execution_requested") because _resolve_retry_policy uses
message.original_topic to lookup overrides; reference DLQMessage.original_topic
and the fallback to retry_policy_for(event_type) (see _resolve_retry_policy) so
callers/tests know to pass prefixed topic names.

In `@backend/app/dlq/models.py`:
- Around line 113-125: The module-level RetryPolicy instances (AGGRESSIVE_RETRY,
CAUTIOUS_RETRY, IMMEDIATE_RETRY, DEFAULT_RETRY) are mutable and returned by
retry_policy_for()/_resolve_retry_policy(), so make the RetryPolicy dataclass
immutable by adding frozen=True to its `@dataclass` decorator (i.e., change
`@dataclass`(...) -> `@dataclass`(..., frozen=True)); then run a quick grep for any
code that mutates RetryPolicy fields and update that code to construct new
instances instead of in-place mutation if needed.

In `@backend/app/events/core/producer.py`:
- Line 34: The new per-event topic naming (topic =
f"{self._topic_prefix}{event_to_produce.event_type}") can cause uncontrolled
auto-creation of many Kafka topics in production when
KAFKA_AUTO_CREATE_TOPICS_ENABLE is left true; update deployment/config
management to either (A) disable auto-create in production (provide a separate
prod config that sets KAFKA_AUTO_CREATE_TOPICS_ENABLE=false) or (B) implement an
explicit topic governance control: add a topic allowlist and an approval/audit
workflow that validates event_to_produce.event_type values before producing, and
ensure the producer (the code building topic from self._topic_prefix and
event_to_produce.event_type) logs denied attempts and fails fast when a topic is
not in the approved list. Ensure the chosen approach is documented and applied
to the docker-compose/prod config used by the seven consumer groups.

In `@backend/app/events/handlers.py`:
- Around line 284-301: The loop in register_sse_subscriber creates multiple
subscriber functions all named on_sse_event which causes shadowing and
indistinguishable handlers; fix by generating a unique handler per event type
(capturing et/topic) — either create a handler factory that returns an async
function (e.g., make_on_sse_event(et) -> async def handler(body, sse_bus): ...)
and set its __name__ to include the event type, or pass an explicit title/name
into broker.subscriber for each topic; ensure the handler references the
captured topic/et and calls sse_bus.route_domain_event(body) so each
subscription has a distinct identifier for docs/tests.
- Around line 64-122: The coordinator subscriber handlers
on_execution_requested, on_execution_completed, on_execution_failed and
on_execution_cancelled currently rely on the broker.subscriber default offset
policy ("latest") and must explicitly set auto_offset_reset="earliest"; update
each broker.subscriber decorator for these functions (and do the same for the
corresponding saga subscriber decorators) to include
auto_offset_reset="earliest" so new consumer groups/cold starts will consume
past EXECUTION_* events instead of skipping them.

In `@backend/app/services/pod_monitor/event_mapper.py`:
- Around line 184-188: Extract the hardcoded "pod-monitor" into a module-level
constant (e.g., _SERVICE_NAME = "pod-monitor") and replace the inline literal in
the EventMetadata construction (where md is assigned) with that constant; keep
service_version usage consistent with the existing pattern and ensure the
constant name is private (leading underscore) to match module conventions.

In `@backend/app/services/result_processor/processor.py`:
- Line 147: Extract the duplicated literal "result-processor" into a
module-level constant (e.g., RESULT_PROCESSOR_SERVICE_NAME) and replace the
hardcoded strings in both _publish_result_stored and _publish_result_failed with
that constant; declare the constant at the top of the processor.py module and
use the constant in the service_name parameter in both functions to avoid
duplication and typos.

In `@frontend/src/lib/api/core/queryKeySerializer.gen.ts`:
- Line 1: This file has a generator header "// This file is auto-generated by
`@hey-api/openapi-ts`" so stop making direct formatting edits in
queryKeySerializer.gen.ts; instead, move any desired style/config changes into
the OpenAPI generator pipeline — update the `@hey-api/openapi-ts` generator config
or add a post-generation step (prettier/ESLint) in the generation script so the
formatting is applied automatically and persisted on regen.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
docs/reference/openapi.json (1)

1744-1893: ⚠️ Potential issue | 🟠 Major

Document both CSV and JSON export response types in OpenAPI spec.

The export endpoint returns either text/csv or application/json depending on the export_format parameter, but the spec documents only application/json with an empty schema. This causes generated clients to expect JSON regardless of format. Define both content types with proper string schemas.

💡 Suggested OpenAPI fix
-          "application/json": {
-            "schema": {}
-          }
+          "text/csv": {
+            "schema": {
+              "type": "string"
+            }
+          },
+          "application/json": {
+            "schema": {
+              "type": "string"
+            }
+          }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/reference/openapi.json` around lines 1744 - 1893, The 200 response for
the export endpoint (/api/v1/admin/events/export/{export_format}, operationId
export_events_api_v1_admin_events_export__export_format__get) currently only
lists application/json with an empty schema; update the responses. Replace the
empty schema with two content entries: "application/json" with a string schema
(e.g., type: "string" or an array/object schema if JSON objects are returned)
and "text/csv" with a string schema (type: "string", format: "binary" or plain
string) so clients can handle both CSV and JSON outputs based on export_format;
ensure you remove the empty schema and add clear descriptions for each content
type.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/reference/openapi.json`:
- Around line 1589-1604: Update the OpenAPI responses for the SSE endpoints to
use the text/event-stream media type instead of application/json: for the path
"/api/v1/events/notifications/stream" (operationId
notification_stream_api_v1_events_notifications_stream_get) replace
"application/json" with "text/event-stream" under responses -> 200 (and adjust
the schema to a simple string/event-stream representation instead of
components/schemas/NotificationResponse if needed), and make the same change for
the other SSE endpoint referenced in the spec (the second SSE operation shown
around lines 1611-1637) so the spec matches the backend EventSourceResponse
behavior.

---

Outside diff comments:
In `@docs/reference/openapi.json`:
- Around line 1744-1893: The 200 response for the export endpoint
(/api/v1/admin/events/export/{export_format}, operationId
export_events_api_v1_admin_events_export__export_format__get) currently only
lists application/json with an empty schema; update the responses. Replace the
empty schema with two content entries: "application/json" with a string schema
(e.g., type: "string" or an array/object schema if JSON objects are returned)
and "text/csv" with a string schema (type: "string", format: "binary" or plain
string) so clients can handle both CSV and JSON outputs based on export_format;
ensure you remove the empty schema and add clear descriptions for each content
type.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
backend/tests/unit/app/test_main_app.py (4)

138-163: CORS origin assertions are hardcoded — consider deriving them from test_settings.

The specific values "https://localhost:5001", "https://127.0.0.1:5001", "https://localhost" are hardcoded. If test_settings.CORS_ORIGINS ever changes, these tests fail with a misleading assertion error rather than a clear "expected vs actual origins" message. Asserting against test_settings.CORS_ORIGINS would be more resilient:

♻️ Proposed fix
     def test_cors_allows_localhost_origins(self, app: FastAPI, test_settings: Settings) -> None:
         cors_kwargs = self._get_cors_kwargs(app)
         assert cors_kwargs is not None
         allowed = cors_kwargs.get("allow_origins", [])
-        assert "https://localhost:5001" in allowed
-        assert "https://127.0.0.1:5001" in allowed
-        assert "https://localhost" in allowed
+        for origin in test_settings.CORS_ORIGINS:
+            assert origin in allowed
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/tests/unit/app/test_main_app.py` around lines 138 - 163, Replace the
hardcoded origin checks in test_cors_allows_localhost_origins (and anywhere else
checking fixed origin strings) with assertions that compare the app's CORS
allow_origins to the configured test_settings.CORS_ORIGINS: retrieve cors_kwargs
via _get_cors_kwargs(app), get allowed = cors_kwargs.get("allow_origins", []),
then assert that each entry from test_settings.CORS_ORIGINS is present (or
assert equality/issuperset as appropriate) so the test reflects the canonical
source of truth rather than fixed literal strings.

14-24: _get_all_paths only recurses one level into Mount — nested mounts are silently missed.

For FastAPI's standard include_router this is fine (routes are flattened into APIRoute/Route objects directly). However, if any mounted sub-application adds another Mount level (e.g., a sub-app with its own prefix), those paths are silently dropped and route-presence assertions would give false negatives. A recursive walk would be more robust:

♻️ Proposed recursive implementation
 def _get_all_paths(app: FastAPI) -> set[str]:
     """Extract all route paths from app, including mounted routers."""
-    paths: set[str] = set()
-    for route in app.router.routes:
-        if isinstance(route, Route):
-            paths.add(route.path)
-        elif isinstance(route, Mount) and route.routes is not None:
-            for sub_route in route.routes:
-                if isinstance(sub_route, Route):
-                    paths.add(f"{route.path}{sub_route.path}")
-    return paths
+    def _walk(routes: list, prefix: str = "") -> None:
+        for route in routes:
+            if isinstance(route, Route):
+                paths.add(f"{prefix}{route.path}")
+            elif isinstance(route, Mount) and route.routes is not None:
+                _walk(route.routes, prefix=f"{prefix}{route.path}")
+
+    paths: set[str] = set()
+    _walk(app.router.routes)
+    return paths
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/tests/unit/app/test_main_app.py` around lines 14 - 24, _get_all_paths
currently only descends one level into Mount objects and thus drops paths from
nested Mounts; update _get_all_paths to perform a recursive walk of
app.router.routes: for each item check isinstance(..., Route) and add its path,
if isinstance(..., Mount) then iterate its .routes and recursively collect
sub-paths, concatenating the parent mount.path with each child's path (and
handle nested Mounts similarly) until all Route paths are gathered; ensure the
function still returns a flat set[str] of fully prefixed paths.

176-182: import_module("app.main").create_app is redundant with the top-level import.

create_app is already imported directly on line 6. Using import_module here tests that the symbol is importable via the module path, but it fetches the same already-cached module object from sys.modules — it doesn't exercise a materially different code path. The indirection adds cognitive noise without value.

♻️ Proposed simplification
-    def test_create_app_returns_fastapi(self, test_settings: Settings) -> None:
-        create_app_fn = import_module("app.main").create_app
-        assert isinstance(create_app_fn(settings=test_settings), FastAPI)
+    def test_create_app_returns_fastapi(self, test_settings: Settings) -> None:
+        assert isinstance(create_app(settings=test_settings), FastAPI)

-    def test_create_app_uses_provided_settings(self, test_settings: Settings) -> None:
-        create_app_fn = import_module("app.main").create_app
-        assert create_app_fn(settings=test_settings).title == test_settings.PROJECT_NAME
+    def test_create_app_uses_provided_settings(self, test_settings: Settings) -> None:
+        assert create_app(settings=test_settings).title == test_settings.PROJECT_NAME

If these tests are intended to guard against accidental removal of the export, a standalone import check is clearer:

from app.main import create_app as _create_app  # noqa: F401
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/tests/unit/app/test_main_app.py` around lines 176 - 182, The tests
redundantly call import_module("app.main").create_app even though create_app is
already imported at the top; remove the import_module indirection and call the
top-level create_app directly (or replace the redundant calls with a standalone
import assertion like "from app.main import create_app as _create_app  # noqa:
F401" if you want an explicit export-check). Update
test_create_app_returns_fastapi and test_create_app_uses_provided_settings to
use the already-imported create_app symbol instead of
import_module("app.main").create_app.

100-101: app.user_middleware is an internal implementation detail — prefer public middleware APIs instead.

app.user_middleware is not part of Starlette's public documented API. It's an internal list of starlette.middleware.Middleware objects (each with .cls, .args, and .kwargs attributes), mutated by app.add_middleware(). While the current structure works, it has changed across minor Starlette versions (e.g., Starlette 0.24.0, 0.25.0) without deprecation notice.

Use the documented public API instead: pass middleware=[...] at app construction or call app.add_middleware(...) before the app starts.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/tests/unit/app/test_main_app.py` around lines 100 - 101, The helper
_get_middleware_class_names currently depends on the internal
app.user_middleware list; instead stop reading that internal attribute and use
the public middleware API by extracting names from the middleware list passed at
app construction or from middleware objects added via app.add_middleware()
before startup. Update tests to construct the FastAPI app with middleware=[...]
(or call app.add_middleware(...) in test setup) and change
_get_middleware_class_names to accept that middleware iterable (or accept the
middleware list as an argument) and return {getattr(m.cls, "__name__",
str(m.cls)) for m in middleware_list}, removing any direct access to
app.user_middleware and referencing the symbol _get_middleware_class_names and
the created middleware list in the test changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/tests/unit/app/test_main_app.py`:
- Around line 184-186: The test calls create_app() with no args which
instantiates real Settings(), causing non-hermetic failures; update the test
(test_create_app_without_settings_uses_defaults) to pass a controlled test
Settings instance instead of relying on env/config by importing or using the
existing test_settings fixture (or constructing a minimal Settings test helper)
and call create_app(test_settings) so create_app uses the test Settings object
rather than reading real environment/config files.

---

Nitpick comments:
In `@backend/tests/unit/app/test_main_app.py`:
- Around line 138-163: Replace the hardcoded origin checks in
test_cors_allows_localhost_origins (and anywhere else checking fixed origin
strings) with assertions that compare the app's CORS allow_origins to the
configured test_settings.CORS_ORIGINS: retrieve cors_kwargs via
_get_cors_kwargs(app), get allowed = cors_kwargs.get("allow_origins", []), then
assert that each entry from test_settings.CORS_ORIGINS is present (or assert
equality/issuperset as appropriate) so the test reflects the canonical source of
truth rather than fixed literal strings.
- Around line 14-24: _get_all_paths currently only descends one level into Mount
objects and thus drops paths from nested Mounts; update _get_all_paths to
perform a recursive walk of app.router.routes: for each item check
isinstance(..., Route) and add its path, if isinstance(..., Mount) then iterate
its .routes and recursively collect sub-paths, concatenating the parent
mount.path with each child's path (and handle nested Mounts similarly) until all
Route paths are gathered; ensure the function still returns a flat set[str] of
fully prefixed paths.
- Around line 176-182: The tests redundantly call
import_module("app.main").create_app even though create_app is already imported
at the top; remove the import_module indirection and call the top-level
create_app directly (or replace the redundant calls with a standalone import
assertion like "from app.main import create_app as _create_app  # noqa: F401" if
you want an explicit export-check). Update test_create_app_returns_fastapi and
test_create_app_uses_provided_settings to use the already-imported create_app
symbol instead of import_module("app.main").create_app.
- Around line 100-101: The helper _get_middleware_class_names currently depends
on the internal app.user_middleware list; instead stop reading that internal
attribute and use the public middleware API by extracting names from the
middleware list passed at app construction or from middleware objects added via
app.add_middleware() before startup. Update tests to construct the FastAPI app
with middleware=[...] (or call app.add_middleware(...) in test setup) and change
_get_middleware_class_names to accept that middleware iterable (or accept the
middleware list as an argument) and return {getattr(m.cls, "__name__",
str(m.cls)) for m in middleware_list}, removing any direct access to
app.user_middleware and referencing the symbol _get_middleware_class_names and
the created middleware list in the test changes.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
backend/app/events/handlers.py (2)

366-366: ⚠️ Potential issue | 🔴 Critical

assert for runtime guard is stripped under Python -O — use an explicit check.

With python -O (or PYTHONOPTIMIZE=1), all assert statements are compiled away. If raw is a tuple, the next line raw.offset will raise an AttributeError at runtime in optimised deployments rather than failing cleanly with a meaningful error.

🐛 Proposed fix
-        assert not isinstance(raw, tuple)
+        if isinstance(raw, tuple):
+            raise TypeError(f"Expected a single Kafka message, got tuple: {type(raw)}")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/events/handlers.py` at line 366, Replace the runtime-only assert
with an explicit runtime check: in the handler where the code currently has
"assert not isinstance(raw, tuple)" (referencing the variable raw and subsequent
use of raw.offset), perform an explicit isinstance check and raise a clear
exception (e.g., TypeError or ValueError) with a helpful message when raw is a
tuple so optimized Python runs won't skip the guard; update any surrounding
comment to reflect the validated precondition.

370-377: ⚠️ Potential issue | 🟠 Major

record_dlq_message_received / record_dlq_message_age must be recorded even when handle_message raises.

Both of these metrics track receipt, not successful processing. If handle_message raises an exception (line 370), lines 372-375 are never executed, under-counting the DLQ intake and skewing age histograms. Wrap in try/finally:

🛡️ Proposed fix
-        await manager.handle_message(body)
-
-        manager.metrics.record_dlq_message_received(body.original_topic, body.event.event_type)
-        manager.metrics.record_dlq_message_age(
-            (datetime.now(timezone.utc) - body.failed_at).total_seconds()
-        )
-        manager.metrics.record_dlq_processing_duration(
-            asyncio.get_running_loop().time() - start, "process"
-        )
+        manager.metrics.record_dlq_message_received(body.original_topic, body.event.event_type)
+        manager.metrics.record_dlq_message_age(
+            (datetime.now(timezone.utc) - body.failed_at).total_seconds()
+        )
+        try:
+            await manager.handle_message(body)
+        finally:
+            manager.metrics.record_dlq_processing_duration(
+                asyncio.get_running_loop().time() - start, "process"
+            )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/events/handlers.py` around lines 370 - 377, Wrap the await
manager.handle_message(body) call in a try/finally so that
manager.metrics.record_dlq_message_received(...) and
manager.metrics.record_dlq_message_age(...) are executed in the finally block
regardless of exceptions; keep
manager.metrics.record_dlq_processing_duration(...) in the same finally (or
after a separate try/except if you want to only record processing duration on
success/failure differently), and reference manager.handle_message,
manager.metrics.record_dlq_message_received,
manager.metrics.record_dlq_message_age,
manager.metrics.record_dlq_processing_duration and the start timing variable
when making the change.
🧹 Nitpick comments (2)
backend/app/events/handlers.py (2)

48-48: Prefer structlog's key-value API over f-string interpolation.

Using an f-string collapses structured fields into an unindexable string, losing the ability to filter/query by event_type or event_id in log aggregators.

♻️ Suggested refactor
-        logger.info(f"Duplicate event: {event.event_type} ({event.event_id})")
+        logger.info("duplicate_event", event_type=event.event_type, event_id=str(event.event_id))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/events/handlers.py` at line 48, Replace the f-string log call
that collapses structured fields into a single message with structlog's
key-value style so fields remain queryable; specifically change the logger.info
call that currently uses f"Duplicate event: {event.event_type}
({event.event_id})" to call logger.info with a short message like "Duplicate
event" and pass event_type=event.event_type and event_id=event.event_id as
separate keyword/context fields so the event_type and event_id remain structured
and indexable.

353-353: Extracting the DLQ topic suffix to a named constant is optional.

Only a single occurrence of "dead_letter_queue" was found in the codebase, so there is no duplication risk from typos. However, extracting it to a constant (e.g., in settings or constants.py) could improve maintainability if this topic name needs to be referenced elsewhere in the future.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/events/handlers.py` at line 353, The code currently concatenates
settings.KAFKA_TOPIC_PREFIX with the literal "dead_letter_queue"; extract that
suffix into a named constant (e.g., DEAD_LETTER_QUEUE = "dead_letter_queue") in
a shared config module such as settings or constants.py and replace the literal
in the handlers.py call with that constant (use settings.DEAD_LETTER_QUEUE or
constants.DEAD_LETTER_QUEUE) so the topic name is centralized and easier to
maintain.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/app/events/handlers.py`:
- Around line 219-261: register_saga_subscriber currently omits a subscriber for
EventType.EXECUTION_CANCELLED, so the SagaOrchestrator never sees cancellation
events; add a broker.subscriber block mirroring the other handlers that
subscribes to f"{prefix}{EventType.EXECUTION_CANCELLED}"
(group_id="saga-orchestrator", ack_policy=AckPolicy.ACK) and implement async def
on_execution_cancelled(body: ExecutionCancelledEvent, orchestrator:
FromDishka[SagaOrchestrator]) -> None: await
orchestrator.handle_execution_cancelled(body) (or if that method is not present,
call orchestrator.cancel_saga(...) with the execution id from body) so the
orchestrator transitions the saga to CANCELLED and triggers compensation.

---

Outside diff comments:
In `@backend/app/events/handlers.py`:
- Line 366: Replace the runtime-only assert with an explicit runtime check: in
the handler where the code currently has "assert not isinstance(raw, tuple)"
(referencing the variable raw and subsequent use of raw.offset), perform an
explicit isinstance check and raise a clear exception (e.g., TypeError or
ValueError) with a helpful message when raw is a tuple so optimized Python runs
won't skip the guard; update any surrounding comment to reflect the validated
precondition.
- Around line 370-377: Wrap the await manager.handle_message(body) call in a
try/finally so that manager.metrics.record_dlq_message_received(...) and
manager.metrics.record_dlq_message_age(...) are executed in the finally block
regardless of exceptions; keep
manager.metrics.record_dlq_processing_duration(...) in the same finally (or
after a separate try/except if you want to only record processing duration on
success/failure differently), and reference manager.handle_message,
manager.metrics.record_dlq_message_received,
manager.metrics.record_dlq_message_age,
manager.metrics.record_dlq_processing_duration and the start timing variable
when making the change.

---

Nitpick comments:
In `@backend/app/events/handlers.py`:
- Line 48: Replace the f-string log call that collapses structured fields into a
single message with structlog's key-value style so fields remain queryable;
specifically change the logger.info call that currently uses f"Duplicate event:
{event.event_type} ({event.event_id})" to call logger.info with a short message
like "Duplicate event" and pass event_type=event.event_type and
event_id=event.event_id as separate keyword/context fields so the event_type and
event_id remain structured and indexable.
- Line 353: The code currently concatenates settings.KAFKA_TOPIC_PREFIX with the
literal "dead_letter_queue"; extract that suffix into a named constant (e.g.,
DEAD_LETTER_QUEUE = "dead_letter_queue") in a shared config module such as
settings or constants.py and replace the literal in the handlers.py call with
that constant (use settings.DEAD_LETTER_QUEUE or constants.DEAD_LETTER_QUEUE) so
the topic name is centralized and easier to maintain.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (2)
backend/app/events/handlers.py (2)

256-273: RESULT_FAILED is absent from _SSE_EVENT_TYPES.

RESULT_STORED is present (line 265) but RESULT_FAILED is not. A result-storage failure event won't reach SSE clients, leaving the frontend with no real-time notification when result persistence fails after execution completes.

🛠️ Proposed fix
     EventType.RESULT_STORED,
+    EventType.RESULT_FAILED,
     EventType.POD_CREATED,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/events/handlers.py` around lines 256 - 273, The SSE event
whitelist _SSE_EVENT_TYPES is missing the EventType.RESULT_FAILED entry so
listeners won't receive result-persistence failure events; update the list in
backend/app/events/handlers.py to include EventType.RESULT_FAILED alongside
EventType.RESULT_STORED (ensuring the constant name matches the EventType enum)
so result-storage failures are emitted to SSE clients.

339-341: Extract "dead_letter_queue" to a named constant or add to EventType for consistency.

The codebase follows a convention where all @broker.subscriber() calls use EventType values. This DLQ subscriber is the only exception, using a hardcoded string literal instead. While "dead_letter_queue" is Kafka's infrastructure topic (not a producer publishing destination), extracting it to a shared constant or adding DEAD_LETTER_QUEUE = "dead_letter_queue" to EventType would maintain consistency across the codebase and reduce the risk of future changes introducing silent mismatches.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/events/handlers.py` around lines 339 - 341, Replace the hardcoded
"dead_letter_queue" literal used in the `@broker.subscriber` decorator with a
named constant to match the project's convention: add DEAD_LETTER_QUEUE =
"dead_letter_queue" to the EventType enum (or a shared constants module) and
update the decorator call in the handler (the `@broker.subscriber`(...) usage) to
reference EventType.DEAD_LETTER_QUEUE (or the shared constant name) so the
subscriber uses the same centralized identifier as other topics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/tests/e2e/conftest.py`:
- Around line 140-143: The variable topics is annotated as list[str] but is set
to list(EventType) (a list of enum members); change the assignment so topics is
a list of raw strings by using a comprehension that extracts values from
EventType (e.g., [e.value for e in EventType]), leaving the type annotation
topics: list[str] intact; this keeps
EventWaiter(test_settings.KAFKA_BOOTSTRAP_SERVERS, topics) and the subsequent
_logger.info call unchanged.

In `@docs/architecture/kafka-topic-architecture.md`:
- Around line 7-10: Add a language specifier to the fenced code block that
contains "Topic name = EventType.EXECUTION_REQUESTED = \"execution_requested\""
so the block is rendered with proper syntax highlighting; update the opening
fence to include the `text` language specifier (e.g., ```text) for that code
block around the Topic name lines.
- Around line 40-41: Update the table so the "Default" row no longer mentions
"DLQ": edit the Default category's description "Everything else (saga,
notification, DLQ, etc.)" to remove "DLQ" (e.g., "Everything else (saga,
notification, etc.)"), leaving the separate "DLQ" row and its
`dead_letter_queue` entry untouched to avoid the contradictory duplicate.
- Line 41: Update the DLQ row so it uses the actual EventType enum members
instead of `dead_letter_queue`: replace the value `dead_letter_queue` with
`dlq_message_received`, `dlq_message_retried`, and `dlq_message_discarded` in
the table entry for the dead_letter_queue topic; also add a brief parenthetical
or note that this topic is an exception to the 1:1 topic-to-EventType mapping
described earlier (topic `dead_letter_queue` maps to three EventType members) so
readers understand why it differs from the EventType enum naming convention.

---

Duplicate comments:
In `@backend/app/events/handlers.py`:
- Around line 208-253: Add a subscriber for EventType.EXECUTION_CANCELLED inside
register_saga_subscriber so the SagaOrchestrator can transition sagas to a
cancelled terminal state; mirror the pattern used by
on_execution_failed/on_execution_timeout: register a `@broker.subscriber` for
EventType.EXECUTION_CANCELLED with group_id="saga-orchestrator" and
ack_policy=AckPolicy.ACK, implement async def on_execution_cancelled(body:
ExecutionCancelledEvent, orchestrator: FromDishka[SagaOrchestrator]) -> None and
call await orchestrator.handle_execution_cancelled(body) so
SagaOrchestrator.handle_execution_cancelled is invoked when cancellation events
arrive.

---

Nitpick comments:
In `@backend/app/events/handlers.py`:
- Around line 256-273: The SSE event whitelist _SSE_EVENT_TYPES is missing the
EventType.RESULT_FAILED entry so listeners won't receive result-persistence
failure events; update the list in backend/app/events/handlers.py to include
EventType.RESULT_FAILED alongside EventType.RESULT_STORED (ensuring the constant
name matches the EventType enum) so result-storage failures are emitted to SSE
clients.
- Around line 339-341: Replace the hardcoded "dead_letter_queue" literal used in
the `@broker.subscriber` decorator with a named constant to match the project's
convention: add DEAD_LETTER_QUEUE = "dead_letter_queue" to the EventType enum
(or a shared constants module) and update the decorator call in the handler (the
`@broker.subscriber`(...) usage) to reference EventType.DEAD_LETTER_QUEUE (or the
shared constant name) so the subscriber uses the same centralized identifier as
other topics.

@sonarqubecloud
Copy link

@HardMax71 HardMax71 changed the title Feat/v3 less useless services feat: switch to 1:1 eventType as a topic Feb 19, 2026
@HardMax71 HardMax71 merged commit 95389b4 into main Feb 19, 2026
19 checks passed
@HardMax71 HardMax71 deleted the feat/v3-less-useless-services branch February 19, 2026 23:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Comments