diff --git a/src/pipelines/weaver.py b/src/pipelines/weaver.py index 8138436..48b0fd1 100644 --- a/src/pipelines/weaver.py +++ b/src/pipelines/weaver.py @@ -12,6 +12,7 @@ from __future__ import annotations +import asyncio import logging from typing import Any, Callable, Dict, List, Optional @@ -92,9 +93,12 @@ async def execute( batched_executed = await self._execute_batched_vector(judge_result.operations, domain, user_id) result.executed.extend(batched_executed) else: - for op in judge_result.operations: - executed = await self._execute_one(op, domain, user_id) - result.executed.append(executed) + tasks = [ + self._execute_one(op, domain, user_id) + for op in judge_result.operations + ] + executed_list = await asyncio.gather(*tasks) + result.executed.extend(executed_list) self._log_summary(domain, result) return result @@ -539,7 +543,6 @@ async def _code_add(self, op: Operation, user_id: str) -> ExecutedOp: content=op.content, error="No vector store for code domain", ) - import asyncio loop = asyncio.get_running_loop() embedding = await loop.run_in_executor(None, self.embed_fn, op.content) metadata: Dict[str, Any] = { @@ -599,7 +602,6 @@ async def _code_update(self, op: Operation, user_id: str) -> ExecutedOp: ) parsed = _parse_code_annotation_content(op.content) - import asyncio loop = asyncio.get_running_loop() embedding = await loop.run_in_executor(None, self.embed_fn, op.content) metadata: Dict[str, Any] = { @@ -642,7 +644,6 @@ async def _code_delete(self, op: Operation) -> ExecutedOp: embedding_id=op.embedding_id, error="No vector store for code domain", ) - import asyncio loop = asyncio.get_running_loop() from functools import partial success = await loop.run_in_executor(None, partial(store.delete, ids=[op.embedding_id])) @@ -695,7 +696,6 @@ async def _snippet_add(self, op: Operation, user_id: str) -> ExecutedOp: parsed = _parse_snippet_content(op.content) searchable = parsed.get("content", op.content) - import asyncio loop = asyncio.get_running_loop() embedding = await loop.run_in_executor(None, self.embed_fn, searchable) @@ -741,7 +741,6 @@ async def _snippet_update(self, op: Operation, user_id: str) -> ExecutedOp: parsed = _parse_snippet_content(op.content) searchable = parsed.get("content", op.content) - import asyncio loop = asyncio.get_running_loop() embedding = await loop.run_in_executor(None, self.embed_fn, searchable) @@ -785,7 +784,6 @@ async def _snippet_delete(self, op: Operation) -> ExecutedOp: embedding_id=op.embedding_id, error="No vector store for snippet domain", ) - import asyncio loop = asyncio.get_running_loop() from functools import partial success = await loop.run_in_executor(None, partial(store.delete, ids=[op.embedding_id]))