diff --git a/.jules/bolt.md b/.jules/bolt.md new file mode 100644 index 0000000..0600d9a --- /dev/null +++ b/.jules/bolt.md @@ -0,0 +1,3 @@ +## 2024-03-16 - [Optimize non-batched vector operations in Weaver] +**Learning:** Network/I/O bound operations in an asynchronous environment should not be sequentially looped. The `Weaver` pipeline was executing non-batched operations sequentially, causing performance to degrade linearly with the number of operations (`O(N)`). +**Action:** Used `asyncio.gather` to concurrently execute non-batched operations, turning an `O(N)` network wait time into approximately `O(1)` (limited by connection pool and concurrency caps). Applied to `src/pipelines/weaver.py`'s execution method. Always look for synchronous or sequentially awaited operations when iterating over a batch of network requests. diff --git a/server.py b/server.py index 6f2cc84..241f2b7 100644 --- a/server.py +++ b/server.py @@ -13,7 +13,6 @@ import asyncio import logging import time -import uuid from contextlib import asynccontextmanager from pathlib import Path from typing import Any, Dict, List @@ -21,12 +20,10 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse -from fastapi.staticfiles import StaticFiles from pydantic import BaseModel # ── Project root setup ──────────────────────────────────────────── import sys -import os PROJECT_ROOT = Path(__file__).resolve().parent sys.path.insert(0, str(PROJECT_ROOT)) diff --git a/src/api/app.py b/src/api/app.py index 4e9539e..f819f89 100644 --- a/src/api/app.py +++ b/src/api/app.py @@ -30,7 +30,6 @@ from src.api.routes.memory import router as memory_router from src.api.routes.scanner import router as scanner_router from src.api.schemas import APIResponse, StatusEnum -from src.config import settings logger = logging.getLogger("xmem.api") diff --git a/src/api/routes/scanner.py b/src/api/routes/scanner.py index 50e8807..1f56c1c 100644 --- a/src/api/routes/scanner.py +++ b/src/api/routes/scanner.py @@ -70,7 +70,7 @@ def _parse_github_url(url: str) -> tuple: if m: return m.group(1), m.group(2) raise ValueError( - f"Invalid GitHub URL. Expected format: https://github.com/org/repo" + "Invalid GitHub URL. Expected format: https://github.com/org/repo" ) diff --git a/src/api/schemas.py b/src/api/schemas.py index 7c9b3a0..62b5542 100644 --- a/src/api/schemas.py +++ b/src/api/schemas.py @@ -7,7 +7,6 @@ from __future__ import annotations -from datetime import datetime from enum import Enum from typing import Any, Dict, List, Optional diff --git a/src/config/logging.py b/src/config/logging.py index 1e3f70c..b272071 100644 --- a/src/config/logging.py +++ b/src/config/logging.py @@ -41,13 +41,12 @@ import logging -from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler +from logging.handlers import RotatingFileHandler import sys -import os from pathlib import Path from typing import Optional from enum import Enum -from dataclasses import dataclass, field +from dataclasses import dataclass class LogLevel(str, Enum): diff --git a/src/pipelines/code_retrieval.py b/src/pipelines/code_retrieval.py index 69e4abc..51e765f 100644 --- a/src/pipelines/code_retrieval.py +++ b/src/pipelines/code_retrieval.py @@ -37,7 +37,6 @@ from src.scanner.code_store import CodeStore from src.schemas.code import ( annotations_namespace, - directories_namespace, files_namespace, snippets_namespace, symbols_namespace, diff --git a/src/pipelines/ingest.py b/src/pipelines/ingest.py index c1cb3a4..7eee427 100644 --- a/src/pipelines/ingest.py +++ b/src/pipelines/ingest.py @@ -82,7 +82,7 @@ ) from src.schemas.events import EventResult from src.schemas.image import ImageResult -from src.schemas.judge import JudgeDomain, JudgeResult, OperationType +from src.schemas.judge import JudgeDomain, JudgeResult from src.schemas.profile import ProfileResult from src.schemas.summary import SummaryResult from src.schemas.weaver import WeaverResult diff --git a/src/pipelines/retrieval.py b/src/pipelines/retrieval.py index d54cc0d..ec8a29d 100644 --- a/src/pipelines/retrieval.py +++ b/src/pipelines/retrieval.py @@ -21,7 +21,6 @@ from __future__ import annotations import logging -import os from typing import Any, Callable, Dict, List, Optional from dotenv import load_dotenv diff --git a/src/pipelines/weaver.py b/src/pipelines/weaver.py index 8138436..787029d 100644 --- a/src/pipelines/weaver.py +++ b/src/pipelines/weaver.py @@ -92,9 +92,11 @@ async def execute( batched_executed = await self._execute_batched_vector(judge_result.operations, domain, user_id) result.executed.extend(batched_executed) else: - for op in judge_result.operations: - executed = await self._execute_one(op, domain, user_id) - result.executed.append(executed) + import asyncio + # Execute non-batched operations concurrently to significantly reduce latency + tasks = [self._execute_one(op, domain, user_id) for op in judge_result.operations] + executed_ops = await asyncio.gather(*tasks) + result.executed.extend(executed_ops) self._log_summary(domain, result) return result diff --git a/src/prompts/profiler_topics.py b/src/prompts/profiler_topics.py index f2f6c74..3b08223 100644 --- a/src/prompts/profiler_topics.py +++ b/src/prompts/profiler_topics.py @@ -3,7 +3,6 @@ from dataclasses import dataclass, field from typing import Dict, List, Union -from src.config.constants import LLM_TAB_SEPARATOR @dataclass diff --git a/src/prompts/summarizer.py b/src/prompts/summarizer.py index af956a1..c7b3f70 100644 --- a/src/prompts/summarizer.py +++ b/src/prompts/summarizer.py @@ -1,7 +1,7 @@ from __future__ import annotations from functools import lru_cache -from typing import List, Tuple +from typing import List import inspect from src.prompts.examples.summary import SUMMARY_EXAMPLES diff --git a/src/scanner/ast_parser.py b/src/scanner/ast_parser.py index 84d3cb4..4e155d2 100644 --- a/src/scanner/ast_parser.py +++ b/src/scanner/ast_parser.py @@ -20,10 +20,8 @@ import hashlib import logging import re -import textwrap from dataclasses import dataclass, field -from pathlib import Path -from typing import Any, Dict, List, Optional, Set, Tuple +from typing import Any, Dict, List, Optional, Tuple # Tree-sitter imports (optional — graceful degradation if not installed) try: diff --git a/src/scanner/git_ops.py b/src/scanner/git_ops.py index 9a3e3b8..a79c6bd 100644 --- a/src/scanner/git_ops.py +++ b/src/scanner/git_ops.py @@ -8,7 +8,6 @@ from __future__ import annotations import logging -import os import subprocess from dataclasses import dataclass, field from enum import Enum diff --git a/src/scanner/indexer.py b/src/scanner/indexer.py index 6e44b39..db0af4d 100644 --- a/src/scanner/indexer.py +++ b/src/scanner/indexer.py @@ -32,10 +32,8 @@ from src.scanner.ast_parser import ParsedFile, ParsedSymbol, parse_file, compute_content_hash from src.scanner.code_store import CodeStore from src.scanner.git_ops import ( - DiffResult, clone_or_pull, get_diff, - get_head_sha, get_language, list_all_files, should_skip_file, diff --git a/src/scanner/runner.py b/src/scanner/runner.py index b5e9852..9279aa2 100644 --- a/src/scanner/runner.py +++ b/src/scanner/runner.py @@ -58,7 +58,6 @@ import os import sys import time -from pathlib import Path from typing import Any, Dict, List from dotenv import load_dotenv diff --git a/src/schemas/retrieval.py b/src/schemas/retrieval.py index 8896726..ae58392 100644 --- a/src/schemas/retrieval.py +++ b/src/schemas/retrieval.py @@ -5,7 +5,7 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List @dataclass diff --git a/src/schemas/summary.py b/src/schemas/summary.py index a4f7f75..c9d8eda 100644 --- a/src/schemas/summary.py +++ b/src/schemas/summary.py @@ -1,5 +1,4 @@ from __future__ import annotations -from typing import List from pydantic import BaseModel, Field diff --git a/src/storage/base.py b/src/storage/base.py index a0ddde2..e45e291 100644 --- a/src/storage/base.py +++ b/src/storage/base.py @@ -67,10 +67,7 @@ def process_memories(store: BaseVectorStore): # <- Takes ANY vector store from enum import Enum from ..config import get_logger from ..utils.exceptions import ( - VectorStoreError, - VectorStoreConnectionError, VectorStoreValidationError, - VectorNotFoundError, ) logger = get_logger(__name__) diff --git a/src/utils/retry.py b/src/utils/retry.py index 09e83e9..75f3061 100644 --- a/src/utils/retry.py +++ b/src/utils/retry.py @@ -50,7 +50,7 @@ def another_api_call(): import time import logging from dataclasses import dataclass, field -from .exceptions import XMemError, ValidationError +from .exceptions import ValidationError logger = logging.getLogger(__name__) T = TypeVar("T") diff --git a/test_weaver_perf.py b/test_weaver_perf.py new file mode 100644 index 0000000..52d263c --- /dev/null +++ b/test_weaver_perf.py @@ -0,0 +1,38 @@ +import asyncio +import time + +class MockJudgeResult: + def __init__(self, n_ops=10): + self.is_empty = False + self.has_writes = True + self.operations = [{"type": "ADD", "content": f"test {i}"} for i in range(n_ops)] + +class MockWeaver: + async def _execute_one(self, op, domain, user_id): + await asyncio.sleep(0.1) # Simulate DB op + return f"Executed {op['content']}" + +async def main_sequential(): + weaver = MockWeaver() + judge_result = MockJudgeResult() + start = time.time() + result = [] + for op in judge_result.operations: + executed = await weaver._execute_one(op, "domain", "user_id") + result.append(executed) + print(f"Sequential took: {time.time() - start:.2f}s") + +async def main_concurrent(): + weaver = MockWeaver() + judge_result = MockJudgeResult() + start = time.time() + + # Run concurrently using asyncio.gather + tasks = [weaver._execute_one(op, "domain", "user_id") for op in judge_result.operations] + result = await asyncio.gather(*tasks) + + print(f"Concurrent took: {time.time() - start:.2f}s") + +if __name__ == "__main__": + asyncio.run(main_sequential()) + asyncio.run(main_concurrent())