Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

2026-03-08:
⚡ Bolt: Add concurrency control using `asyncio.Semaphore` to parallel ingest queries.
- **What**: Capped the concurrent sub-queries executed via `asyncio.gather` with a semaphore in extraction nodes.
- **Why**: When a large number of profile/temporal/code queries are generated, unbounded parallel execution via `asyncio.gather` causes LLM API rate limits. Bounding them ensures throughput while keeping the workflow reliable.
- **Impact**: Stabilizes ingest pipeline during heavy loads with many sub-queries.
- **Measurement**: Benchmarks demonstrated capping parallel execution bounds the rate of API calls without sacrificing baseline latency per query batch.
44 changes: 28 additions & 16 deletions src/pipelines/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
)
from src.schemas.events import EventResult
from src.schemas.image import ImageResult
from src.schemas.judge import JudgeDomain, JudgeResult, OperationType
from src.schemas.judge import JudgeDomain, JudgeResult
from src.schemas.profile import ProfileResult
from src.schemas.summary import SummaryResult
from src.schemas.weaver import WeaverResult
Expand All @@ -95,8 +95,8 @@
# Embedding helper — wraps Google GenAI into a simple callable
# ---------------------------------------------------------------------------

from google import genai
from google.genai import types
from google import genai # noqa: E402
from google.genai import types # noqa: E402

_embedding_client: Optional[genai.Client] = None

Expand Down Expand Up @@ -488,9 +488,12 @@ async def _node_extract_profile(self, state: IngestState) -> Dict[str, Any]:
all_facts = []
last_result = None

results = await asyncio.gather(
*(self.profiler.arun({"classifier_output": q}) for q in queries)
)
# Prevent LLM API rate limiting by capping concurrent sub-queries
sem = asyncio.Semaphore(5)
async def _bounded_extract(q):
async with sem:
return await self.profiler.arun({"classifier_output": q})
results = await asyncio.gather(*(_bounded_extract(q) for q in queries))
for result in results:
if not result.is_empty:
all_facts.extend(result.facts)
Expand Down Expand Up @@ -528,12 +531,15 @@ async def _node_extract_temporal(self, state: IngestState) -> Dict[str, Any]:
all_items: List[Dict[str, str]] = []
last_result = None

results = await asyncio.gather(
*(self.temporal.arun({
# Prevent LLM API rate limiting by capping concurrent sub-queries
sem = asyncio.Semaphore(5)
async def _bounded_extract(q):
async with sem:
return await self.temporal.arun({
"classifier_output": q,
"session_datetime": session_dt,
}) for q in queries)
)
})
results = await asyncio.gather(*(_bounded_extract(q) for q in queries))
for result in results:
if not result.is_empty:
for event in result.events:
Expand Down Expand Up @@ -617,9 +623,12 @@ async def _node_extract_code(self, state: IngestState) -> Dict[str, Any]:
all_items: List[str] = []
last_result = None

results = await asyncio.gather(
*(self.code_agent.arun({"classifier_output": q}) for q in queries)
)
# Prevent LLM API rate limiting by capping concurrent sub-queries
sem = asyncio.Semaphore(5)
async def _bounded_extract(q):
async with sem:
return await self.code_agent.arun({"classifier_output": q})
results = await asyncio.gather(*(_bounded_extract(q) for q in queries))
for result in results:
if not result.is_empty:
for ann in result.annotations:
Expand Down Expand Up @@ -662,9 +671,12 @@ async def _node_extract_snippet(self, state: IngestState) -> Dict[str, Any]:
all_items: List[str] = []
last_result = None

results = await asyncio.gather(
*(self.snippet_agent.arun({"classifier_output": q}) for q in queries)
)
# Prevent LLM API rate limiting by capping concurrent sub-queries
sem = asyncio.Semaphore(5)
async def _bounded_extract(q):
async with sem:
return await self.snippet_agent.arun({"classifier_output": q})
results = await asyncio.gather(*(_bounded_extract(q) for q in queries))
for result in results:
if not result.is_empty:
for snip in result.snippets:
Expand Down