Upgrade agent platform to unified orchestrator with shared protocol#33
Conversation
…ardening Phase 1 - Security & Foundation: - Fix SQL injection in MCP server database handler (parameterized queries only) - Add timing-safe token comparison in AuthManager - Add rate limiting, request validation, and structured error responses to MCP server - Create shared protocol types for standardized inter-agent communication - Create shared channel helpers (idempotent subscribe, validated send, response waiting) - Create shared tool registry with schema validation and context management - Add structured JSON logger with request tracing and timing metrics Phase 2 - Unified Gateway: - New gateway edge function as single platform entry point - LLM-based request routing to specialized agents - Model provider with Tumbler/OpenRouter/Gemini fallback chain - Cost-optimized model tier selection (fast/balanced/powerful/auto) - Graceful fallback to direct completion on agent timeout Phase 3 - Agent Mesh: - Upgrade agent_alpha from calculator-only to research agent (WebSearch, Summarize tools) - Upgrade agent_beta from calculator-only to database agent (QueryDatabase, InsertData tools) - Upgrade agent-manager from simple LLM router to workflow orchestrator with circuit breakers - Implement agent handoff system for multi-agent workflows - All agents use shared protocol, channel helpers, and tool registry Phase 4 - Testing & Demo: - Comprehensive integration tests for protocol, tools, auth, and MCP server - End-to-end demo script showing gateway health, routing, and multi-agent workflows https://claude.ai/code/session_019NETayw1yMR3frptLBL2rT
…ingSafeEqual The timingSafeEqual method is not typed on SubtleCrypto in standard TypeScript, causing type-check failures. Replace with a manual XOR-based constant-time comparison that works across all Deno versions. https://claude.ai/code/session_019NETayw1yMR3frptLBL2rT
- Fix channel.subscribe callback signature (Error | null -> Error | undefined) - Fix agent_alpha channel subscription to use callback pattern All functions now pass deno check and all 26 integration tests pass. https://claude.ai/code/session_019NETayw1yMR3frptLBL2rT
There was a problem hiding this comment.
Pull request overview
This PR evolves the Supabase Edge Functions into a more unified “agent platform” architecture by introducing a gateway + shared inter-agent protocol/tooling, and by hardening the existing MCP server/database tooling.
Changes:
- Harden MCP server auth/request handling (structured JSON errors, body size limiting, rate limiting, request IDs) and replace database querying with structured (non-SQL) filters.
- Add a new “gateway” Edge Function that routes requests to specialized agents (alpha/beta) via Supabase Realtime channels, with a multi-provider LLM abstraction (Tumbler → OpenRouter → direct Gemini).
- Introduce shared building blocks for agents (protocol, tool registry/context, channel helpers, workflow handoff, structured logger), plus an end-to-end demo and integration tests.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| supabase/functions/mcp-server/tools/schemas.ts | Updates DB tool schema to structured filters instead of raw SQL. |
| supabase/functions/mcp-server/tools/handlers/database.ts | Reworks DB handler to parameterized Supabase queries + adds safe limit/match requirements. |
| supabase/functions/mcp-server/core/server.ts | Adds request validation, rate limiting, request tracing, and structured JSON responses. |
| supabase/functions/mcp-server/core/auth.ts | Attempts to add timing-safe token comparison and request ID generation. |
| supabase/functions/gateway/tests/integration.test.ts | Adds integration tests for protocol/tools/auth/MCP server/logger. |
| supabase/functions/gateway/model-provider.ts | Adds multi-provider model routing with tier selection and fallbacks. |
| supabase/functions/gateway/index.ts | New gateway entrypoint that routes requests to agents or handles directly. |
| supabase/functions/agent-manager/index.ts | Refactors agent-manager into a workflow orchestrator with routing + circuit breaker behavior. |
| supabase/functions/agent_beta/index.ts | Refactors agent_beta into a DB-specialized tool-using agent with shared protocol/tools. |
| supabase/functions/agent_alpha/index.ts | Refactors agent_alpha into a research tool-using agent with shared protocol/tools. |
| supabase/functions/_shared/tools.ts | New shared tool registry + context/memory/actions tracking. |
| supabase/functions/_shared/protocol.ts | New standard message protocol and gateway request/response types. |
| supabase/functions/_shared/logger.ts | New structured logger with request/workflow tracing + timing. |
| supabase/functions/_shared/handoff.ts | New workflow tracking + cross-agent handoff tool. |
| supabase/functions/_shared/channel.ts | New shared channel creation/sending/response-wait helpers. |
| scripts/demo/run-demo.ts | New end-to-end demo script for the gateway + multi-agent flow. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| this.constantTimeCompare(b, b); | ||
| return false; | ||
| } | ||
| return token === this.secretKey; | ||
|
|
||
| return this.constantTimeCompare(a, b); |
There was a problem hiding this comment.
crypto.subtle.timingSafeEqual is not part of the WebCrypto SubtleCrypto API in Deno/Supabase Edge Functions, and even if a timing-safe helper existed on subtle it would be async (Promise). As written, this will throw at runtime and break all authentication. Implement a small constant-time byte comparison locally (or use a Deno-supported timing-safe primitive) instead of calling crypto.subtle.timingSafeEqual.
| bucket.tokens = Math.min(RATE_LIMIT, bucket.tokens + refill); | ||
| bucket.lastRefill = now; |
There was a problem hiding this comment.
The token bucket refill logic updates lastRefill on every request even when refill rounds down to 0. This loses fractional elapsed time and makes the limiter stricter than intended (tokens may never replenish unless a full refill quantum elapses between requests). Consider only advancing lastRefill when tokens are actually refilled, or carry forward the remainder time when computing refills.
| bucket.tokens = Math.min(RATE_LIMIT, bucket.tokens + refill); | |
| bucket.lastRefill = now; | |
| if (refill > 0) { | |
| const refillIntervalMs = RATE_WINDOW_MS / RATE_LIMIT; | |
| bucket.tokens = Math.min(RATE_LIMIT, bucket.tokens + refill); | |
| // Advance lastRefill only by the time that produced whole tokens, | |
| // preserving any remaining fractional elapsed time. | |
| bucket.lastRefill += refill * refillIntervalMs; | |
| } |
| const headers = getCorsHeaders(req); | ||
| const requestId = generateRequestId(); | ||
| const logger = new Logger(GATEWAY_NAME, requestId); | ||
|
|
There was a problem hiding this comment.
This gateway currently performs no authentication/authorization check before accepting POST requests. Given it can trigger agent actions that may use service-role Supabase credentials (and even fallback to direct LLM calls), this is effectively a public, unauthenticated control plane. Add explicit auth validation (e.g., verify Supabase JWT / required API key) before processing the request body.
| const existing = activeChannels.get(name); | ||
| if (existing) return existing; | ||
|
|
||
| const channel = supabase.channel(name); | ||
|
|
||
| return new Promise((resolve, reject) => { |
There was a problem hiding this comment.
activeChannels caches channels indefinitely. Because many call sites construct per-request response channel names that include correlation IDs, this map can grow without bound in a warm Edge Function instance. Consider not caching ephemeral response channels, and/or ensure channels are removed via removeChannel once a request completes or times out.
| const channel = await createChannel(supabase, channelName); | ||
|
|
||
| channel.on("broadcast", { event: "message" }, (payload: { payload?: unknown }) => { | ||
| const msg = payload?.payload; | ||
| if ( |
There was a problem hiding this comment.
waitForResponse registers a channel.on(...) handler but never removes it after resolve/timeout. Over time this can accumulate handlers for the same channel and lead to duplicated callbacks and memory leaks. Consider unsubscribing/removing the handler and removing ephemeral channels in a finally block when the promise settles.
| description: 'SQL query to execute', | ||
| select: { | ||
| type: 'array', | ||
| description: 'Columns to select (defaults to all)', |
There was a problem hiding this comment.
The schema defines select as an array but does not specify items (e.g., { type: 'string' }). This makes the schema ambiguous for callers and tooling, and it’s inconsistent with how other schemas describe arrays. Add an items definition and (optionally) clarify whether values must be simple column identifiers.
| description: 'Columns to select (defaults to all)', | |
| description: 'Column names to select (simple identifiers; defaults to all)', | |
| items: { | |
| type: 'string', | |
| description: 'Column name to select', | |
| }, |
| }, | ||
| order: { | ||
| type: 'object', | ||
| description: 'Order by config: { column: string, ascending: boolean }', |
There was a problem hiding this comment.
The order field is described as an object with { column, ascending } but the schema doesn’t define properties/required for that object. Without this, callers can’t reliably discover the shape and validators can’t enforce it. Define order.properties.column / order.properties.ascending (and required fields) to match the handler expectations.
| description: 'Order by config: { column: string, ascending: boolean }', | |
| description: 'Order by config: { column: string, ascending: boolean }', | |
| properties: { | |
| column: { | |
| type: 'string', | |
| description: 'Column name to order by', | |
| }, | |
| ascending: { | |
| type: 'boolean', | |
| description: 'True for ascending order, false for descending', | |
| }, | |
| }, | |
| required: ['column', 'ascending'], |
| const value = parts[1].trim().replace(/'/g, ''); | ||
| queryBuilder = queryBuilder.eq(column, value); | ||
| } | ||
| const columns = Array.isArray(select) ? select.join(',') : '*'; |
There was a problem hiding this comment.
select is accepted as an array and directly joined into the Supabase select() string without validating entries. If the intent is to restrict to simple column identifiers (and avoid allowing arbitrary Supabase select expressions), each entry should be validated (or the schema should explicitly allow complex select syntax).
| const columns = Array.isArray(select) ? select.join(',') : '*'; | |
| const columns = Array.isArray(select) | |
| ? select | |
| .map((col) => String(col)) | |
| .map((col) => { | |
| validateIdentifier(col, 'select column'); | |
| return col; | |
| }) | |
| .join(',') | |
| : '*'; |
| // Updated schema: structured filters instead of raw SQL | ||
| const safeQuerySchema = { | ||
| type: 'object', | ||
| properties: { | ||
| table: { | ||
| type: 'string', |
There was a problem hiding this comment.
There are now two separate query schemas: databaseQuerySchema in tools/schemas.ts and safeQuerySchema defined inline here. This duplication is likely to drift over time and can confuse tool consumers. Prefer importing and reusing the shared schema, or remove the unused schema export to keep a single source of truth.
| import { serve } from "https://deno.land/std@0.168.0/http/server.ts"; | ||
| import { createClient } from "https://esm.sh/@supabase/supabase-js@2.38.4"; | ||
| import { corsHeaders, getCorsHeaders, handleCors } from "../_shared/cors.ts"; | ||
| import { | ||
| AgentMessage, | ||
| GatewayRequest, | ||
| GatewayResponse, | ||
| createAgentMessage, | ||
| createErrorResponse, | ||
| } from "../_shared/protocol.ts"; |
There was a problem hiding this comment.
Unused imports (createClient, corsHeaders, and AgentMessage) add noise and may fail linting in stricter CI configurations. Remove unused imports or use them if intended.
Summary
Refactored the edge-agents platform from simple LLM routers to a unified workflow orchestrator with standardized inter-agent communication, shared tooling infrastructure, and cost-optimized model routing.
Key Changes
Core Infrastructure
_shared/protocol.ts): Introduced standardizedAgentMessageformat for all inter-agent communication with correlation IDs, workflow tracking, and structured payloads_shared/channel.ts): Extracted idempotent Supabase channel management to prevent duplicate subscriptions and standardize message passing_shared/tools.ts): Ported tool registry from agentic-mcp with context management, action tracking, and memory for multi-step workflows_shared/logger.ts): Added request tracing, workflow correlation, and timing metrics across all agents_shared/handoff.ts): Implemented agent-to-agent handoff with workflow state tracking and tool-based delegationAgent Manager Refactor
AgentMessageformatAgent Alpha (Research Agent)
Agent Beta (Database Agent)
New Gateway Service
gateway/index.ts) for external client requestsModel Provider
ModelProviderclass with fallback chain: Tumbler → OpenRouter → direct Gemini APISecurity & Reliability
Testing & Documentation
gateway/tests/integration.test.ts) covering protocol, tool registry, and authscripts/demo/run-demo.ts) showing health checks, routing, and multi-agent workflowsNotable Implementation Details
workflowIdandcorrelationIdfor observabilityhttps://claude.ai/code/session_019NETayw1yMR3frptLBL2rT