Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

## 2024-05-28 - Pipeline Concurrent Tool Execution
**Learning:** LangGraph/LLM-based pipelines in this architecture often sequentialize independent tool calls (e.g., retrieving context from multiple namespaces). Given that these operations involve external vector and graph databases, running them sequentially acts as a significant latency bottleneck.
**Action:** Always scan for `for tc in ai_response.tool_calls:` loops and similar sequential aggregation patterns in pipeline execution logic. Replace them with concurrent execution using `asyncio.gather` and internal helper functions to drastically reduce total retrieval latency, especially when processing queries requiring broad context.
34 changes: 29 additions & 5 deletions src/pipelines/code_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
from src.scanner.code_store import CodeStore
from src.schemas.code import (
annotations_namespace,
directories_namespace,
files_namespace,
snippets_namespace,
symbols_namespace,
Expand Down Expand Up @@ -375,7 +374,7 @@ async def run(
turn_records: List[SourceRecord] = []
only_read_tools = True

for tc in ai_response.tool_calls:
async def _process_tool_call(tc: Dict[str, Any]) -> Dict[str, Any]:
tool_name = tc["name"]
tool_args = tc["args"]
tool_id = tc["id"]
Expand All @@ -387,6 +386,20 @@ async def run(
)
tool_ms = (_time.perf_counter() - t1) * 1000
logger.info(" Tool: %s(%s) → %d results (%.0fms)", tool_name, tool_args, len(records), tool_ms)
return {
"tool_name": tool_name,
"tool_id": tool_id,
"records": records,
}

import asyncio
results = await asyncio.gather(*[_process_tool_call(tc) for tc in ai_response.tool_calls])

for res in results:
tool_name = res["tool_name"]
tool_id = res["tool_id"]
records = res["records"]

turn_records.extend(records)
sources.extend(records)

Expand Down Expand Up @@ -471,17 +484,28 @@ async def run_stream(
if ai_response.tool_calls:
yield json.dumps({"type": "status", "content": f"Running {len(ai_response.tool_calls)} search tool(s)..."}) + "\n"

for tc in ai_response.tool_calls:
async def _process_tool_call_stream(tc: Dict[str, Any]) -> Dict[str, Any]:
tool_name = tc["name"]
tool_args = tc["args"]
tool_id = tc["id"]

logger.info(" Tool: %s(%s)", tool_name, tool_args)

records = await self._execute_tool(
tool_name, tool_args, repo=repo, top_k=top_k,
user_id=user_id,
)
return {
"tool_name": tool_name,
"tool_id": tool_id,
"records": records,
}

import asyncio
results = await asyncio.gather(*[_process_tool_call_stream(tc) for tc in ai_response.tool_calls])

for res in results:
tool_id = res["tool_id"]
records = res["records"]

sources.extend(records)

tool_result_text = self._format_tool_results(records)
Expand Down
2 changes: 1 addition & 1 deletion src/pipelines/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
)
from src.schemas.events import EventResult
from src.schemas.image import ImageResult
from src.schemas.judge import JudgeDomain, JudgeResult, OperationType
from src.schemas.judge import JudgeDomain, JudgeResult
from src.schemas.profile import ProfileResult
from src.schemas.summary import SummaryResult
from src.schemas.weaver import WeaverResult
Expand Down
20 changes: 16 additions & 4 deletions src/pipelines/retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from __future__ import annotations

import logging
import os
from typing import Any, Callable, Dict, List, Optional

from dotenv import load_dotenv
Expand Down Expand Up @@ -177,16 +176,29 @@ async def run(

if ai_response.tool_calls:
called_tools = set()
for tc in ai_response.tool_calls:

async def _process_tool_call(tc: Dict[str, Any]) -> Dict[str, Any]:
tool_name = tc["name"]
tool_args = tc["args"]
tool_id = tc["id"]

logger.info(" Tool call: %s(%s)", tool_name, tool_args)

records = await self._execute_tool(
tool_name, tool_args, user_id, top_k,
)
return {
"tool_name": tool_name,
"tool_id": tool_id,
"records": records,
}

import asyncio
results = await asyncio.gather(*[_process_tool_call(tc) for tc in ai_response.tool_calls])

for res in results:
tool_name = res["tool_name"]
tool_id = res["tool_id"]
records = res["records"]

sources.extend(records)

# Build ToolMessage for the LLM
Expand Down