diff --git a/.jules/bolt.md b/.jules/bolt.md new file mode 100644 index 0000000..c0e7d20 --- /dev/null +++ b/.jules/bolt.md @@ -0,0 +1,4 @@ + +## 2024-05-28 - Pipeline Concurrent Tool Execution +**Learning:** LangGraph/LLM-based pipelines in this architecture often sequentialize independent tool calls (e.g., retrieving context from multiple namespaces). Given that these operations involve external vector and graph databases, running them sequentially acts as a significant latency bottleneck. +**Action:** Always scan for `for tc in ai_response.tool_calls:` loops and similar sequential aggregation patterns in pipeline execution logic. Replace them with concurrent execution using `asyncio.gather` and internal helper functions to drastically reduce total retrieval latency, especially when processing queries requiring broad context. diff --git a/src/pipelines/code_retrieval.py b/src/pipelines/code_retrieval.py index 69e4abc..f4c3108 100644 --- a/src/pipelines/code_retrieval.py +++ b/src/pipelines/code_retrieval.py @@ -37,7 +37,6 @@ from src.scanner.code_store import CodeStore from src.schemas.code import ( annotations_namespace, - directories_namespace, files_namespace, snippets_namespace, symbols_namespace, @@ -375,7 +374,7 @@ async def run( turn_records: List[SourceRecord] = [] only_read_tools = True - for tc in ai_response.tool_calls: + async def _process_tool_call(tc: Dict[str, Any]) -> Dict[str, Any]: tool_name = tc["name"] tool_args = tc["args"] tool_id = tc["id"] @@ -387,6 +386,20 @@ async def run( ) tool_ms = (_time.perf_counter() - t1) * 1000 logger.info(" Tool: %s(%s) → %d results (%.0fms)", tool_name, tool_args, len(records), tool_ms) + return { + "tool_name": tool_name, + "tool_id": tool_id, + "records": records, + } + + import asyncio + results = await asyncio.gather(*[_process_tool_call(tc) for tc in ai_response.tool_calls]) + + for res in results: + tool_name = res["tool_name"] + tool_id = res["tool_id"] + records = res["records"] + turn_records.extend(records) sources.extend(records) @@ -471,17 +484,28 @@ async def run_stream( if ai_response.tool_calls: yield json.dumps({"type": "status", "content": f"Running {len(ai_response.tool_calls)} search tool(s)..."}) + "\n" - for tc in ai_response.tool_calls: + async def _process_tool_call_stream(tc: Dict[str, Any]) -> Dict[str, Any]: tool_name = tc["name"] tool_args = tc["args"] tool_id = tc["id"] - logger.info(" Tool: %s(%s)", tool_name, tool_args) - records = await self._execute_tool( tool_name, tool_args, repo=repo, top_k=top_k, user_id=user_id, ) + return { + "tool_name": tool_name, + "tool_id": tool_id, + "records": records, + } + + import asyncio + results = await asyncio.gather(*[_process_tool_call_stream(tc) for tc in ai_response.tool_calls]) + + for res in results: + tool_id = res["tool_id"] + records = res["records"] + sources.extend(records) tool_result_text = self._format_tool_results(records) diff --git a/src/pipelines/ingest.py b/src/pipelines/ingest.py index f78e983..b3e3c0b 100644 --- a/src/pipelines/ingest.py +++ b/src/pipelines/ingest.py @@ -82,7 +82,7 @@ ) from src.schemas.events import EventResult from src.schemas.image import ImageResult -from src.schemas.judge import JudgeDomain, JudgeResult, OperationType +from src.schemas.judge import JudgeDomain, JudgeResult from src.schemas.profile import ProfileResult from src.schemas.summary import SummaryResult from src.schemas.weaver import WeaverResult diff --git a/src/pipelines/retrieval.py b/src/pipelines/retrieval.py index d54cc0d..61c2f63 100644 --- a/src/pipelines/retrieval.py +++ b/src/pipelines/retrieval.py @@ -21,7 +21,6 @@ from __future__ import annotations import logging -import os from typing import Any, Callable, Dict, List, Optional from dotenv import load_dotenv @@ -177,16 +176,29 @@ async def run( if ai_response.tool_calls: called_tools = set() - for tc in ai_response.tool_calls: + + async def _process_tool_call(tc: Dict[str, Any]) -> Dict[str, Any]: tool_name = tc["name"] tool_args = tc["args"] tool_id = tc["id"] - logger.info(" Tool call: %s(%s)", tool_name, tool_args) - records = await self._execute_tool( tool_name, tool_args, user_id, top_k, ) + return { + "tool_name": tool_name, + "tool_id": tool_id, + "records": records, + } + + import asyncio + results = await asyncio.gather(*[_process_tool_call(tc) for tc in ai_response.tool_calls]) + + for res in results: + tool_name = res["tool_name"] + tool_id = res["tool_id"] + records = res["records"] + sources.extend(records) # Build ToolMessage for the LLM