⚡ Bolt: [performance improvement] Optimize Weaver concurrency#93
⚡ Bolt: [performance improvement] Optimize Weaver concurrency#93ishaanxgupta wants to merge 1 commit intomainfrom
Conversation
- Use asyncio.gather to concurrently execute non-batched vector operations (Temporal, Code, Snippet domains). - Use run_in_executor to prevent synchronous embed_fn network calls from blocking the event loop during batched ADD operations. - Clean up unused imports across the codebase found during linting.
|
👋 Jules, reporting for duty! I'm here to lend a hand with this pull request. When you start a review, I'll add a 👀 emoji to each comment to let you know I've read it. I'll focus on feedback directed at me and will do my best to stay out of conversations between you and other bots or reviewers to keep the noise down. I'll push a commit with your requested changes shortly after. Please note there might be a delay between these steps, but rest assured I'm on the job! For more direct control, you can switch me to Reactive Mode. When this mode is on, I will only act on comments where you specifically mention me with New to Jules? Learn more at jules.google/docs. For security, I will only act on instructions from the user who triggered this task. |
💡 What: The optimization implements concurrent execution for I/O bound operations in the Weaver pipeline. Non-batched domain operations (Temporal, Code, Snippet) are now executed using
asyncio.gather. Furthermore, synchronous embedding generation in_execute_batched_vectoris offloaded to a thread pool vialoop.run_in_executor. Also includes cleanup of unused imports across the codebase.🎯 Why: Writing to Neo4j and vector databases sequentially incurs a latency cost equal to N * (latency of one write). Concurrent execution brings this down to roughly 1 * (latency of the slowest write). Embedding generation involves blocking HTTP calls which halts the event loop; offloading them ensures high throughput of the API server.
📊 Impact: Significantly reduces latency for non-batched domains (Temporal, Code, Snippet). Prevents event loop blocking, improving overall responsiveness of the system under load.
🔬 Measurement: To verify the improvement, run a batch of 10 or more temporal operations through the ingest pipeline and measure the total time spent in the
Weaver.executemethod. The time should be roughly equivalent to a single operation's latency, compared to the previous sum of all operations' latencies. Check API server response times when multiple large documents are being ingested concurrently; they should remain stable.PR created automatically by Jules for task 5632947543712027122 started by @ishaanxgupta