diff --git a/.jules/bolt.md b/.jules/bolt.md new file mode 100644 index 0000000..dde1e4b --- /dev/null +++ b/.jules/bolt.md @@ -0,0 +1,4 @@ + +## 2024-05-20 - Batch multi-repository searches in pipelines +**Learning:** In pipelines performing multi-repository operations (like `CodeRetrievalPipeline`), sequential iterations over configured repositories for asynchronous tasks (e.g., `await self._search_namespace`) introduce an N+1 query overhead. The performance degrades linearly with the number of attached repositories. +**Action:** When working with multiple repositories concurrently, avoid sequential bottlenecks by batching operations using `asyncio.gather(*tasks)` mapped over async helpers and flattening the results, exactly as optimized in `src/pipelines/code_retrieval.py` for `_search_symbols` and `_search_files`. diff --git a/src/pipelines/code_retrieval.py b/src/pipelines/code_retrieval.py index 69e4abc..e2511df 100644 --- a/src/pipelines/code_retrieval.py +++ b/src/pipelines/code_retrieval.py @@ -25,6 +25,7 @@ from __future__ import annotations +import asyncio import logging from typing import Any, Callable, Dict, List, Optional @@ -37,7 +38,6 @@ from src.scanner.code_store import CodeStore from src.schemas.code import ( annotations_namespace, - directories_namespace, files_namespace, snippets_namespace, symbols_namespace, @@ -589,14 +589,24 @@ async def _search_symbols( ) -> List[SourceRecord]: if not repo: logger.warning("search_symbols called without repo — searching all repos") - results = [] - for r in self.repos: - results.extend(await self._search_namespace( + + # Fetch from all repos concurrently + tasks = [ + self._search_namespace( namespace=symbols_namespace(self.org_id, r), query=query, domain="symbol", top_k=top_k, - )) + ) + for r in self.repos + ] + results_list = await asyncio.gather(*tasks) + + # Flatten results + results = [] + for res in results_list: + results.extend(res) + return results[:top_k] return await self._search_namespace( @@ -612,14 +622,23 @@ async def _search_files( self, query: str, repo: str, top_k: int = 10, ) -> List[SourceRecord]: if not repo: - results = [] - for r in self.repos: - results.extend(await self._search_namespace( + # Fetch from all repos concurrently + tasks = [ + self._search_namespace( namespace=files_namespace(self.org_id, r), query=query, domain="file", top_k=top_k, - )) + ) + for r in self.repos + ] + results_list = await asyncio.gather(*tasks) + + # Flatten results + results = [] + for res in results_list: + results.extend(res) + return results[:top_k] return await self._search_namespace(