Skip to content

Comments

LLM Chain: Add foundation for chain execution with database schema#616

Open
vprashrex wants to merge 3 commits intomainfrom
feature/llm-chain-setup
Open

LLM Chain: Add foundation for chain execution with database schema#616
vprashrex wants to merge 3 commits intomainfrom
feature/llm-chain-setup

Conversation

@vprashrex
Copy link
Collaborator

@vprashrex vprashrex commented Feb 20, 2026

Summary

Target issue is #542
Explain the motivation for making this change. What existing problem does the pull request solve?
This PR adds the foundation for llm_chain, a workflow manager that orchestrates sequential llm_call executions as a single request. It reuses existing llm-call as its core primitive, so every existing llm-call capability (providers, configs, guardrails, prompt templates) works automatically in chains with zero code duplication.

Checklist

Before submitting a pull request, please ensure that you mark these task.

  • Ran fastapi run --reload app/main.py or docker compose up in the repository root and test.
  • If you've fixed a bug or added code that is tested and has test cases.

Notes

Please add here if any other information is required for the reviewer.

Summary by CodeRabbit

  • New Features

    • LLM chain orchestration: run ordered blocks that pass results between steps
    • Async chain jobs with final callback delivery and optional intermediate callbacks
    • Chain status & progress tracking (pending/running/completed/failed) with per-block progress updates
    • Prompt template support for dynamic inputs and option to include raw provider responses
    • Public API endpoint to submit chain requests
  • Documentation

    • Added detailed docs describing endpoint parameters, block modes, callbacks, and best practices

@coderabbitai
Copy link

coderabbitai bot commented Feb 20, 2026

📝 Walkthrough

Walkthrough

Adds LLM chain orchestration: DB migration and model for chain tracking, API endpoints for chain submission and callbacks, CRUD helpers, chain execution framework (blocks, executor, types), job orchestration with guardrails, and docs for the new endpoint.

Changes

Cohort / File(s) Summary
Database Migration
backend/app/alembic/versions/048_create_llm_chain_table.py
Creates llm_chain table with PK/FKs and indices, adds chain_id to llm_call (FK with ON DELETE SET NULL), and appends LLM_CHAIN to jobtype enum.
API Routes
backend/app/api/routes/llm_chain.py, backend/app/api/main.py
Adds POST /llm/chain endpoint, callback router and validation, and registers router in API main.
Models (LLM)
backend/app/models/llm/request.py, backend/app/models/llm/response.py, backend/app/models/llm/__init__.py, backend/app/models/__init__.py, backend/app/models/job.py
Adds chain request/response models (LLMChainRequest, LLMChainResponse, IntermediateChainResponse), LlmChain ORM model, ChainBlock, ChainStatus, PromptTemplate; extends ConfigBlob and adds JobType.LLM_CHAIN.
CRUD Layer
backend/app/crud/llm_chain.py, backend/app/crud/llm.py
New CRUD functions to create/update chains and mark blocks completed; create_llm_call accepts optional chain_id and persists it.
Chain Execution Framework
backend/app/services/llm/chain/chain.py, backend/app/services/llm/chain/executor.py, backend/app/services/llm/chain/types.py
Implements ChainContext, ChainBlock, LLMChain, BlockResult type, and ChainExecutor managing lifecycle, intermediate callbacks, DB updates, and error handling.
Job Orchestration & Guardrails
backend/app/services/llm/jobs.py
Adds start_chain_job, execute_chain_job, execute_llm_call, apply_input_guardrails, and apply_output_guardrails; integrates chain scheduling and execution flow.
Docs
backend/app/api/docs/llm/llm_chain.md
Documentation for chain endpoint, block config modes, callback behavior, and usage notes.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client/API
    participant Router as LLM Chain Route
    participant JobSvc as Job Service
    participant DB as Database
    participant Worker as Worker/Executor
    participant Provider as LLM Provider

    Client->>Router: POST /llm/chain (request + callback_url)
    Router->>JobSvc: start_chain_job(request, project_id, org_id)
    JobSvc->>DB: Create Job (type: LLM_CHAIN) & LlmChain record (PENDING)
    JobSvc->>Worker: Enqueue chain task
    JobSvc-->>Client: Return job UUID

    Worker->>DB: _setup() - mark job PROCESSING, chain RUNNING
    Worker->>Worker: apply_input_guardrails, resolve first block

    loop per block
        Worker->>Provider: execute LLM call for block
        Provider-->>Worker: LLM response
        Worker->>DB: create llm_call (chain_id), mark block completed
        Worker->>Worker: apply_output_guardrails, send intermediate callback (optional)
        Worker->>Worker: derive next block input
    end

    Worker->>DB: update chain status COMPLETED with output/usage
    Worker->>DB: update job status SUCCESS
    Worker->>Client: send final callback (if configured)
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

Suggested labels

enhancement

Suggested reviewers

  • Prajna1999
  • kartpop

Poem

🐰 I hop through prompts and blocks all day,

linking answers in a playful way.
Callbacks flutter, guardrails in tow,
from pending seed to final glow.
A chain of LLMs — hop! — watch results flow.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 72.73% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly summarizes the main change: introducing LLM Chain foundation with database schema for chain execution workflow.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/llm-chain-setup

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
backend/app/models/job.py (1)

55-60: ⚠️ Potential issue | 🟡 Minor

Stale column comment — LLM_CHAIN missing from the example list.

The sa_column_kwargs comment still reads "(e.g., RESPONSE, LLM_API)". Update it to include the new value for accuracy.

📝 Proposed fix
-            "comment": "Type of job being executed (e.g., RESPONSE, LLM_API)"
+            "comment": "Type of job being executed (e.g., RESPONSE, LLM_API, LLM_CHAIN)"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/models/job.py` around lines 55 - 60, Update the stale column
comment in the job_type Field: edit the sa_column_kwargs comment on the job_type
field (JobType) to include the new enum value LLM_CHAIN (e.g., change "Type of
job being executed (e.g., RESPONSE, LLM_API)" to include LLM_CHAIN in the
example list) so the DB column comment accurately reflects all relevant job
types.
backend/app/crud/llm.py (1)

62-75: ⚠️ Potential issue | 🟡 Minor

Docstring Args section missing chain_id.

The new parameter is undocumented in the function's docstring.

📝 Proposed fix
     Args:
         session: Database session
         request: The LLM call request containing query and config
         job_id: Reference to the parent job
+        chain_id: Optional reference to a parent LLM chain record
         project_id: Project this LLM call belongs to
         organization_id: Organization this LLM call belongs to
         resolved_config: The resolved configuration blob (either from stored config or ad-hoc)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/crud/llm.py` around lines 62 - 75, The docstring for the function
that creates a new LLM call record (create_llm_call) is missing documentation
for the new chain_id parameter; update the Args section to include an entry for
chain_id with its type and a short description (e.g., "chain_id: Optional[str] —
Reference to the parent chain for this LLM call, if any"), ensuring it matches
the parameter name used in the function signature and follows the same
formatting style as the other Args entries.
🧹 Nitpick comments (8)
backend/app/models/llm/response.py (1)

67-80: LLMChainResponse is structurally identical to LLMCallResponse — consider sharing a base class.

Both models have the exact same three fields (response, usage, provider_raw_response). Extracting a common base avoids diverging in the future.

♻️ Proposed refactor
+class LLMCallResponseBase(SQLModel):
+    """Shared fields for single-call and chain LLM responses."""
+    response: LLMResponse = Field(..., description="Normalized, structured LLM response.")
+    usage: Usage = Field(..., description="Token usage and cost information.")
+    provider_raw_response: dict[str, object] | None = Field(
+        default=None,
+        description="Unmodified raw response from the LLM provider.",
+    )
+

-class LLMCallResponse(SQLModel):
-    """Top-level response schema for an LLM API call."""
-
-    response: LLMResponse = Field(...)
-    usage: Usage = Field(...)
-    provider_raw_response: dict[str, object] | None = Field(default=None, ...)
+class LLMCallResponse(LLMCallResponseBase):
+    """Top-level response schema for an LLM API call."""


-class LLMChainResponse(SQLModel):
-    """Response schema for an LLM chain execution."""
-
-    response: LLMResponse = Field(...)
-    usage: Usage = Field(...)
-    provider_raw_response: dict[str, object] | None = Field(default=None, ...)
+class LLMChainResponse(LLMCallResponseBase):
+    """Response schema for an LLM chain execution."""
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/models/llm/response.py` around lines 67 - 80, LLMChainResponse
and LLMCallResponse share identical fields—extract those three fields into a
shared base SQLModel (e.g., LLMResponseBase or BaseLLMCall) that defines
response: LLMResponse, usage: Usage, and provider_raw_response: dict[str,
object] | None with the same Field descriptions, then have both LLMChainResponse
and LLMCallResponse inherit from that base (removing their duplicate field
declarations); ensure imports and type annotations remain unchanged and run
tests/validators to confirm serialization and schema behavior is preserved.
backend/app/services/llm/chain/types.py (1)

7-18: BlockResult.usage is redundant with response.usage — potential consistency footgun.

When response is not None, block_result.usage duplicates block_result.response.usage. Callers that populate both independently can silently produce inconsistent data. Consider removing usage from BlockResult and accessing it via result.response.usage instead.

♻️ Proposed refactor
 `@dataclass`
 class BlockResult:
     """Result of a single block/LLM call execution."""

     response: LLMCallResponse | None = None
     llm_call_id: UUID | None = None
-    usage: Usage | None = None
     error: str | None = None

     `@property`
     def success(self) -> bool:
         return self.error is None and self.response is not None
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/chain/types.py` around lines 7 - 18, BlockResult
currently stores a redundant usage field that can diverge from response.usage;
remove the usage attribute from the BlockResult dataclass and any assignments
that set block_result.usage, and update all call sites to read usage from
block_result.response.usage (guarding for response being None). Update the
BlockResult type signature and the success property remains unchanged; search
for references to BlockResult.usage and replace them with the guarded access
(e.g., result.response and then .usage) to maintain null-safety.
backend/app/crud/llm_chain.py (1)

85-99: Use elif for mutually exclusive status branches and capture timestamp once.

The status branches are mutually exclusive (enum values), but using independent if statements is misleading. Also, now() is called multiple times — updated_at and started_at/completed_at will have slightly different values within the same update.

♻️ Proposed fix
-    db_chain.status = status
-    db_chain.updated_at = now()
-
-    if status == ChainStatus.RUNNING:
-        db_chain.started_at = now()
-
-    if status == ChainStatus.FAILED:
-        db_chain.error = error
-        db_chain.total_usage = total_usage
-        db_chain.completed_at = now()
-
-    if status == ChainStatus.COMPLETED:
-        db_chain.output = output
-        db_chain.total_usage = total_usage
-        db_chain.completed_at = now()
+    current_time = now()
+    db_chain.status = status
+    db_chain.updated_at = current_time
+
+    if status == ChainStatus.RUNNING:
+        db_chain.started_at = current_time
+    elif status == ChainStatus.FAILED:
+        db_chain.error = error
+        db_chain.total_usage = total_usage
+        db_chain.completed_at = current_time
+    elif status == ChainStatus.COMPLETED:
+        db_chain.output = output
+        db_chain.total_usage = total_usage
+        db_chain.completed_at = current_time
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/crud/llm_chain.py` around lines 85 - 99, The status-update block
uses separate ifs and calls now() multiple times; refactor the update in the
function that sets db_chain to compute a single timestamp once (e.g., ts =
now()) and assign db_chain.updated_at = ts, then use an if/elif/elif chain on
status (checking ChainStatus.RUNNING, ChainStatus.FAILED, ChainStatus.COMPLETED)
to set the mutually exclusive fields (db_chain.started_at, db_chain.error,
db_chain.total_usage, db_chain.completed_at, db_chain.output) so only the
matching branch runs and all timestamps use the same ts value.
backend/app/services/llm/jobs.py (1)

461-474: job_id parameter shadowed by reassignment on Line 474.

job_id is declared as str in the function signature (Line 465) and then rebound to UUID on Line 474: job_id: UUID = UUID(job_id). This works at runtime but is a type annotation violation — the variable changes type. Use a different name for the converted value.

Proposed fix
-    request = LLMCallRequest(**request_data)
-    job_id: UUID = UUID(job_id)
+    request = LLMCallRequest(**request_data)
+    job_uuid: UUID = UUID(job_id)

Then replace all subsequent references to job_id within execute_job with job_uuid (matching the pattern already used in execute_chain_job at Line 603).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/jobs.py` around lines 461 - 474, The function
execute_job currently reassigns the str parameter job_id to a UUID on Line 474
which shadows the original type; create a new variable (e.g., job_uuid) by
converting the incoming job_id string to UUID and replace all subsequent uses of
the old job_id variable inside execute_job with job_uuid (mirroring the pattern
used in execute_chain_job) so the original parameter type is preserved and
annotations remain correct.
backend/app/models/llm/request.py (3)

570-576: Use StrEnum instead of (str, Enum) — Python 3.11+ is the target.

Per the coding guidelines, this codebase targets Python 3.11+, which provides enum.StrEnum. Ruff also flags this (UP042).

♻️ Proposed fix

Update the import at Line 1:

-from enum import Enum
+from enum import Enum, StrEnum

Then update the class:

-class ChainStatus(str, Enum):
+class ChainStatus(StrEnum):
     """Status of an LLM chain execution."""
 
     PENDING = "pending"
     RUNNING = "running"
     FAILED = "failed"
     COMPLETED = "completed"

As per coding guidelines: "Use Python 3.11+ with type hints throughout the codebase".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/models/llm/request.py` around lines 570 - 576, Replace the Enum
subclass for ChainStatus with StrEnum: import StrEnum from enum and change class
ChainStatus(str, Enum) to class ChainStatus(StrEnum); keep the same member
names/values (PENDING, RUNNING, FAILED, COMPLETED) so behavior is unchanged and
ruff UP042 is satisfied. Ensure any type hints or comparisons that relied on str
behavior remain valid (StrEnum inherits str) and run tests/lint to confirm no
further adjustments needed.

640-647: block_sequences: default_factory=list conflicts with nullable=True and list[str] | None type.

The default factory always produces [], so a new LlmChain instance will never have block_sequences=None. But the type hint allows None and the DB column is nullable. This inconsistency can cause confusion — callers might check if chain.block_sequences is None thinking it indicates "not started" but that will never be true for newly created records.

Consider aligning: either use default=None (and handle the empty case as None) or make the column NOT NULL with a default of [].

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/models/llm/request.py` around lines 640 - 647, The Field
definition for block_sequences in the LlmChain model is inconsistent: it
declares type list[str] | None and sa_column nullable=True but uses
default_factory=list so new instances are never None; change this to be
consistent by either removing default_factory and using default=None (so
block_sequences: list[str] | None = Field(default=None,
sa_column=sa.Column(JSONB, nullable=True, ...)) to allow None) or make the
column non-nullable and keep default_factory=list (set sa.Column(...,
nullable=False) and keep default_factory=list and type list[str]), and update
any callers that check for None to instead check for empty list if you choose
the non-nullable option; locate and update the block_sequences Field in the
LlmChain/request model accordingly.

522-567: Naming collision: ChainBlock (request model) vs ChainBlock (execution class).

Two distinct ChainBlock classes share the same name in backend/app/models/llm/request.py (line 522, SQLModel) and backend/app/services/llm/chain/chain.py (line 125, execution class). In execute_chain_job, both exist in overlapping scope — the request model is accessed via the LLMChainRequest type (containing list[ChainBlock]), while the local import at line 599 brings in the execution class. The code works because request.blocks is already deserialized before the import shadows, but this is fragile and confusing.

Consider renaming the execution class (e.g., ChainBlockNode or ExecutableChainBlock) to eliminate ambiguity.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/models/llm/request.py` around lines 522 - 567, The request-side
SQLModel ChainBlock class conflicts with the execution-side ChainBlock class in
services; rename the execution class in backend/app/services/llm/chain/chain.py
(e.g., to ExecutableChainBlock or ChainBlockNode) and update all
references/imports (including the local import used in execute_chain_job and any
usages, method names, or type hints that reference the old ChainBlock) so the
request model list[ChainBlock] and the runtime execution class no longer share
the same identifier.
backend/app/services/llm/chain/chain.py (1)

151-191: Recursive chain execution — unbounded stack depth for long chains.

ChainBlock.execute calls self._next.execute(next_query) recursively. Python's default recursion limit is ~1000. With typical chain sizes (2–10 blocks) this is fine, but there's no upper-bound validation on LLMChainRequest.blocks length to prevent a user from submitting a pathologically long chain.

Consider adding a max_length constraint on LLMChainRequest.blocks in the request model, or converting this to an iterative loop.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/chain/chain.py` around lines 151 - 191,
ChainBlock.execute currently recurses via self._next.execute causing unbounded
stack depth; replace the recursion with an iterative traversal: implement a loop
in ChainBlock.execute that starts with current=self and current_query=query,
calls execute_llm_call (same args), calls
self._context.on_block_completed(current._index, result), returns immediately on
failure, otherwise sets current_query = result_to_query(result) and current =
current._next until current is None, then return the last result; remove the
recursive call to self._next.execute; additionally add a defensive max_length
constraint on LLMChainRequest.blocks in the request model (e.g., pydantic
Field(max_items=...)) to prevent extremely long chains if desired.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/app/api/routes/llm_chain.py`:
- Line 20: Both route handlers missing return type annotations: add explicit
return type hints for llm_callback_notification and llm_chain to match what they
actually return (e.g., APIResponse[LLMChainResponse], fastapi.Response,
starlette.responses.JSONResponse, or None). Update the function signatures
(llm_callback_notification(body: APIResponse[LLMChainResponse]) and
llm_chain(...)) to include the correct -> <ReturnType> annotation, ensuring the
declared type aligns with the handler’s actual return values.
- Line 34: The Swagger description for the /llm/chain endpoint is loading the
wrong markdown (description=load_description("llm/llm_call.md")) — replace the
call to load_description with the chain-specific file (e.g.,
load_description("llm/llm_chain.md")), add or update llm_chain.md with the
appropriate chain documentation, and ensure the route handler that registers
/llm/chain uses this updated description so the API docs accurately reflect the
chain behavior.
- Around line 48-62: The endpoint currently treats request.callback_url as
optional but always returns "will be delivered via callback", which is incorrect
when LLMChainRequest.callback_url is None; either enforce a non-null callback by
validating request.callback_url and raising/returning a 400 before calling
start_chain_job (use validate_callback_url and LLMChainRequest.callback_url to
gate behavior), or change the flow when callback_url is None to return a
synchronous result or a job identifier for polling (have start_chain_job return
a job_id/status and update the APIResponse.success_response payload and Message
accordingly); ensure the response text matches the chosen path and do not claim
callback delivery when no callback_url was provided.
- Around line 16-17: The route decorator uses an invalid OpenAPI runtime
expression "{$callback_url}"; update the expression to a valid OpenAPI runtime
expression that points to where the callback URL actually lives (e.g. request
body or query). Replace "{$callback_url}" in the llm_callback_router.post
decorator with a compliant expression such as "{$request.body#/callback_url}" or
"{$request.query.callback_url}" depending on where the callback_url is provided,
ensuring the symbol llm_callback_router.post is updated accordingly.

In `@backend/app/models/llm/response.py`:
- Around line 83-89: The class docstring for IntermediateChainResponse contains
a typo: change "Flattend structure matching LLMCallResponse keys for
consistency" to "Flattened structure matching LLMCallResponse keys for
consistency" in the IntermediateChainResponse definition in response.py so the
documentation reads correctly.

In `@backend/app/services/llm/chain/chain.py`:
- Around line 31-51: ChainContext currently types callback_url as str but
callers can pass None; update the dataclass to allow Optional types by changing
callback_url: str to callback_url: str | None and tighten request_metadata to
dict[str, Any] | None; ensure any code instantiating ChainContext (e.g., in
execute_chain_job) continues to pass None or str and adjust any downstream uses
to handle None safely (refer to the ChainContext dataclass and the callback_url
and request_metadata fields).

In `@backend/app/services/llm/chain/executor.py`:
- Around line 160-197: The _handle_error method can abort before updating DB if
send_callback raises; modify _handle_error so DB state is always persisted
first: call update_llm_chain_status(...) and JobCrud(session).update(...) before
attempting send_callback, and/or wrap send_callback in its own try/except that
logs but does not re-raise. Ensure you still build callback_response from
APIResponse.failure_response and include references to _handle_error,
send_callback, update_llm_chain_status, and JobCrud.update when making the
change so the chain status is set to ChainStatus.FAILED and the job status to
JobStatus.FAILED even if the callback fails.
- Around line 127-158: The success path in _teardown currently calls
send_callback before persisting DB state, so if send_callback raises the job and
chain updates may never be applied; move the send_callback call (and its
callback_url check) to after JobCrud(session).update(...) and
update_llm_chain_status(...) so the DB updates are performed first, then send
the callback with callback_response.model_dump(), and finally return the
callback response; keep all existing objects (LLMChainResponse, APIResponse) and
use the same self._request.callback_url check.

In `@backend/app/services/llm/jobs.py`:
- Around line 687-691: The callback_url being passed to handle_job_error is a
Pydantic v2 HttpUrl (request.callback_url) and must be converted to a native str
before use; update all calls that pass request.callback_url (the ones that
ultimately call handle_job_error -> send_callback) to pass
str(request.callback_url) if request.callback_url else None so the signature
handle_job_error(callback_url: str | None, ...) and send_callback(callback_url:
str, ...) receive proper types; apply this change to every occurrence in this
module where request.callback_url is forwarded (e.g., the calls around job error
handling that reference handle_job_error and send_callback).
- Around line 294-300: apply_output_guardrails is returning the rephrased text
as the "error" tuple element, causing execute_job to treat safe-but-rephrased
outputs as failures; change apply_output_guardrails so that when safe["success"]
is True and safe["data"].get("rephrase_needed") is truthy, it still returns
(result, None) (i.e., no error) after updating
result.response.response.output.content.value, and if you need to signal that
rephrasing occurred add a non-error flag/metadata on the result (e.g.,
result.response.metadata["rephrased"]=True) instead of returning the text as the
error; this aligns apply_output_guardrails with apply_input_guardrails and
prevents execute_job from misclassifying rephrased outputs as errors.

---

Outside diff comments:
In `@backend/app/crud/llm.py`:
- Around line 62-75: The docstring for the function that creates a new LLM call
record (create_llm_call) is missing documentation for the new chain_id
parameter; update the Args section to include an entry for chain_id with its
type and a short description (e.g., "chain_id: Optional[str] — Reference to the
parent chain for this LLM call, if any"), ensuring it matches the parameter name
used in the function signature and follows the same formatting style as the
other Args entries.

In `@backend/app/models/job.py`:
- Around line 55-60: Update the stale column comment in the job_type Field: edit
the sa_column_kwargs comment on the job_type field (JobType) to include the new
enum value LLM_CHAIN (e.g., change "Type of job being executed (e.g., RESPONSE,
LLM_API)" to include LLM_CHAIN in the example list) so the DB column comment
accurately reflects all relevant job types.

---

Nitpick comments:
In `@backend/app/crud/llm_chain.py`:
- Around line 85-99: The status-update block uses separate ifs and calls now()
multiple times; refactor the update in the function that sets db_chain to
compute a single timestamp once (e.g., ts = now()) and assign
db_chain.updated_at = ts, then use an if/elif/elif chain on status (checking
ChainStatus.RUNNING, ChainStatus.FAILED, ChainStatus.COMPLETED) to set the
mutually exclusive fields (db_chain.started_at, db_chain.error,
db_chain.total_usage, db_chain.completed_at, db_chain.output) so only the
matching branch runs and all timestamps use the same ts value.

In `@backend/app/models/llm/request.py`:
- Around line 570-576: Replace the Enum subclass for ChainStatus with StrEnum:
import StrEnum from enum and change class ChainStatus(str, Enum) to class
ChainStatus(StrEnum); keep the same member names/values (PENDING, RUNNING,
FAILED, COMPLETED) so behavior is unchanged and ruff UP042 is satisfied. Ensure
any type hints or comparisons that relied on str behavior remain valid (StrEnum
inherits str) and run tests/lint to confirm no further adjustments needed.
- Around line 640-647: The Field definition for block_sequences in the LlmChain
model is inconsistent: it declares type list[str] | None and sa_column
nullable=True but uses default_factory=list so new instances are never None;
change this to be consistent by either removing default_factory and using
default=None (so block_sequences: list[str] | None = Field(default=None,
sa_column=sa.Column(JSONB, nullable=True, ...)) to allow None) or make the
column non-nullable and keep default_factory=list (set sa.Column(...,
nullable=False) and keep default_factory=list and type list[str]), and update
any callers that check for None to instead check for empty list if you choose
the non-nullable option; locate and update the block_sequences Field in the
LlmChain/request model accordingly.
- Around line 522-567: The request-side SQLModel ChainBlock class conflicts with
the execution-side ChainBlock class in services; rename the execution class in
backend/app/services/llm/chain/chain.py (e.g., to ExecutableChainBlock or
ChainBlockNode) and update all references/imports (including the local import
used in execute_chain_job and any usages, method names, or type hints that
reference the old ChainBlock) so the request model list[ChainBlock] and the
runtime execution class no longer share the same identifier.

In `@backend/app/models/llm/response.py`:
- Around line 67-80: LLMChainResponse and LLMCallResponse share identical
fields—extract those three fields into a shared base SQLModel (e.g.,
LLMResponseBase or BaseLLMCall) that defines response: LLMResponse, usage:
Usage, and provider_raw_response: dict[str, object] | None with the same Field
descriptions, then have both LLMChainResponse and LLMCallResponse inherit from
that base (removing their duplicate field declarations); ensure imports and type
annotations remain unchanged and run tests/validators to confirm serialization
and schema behavior is preserved.

In `@backend/app/services/llm/chain/chain.py`:
- Around line 151-191: ChainBlock.execute currently recurses via
self._next.execute causing unbounded stack depth; replace the recursion with an
iterative traversal: implement a loop in ChainBlock.execute that starts with
current=self and current_query=query, calls execute_llm_call (same args), calls
self._context.on_block_completed(current._index, result), returns immediately on
failure, otherwise sets current_query = result_to_query(result) and current =
current._next until current is None, then return the last result; remove the
recursive call to self._next.execute; additionally add a defensive max_length
constraint on LLMChainRequest.blocks in the request model (e.g., pydantic
Field(max_items=...)) to prevent extremely long chains if desired.

In `@backend/app/services/llm/chain/types.py`:
- Around line 7-18: BlockResult currently stores a redundant usage field that
can diverge from response.usage; remove the usage attribute from the BlockResult
dataclass and any assignments that set block_result.usage, and update all call
sites to read usage from block_result.response.usage (guarding for response
being None). Update the BlockResult type signature and the success property
remains unchanged; search for references to BlockResult.usage and replace them
with the guarded access (e.g., result.response and then .usage) to maintain
null-safety.

In `@backend/app/services/llm/jobs.py`:
- Around line 461-474: The function execute_job currently reassigns the str
parameter job_id to a UUID on Line 474 which shadows the original type; create a
new variable (e.g., job_uuid) by converting the incoming job_id string to UUID
and replace all subsequent uses of the old job_id variable inside execute_job
with job_uuid (mirroring the pattern used in execute_chain_job) so the original
parameter type is preserved and annotations remain correct.

Comment on lines +16 to +17
@llm_callback_router.post(
"{$callback_url}",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, find and read the file
find . -name "llm_chain.py" -path "*/backend/app/api/*" | head -5

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 112


🏁 Script executed:

# Also check file structure to understand the path
find . -type f -name "llm_chain.py"

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 144


🏁 Script executed:

# Let me also search for the specific file with less strict matching
fd "llm_chain\.py$"

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 140


🏁 Script executed:

cat -n backend/app/api/routes/llm_chain.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 2400


🏁 Script executed:

# Check if the description files exist
find . -name "llm_call.md" -o -name "llm_chain.md" | grep -E "(llm_call|llm_chain)"

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 113


🏁 Script executed:

# Search for where load_description is defined to understand its usage
rg "def load_description" -A 5

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 403


🏁 Script executed:

# Check the LLMChainRequest model to verify callback_url optionality
rg "class LLMChainRequest" -A 20

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 1424


🏁 Script executed:

# Check if llm_chain.md exists
find . -name "llm_chain.md"

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 57


🏁 Script executed:

# Let's also check what description files exist in the llm directory
ls -la backend/app/api/docs/llm/

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 249


🌐 Web query:

OpenAPI 3.0 callback runtime expression syntax specification

💡 Result:

In OpenAPI 3.0.x, the URL template key inside a Callback Object is a runtime expression (“key expression”) evaluated at runtime to produce the callback URL (e.g., $request.body#/url). (spec.openapis.org)

Runtime expression syntax (ABNF)

expression = ( "$url" / "$method" / "$statusCode"
             / "$request." source / "$response." source )
source = ( header-reference / query-reference / path-reference / body-reference )
header-reference = "header." token
query-reference  = "query." name
path-reference   = "path." name
body-reference   = "body" ["#" json-pointer ]
json-pointer     = *( "/" reference-token )
reference-token  = *( unescaped / escaped )
escaped          = "~" ( "0" / "1" )   ; "~0" => "~", "~1" => "/"

(spec.openapis.org)

Common callback/key examples

Embedding into strings (typical for callback URLs)

Runtime expressions can be embedded in a string by wrapping them in {} (for example, a callback key like {$request.body#/callbackUrl}). (spec.openapis.org)

Citations:


🏁 Script executed:

# Check the entire llm_chain function to see all return paths
sed -n '39,62p' backend/app/api/routes/llm_chain.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 750


🏁 Script executed:

# Verify what the llm_call.md content says and if it's appropriate for llm_chain endpoint
cat backend/app/api/docs/llm/llm_call.md | head -20

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 981


🏁 Script executed:

# Check if there are any return type hints elsewhere in the file or similar endpoints
rg "@router.post" -A 10 backend/app/api/routes/ | head -50

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 2876


🏁 Script executed:

# Check what happens in start_chain_job when callback_url is None
rg "def start_chain_job" -A 20 backend/app/services/

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 1519


🏁 Script executed:

# Also check the llm_callback_notification function return type more carefully
sed -n '20,29p' backend/app/api/routes/llm_chain.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 484


🏁 Script executed:

# Check response_model to infer what return type should be
sed -n '32,38p' backend/app/api/routes/llm_chain.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 320


🏁 Script executed:

# Check if there are other endpoints with similar callback patterns to understand the intended usage
rg "callback" -i backend/app/api/routes/ -l

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 284


🏁 Script executed:

# Let me also check if execute_chain_job uses the callback_url
rg "def execute_chain_job" -A 30 backend/app/services/

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 2098


🏁 Script executed:

# Check what happens when callback_url is None in the job execution
rg "callback_url" backend/app/services/llm/jobs.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 845


🏁 Script executed:

# Final verification: confirm the exact issue exists in the code
sed -n '15,19p' backend/app/api/routes/llm_chain.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 158


Fix invalid OpenAPI runtime expression — {$callback_url} does not conform to spec.

OpenAPI 3.0 defines runtime expressions using the ABNF grammar: expression = ( "$url" / "$method" / "$statusCode" / "$request." source / "$response." source ). The token $callback_url is not a recognized form; valid patterns require $request.body, $response.body, or similar qualified sources.

Proposed fix
-    "{$callback_url}",
+    "{$request.body#/callback_url}",
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/api/routes/llm_chain.py` around lines 16 - 17, The route
decorator uses an invalid OpenAPI runtime expression "{$callback_url}"; update
the expression to a valid OpenAPI runtime expression that points to where the
callback URL actually lives (e.g. request body or query). Replace
"{$callback_url}" in the llm_callback_router.post decorator with a compliant
expression such as "{$request.body#/callback_url}" or
"{$request.query.callback_url}" depending on where the callback_url is provided,
ensuring the symbol llm_callback_router.post is updated accordingly.

"{$callback_url}",
name="llm_chain_callback",
)
def llm_callback_notification(body: APIResponse[LLMChainResponse]):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Missing return type hints on both route handlers — coding guideline violation.

llm_callback_notification and llm_chain both lack return type annotations.

🐛 Proposed fix
-def llm_callback_notification(body: APIResponse[LLMChainResponse]):
+def llm_callback_notification(body: APIResponse[LLMChainResponse]) -> None:
-def llm_chain(
-    _current_user: AuthContextDep, _session: SessionDep, request: LLMChainRequest
-):
+def llm_chain(
+    _current_user: AuthContextDep, _session: SessionDep, request: LLMChainRequest
+) -> APIResponse[Message]:

As per coding guidelines: "Always add type hints to all function parameters and return values in Python code."

Also applies to: 39-41

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/api/routes/llm_chain.py` at line 20, Both route handlers missing
return type annotations: add explicit return type hints for
llm_callback_notification and llm_chain to match what they actually return
(e.g., APIResponse[LLMChainResponse], fastapi.Response,
starlette.responses.JSONResponse, or None). Update the function signatures
(llm_callback_notification(body: APIResponse[LLMChainResponse]) and
llm_chain(...)) to include the correct -> <ReturnType> annotation, ensuring the
declared type aligns with the handler’s actual return values.

Comment on lines +48 to +62
if request.callback_url:
validate_callback_url(str(request.callback_url))

start_chain_job(
db=_session,
request=request,
project_id=project_id,
organization_id=organization_id,
)

return APIResponse.success_response(
data=Message(
message="Your response is being generated and will be delivered via callback."
),
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Unconditional "callback" success message when callback_url is optional — results are silently lost.

LLMChainRequest.callback_url is HttpUrl | None with default=None, so it's entirely valid to submit a chain request without a callback URL. When that happens the endpoint still responds "Your response is being generated and will be delivered via callback.", which is incorrect: there is no delivery mechanism and the caller has no way to retrieve the result.

Consider either (a) requiring callback_url to be non-null, or (b) returning a synchronous result / job-ID for polling when no callback URL is supplied, and adjusting the response message accordingly.

🐛 Minimal guard (option a — require callback_url)
+from fastapi import HTTPException, status

 if request.callback_url:
     validate_callback_url(str(request.callback_url))
+else:
+    raise HTTPException(
+        status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+        detail="callback_url is required for LLM chain execution.",
+    )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if request.callback_url:
validate_callback_url(str(request.callback_url))
start_chain_job(
db=_session,
request=request,
project_id=project_id,
organization_id=organization_id,
)
return APIResponse.success_response(
data=Message(
message="Your response is being generated and will be delivered via callback."
),
)
from fastapi import HTTPException, status
if request.callback_url:
validate_callback_url(str(request.callback_url))
else:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="callback_url is required for LLM chain execution.",
)
start_chain_job(
db=_session,
request=request,
project_id=project_id,
organization_id=organization_id,
)
return APIResponse.success_response(
data=Message(
message="Your response is being generated and will be delivered via callback."
),
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/api/routes/llm_chain.py` around lines 48 - 62, The endpoint
currently treats request.callback_url as optional but always returns "will be
delivered via callback", which is incorrect when LLMChainRequest.callback_url is
None; either enforce a non-null callback by validating request.callback_url and
raising/returning a 400 before calling start_chain_job (use
validate_callback_url and LLMChainRequest.callback_url to gate behavior), or
change the flow when callback_url is None to return a synchronous result or a
job identifier for polling (have start_chain_job return a job_id/status and
update the APIResponse.success_response payload and Message accordingly); ensure
the response text matches the chosen path and do not claim callback delivery
when no callback_url was provided.

Comment on lines +83 to +89
class IntermediateChainResponse(SQLModel):
"""
Intermediate callback response from the intermediate blocks
from the llm chain execution. (if configured)

Flattend structure matching LLMCallResponse keys for consistency
"""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Typo in docstring: FlattendFlattened.

📝 Proposed fix
-    Flattend structure matching LLMCallResponse keys for consistency
+    Flattened structure matching LLMCallResponse keys for consistency
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class IntermediateChainResponse(SQLModel):
"""
Intermediate callback response from the intermediate blocks
from the llm chain execution. (if configured)
Flattend structure matching LLMCallResponse keys for consistency
"""
class IntermediateChainResponse(SQLModel):
"""
Intermediate callback response from the intermediate blocks
from the llm chain execution. (if configured)
Flattened structure matching LLMCallResponse keys for consistency
"""
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/models/llm/response.py` around lines 83 - 89, The class docstring
for IntermediateChainResponse contains a typo: change "Flattend structure
matching LLMCallResponse keys for consistency" to "Flattened structure matching
LLMCallResponse keys for consistency" in the IntermediateChainResponse
definition in response.py so the documentation reads correctly.

Comment on lines +31 to +51
@dataclass
class ChainContext:
"""Shared state passed to all blocks. Accumulates responses."""

job_id: UUID
chain_id: UUID
project_id: int
organization_id: int
callback_url: str
total_blocks: int

langfuse_credentials: dict[str, Any] | None = None
request_metadata: dict | None = None
intermediate_callback_flags: list[bool] = field(default_factory=list)
aggregated_usage: Usage = field(
default_factory=lambda: Usage(
input_tokens=0,
output_tokens=0,
total_tokens=0,
)
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

callback_url type is str but receives None from callers.

In execute_chain_job (backend/app/services/llm/jobs.py, Line 644), callback_url is set to None when request.callback_url is absent:

callback_url=str(request.callback_url) if request.callback_url else None,

But here it's typed as str, not str | None. This is a type-safety gap. Also, request_metadata should be dict[str, Any] | None for full type annotation.

Proposed fix
 `@dataclass`
 class ChainContext:
     """Shared state passed to all blocks. Accumulates responses."""
 
     job_id: UUID
     chain_id: UUID
     project_id: int
     organization_id: int
-    callback_url: str
+    callback_url: str | None
     total_blocks: int
 
     langfuse_credentials: dict[str, Any] | None = None
-    request_metadata: dict | None = None
+    request_metadata: dict[str, Any] | None = None

As per coding guidelines: "Always add type hints to all function parameters and return values in Python code".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/chain/chain.py` around lines 31 - 51, ChainContext
currently types callback_url as str but callers can pass None; update the
dataclass to allow Optional types by changing callback_url: str to callback_url:
str | None and tighten request_metadata to dict[str, Any] | None; ensure any
code instantiating ChainContext (e.g., in execute_chain_job) continues to pass
None or str and adjust any downstream uses to handle None safely (refer to the
ChainContext dataclass and the callback_url and request_metadata fields).

Comment on lines +127 to +158
def _teardown(self, result: BlockResult) -> dict:
"""Finalize chain record, send callback, and update job status."""

with Session(engine) as session:
if result.success:
final = LLMChainResponse(
response=result.response.response,
usage=result.usage,
provider_raw_response=result.response.provider_raw_response,
)
callback_response = APIResponse.success_response(
data=final, metadata=self._request.request_metadata
)
if self._request.callback_url:
send_callback(
callback_url=str(self._request.callback_url),
data=callback_response.model_dump(),
)
JobCrud(session).update(
job_id=self._context.job_id,
job_update=JobUpdate(status=JobStatus.SUCCESS),
)
update_llm_chain_status(
session=session,
chain_id=self._context.chain_id,
status=ChainStatus.COMPLETED,
output=result.response.response.output.model_dump(),
total_usage=self._context.aggregated_usage.model_dump(),
)
return callback_response.model_dump()
else:
return self._handle_error(result.error)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Same send_callback risk in _teardown — callback before DB updates.

On the success path (Lines 140–144), send_callback is called before JobCrud.update and update_llm_chain_status. If send_callback throws, DB state won't be finalized. Apply the same pattern: persist DB state first, then send callback.

Proposed fix — DB updates before callback
     def _teardown(self, result: BlockResult) -> dict:
         """Finalize chain record, send callback, and update job status."""
 
         with Session(engine) as session:
             if result.success:
                 final = LLMChainResponse(
                     response=result.response.response,
                     usage=result.usage,
                     provider_raw_response=result.response.provider_raw_response,
                 )
                 callback_response = APIResponse.success_response(
                     data=final, metadata=self._request.request_metadata
                 )
-                if self._request.callback_url:
-                    send_callback(
-                        callback_url=str(self._request.callback_url),
-                        data=callback_response.model_dump(),
-                    )
                 JobCrud(session).update(
                     job_id=self._context.job_id,
                     job_update=JobUpdate(status=JobStatus.SUCCESS),
                 )
                 update_llm_chain_status(
                     session=session,
                     chain_id=self._context.chain_id,
                     status=ChainStatus.COMPLETED,
                     output=result.response.response.output.model_dump(),
                     total_usage=self._context.aggregated_usage.model_dump(),
                 )
+
+                if self._request.callback_url:
+                    try:
+                        send_callback(
+                            callback_url=str(self._request.callback_url),
+                            data=callback_response.model_dump(),
+                        )
+                    except Exception as cb_err:
+                        logger.warning(
+                            f"[ChainExecutor] Failed to send success callback: {cb_err} | "
+                            f"chain_id={self._context.chain_id}"
+                        )
+
                 return callback_response.model_dump()
             else:
                 return self._handle_error(result.error)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _teardown(self, result: BlockResult) -> dict:
"""Finalize chain record, send callback, and update job status."""
with Session(engine) as session:
if result.success:
final = LLMChainResponse(
response=result.response.response,
usage=result.usage,
provider_raw_response=result.response.provider_raw_response,
)
callback_response = APIResponse.success_response(
data=final, metadata=self._request.request_metadata
)
if self._request.callback_url:
send_callback(
callback_url=str(self._request.callback_url),
data=callback_response.model_dump(),
)
JobCrud(session).update(
job_id=self._context.job_id,
job_update=JobUpdate(status=JobStatus.SUCCESS),
)
update_llm_chain_status(
session=session,
chain_id=self._context.chain_id,
status=ChainStatus.COMPLETED,
output=result.response.response.output.model_dump(),
total_usage=self._context.aggregated_usage.model_dump(),
)
return callback_response.model_dump()
else:
return self._handle_error(result.error)
def _teardown(self, result: BlockResult) -> dict:
"""Finalize chain record, send callback, and update job status."""
with Session(engine) as session:
if result.success:
final = LLMChainResponse(
response=result.response.response,
usage=result.usage,
provider_raw_response=result.response.provider_raw_response,
)
callback_response = APIResponse.success_response(
data=final, metadata=self._request.request_metadata
)
JobCrud(session).update(
job_id=self._context.job_id,
job_update=JobUpdate(status=JobStatus.SUCCESS),
)
update_llm_chain_status(
session=session,
chain_id=self._context.chain_id,
status=ChainStatus.COMPLETED,
output=result.response.response.output.model_dump(),
total_usage=self._context.aggregated_usage.model_dump(),
)
if self._request.callback_url:
try:
send_callback(
callback_url=str(self._request.callback_url),
data=callback_response.model_dump(),
)
except Exception as cb_err:
logger.warning(
f"[_teardown] Failed to send success callback: {cb_err} | "
f"chain_id={self._context.chain_id}"
)
return callback_response.model_dump()
else:
return self._handle_error(result.error)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/chain/executor.py` around lines 127 - 158, The
success path in _teardown currently calls send_callback before persisting DB
state, so if send_callback raises the job and chain updates may never be
applied; move the send_callback call (and its callback_url check) to after
JobCrud(session).update(...) and update_llm_chain_status(...) so the DB updates
are performed first, then send the callback with callback_response.model_dump(),
and finally return the callback response; keep all existing objects
(LLMChainResponse, APIResponse) and use the same self._request.callback_url
check.

Comment on lines +160 to +197
def _handle_error(self, error: str) -> dict:
callback_response = APIResponse.failure_response(
error=error or "Unknown error occurred",
metadata=self._request.request_metadata,
)
logger.error(
f"[ChainExecutor] Chain execution failed | "
f"chain_id={self._context.chain_id}, job_id={self._context.job_id}, error={error}"
)

with Session(engine) as session:
if self._request.callback_url:
send_callback(
callback_url=str(self._request.callback_url),
data=callback_response.model_dump(),
)

update_llm_chain_status(
session,
chain_id=self._context.chain_id,
status=ChainStatus.FAILED,
output=None,
total_usage=self._context.aggregated_usage.model_dump(),
error=error,
)
JobCrud(session).update(
job_id=self._context.job_id,
job_update=JobUpdate(status=JobStatus.FAILED, error_message=error),
)
return callback_response.model_dump()

def _handle_unexpected_error(self, e: Exception) -> dict:
logger.error(
f"[ChainExecutor.run] Unexpected error: {e} | "
f"job_id={self._context.job_id}",
exc_info=True,
)
return self._handle_error("Unexpected error occurred")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

_handle_error can throw, leaving chain/job in inconsistent DB state.

If send_callback (Line 172) raises an unhandled exception, the subsequent update_llm_chain_status and JobCrud.update calls (Lines 177–188) are skipped. Since _handle_unexpected_error delegates to _handle_error, there's no fallback — the exception propagates to the Celery runner, and the job remains in PROCESSING / chain in RUNNING forever.

Consider wrapping send_callback in a try/except within _handle_error, or ensuring DB updates happen before the callback.

Proposed fix — update DB before sending callback
     def _handle_error(self, error: str) -> dict:
         callback_response = APIResponse.failure_response(
             error=error or "Unknown error occurred",
             metadata=self._request.request_metadata,
         )
         logger.error(
             f"[ChainExecutor] Chain execution failed | "
             f"chain_id={self._context.chain_id}, job_id={self._context.job_id}, error={error}"
         )
 
         with Session(engine) as session:
-            if self._request.callback_url:
-                send_callback(
-                    callback_url=str(self._request.callback_url),
-                    data=callback_response.model_dump(),
-                )
-
             update_llm_chain_status(
                 session,
                 chain_id=self._context.chain_id,
                 status=ChainStatus.FAILED,
                 output=None,
                 total_usage=self._context.aggregated_usage.model_dump(),
                 error=error,
             )
             JobCrud(session).update(
                 job_id=self._context.job_id,
                 job_update=JobUpdate(status=JobStatus.FAILED, error_message=error),
             )
+
+        if self._request.callback_url:
+            try:
+                send_callback(
+                    callback_url=str(self._request.callback_url),
+                    data=callback_response.model_dump(),
+                )
+            except Exception as cb_err:
+                logger.warning(
+                    f"[ChainExecutor] Failed to send error callback: {cb_err} | "
+                    f"chain_id={self._context.chain_id}"
+                )
+
         return callback_response.model_dump()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/chain/executor.py` around lines 160 - 197, The
_handle_error method can abort before updating DB if send_callback raises;
modify _handle_error so DB state is always persisted first: call
update_llm_chain_status(...) and JobCrud(session).update(...) before attempting
send_callback, and/or wrap send_callback in its own try/except that logs but
does not re-raise. Ensure you still build callback_response from
APIResponse.failure_response and include references to _handle_error,
send_callback, update_llm_chain_status, and JobCrud.update when making the
change so the chain status is set to ChainStatus.FAILED and the job status to
JobStatus.FAILED even if the callback fails.

Comment on lines +687 to +691
callback_response = APIResponse.failure_response(
error="Unexpected error occurred",
metadata=request.request_metadata,
)
return handle_job_error(job_uuid, request.callback_url, callback_response)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

Pydantic v2 HttpUrl str subclass type compatibility

💡 Result:

In Pydantic v2, HttpUrl is not a str subclass anymore (unlike v1). It’s a constrained URL type built on Pydantic’s core Url object, so things like isinstance(model.url, str) will be False, and you may need explicit conversion when calling APIs that expect str. This is an intentional breaking change noted in the migration guide. [1][2]

What to do for “expects str” compatibility

  • Convert explicitly: str(model.url) (or f"{model.url}"). Pydantic’s migration guide specifically recommends this approach. [1]
  • If you need a URL type that is a str (for isinstance(..., str) and “stringy” library interop), a common workaround is using pydantic-string-url, which provides drop-in URL types based on str. [3][4]

Typing notes (why it can look odd)

Historically in v2, HttpUrl was defined via typing.Annotated over Url + UrlConstraints (so it wasn’t a real subclass at runtime), e.g. HttpUrl = Annotated[Url, UrlConstraints(...)]. [5] Pydantic later improved URL types to be concrete subclasses of the base URL type (to enable better isinstance checks between URL types), but they still do not inherit from str. [6]

Sources:
[1] Pydantic v2 Migration Guide (URL types no longer inherit from str) [1]
[2] Pydantic “URLs” docs (shows URL values are Url(...) objects and stringify via str()) [2]
[3] pydantic-string-url PyPI description (v2 URLs not str, provides str-based replacements) [3]
[4] Pydantic GitHub discussion #6395 (practical interop issue + pydantic-string-url suggestion) [4]
[5] Pydantic network types API docs (shows HttpUrl defined via Annotated[..., UrlConstraints(...)]) [5]
[6] Pydantic v2.10 release notes (URL types moved from Annotated approach to concrete URL subclasses) [6]


🏁 Script executed:

# First, find and examine the jobs.py file to see the actual code
find . -name "jobs.py" -path "*/backend/app/services/*" -type f

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 149


🏁 Script executed:

# Check the handle_job_error function signature
rg -A 5 "def handle_job_error" --type py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 459


🏁 Script executed:

# Look at the code around lines 687-691 and 500 in jobs.py
fd jobs.py -x wc -l {} \; 2>/dev/null | head -5

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 317


🏁 Script executed:

# Check the actual code at lines 687-691 and around line 500
sed -n '685,691p' backend/app/services/llm/jobs.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 338


🏁 Script executed:

# Check line 500 area
sed -n '498,505p' backend/app/services/llm/jobs.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 361


🏁 Script executed:

# Check the request object type to confirm callback_url is HttpUrl | None
rg -B 10 "def handle_job_error" backend/app/services/llm/jobs.py | head -20

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 376


🏁 Script executed:

# Look for the request type definition to verify callback_url field type
rg "callback_url" backend/app/services/llm/jobs.py -B 2 -A 2 | head -30

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 975


🏁 Script executed:

# Find the request type definition to confirm callback_url is HttpUrl | None
rg "class.*Request" backend/app/services/llm/jobs.py -A 30 | head -60

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 57


🏁 Script executed:

# Look for imports to understand what HttpUrl is being used
rg "from pydantic" backend/app/services/llm/jobs.py -A 2

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 57


🏁 Script executed:

# Check send_callback to understand what it expects
rg -A 10 "def send_callback" backend/app/services/llm/jobs.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 57


🏁 Script executed:

# Count all occurrences of handle_job_error being called with request.callback_url
rg "handle_job_error.*request\.callback_url" backend/app/services/llm/jobs.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 497


🏁 Script executed:

# Search for request classes more broadly
rg "callback_url.*HttpUrl\|HttpUrl.*callback_url" backend/app/services/llm/ -A 2 -B 2

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 57


🏁 Script executed:

# Look at the entire file structure to find the request definition
sed -n '1,100p' backend/app/services/llm/jobs.py | grep -E "^import|^from|class.*Request"

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 1186


🏁 Script executed:

# Find where request is defined in the function signature at line 687
sed -n '670,691p' backend/app/services/llm/jobs.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 884


🏁 Script executed:

# Search for HttpUrl in the entire backend
rg "HttpUrl" backend/ --type py | head -20

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 580


🏁 Script executed:

# Count all occurrences of handle_job_error being called
rg "return handle_job_error.*request\.callback_url" backend/app/services/llm/jobs.py -n

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 517


🏁 Script executed:

# Check if send_callback expects str
rg -A 5 "def send_callback" backend/app/utils.py backend/app/services/

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 404


🏁 Script executed:

# Verify what function contains line 500
sed -n '480,520p' backend/app/services/llm/jobs.py | head -50

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 1648


🏁 Script executed:

# Check all 5 occurrences in context
rg -n -B 8 "return handle_job_error.*request\.callback_url" backend/app/services/llm/jobs.py | grep -E "^[0-9]+-\s*(def |return handle_job_error)"

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 57


Convert request.callback_url (Pydantic v2 HttpUrl) to str before passing to handle_job_error.

In Pydantic v2, HttpUrl is not a str subclass and requires explicit conversion. The function handle_job_error expects callback_url: str | None, and internally calls send_callback(callback_url: str, ...). Passing HttpUrl directly will cause type errors.

This pattern appears 5 times in the file (lines 517, 550, 575, 586, 691). Convert all occurrences using str(request.callback_url) if request.callback_url else None.

Proposed fix example (line 691)
-        return handle_job_error(job_uuid, request.callback_url, callback_response)
+        return handle_job_error(
+            job_uuid,
+            str(request.callback_url) if request.callback_url else None,
+            callback_response,
+        )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/jobs.py` around lines 687 - 691, The callback_url
being passed to handle_job_error is a Pydantic v2 HttpUrl (request.callback_url)
and must be converted to a native str before use; update all calls that pass
request.callback_url (the ones that ultimately call handle_job_error ->
send_callback) to pass str(request.callback_url) if request.callback_url else
None so the signature handle_job_error(callback_url: str | None, ...) and
send_callback(callback_url: str, ...) receive proper types; apply this change to
every occurrence in this module where request.callback_url is forwarded (e.g.,
the calls around job error handling that reference handle_job_error and
send_callback).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (2)
backend/app/services/llm/jobs.py (2)

589-595: task_id is unused in execute_chain_job and task_instance lacks a type annotation.

Unlike execute_job (which logs task_id at startup and in the error handler), execute_chain_job never uses task_id, making it dead noise. Additionally, task_instance has no type annotation in either Celery task function, violating the project type-hint convention.

♻️ Proposed fix
 def execute_chain_job(
     request_data: dict,
     project_id: int,
     organization_id: int,
     job_id: str,
     task_id: str,
-    task_instance,
+    task_instance: Any,  # celery.Task — available via bind=True
 ) -> dict:

Add from typing import Any if not already imported, and include task_id in the startup log message for consistency:

     logger.info(
         f"[execute_chain_job] Starting chain execution | "
-        f"job_id={job_uuid}, total_blocks={len(request.blocks)}"
+        f"job_id={job_uuid}, task_id={task_id}, total_blocks={len(request.blocks)}"
     )

As per coding guidelines: "Always add type hints to all function parameters and return values in Python code."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/jobs.py` around lines 589 - 595, The function
execute_chain_job currently has an unused task_id parameter and an untyped
task_instance; update it to include a proper type hint (e.g., task_instance:
Any) and add the matching import (from typing import Any) if missing, and use
task_id in the startup log (and mirror execute_job’s error handling log) so
task_id is logged on start and on failure; ensure the function signature and
return type follow the project's type-hint convention and that the startup/error
log messages reference task_id.

81-121: Consider extracting a shared _create_and_schedule_job helper to eliminate duplication with start_job.

start_chain_job is structurally identical to start_job, differing only in job_type, function_path, and error string. This is a clear DRY violation.

♻️ Proposed refactor
+def _create_and_schedule_job(
+    db: Session,
+    request: LLMCallRequest | LLMChainRequest,
+    project_id: int,
+    organization_id: int,
+    *,
+    job_type: JobType,
+    function_path: str,
+    error_detail: str,
+) -> UUID:
+    trace_id = correlation_id.get() or "N/A"
+    fn_label = function_path.rsplit(".", 1)[-1]
+    job_crud = JobCrud(session=db)
+    job = job_crud.create(job_type=job_type, trace_id=trace_id)
+    db.flush()
+    db.commit()
+    logger.info(f"[{fn_label}] Created job | job_id={job.id}, status={job.status}, project_id={project_id}")
+    try:
+        task_id = start_high_priority_job(
+            function_path=function_path,
+            project_id=project_id,
+            job_id=str(job.id),
+            trace_id=trace_id,
+            request_data=request.model_dump(mode="json"),
+            organization_id=organization_id,
+        )
+    except Exception as e:
+        logger.error(f"[{fn_label}] Error starting Celery task: {e} | job_id={job.id}", exc_info=True)
+        job_crud.update(job_id=job.id, job_update=JobUpdate(status=JobStatus.FAILED, error_message=str(e)))
+        raise HTTPException(status_code=500, detail=error_detail)
+    logger.info(f"[{fn_label}] Job scheduled | job_id={job.id}, project_id={project_id}, task_id={task_id}")
+    return job.id

 def start_job(db, request, project_id, organization_id) -> UUID:
-    # ... full body ...
+    return _create_and_schedule_job(db, request, project_id, organization_id,
+        job_type=JobType.LLM_API,
+        function_path="app.services.llm.jobs.execute_job",
+        error_detail="Internal server error while executing LLM call",
+    )

 def start_chain_job(db, request, project_id, organization_id) -> UUID:
-    # ... full body ...
+    return _create_and_schedule_job(db, request, project_id, organization_id,
+        job_type=JobType.LLM_CHAIN,
+        function_path="app.services.llm.jobs.execute_chain_job",
+        error_detail="Internal server error while executing LLM chain job",
+    )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/jobs.py` around lines 81 - 121, Extract a shared
helper (e.g., _create_and_schedule_job) to remove duplication between
start_chain_job and start_job: the helper should accept parameters job_type
(JobType.LLM_CHAIN / other), function_path (string), project_id,
organization_id, request_data (request.model_dump(mode="json") or None), and an
error message/template; it should perform JobCrud(session=db).create(...),
db.flush(), db.commit(), call start_high_priority_job with the provided
function_path and args, catch exceptions to log with logger.error (include
exception and job.id), update the job via job_crud.update(job_id=job.id,
job_update=JobUpdate(status=JobStatus.FAILED, error_message=str(e))) and
re-raise an HTTPException with the provided error detail, and finally return
job.id (or task_id if needed). Replace start_chain_job and start_job to call
this helper with their respective JobType, function_path, and error text,
keeping unique symbols JobCrud, JobUpdate, JobStatus, start_high_priority_job,
and logger for locating code.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/app/api/docs/llm/llm_chain.md`:
- Line 9: The docs line incorrectly states `input` is a string; update the
documentation for QueryParams.input to reflect that it is a discriminated union
(TextInput | AudioInput) and can accept either a text string or an audio
payload; mention both variants (TextInput and AudioInput) and their
distinguishing fields (e.g., type/discriminator and text vs. audio
data/uri/format) so consumers know audio inputs are supported and how to
construct them when using LLMChain or related functions that reference
QueryParams.input.
- Line 48: The docs line "If not provided, response is only accessible through
job status" is inaccurate because the LLM chain creation endpoint in
llm_chain.py does not return a job_id; either remove or reword that sentence in
llm_chain.md to reflect current behavior (e.g., state that no job identifier is
returned and callers cannot poll for status) or alternatively update the
endpoint implementation in llm_chain.py to include and return a job_id
(generate/attach the Job id from the job creation flow and include it in the
JSON response) so the documentation is correct.

In `@backend/app/services/llm/jobs.py`:
- Around line 683-688: The inner except in execute_chain_job catches exceptions
without binding them, but the logger.error message references the outer
exception variable e; change the inner except to capture the exception (e.g.
except Exception as update_err:) and update the logger.error call in the
status-update block to log update_err (and keep exc_info=True) so the message
correctly reports the failure to update chain status for chain_uuid and uses the
correct exception object; reference the execute_chain_job function, the
chain_uuid variable, and the logger.error call to locate the change.
- Around line 414-416: The cleanup condition is incorrect because resolved_input
(str) is being compared to query.input (a TextInput|AudioInput model), so it
always yields True; update the resolver to track whether a temp file was created
(e.g., set a boolean flag like resolved_from_temp when you call the resolver
that writes a temp file) and then change the finally block to only call
cleanup_temp_file(resolved_input) when resolved_from_temp is True and
resolved_input is truthy; alternatively, if you prefer not to add a flag,
compare resolved_input against the model's text field (like getattr(query.input,
"text", None)) or check isinstance(query.input, AudioInput) before deciding to
clean up. Ensure you update the resolver location where resolved_input is
produced and reference resolved_input, query.input, and cleanup_temp_file in the
fix.
- Around line 614-617: execute_chain_job currently never updates the job status
to JobStatus.PROCESSING like execute_job does; update execute_chain_job to mark
the job as PROCESSING as soon as the Celery task starts (before creating the
chain or doing heavy work). Locate the execute_chain_job function and add the
same status transition logic used in execute_job (set status to
JobStatus.PROCESSING and persist the change via the session/ORM call that your
project uses) prior to calling create_llm_chain or invoking downstream
processing so monitoring/alerting sees the in-progress state.

---

Duplicate comments:
In `@backend/app/api/routes/llm_chain.py`:
- Around line 48-62: start_chain_job returns a job UUID that is currently
discarded; capture its return value and include it in the API response so
callers can poll when no callback_url is provided. Specifically, assign job_id =
start_chain_job(db=_session, request=request, project_id=project_id,
organization_id=organization_id) and then return
APIResponse.success_response(data=Message(message=...), job_id=job_id) or embed
the job_id into the Message payload; also make the success message conditional:
if request.callback_url is set keep the callback wording, otherwise return a
polling message including the job_id. Ensure you reference start_chain_job,
request.callback_url, APIResponse.success_response and Message when making the
change.
- Line 20: Both route handler functions are missing return type annotations;
update llm_callback_notification(body: APIResponse[LLMChainResponse]) and the
other route handler in this module to include precise return types (e.g., ->
Response, -> JSONResponse, or -> dict depending on what they actually return).
Ensure parameters already typed remain, add the appropriate importing type if
needed (from fastapi.responses or typing), and keep annotations consistent with
the rest of the codebase.
- Around line 16-17: The route decorator is using an invalid OpenAPI runtime
expression "{$callback_url}"; update the string passed to
llm_callback_router.post to a valid form — either change it to a standard path
parameter like "{callback_url}" if you intend a URL path segment, or replace it
with a proper OpenAPI runtime expression such as "{$request.body#/callback_url}"
or "{$request.header.callback_url}" depending on where the callback URL comes
from; make the change in the llm_callback_router.post decorator so the routing
and generated OpenAPI spec are valid.

In `@backend/app/services/llm/jobs.py`:
- Line 694: The call to handle_job_error passes request.callback_url (a Pydantic
v2 HttpUrl) directly but handle_job_error expects str | None; convert the
HttpUrl to str the same way used earlier: pass str(request.callback_url) if
request.callback_url else None when calling handle_job_error (reference the
handle_job_error call and the existing conversion pattern used previously near
the other callback usage).

---

Nitpick comments:
In `@backend/app/services/llm/jobs.py`:
- Around line 589-595: The function execute_chain_job currently has an unused
task_id parameter and an untyped task_instance; update it to include a proper
type hint (e.g., task_instance: Any) and add the matching import (from typing
import Any) if missing, and use task_id in the startup log (and mirror
execute_job’s error handling log) so task_id is logged on start and on failure;
ensure the function signature and return type follow the project's type-hint
convention and that the startup/error log messages reference task_id.
- Around line 81-121: Extract a shared helper (e.g., _create_and_schedule_job)
to remove duplication between start_chain_job and start_job: the helper should
accept parameters job_type (JobType.LLM_CHAIN / other), function_path (string),
project_id, organization_id, request_data (request.model_dump(mode="json") or
None), and an error message/template; it should perform
JobCrud(session=db).create(...), db.flush(), db.commit(), call
start_high_priority_job with the provided function_path and args, catch
exceptions to log with logger.error (include exception and job.id), update the
job via job_crud.update(job_id=job.id,
job_update=JobUpdate(status=JobStatus.FAILED, error_message=str(e))) and
re-raise an HTTPException with the provided error detail, and finally return
job.id (or task_id if needed). Replace start_chain_job and start_job to call
this helper with their respective JobType, function_path, and error text,
keeping unique symbols JobCrud, JobUpdate, JobStatus, start_high_priority_job,
and logger for locating code.

### Key Parameters

**`query`** (required) - Initial query input for the first block in the chain:
- `input` (required, string, min 1 char): User question/prompt/query
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

input documented as string only — audio input support is omitted.

QueryParams.input is a discriminated union of TextInput and AudioInput. Describing it as string misleads consumers who may want to use audio inputs in chains.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/api/docs/llm/llm_chain.md` at line 9, The docs line incorrectly
states `input` is a string; update the documentation for QueryParams.input to
reflect that it is a discriminated union (TextInput | AudioInput) and can accept
either a text string or an audio payload; mention both variants (TextInput and
AudioInput) and their distinguishing fields (e.g., type/discriminator and text
vs. audio data/uri/format) so consumers know audio inputs are supported and how
to construct them when using LLMChain or related functions that reference
QueryParams.input.

**`callback_url`** (optional, HTTPS URL):
- Webhook endpoint to receive the final response and intermediate callbacks
- Must be a valid HTTPS URL
- If not provided, response is only accessible through job status
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

"Accessible through job status" is misleading — the endpoint currently does not return a job_id.

Without a job_id in the response, callers have no identifier to query for job status when callback_url is omitted. This documentation claim is inaccurate until the route returns a job ID (see the comment in llm_chain.py).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/api/docs/llm/llm_chain.md` at line 48, The docs line "If not
provided, response is only accessible through job status" is inaccurate because
the LLM chain creation endpoint in llm_chain.py does not return a job_id; either
remove or reword that sentence in llm_chain.md to reflect current behavior
(e.g., state that no job identifier is returned and callers cannot poll for
status) or alternatively update the endpoint implementation in llm_chain.py to
include and return a job_id (generate/attach the Job id from the job creation
flow and include it in the JSON response) so the documentation is correct.

Comment on lines 414 to 416
finally:
# Clean up temp files for audio inputs
if resolved_input and resolved_input != request.query.input:
if resolved_input and resolved_input != query.input:
cleanup_temp_file(resolved_input)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Temp-file cleanup condition is always Trueresolved_input != query.input compares str to a Pydantic model.

resolved_input is a str; query.input is a TextInput | AudioInput object. The != comparison is always True, so cleanup_temp_file fires for every non-empty input, including plain-text inputs. This is benign (due to missing_ok=True) but semantically wrong.

🐛 Proposed fix
-            if resolved_input and resolved_input != query.input:
+            if resolved_input and not isinstance(query.input, TextInput):
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/jobs.py` around lines 414 - 416, The cleanup
condition is incorrect because resolved_input (str) is being compared to
query.input (a TextInput|AudioInput model), so it always yields True; update the
resolver to track whether a temp file was created (e.g., set a boolean flag like
resolved_from_temp when you call the resolver that writes a temp file) and then
change the finally block to only call cleanup_temp_file(resolved_input) when
resolved_from_temp is True and resolved_input is truthy; alternatively, if you
prefer not to add a flag, compare resolved_input against the model's text field
(like getattr(query.input, "text", None)) or check isinstance(query.input,
AudioInput) before deciding to clean up. Ensure you update the resolver location
where resolved_input is produced and reference resolved_input, query.input, and
cleanup_temp_file in the fix.

Comment on lines +614 to +617
try:
with Session(engine) as session:
chain_record = create_llm_chain(
session,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

execute_chain_job never sets the job to JobStatus.PROCESSING — inconsistency with execute_job.

execute_job immediately marks the job as PROCESSING (lines 483–486) when the Celery task starts. execute_chain_job skips this step entirely, leaving the job in its initial state until completion or failure. This breaks monitoring/alerting that relies on status transitions and makes it impossible to distinguish stuck-queued jobs from in-progress ones.

🐛 Proposed fix
     try:
         with Session(engine) as session:
+            JobCrud(session=session).update(
+                job_id=job_uuid, job_update=JobUpdate(status=JobStatus.PROCESSING)
+            )
             chain_record = create_llm_chain(
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
with Session(engine) as session:
chain_record = create_llm_chain(
session,
try:
with Session(engine) as session:
JobCrud(session=session).update(
job_id=job_uuid, job_update=JobUpdate(status=JobStatus.PROCESSING)
)
chain_record = create_llm_chain(
session,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/jobs.py` around lines 614 - 617, execute_chain_job
currently never updates the job status to JobStatus.PROCESSING like execute_job
does; update execute_chain_job to mark the job as PROCESSING as soon as the
Celery task starts (before creating the chain or doing heavy work). Locate the
execute_chain_job function and add the same status transition logic used in
execute_job (set status to JobStatus.PROCESSING and persist the change via the
session/ORM call that your project uses) prior to calling create_llm_chain or
invoking downstream processing so monitoring/alerting sees the in-progress
state.

Comment on lines +683 to +688
except Exception:
logger.error(
f"[execute_chain_job] Failed to update chain status: {e} | "
f"chain_id={chain_uuid}",
exc_info=True,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Inner except block logs the outer exception e instead of the status-update exception.

The bare except Exception: at line 683 does not bind the caught exception, so {e} in the log message still refers to the outer chain-execution error — not the failure to update chain status. This is misleading during debugging.

🐛 Proposed fix
-        except Exception:
+        except Exception as update_error:
             logger.error(
-                f"[execute_chain_job] Failed to update chain status: {e} | "
-                f"chain_id={chain_uuid}",
+                f"[execute_chain_job] Failed to update chain status: {update_error} | "
+                f"chain_id={chain_uuid}",
                 exc_info=True,
             )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/jobs.py` around lines 683 - 688, The inner except in
execute_chain_job catches exceptions without binding them, but the logger.error
message references the outer exception variable e; change the inner except to
capture the exception (e.g. except Exception as update_err:) and update the
logger.error call in the status-update block to log update_err (and keep
exc_info=True) so the message correctly reports the failure to update chain
status for chain_uuid and uses the correct exception object; reference the
execute_chain_job function, the chain_uuid variable, and the logger.error call
to locate the change.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
backend/app/services/llm/chain/executor.py (1)

63-91: Unnecessary session acquired on the error path.

with Session(engine) as session: at Line 63 wraps both branches. When result.success is False, the session object is created but never used, then _handle_error (Line 91) opens its own session at Line 103. Consider dispatching the error case before entering the session context.

♻️ Proposed refactor
 def _teardown(self, result: BlockResult) -> dict:
     """Finalize chain record, send callback, and update job status."""
+    if not result.success:
+        return self._handle_error(result.error)
+
     with Session(engine) as session:
-        if result.success:
-            final = LLMChainResponse(
-                ...
-            )
-            ...
-            return callback_response.model_dump()
-        else:
-            return self._handle_error(result.error)
+        final = LLMChainResponse(...)
+        ...
+        return callback_response.model_dump()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/chain/executor.py` around lines 63 - 91, The session
is opened unconditionally but only used on the success path; change executor
logic to check result.success before entering the Session(engine) context so the
error path returns early via self._handle_error(result.error) without creating a
session. Concretely, move or add the early guard (if not result.success: return
self._handle_error(result.error)) before the with Session(engine) as session:
block, and keep the existing success logic that constructs LLMChainResponse,
calls APIResponse.success_response, send_callback, JobCrud(session).update, and
update_llm_chain_status inside the session context.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/app/services/llm/jobs.py`:
- Around line 480-487: The functions execute_job and execute_chain_job currently
omit type annotations for the task_instance (and task_id where noted) which
triggers static-analysis and ARG001 unused-parameter warnings; edit the
signatures for execute_job and execute_chain_job to annotate task_instance with
an appropriate type (e.g., celery.app.task.Task or typing.Any) and add a return
type hint, and if the parameter must remain unused keep the annotation but add a
trailing comment "# noqa: ARG001" to suppress the unused-argument warning;
update only the function signatures for locateable symbols execute_job and
execute_chain_job and ensure imports (Task or Any) are added as needed.

---

Duplicate comments:
In `@backend/app/services/llm/chain/executor.py`:
- Around line 73-88: The callback is sent before persisting success state; move
the DB updates (calls to JobCrud(session).update and update_llm_chain_status) to
execute and commit first, then call send_callback (only if
self._request.callback_url) afterwards; wrap send_callback in its own try/except
so any exception from send_callback (or network issues) is caught and logged but
does not prevent the job/chain from remaining marked SUCCESS/COMPLETED
(referenced symbols: send_callback, self._request.callback_url,
callback_response, JobCrud.update, update_llm_chain_status).
- Around line 103-121: The send_callback call in _handle_error can raise and
prevent the DB updates, leaving ChainStatus and JobStatus stuck; move the
update_llm_chain_status(session, chain_id=..., status=ChainStatus.FAILED, ...)
and JobCrud(session).update(job_id=self._context.job_id,
job_update=JobUpdate(...)) to execute before calling send_callback, and then
call send_callback afterwards inside its own try/except so callback failures are
logged/ignored without preventing the DB writes; locate the relevant block using
send_callback, update_llm_chain_status, and JobCrud.update to reorder and add
the error handling.

In `@backend/app/services/llm/jobs.py`:
- Around line 529-531: The callback_url from the request is a Pydantic Url
object (not a str) so update every call site that passes request.callback_url to
send_callback and handle_job_error to pass a string instead; specifically change
the send_callback call in execute_job (callback_url=request.callback_url) and
all handle_job_error calls in execute_job and execute_chain_job (currently
callback_url=request.callback_url) to use callback_url=str(request.callback_url)
so the functions receive a plain str.
- Around line 658-663: The inner bare "except Exception:" in execute_chain_job
incorrectly references the outer exception variable `e`; change the inner
handler to capture its own exception (e.g., "except Exception as update_err:")
and update the logger.error call in that block to include the new variable
(e.g., `update_err`) in the message and keep `exc_info=True` so the actual
status-update failure is logged; locate this in jobs.py around the logger.error
inside execute_chain_job.
- Around line 424-426: The current cleanup condition compares resolved_input
(str) to query.input (TextInput | AudioInput) which is always True, so update
the logic to only attempt temp-file cleanup for inputs that can produce a temp
file (i.e., AudioInput). Replace the condition using the cross-type comparison
with an explicit type check against AudioInput (use the query.input model class
name) and only call cleanup_temp_file(resolved_input) when resolved_input is
truthy and query.input is an AudioInput; remove the resolved_input !=
query.input comparison. This ensures cleanup_temp_file is only invoked for
audio-derived temp files and leaves TextInput untouched.

---

Nitpick comments:
In `@backend/app/services/llm/chain/executor.py`:
- Around line 63-91: The session is opened unconditionally but only used on the
success path; change executor logic to check result.success before entering the
Session(engine) context so the error path returns early via
self._handle_error(result.error) without creating a session. Concretely, move or
add the early guard (if not result.success: return
self._handle_error(result.error)) before the with Session(engine) as session:
block, and keep the existing success logic that constructs LLMChainResponse,
calls APIResponse.success_response, send_callback, JobCrud(session).update, and
update_llm_chain_status inside the session context.

Comment on lines +480 to +487
def execute_job(
request_data: dict,
project_id: int,
organization_id: int,
job_id: str,
task_id: str,
task_instance,
) -> dict:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

task_instance (and task_id in execute_chain_job) missing type annotations — coding guideline violation.

Both execute_job (Line 486) and execute_chain_job (Line 569) leave task_instance without a type hint. The static analysis tools also flag these as unused parameters; if the Celery task signature requires them, add a type annotation (e.g., Task from celery) or Any, and suppress with # noqa: ARG001 if intentionally unused.

🛠️ Proposed fix
 def execute_job(
     request_data: dict,
     project_id: int,
     organization_id: int,
     job_id: str,
     task_id: str,
-    task_instance,
+    task_instance: Any,  # required by Celery task signature
 ) -> dict:
 def execute_chain_job(
     request_data: dict,
     project_id: int,
     organization_id: int,
     job_id: str,
     task_id: str,
-    task_instance,
+    task_instance: Any,  # required by Celery task signature
 ) -> dict:

As per coding guidelines, "Always add type hints to all function parameters and return values in Python code."

Also applies to: 563-570

🧰 Tools
🪛 Ruff (0.15.1)

[warning] 486-486: Unused function argument: task_instance

(ARG001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/llm/jobs.py` around lines 480 - 487, The functions
execute_job and execute_chain_job currently omit type annotations for the
task_instance (and task_id where noted) which triggers static-analysis and
ARG001 unused-parameter warnings; edit the signatures for execute_job and
execute_chain_job to annotate task_instance with an appropriate type (e.g.,
celery.app.task.Task or typing.Any) and add a return type hint, and if the
parameter must remain unused keep the annotation but add a trailing comment "#
noqa: ARG001" to suppress the unused-argument warning; update only the function
signatures for locateable symbols execute_job and execute_chain_job and ensure
imports (Task or Any) are added as needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

Status: In Progress

Development

Successfully merging this pull request may close these issues.

LLM Chain: Add sequential multi-step LLM call execution LLM-chain: database schema for chain execution tracking

1 participant