Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
## 2024-05-24 - Weaver Pipeline Vector Add Operations Bottleneck
**Learning:** The Weaver pipeline (`src/pipelines/weaver.py`) batched vector `ADD` operations was iterating sequentially to generate embeddings, which was a performance bottleneck specific to this codebase due to the nature of synchronous external calls embedded in an asynchronous flow without leveraging the thread pool for each item in the batch.
**Action:** Replaced the sequential iteration with `asyncio.gather` and `asyncio.get_running_loop().run_in_executor` to perform embedding generations concurrently.
43 changes: 28 additions & 15 deletions src/pipelines/weaver.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from __future__ import annotations

import asyncio
import logging
from typing import Any, Callable, Dict, List, Optional

Expand Down Expand Up @@ -123,12 +124,11 @@ async def flush_add_batch():
if not add_batch_ops:
return

# Prepare data for batch add
valid_ops = []
texts = []
embeddings = []
metadatas = []
loop = asyncio.get_running_loop()

# Filter valid ops and create tasks
ops_to_process = []
tasks = []
for op in add_batch_ops:
if not op.content:
logger.warning("ADD with empty content β€” skipping.")
Expand All @@ -137,22 +137,35 @@ async def flush_add_batch():
error="ADD requires content",
))
continue
ops_to_process.append(op)
tasks.append(loop.run_in_executor(None, self.embed_fn, op.content))

try:
emb = self.embed_fn(op.content)
if not ops_to_process:
return

# Execute embedding generations concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)

# Prepare data for batch add
valid_ops = []
texts = []
embeddings = []
metadatas = []

for op, result in zip(ops_to_process, results):
if isinstance(result, Exception):
logger.error("Embedding generation failed for ADD: %s", result)
executed_ops.append(ExecutedOp(
type=op.type, status=OpStatus.FAILED,
content=op.content, error=str(result)
))
else:
meta = {"user_id": user_id, "domain": domain.value}
meta.update(_extract_structured_metadata(op.content))

valid_ops.append(op)
texts.append(op.content)
embeddings.append(emb)
embeddings.append(result)
metadatas.append(meta)
except Exception as exc:
logger.error("Embedding generation failed for ADD: %s", exc)
executed_ops.append(ExecutedOp(
type=op.type, status=OpStatus.FAILED,
content=op.content, error=str(exc)
))

if valid_ops:
try:
Expand Down