Воркер в стиле Clean Architecture и тактического DDD: забирает вопросы из NATS, гоняет RAG-пайплайн (LangGraph + FAISS + OpenAI), стримит токены обратно в NATS, пишет статусы и ответы в Redis.
src/app/
├── domain/ # Чистый домен — value objects, исключения, без I/O
├── application/
│ ├── commands/ # ProcessQuestionHandler (одна команда)
│ └── common/ports/ # Порты: RagPipeline, StatusWriter и т.д.
├── infrastructure/
│ ├── adapters/ # Redis, NATS
│ ├── knowledge/ # YAML → доменные документы + FAISS (кеш)
│ └── rag/ # LangGraph: узлы, промпты, состояние графа
├── presentation/
│ └── messaging/ # NatsQuestionConsumer — «контроллер» брокера
└── setup/
├── config/ # WorkerSettings, structlog
└── ioc/ # Dishka: settings, infrastructure, application
Обработчик зависит только от абстрактного RagPipeline:
class RagPipeline(ABC):
def stream_tokens(self, session_id, question) -> AsyncIterator[str]: ...Реализация в инфраструктуре (LangGraphRagPipeline). Смена LLM или retrieval — новый адаптер, слой application не трогаем.
loader.py → доменные KnowledgeDocument без LangChain.
vector_store.py → перевод в LangChain и FAISS.
SHA-256 по (path, mtime_ns, size) для каждого YAML; при изменении знаний индекс перестраивается, лишний embed на каждый старт не делаем.
Как HTTP-контроллер мапит запрос в DTO, так consumer мапит сырой NATS → ProcessQuestionCommand; бизнес-логика только в хендлере.
Нужны запущенные Redis и NATS (например, из репозитория assistant-chat-backend).
make install # poetry install --extras dev
cp .env.example .env # задать OPENAI_API_KEY и пути (см. .env.example)
mkdir -p knowledge .vector_cacheУбедитесь, что в .env для хоста указаны REDIS_URL / NATS_URL на localhost (как в примере), а KNOWLEDGE_DIR / VECTOR_CACHE_DIR — на каталоги с YAML и кешом на машине.
Запуск воркера:
make runПроверка: GET http://localhost:8001/health
Логи: JSON в stdout (structlog); при заголовке X-Trace-Id он попадает в контекст логов.
Redis и NATS поднимаются в assistant-chat-backend (там же создаётся сеть assistant-net). В этом репозитории — только сервис rag-worker, подключённый к внешней сети assistant-net.
- В каталоге backend:
docker compose up -d(чтобы сеть и брокеры были живы). - В
.envворкера — URL видаredis://redis:6379/0,nats://nats:4222(илиassistant-redis/assistant-nats, если используетеcontainer_name), плюсKNOWLEDGE_DIR=/app/knowledge,VECTOR_CACHE_DIR=/app/.vector_cache. - Сборка и старт:
cp .env.example .env
mkdir -p knowledge .vector_cache
make app-build
make appЛоги воркера:
make logsОстановка контейнера воркера:
make app-downПересборка и перезапуск:
make app-restartСнаружи публикуется порт 8001 (HTTP health).
| Цель | Действие |
|---|---|
make install |
Установить зависимости (включая dev) |
make run |
Локально запустить воркер (PYTHONPATH=src) |
make app |
docker compose up -d |
make app-build |
Пересобрать образы |
make app-down |
Остановить compose-стек воркера |
make app-restart |
down + up -d |
make logs |
docker compose logs -f rag-worker |
make test |
Тесты |
make typecheck |
mypy src |
make lint |
ruff check . |
make clean |
Очистка кешей |
make testЮнит-тесты не требуют Redis, NATS и OpenAI.
- Порт —
AnswerCache(ABC)вapplication/common/ports/. - Адаптер — например
RedisAnswerCacheвinfrastructure/adapters/. - Провайдер — в
setup/ioc/infrastructure.py. - Хендлер — внедрить и вызвать в
ProcessQuestionHandler.
Остальные слои можно не трогать.