diff --git a/.jules/bolt.md b/.jules/bolt.md new file mode 100644 index 0000000..730ac79 --- /dev/null +++ b/.jules/bolt.md @@ -0,0 +1,3 @@ +## 2024-05-24 - Weaver Pipeline Vector Add Operations Bottleneck +**Learning:** The Weaver pipeline (`src/pipelines/weaver.py`) batched vector `ADD` operations was iterating sequentially to generate embeddings, which was a performance bottleneck specific to this codebase due to the nature of synchronous external calls embedded in an asynchronous flow without leveraging the thread pool for each item in the batch. +**Action:** Replaced the sequential iteration with `asyncio.gather` and `asyncio.get_running_loop().run_in_executor` to perform embedding generations concurrently. diff --git a/src/pipelines/weaver.py b/src/pipelines/weaver.py index 8138436..8b215aa 100644 --- a/src/pipelines/weaver.py +++ b/src/pipelines/weaver.py @@ -12,6 +12,7 @@ from __future__ import annotations +import asyncio import logging from typing import Any, Callable, Dict, List, Optional @@ -123,12 +124,11 @@ async def flush_add_batch(): if not add_batch_ops: return - # Prepare data for batch add - valid_ops = [] - texts = [] - embeddings = [] - metadatas = [] + loop = asyncio.get_running_loop() + # Filter valid ops and create tasks + ops_to_process = [] + tasks = [] for op in add_batch_ops: if not op.content: logger.warning("ADD with empty content — skipping.") @@ -137,22 +137,35 @@ async def flush_add_batch(): error="ADD requires content", )) continue + ops_to_process.append(op) + tasks.append(loop.run_in_executor(None, self.embed_fn, op.content)) - try: - emb = self.embed_fn(op.content) + if not ops_to_process: + return + + # Execute embedding generations concurrently + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Prepare data for batch add + valid_ops = [] + texts = [] + embeddings = [] + metadatas = [] + + for op, result in zip(ops_to_process, results): + if isinstance(result, Exception): + logger.error("Embedding generation failed for ADD: %s", result) + executed_ops.append(ExecutedOp( + type=op.type, status=OpStatus.FAILED, + content=op.content, error=str(result) + )) + else: meta = {"user_id": user_id, "domain": domain.value} meta.update(_extract_structured_metadata(op.content)) - valid_ops.append(op) texts.append(op.content) - embeddings.append(emb) + embeddings.append(result) metadatas.append(meta) - except Exception as exc: - logger.error("Embedding generation failed for ADD: %s", exc) - executed_ops.append(ExecutedOp( - type=op.type, status=OpStatus.FAILED, - content=op.content, error=str(exc) - )) if valid_ops: try: