diff --git a/packages/modal-infra/src/app.py b/packages/modal-infra/src/app.py index ca0b1a21..609cd955 100644 --- a/packages/modal-infra/src/app.py +++ b/packages/modal-infra/src/app.py @@ -29,10 +29,9 @@ # Secrets for LLM API keys - defined in Modal dashboard or CLI # These are injected into sandboxes but never stored in snapshots -llm_secrets = modal.Secret.from_name( - "llm-api-keys", - required_keys=["ANTHROPIC_API_KEY"], -) +# Either ANTHROPIC_API_KEY or CLAUDE_CODE_OAUTH_TOKEN can be set +# OAuth token takes precedence (uses subscription billing, much cheaper) +llm_secrets = modal.Secret.from_name("llm-api-keys") # Secrets for GitHub App - used for git operations (clone, push) # These are used to generate installation tokens, NOT injected into sandboxes diff --git a/packages/modal-infra/src/images/__init__.py b/packages/modal-infra/src/images/__init__.py index 79ab9f4a..5bf6acb1 100644 --- a/packages/modal-infra/src/images/__init__.py +++ b/packages/modal-infra/src/images/__init__.py @@ -1,5 +1,5 @@ """Image definitions for Open-Inspect sandboxes.""" -from .base import OPENCODE_VERSION, base_image +from .base import CACHE_BUSTER, base_image -__all__ = ["OPENCODE_VERSION", "base_image"] +__all__ = ["CACHE_BUSTER", "base_image"] diff --git a/packages/modal-infra/src/images/base.py b/packages/modal-infra/src/images/base.py index e05c761a..c839dd1d 100644 --- a/packages/modal-infra/src/images/base.py +++ b/packages/modal-infra/src/images/base.py @@ -5,7 +5,7 @@ - Debian slim base with git, curl, build-essential - Node.js 22 LTS, pnpm, Bun runtime - Python 3.12 with uv -- OpenCode CLI pre-installed +- Claude Code ACP adapter for agent communication - Playwright with headless Chrome for visual verification - Sandbox entrypoint and bridge code """ @@ -17,14 +17,9 @@ # Get the path to the sandbox code SANDBOX_DIR = Path(__file__).parent.parent / "sandbox" -# Plugin is now bundled with sandbox code at /app/sandbox/inspect-plugin.js - -# OpenCode version to install -OPENCODE_VERSION = "latest" - # Cache buster - change this to force Modal image rebuild -# v34: Replace print() with structured JSON logging in sandbox code -CACHE_BUSTER = "v34-structured-logging" +# v41: Cache tool titles from ToolCallStart for use in ToolCallUpdate +CACHE_BUSTER = "v45-acp-10mb-buffer" # Base image with all development tools base_image = ( @@ -64,6 +59,13 @@ "node --version", "npm --version", ) + # Install GitHub CLI (gh) + .run_commands( + "curl -fsSL https://cli.github.com/packages/githubcli-archive-keyring.gpg | dd of=/usr/share/keyrings/githubcli-archive-keyring.gpg", + 'echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/githubcli-archive-keyring.gpg] https://cli.github.com/packages stable main" | tee /etc/apt/sources.list.d/github-cli.list > /dev/null', + "apt-get update && apt-get install -y gh", + "gh --version", + ) # Install pnpm and Bun .run_commands( # Install pnpm globally @@ -83,14 +85,13 @@ "playwright", "pydantic>=2.0", # Required for sandbox types "PyJWT[crypto]", # For GitHub App token generation (includes cryptography) + "agent-client-protocol", # Python ACP SDK for agent communication ) - # Install OpenCode CLI and plugin for custom tools + # Install Claude Code ACP adapter for agent communication + # Uses Zed's wrapper which bridges Claude Agent SDK <-> ACP protocol .run_commands( - "npm install -g opencode-ai@latest", - "opencode --version || echo 'OpenCode installed'", - # Install @opencode-ai/plugin globally for custom tools - # This ensures tools can import the plugin without needing to run bun add - "npm install -g @opencode-ai/plugin@latest zod", + "npm install -g @zed-industries/claude-code-acp@latest", + "claude-code-acp --version || echo 'Claude Code ACP installed'", ) # Install Playwright browsers (Chromium only to save space) .run_commands( @@ -101,8 +102,7 @@ .run_commands( "mkdir -p /workspace", "mkdir -p /app/plugins", - "mkdir -p /tmp/opencode", - "echo 'Image rebuilt at: v21-force-rebuild' > /app/image-version.txt", + f"echo 'Image rebuilt at: {CACHE_BUSTER}' > /app/image-version.txt", ) # Set environment variables (including cache buster to force rebuild) .env( @@ -114,11 +114,11 @@ "PLAYWRIGHT_BROWSERS_PATH": "/root/.cache/ms-playwright", "PYTHONPATH": "/app", "SANDBOX_VERSION": CACHE_BUSTER, - # NODE_PATH for globally installed modules (used by custom tools) - "NODE_PATH": "/usr/lib/node_modules", + # Auto-approve all permissions in sandbox (running in isolated container) + "CLAUDE_CODE_SKIP_PERMISSIONS": "true", } ) - # Add sandbox code to the image (includes plugin at /app/sandbox/inspect-plugin.js) + # Add sandbox code to the image .add_local_dir( str(SANDBOX_DIR), remote_path="/app/sandbox", diff --git a/packages/modal-infra/src/sandbox/bridge.py b/packages/modal-infra/src/sandbox/bridge.py index 510b60e8..8540c466 100644 --- a/packages/modal-infra/src/sandbox/bridge.py +++ b/packages/modal-infra/src/sandbox/bridge.py @@ -4,7 +4,8 @@ This module handles: - WebSocket connection to control plane Durable Object - Heartbeat loop for connection health -- Event forwarding from OpenCode to control plane +- ACP (Agent Client Protocol) communication with Claude Code +- Event forwarding from ACP agent to control plane - Command handling from control plane (prompt, stop, snapshot) - Git identity configuration per prompt author """ @@ -13,22 +14,45 @@ import asyncio import json import os -import secrets import subprocess import tempfile import time -from collections.abc import AsyncIterator from pathlib import Path -from typing import Any, ClassVar, NamedTuple +from typing import TYPE_CHECKING, Any, NamedTuple -import httpx import websockets + +# ACP SDK - handles JSON-RPC over stdio +from acp import ( + Client as ACPClientBase, +) +from acp import ( + RequestError, + spawn_agent_process, + text_block, +) +from acp.schema import ( + AgentMessageChunk, + AgentThoughtChunk, + AllowedOutcome, + NewSessionResponse, + PermissionOption, + PromptResponse, + RequestPermissionResponse, + TextContentBlock, + ToolCall, + ToolCallStart, + ToolCallUpdate, +) from websockets import ClientConnection, State from websockets.exceptions import InvalidStatus from .log_config import configure_logging, get_logger from .types import GitUser +if TYPE_CHECKING: + from acp.core import ClientSideConnection + configure_logging() @@ -39,90 +63,164 @@ class TokenResolution(NamedTuple): source: str -class OpenCodeIdentifier: - """ - Generate OpenCode-compatible ascending IDs. - - Port of OpenCode's TypeScript implementation: - https://github.com/anomalyco/opencode/blob/8f0d08fae07c97a090fcd31d0d4c4a6fa7eeaa1d/packages/opencode/src/id/id.ts - - Format: {prefix}_{timestamp_hex}{random_base62} - - prefix: type identifier (e.g., "msg" for messages) - - timestamp_hex: 12 hex chars encoding (timestamp_ms * 0x1000 + counter) - - random_base62: 14 random base62 characters - - IDs are monotonically increasing, ensuring new user messages always have - IDs greater than previous assistant messages (required for OpenCode's - prompt loop). +class SessionTerminatedError(Exception): + """Raised when the control plane has terminated the session (HTTP 410). - Note: Uses class-level state for monotonic generation. Safe for async code - but NOT thread-safe. + This is a non-recoverable error - the bridge should exit gracefully + rather than retry. The session can be restored via user action (sending + a new prompt), which will trigger snapshot restoration on the control plane. """ - PREFIXES: ClassVar[dict[str, str]] = { - "session": "ses", - "message": "msg", - "part": "prt", - } - BASE62_CHARS: ClassVar[str] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" - RANDOM_LENGTH: ClassVar[int] = 14 - - _last_timestamp: ClassVar[int] = 0 - _counter: ClassVar[int] = 0 - - @classmethod - def ascending(cls, prefix: str) -> str: - """Generate an ascending ID with the given prefix.""" - if prefix not in cls.PREFIXES: - raise ValueError(f"Unknown prefix: {prefix}") + pass - prefix_str = cls.PREFIXES[prefix] - current_timestamp = int(time.time() * 1000) - if current_timestamp != cls._last_timestamp: - cls._last_timestamp = current_timestamp - cls._counter = 0 - cls._counter += 1 +class ACPClient(ACPClientBase): + """ + ACP Client implementation for sandbox environment. - encoded = current_timestamp * 0x1000 + cls._counter - encoded_48bit = encoded & 0xFFFFFFFFFFFF - timestamp_bytes = encoded_48bit.to_bytes(6, byteorder="big") - timestamp_hex = timestamp_bytes.hex() - random_suffix = cls._random_base62(cls.RANDOM_LENGTH) + Handles permission requests, file operations, and terminal management + for the Claude Code agent running in the sandbox. + """ - return f"{prefix_str}_{timestamp_hex}{random_suffix}" + def __init__(self, bridge: "AgentBridge"): + self.bridge = bridge + self.log = bridge.log - @classmethod - def _random_base62(cls, length: int) -> str: - """Generate random base62 string.""" - return "".join(cls.BASE62_CHARS[secrets.randbelow(62)] for _ in range(length)) + async def request_permission( + self, + options: list[PermissionOption], + session_id: str, + tool_call: ToolCallUpdate, + **kwargs: Any, + ) -> RequestPermissionResponse: + """Auto-approve all permissions in sandbox environment. + The sandbox runs in an isolated container with no access to user + systems, so all permissions can be safely granted. + """ + # Find an "allow" option to select + allow_option = None + for opt in options: + if opt.kind in ("allow_once", "allow_always"): + allow_option = opt + break + + if allow_option: + self.log.debug( + "acp.permission_auto_approve", + option_id=allow_option.option_id, + option_kind=allow_option.kind, + tool=tool_call.title if tool_call else None, + ) + return RequestPermissionResponse( + outcome=AllowedOutcome(outcome="selected", option_id=allow_option.option_id) + ) + else: + # Fallback: if no allow option found, use first option + first_option = options[0] if options else None + if first_option: + self.log.debug( + "acp.permission_fallback", + option_id=first_option.option_id, + option_kind=first_option.kind, + ) + return RequestPermissionResponse( + outcome=AllowedOutcome(outcome="selected", option_id=first_option.option_id) + ) + # No options at all - this shouldn't happen + self.log.warn("acp.permission_no_options") + raise ValueError("No permission options provided") -class SSEConnectionError(Exception): - """Raised when SSE connection fails.""" + async def write_text_file( + self, + session_id: str, + path: str, + text: str, + **kwargs: Any, + ) -> None: + """Write a text file to the filesystem.""" + self.log.debug("acp.write_file", path=path) + Path(path).write_text(text) + + async def read_text_file( + self, + session_id: str, + path: str, + **kwargs: Any, + ) -> str: + """Read a text file from the filesystem.""" + self.log.debug("acp.read_file", path=path) + return Path(path).read_text() + + async def create_terminal( + self, + session_id: str, + command: str | None = None, + **kwargs: Any, + ) -> str: + """Create a terminal - not implemented in sandbox context.""" + raise RequestError.method_not_found("create_terminal not implemented") - pass + async def terminal_output( + self, + session_id: str, + terminal_id: str, + output: str, + **kwargs: Any, + ) -> None: + """Send terminal output - not implemented in sandbox context.""" + raise RequestError.method_not_found("terminal_output not implemented") + + async def wait_for_terminal_exit( + self, + session_id: str, + terminal_id: str, + **kwargs: Any, + ) -> int: + """Wait for terminal exit - not implemented in sandbox context.""" + raise RequestError.method_not_found("wait_for_terminal_exit not implemented") + async def kill_terminal_command( + self, + session_id: str, + terminal_id: str, + **kwargs: Any, + ) -> None: + """Kill terminal command - not implemented in sandbox context.""" + raise RequestError.method_not_found("kill_terminal_command not implemented") -class SessionTerminatedError(Exception): - """Raised when the control plane has terminated the session (HTTP 410). + async def release_terminal( + self, + session_id: str, + terminal_id: str, + **kwargs: Any, + ) -> None: + """Release terminal - not implemented in sandbox context.""" + raise RequestError.method_not_found("release_terminal not implemented") - This is a non-recoverable error - the bridge should exit gracefully - rather than retry. The session can be restored via user action (sending - a new prompt), which will trigger snapshot restoration on the control plane. - """ + async def session_update( + self, + session_id: str, + update: Any, + source: str | None = None, + ) -> None: + """Handle session updates from the agent. - pass + This is the main callback for receiving streaming events from Claude Code. + We forward these to the control plane as appropriate event types. + """ + await self.bridge._handle_acp_session_update(session_id, update, source) class AgentBridge: """ - Bridge between sandbox OpenCode instance and control plane. + Bridge between sandbox ACP agent and control plane. Handles: - WebSocket connection management with reconnection - Heartbeat for connection health - - Event streaming from OpenCode to control plane + - ACP subprocess management and communication + - Event streaming from ACP to control plane - Command handling (prompt, stop, snapshot, shutdown) - Git identity management per prompt author """ @@ -130,14 +228,7 @@ class AgentBridge: HEARTBEAT_INTERVAL = 30.0 RECONNECT_BACKOFF_BASE = 2.0 RECONNECT_MAX_DELAY = 60.0 - SSE_INACTIVITY_TIMEOUT = 120.0 - SSE_INACTIVITY_TIMEOUT_MIN = 5.0 - SSE_INACTIVITY_TIMEOUT_MAX = 3600.0 - HTTP_CONNECT_TIMEOUT = 30.0 - HTTP_DEFAULT_TIMEOUT = 30.0 - OPENCODE_REQUEST_TIMEOUT = 10.0 PROMPT_MAX_DURATION = 5400.0 - MAX_PENDING_PART_EVENTS = 2000 def __init__( self, @@ -145,14 +236,11 @@ def __init__( session_id: str, control_plane_url: str, auth_token: str, - opencode_port: int = 4096, ): self.sandbox_id = sandbox_id self.session_id = session_id self.control_plane_url = control_plane_url self.auth_token = auth_token - self.opencode_port = opencode_port - self.opencode_base_url = f"http://localhost:{opencode_port}" # Logger self.log = get_logger( @@ -162,24 +250,23 @@ def __init__( session_id=session_id, ) - self.sse_inactivity_timeout = self._resolve_timeout_seconds( - name="BRIDGE_SSE_INACTIVITY_TIMEOUT", - default=self.SSE_INACTIVITY_TIMEOUT, - min_value=self.SSE_INACTIVITY_TIMEOUT_MIN, - max_value=self.SSE_INACTIVITY_TIMEOUT_MAX, - ) - self.ws: ClientConnection | None = None self.shutdown_event = asyncio.Event() self.git_sync_complete = asyncio.Event() - # Session state - self.opencode_session_id: str | None = None - self.session_id_file = Path(tempfile.gettempdir()) / "opencode-session-id" + # ACP state + self.acp_conn: ClientSideConnection | None = None + self.acp_process: asyncio.subprocess.Process | None = None + self.acp_session_id: str | None = None + self._acp_context: Any | None = None # Async context manager for spawn_agent_process + self.acp_session_id_file = Path(tempfile.gettempdir()) / "acp-session-id" self.repo_path = Path("/workspace") - # HTTP client for OpenCode API - self.http_client: httpx.AsyncClient | None = None + # Current prompt tracking + self._current_message_id: str | None = None + self._prompt_task: asyncio.Task[None] | None = None + # Cache tool titles by tool_call_id (ToolCallUpdate loses the title) + self._tool_titles: dict[str, str] = {} @property def ws_url(self) -> str: @@ -195,14 +282,7 @@ async def run(self) -> None: """ self.log.info("bridge.run_start") - self.http_client = httpx.AsyncClient( - timeout=httpx.Timeout( - self.HTTP_DEFAULT_TIMEOUT, - connect=self.HTTP_CONNECT_TIMEOUT, - ) - ) await self._load_session_id() - reconnect_attempts = 0 try: @@ -258,8 +338,7 @@ async def run(self) -> None: await asyncio.sleep(delay) finally: - if self.http_client: - await self.http_client.aclose() + await self._cleanup_acp() def _is_fatal_connection_error(self, error_str: str) -> bool: """Check if a connection error is fatal and shouldn't trigger retry. @@ -308,7 +387,7 @@ async def _connect_and_run(self) -> None: { "type": "ready", "sandboxId": self.sandbox_id, - "opencodeSessionId": self.opencode_session_id, + "acpSessionId": self.acp_session_id, } ) @@ -397,6 +476,7 @@ async def _handle_command(self, cmd: dict[str, Any]) -> asyncio.Task[None] | Non if cmd_type == "prompt": message_id = cmd.get("messageId") or cmd.get("message_id", "unknown") task = asyncio.create_task(self._handle_prompt(cmd)) + self._prompt_task = task def handle_task_exception(t: asyncio.Task[None], mid: str = message_id) -> None: if t.cancelled(): @@ -438,8 +518,129 @@ def handle_task_exception(t: asyncio.Task[None], mid: str = message_id) -> None: self.log.debug("bridge.unknown_command", cmd_type=cmd_type) return None + async def _ensure_acp_agent(self) -> None: + """Ensure ACP agent subprocess is running and connected. + + Uses spawn_agent_process from the ACP SDK which handles proper + stdio stream setup for JSON-RPC communication. + """ + if self.acp_conn is not None and self.acp_process is not None: + # Check if process is still alive + if self.acp_process.returncode is None: + return + # Process died, clean up + self.log.warn("acp.process_died", returncode=self.acp_process.returncode) + await self._cleanup_acp() + + self.log.info("acp.spawn_start") + + # Determine working directory + workdir = self.repo_path + repo_dirs = list(self.repo_path.glob("*/.git")) + if repo_dirs: + workdir = repo_dirs[0].parent + + # Build environment for Claude Code ACP + # OAuth token takes precedence over API key (uses user's subscription, much cheaper) + env = {**os.environ} + oauth_token = os.environ.get("CLAUDE_CODE_OAUTH_TOKEN") + if oauth_token: + self.log.info("acp.auth_method", method="oauth_token") + env["CLAUDE_CODE_OAUTH_TOKEN"] = oauth_token + env.pop("ANTHROPIC_API_KEY", None) # Don't use API key when OAuth available + else: + self.log.info("acp.auth_method", method="api_key") + + # Create ACP client for handling callbacks + client = ACPClient(self) + + # ACP command is configurable via ACP_COMMAND env var + # Supports space-separated command + args (e.g., "opencode acp") + # Defaults to "claude-code-acp" + acp_command_str = os.environ.get("ACP_COMMAND", "claude-code-acp") + acp_parts = acp_command_str.split() + self.log.info("acp.command", command=acp_parts) + + # Use spawn_agent_process from ACP SDK - it's an async context manager + # We manually manage the context to keep the connection alive + # Increase stdio buffer limit to 10MB (default 64KB is too small for large responses) + self._acp_context = spawn_agent_process( + client, + acp_parts[0], + *acp_parts[1:], + env=env, + cwd=workdir, + transport_kwargs={"limit": 10 * 1024 * 1024}, + ) + + # Enter the context manually (it's wrapped in @asynccontextmanager) + self.acp_conn, self.acp_process = await self._acp_context.__aenter__() + + self.log.info("acp.spawn_complete", pid=self.acp_process.pid) + + # Initialize connection + init_response = await self.acp_conn.initialize( + protocol_version=1, + capabilities={}, + ) + self.log.info( + "acp.initialized", + protocol_version=init_response.protocol_version, + ) + + async def _ensure_acp_session(self) -> str: + """Ensure ACP session exists and return session ID.""" + await self._ensure_acp_agent() + + if self.acp_session_id: + return self.acp_session_id + + if self.acp_conn is None: + raise RuntimeError("ACP connection not initialized") + + # Determine working directory + workdir = self.repo_path + repo_dirs = list(self.repo_path.glob("*/.git")) + if repo_dirs: + workdir = repo_dirs[0].parent + + # Create new session + response: NewSessionResponse = await self.acp_conn.new_session( + cwd=str(workdir), + mcp_servers=[], + ) + + session_id = response.session_id + if not session_id: + raise RuntimeError("ACP session creation failed - no session ID returned") + + self.acp_session_id = session_id + self.log.info("acp.session_created", acp_session_id=self.acp_session_id) + + await self._save_session_id() + return session_id + + async def _cleanup_acp(self) -> None: + """Clean up ACP connection and process. + + Properly exits the spawn_agent_process context manager to ensure + clean shutdown of the subprocess and connection. + """ + # Exit the async context manager properly + if self._acp_context is not None: + try: + # Exit the context manager (it's wrapped in @asynccontextmanager) + await self._acp_context.__aexit__(None, None, None) + except Exception as e: + self.log.debug("acp.context_close_error", exc=e) + self._acp_context = None + + # The context manager should have cleaned up, but ensure conn/process are None + self.acp_conn = None + self.acp_process = None + async def _handle_prompt(self, cmd: dict[str, Any]) -> None: - """Handle prompt command - send to OpenCode and stream response.""" + """Handle prompt command - send to ACP agent and stream response.""" message_id = cmd.get("messageId") or cmd.get("message_id", "unknown") content = cmd.get("content", "") model = cmd.get("model") @@ -453,6 +654,11 @@ async def _handle_prompt(self, cmd: dict[str, Any]) -> None: model=model, ) + # Set current message ID for event forwarding + self._current_message_id = message_id + self._tool_titles = {} # Reset tool title cache for new prompt + + # Configure git identity if provided github_name = author_data.get("githubName") github_email = author_data.get("githubEmail") if github_name and github_email: @@ -463,12 +669,35 @@ async def _handle_prompt(self, cmd: dict[str, Any]) -> None: ) ) - if not self.opencode_session_id: - await self._create_opencode_session() - try: - async for event in self._stream_opencode_response_sse(message_id, content, model): - await self._send_event(event) + # Ensure ACP agent is running and session exists + session_id = await self._ensure_acp_session() + + if self.acp_conn is None: + raise RuntimeError("ACP connection not initialized") + + # Set model if specified (optional - not all ACP agents support this) + if model: + try: + self.log.debug("acp.set_model", model=model) + await self.acp_conn.set_session_model( + session_id=session_id, + model_id=model, + ) + except Exception as e: + # Model setting not supported by this agent - continue without it + self.log.debug("acp.set_model_unsupported", model=model, error=str(e)) + + # Send prompt via ACP + response: PromptResponse = await self.acp_conn.prompt( + session_id=session_id, + prompt=[text_block(content)], + ) + + self.log.info( + "prompt.complete", + stop_reason=response.stop_reason, + ) await self._send_event( { @@ -490,6 +719,7 @@ async def _handle_prompt(self, cmd: dict[str, Any]) -> None: } ) finally: + self._current_message_id = None duration_ms = int((time.time() - start_time) * 1000) self.log.info( "prompt.run", @@ -499,537 +729,208 @@ async def _handle_prompt(self, cmd: dict[str, Any]) -> None: duration_ms=duration_ms, ) - async def _create_opencode_session(self) -> None: - """Create a new OpenCode session.""" - if not self.http_client: - raise RuntimeError("HTTP client not initialized") + async def _handle_acp_session_update( + self, + session_id: str, + update: Any, + source: str | None, + ) -> None: + """Handle session updates from ACP agent. - resp = await self.http_client.post( - f"{self.opencode_base_url}/session", - json={}, - timeout=self.OPENCODE_REQUEST_TIMEOUT, - ) - resp.raise_for_status() - data = resp.json() + Maps ACP session updates to control plane event types. + """ + message_id = self._current_message_id + if not message_id: + self.log.warn( + "acp.session_update_dropped", + reason="no_message_id", + update_type=type(update).__name__, + ) + return - self.opencode_session_id = data.get("id") - self.log.info( - "opencode.session.ensure", - opencode_session_id=self.opencode_session_id, - action="created", + # Handle different update types based on session_update field or isinstance + update_type = getattr(update, "session_update", None) + self.log.debug( + "acp.session_update", + update_class=type(update).__name__, + update_type=update_type, + message_id=message_id, ) - await self._save_session_id() + if isinstance(update, AgentMessageChunk) or update_type == "agent_message_chunk": + # Agent is sending a message chunk - content is a single block, not a list + content = update.content + if content: + await self._forward_content_block(message_id, content) + + elif isinstance(update, AgentThoughtChunk) or update_type == "agent_thought_chunk": + # Agent is thinking (internal reasoning) - content is a single block + # Send incremental chunks - frontend concatenates them + content = update.content + if content and hasattr(content, "text") and content.text: + await self._send_event( + { + "type": "token", + "content": content.text, + "messageId": message_id, + "isThought": True, + } + ) - def _transform_part_to_event( - self, - part: dict[str, Any], - message_id: str, - ) -> dict[str, Any] | None: - """Transform a single OpenCode part to a bridge event.""" - part_type = part.get("type") - - if part_type == "text": - text = part.get("text", "") - if text: - return { - "type": "token", - "content": text, + elif isinstance(update, ToolCallStart): + # Tool call is starting - cache the title for later updates + tool_name = update.title or update.kind or "Tool" + self._tool_titles[update.tool_call_id] = tool_name + self.log.debug( + "acp.tool_call_start", + tool_call_id=update.tool_call_id, + title=update.title, + kind=update.kind, + status=update.status, + ) + await self._send_event( + { + "type": "tool_call", + "tool": tool_name, + "args": update.raw_input or {}, + "callId": update.tool_call_id, + "status": "pending", + "output": "", "messageId": message_id, } - elif part_type == "tool": - state = part.get("state", {}) - status = state.get("status", "") - tool_input = state.get("input", {}) + ) + elif isinstance(update, ToolCallUpdate | ToolCall): + # Tool call update (progress, completion, etc.) + # Use cached title from ToolCallStart since updates often have title=None + tool_name = update.title or self._tool_titles.get(update.tool_call_id, "Tool") self.log.debug( - "bridge.tool_part", - tool=part.get("tool"), - status=status, + "acp.tool_call_update", + tool_call_id=update.tool_call_id, + title=update.title, + cached_title=self._tool_titles.get(update.tool_call_id), + status=update.status, ) + status = "running" + if update.status == "completed": + status = "completed" + elif update.status == "failed": + status = "error" - if status in ("pending", "") and not tool_input: - return None - - return { + event: dict[str, Any] = { "type": "tool_call", - "tool": part.get("tool", ""), - "args": tool_input, - "callId": part.get("callID", ""), + "tool": tool_name, + "args": update.raw_input or {}, + "callId": update.tool_call_id, "status": status, - "output": state.get("output", ""), - "messageId": message_id, - } - elif part_type == "step-finish": - return { - "type": "step_finish", - "cost": part.get("cost"), - "tokens": part.get("tokens"), - "reason": part.get("reason"), - "messageId": message_id, - } - elif part_type == "step-start": - return { - "type": "step_start", "messageId": message_id, } - return None - - def _build_prompt_request_body( - self, content: str, model: str | None, opencode_message_id: str | None = None - ) -> dict[str, Any]: - """Build request body for OpenCode prompt requests. - - Args: - content: The prompt text content - model: Optional model override (e.g., "claude-haiku-4-5" or "anthropic/claude-haiku-4-5") - opencode_message_id: OpenCode-compatible ascending message ID (e.g., "msg_..."). - When provided, OpenCode uses this as the user message ID, - and assistant responses will have parentID pointing to it. - """ - request_body: dict[str, Any] = {"parts": [{"type": "text", "text": content}]} - - if opencode_message_id: - request_body["messageID"] = opencode_message_id - - if model: - if "/" in model: - provider_id, model_id = model.split("/", 1) - else: - provider_id, model_id = "anthropic", model - request_body["model"] = { - "providerID": provider_id, - "modelID": model_id, - } + # Add output if available + if update.raw_output: + event["output"] = update.raw_output + elif update.content: + # Content might be single block or list depending on SDK version + content = update.content + if isinstance(content, list): + output_parts = [] + for c in content: + if hasattr(c, "text") and c.text: + output_parts.append(c.text) + event["output"] = "\n".join(output_parts) + elif hasattr(content, "text") and content.text: + event["output"] = content.text + + await self._send_event(event) - return request_body - - async def _parse_sse_stream( - self, - response: httpx.Response, - timeout_ctx: asyncio.Timeout | None = None, - ) -> AsyncIterator[dict[str, Any]]: - """Parse Server-Sent Events stream from OpenCode. - - SSE format: - data: {"type": "...", "properties": {...}} - - data: {"type": "...", "properties": {...}} + else: + # Log unknown update types for debugging + self.log.debug( + "acp.unknown_update_type", + update_type=type(update).__name__, + ) - Events are separated by double newlines. - If timeout_ctx is provided, the deadline is reset on every chunk received. - """ - buffer = "" - async for chunk in response.aiter_text(): - buffer += chunk - if timeout_ctx is not None: - timeout_ctx.reschedule( - asyncio.get_running_loop().time() + self.sse_inactivity_timeout - ) + async def _forward_content_block(self, message_id: str, content: Any) -> None: + """Forward a content block as an appropriate event. - # Process complete events (separated by double newlines) - while "\n\n" in buffer: - event_str, buffer = buffer.split("\n\n", 1) - - # Parse the event lines - data_lines: list[str] = [] - for line in event_str.split("\n"): - if line.startswith("data:"): - # Handle both "data: {...}" and "data:{...}" formats - data_content = line[5:].lstrip() - if data_content: - data_lines.append(data_content) - - # Join multi-line data and parse JSON - if data_lines: - try: - raw_data = "\n".join(data_lines) - event = json.loads(raw_data) - yield event - except json.JSONDecodeError as e: - self.log.debug("bridge.sse_parse_error", exc=e) - - async def _stream_opencode_response_sse( - self, - message_id: str, - content: str, - model: str | None = None, - ) -> AsyncIterator[dict[str, Any]]: - """Stream response from OpenCode using Server-Sent Events. - - Uses messageID-based correlation for reliable event attribution: - 1. Generate an OpenCode-compatible ascending ID for the user message - 2. OpenCode creates assistant messages with parentID = our ascending ID - 3. Filter events to only process parts from our assistant messages - 4. Use control plane's message_id for events sent back - - The ascending ID ensures our user message ID is lexicographically greater - than any previous assistant message IDs, preventing the early exit condition - in OpenCode's prompt loop (lastUser.id < lastAssistant.id). + For text content, we send incremental chunks. The frontend concatenates + these chunks to build the full response for live streaming display. """ - if not self.http_client or not self.opencode_session_id: - raise RuntimeError("OpenCode session not initialized") - - opencode_message_id = OpenCodeIdentifier.ascending("message") - request_body = self._build_prompt_request_body(content, model, opencode_message_id) - - sse_url = f"{self.opencode_base_url}/event" - async_url = f"{self.opencode_base_url}/session/{self.opencode_session_id}/prompt_async" - - cumulative_text: dict[str, str] = {} - emitted_tool_states: set[str] = set() - allowed_assistant_msg_ids: set[str] = set() - pending_parts: dict[str, list[tuple[dict[str, Any], Any]]] = {} - pending_parts_total = 0 - pending_drop_logged = False - - start_time = time.time() - loop = asyncio.get_running_loop() - - def buffer_part(oc_msg_id: str, part: dict[str, Any], delta: Any) -> None: - nonlocal pending_parts_total - nonlocal pending_drop_logged - if pending_parts_total >= self.MAX_PENDING_PART_EVENTS: - if not pending_drop_logged: - self.log.warn( - "bridge.pending_parts_dropped", - message_id=message_id, - limit=self.MAX_PENDING_PART_EVENTS, - ) - pending_drop_logged = True + # Handle text content - check both isinstance and duck typing + # The ACP SDK may return different concrete types + is_text = isinstance(content, TextContentBlock) + has_text_attr = hasattr(content, "text") and hasattr(content, "type") + + if is_text or (has_text_attr and getattr(content, "type", None) == "text"): + text = getattr(content, "text", "") + # Skip empty text chunks + if not text: return - pending_parts.setdefault(oc_msg_id, []).append((part, delta)) - pending_parts_total += 1 - - def handle_part(part: dict[str, Any], delta: Any) -> list[dict[str, Any]]: - part_type = part.get("type", "") - part_id = part.get("id", "") - events: list[dict[str, Any]] = [] - - if part_type == "text": - text = part.get("text", "") - if delta: - cumulative_text[part_id] = cumulative_text.get(part_id, "") + delta - else: - cumulative_text[part_id] = text - - if cumulative_text.get(part_id): - events.append( - { - "type": "token", - "content": cumulative_text[part_id], - "messageId": message_id, - } - ) - - elif part_type == "tool": - tool_event = self._transform_part_to_event(part, message_id) - if tool_event: - state = part.get("state", {}) - status = state.get("status", "") - call_id = part.get("callID", "") - tool_key = f"tool:{call_id}:{status}" - - if tool_key not in emitted_tool_states: - emitted_tool_states.add(tool_key) - events.append(tool_event) - - elif part_type == "step-start": - events.append( - { - "type": "step_start", - "messageId": message_id, - } - ) - - elif part_type == "step-finish": - events.append( - { - "type": "step_finish", - "cost": part.get("cost"), - "tokens": part.get("tokens"), - "reason": part.get("reason"), - "messageId": message_id, - } - ) - - return events - - try: - deadline = asyncio.get_running_loop().time() + self.sse_inactivity_timeout - async with asyncio.timeout_at(deadline) as timeout_ctx: - async with self.http_client.stream( - "GET", - sse_url, - timeout=httpx.Timeout(None, connect=self.HTTP_CONNECT_TIMEOUT, read=None), - ) as sse_response: - if sse_response.status_code != 200: - raise SSEConnectionError( - f"SSE connection failed: {sse_response.status_code}" - ) - - prompt_start = loop.time() - prompt_response = await self.http_client.post( - async_url, - json=request_body, - timeout=self.OPENCODE_REQUEST_TIMEOUT, - ) - if prompt_response.status_code not in [200, 204]: - error_body = prompt_response.text - self.log.error( - "bridge.prompt_request_error", - status_code=prompt_response.status_code, - error_body=error_body, - ) - raise RuntimeError( - f"Async prompt failed: {prompt_response.status_code} - {error_body}" - ) - - async for event in self._parse_sse_stream(sse_response, timeout_ctx): - event_type = event.get("type") - props = event.get("properties", {}) - - if event_type == "server.connected": - pass - elif event_type != "server.heartbeat": - event_session_id = props.get("sessionID") or props.get("part", {}).get( - "sessionID" - ) - if not event_session_id or event_session_id == self.opencode_session_id: - if event_type == "message.updated": - info = props.get("info", {}) - msg_session_id = info.get("sessionID") - if msg_session_id == self.opencode_session_id: - oc_msg_id = info.get("id", "") - parent_id = info.get("parentID", "") - role = info.get("role", "") - finish = info.get("finish", "") - - self.log.debug( - "bridge.message_updated", - role=role, - oc_msg_id=oc_msg_id, - parent_match=(parent_id == opencode_message_id), - ) - - if ( - role == "assistant" - and parent_id == opencode_message_id - and oc_msg_id - ): - allowed_assistant_msg_ids.add(oc_msg_id) - pending = pending_parts.pop(oc_msg_id, []) - if pending: - pending_parts_total -= len(pending) - for part, delta in pending: - for part_event in handle_part(part, delta): - yield part_event - - if finish and finish not in ("tool-calls", ""): - self.log.debug( - "bridge.message_finished", - finish=finish, - ) - - elif event_type == "message.part.updated": - part = props.get("part", {}) - delta = props.get("delta") - oc_msg_id = part.get("messageID", "") - if oc_msg_id in allowed_assistant_msg_ids: - for part_event in handle_part(part, delta): - yield part_event - elif oc_msg_id: - buffer_part(oc_msg_id, part, delta) - - elif event_type == "session.idle": - idle_session_id = props.get("sessionID") - if idle_session_id == self.opencode_session_id: - elapsed = time.time() - start_time - self.log.debug( - "bridge.session_idle", - elapsed_s=round(elapsed, 1), - tracked_msgs=len(allowed_assistant_msg_ids), - ) - async for final_event in self._fetch_final_message_state( - message_id, - opencode_message_id, - cumulative_text, - allowed_assistant_msg_ids, - ): - yield final_event - return - - elif event_type == "session.status": - status_session_id = props.get("sessionID") - status = props.get("status", {}) - if ( - status_session_id == self.opencode_session_id - and status.get("type") == "idle" - ): - elapsed = time.time() - start_time - self.log.debug( - "bridge.session_status_idle", - elapsed_s=round(elapsed, 1), - tracked_msgs=len(allowed_assistant_msg_ids), - ) - async for final_event in self._fetch_final_message_state( - message_id, - opencode_message_id, - cumulative_text, - allowed_assistant_msg_ids, - ): - yield final_event - return - - elif event_type == "session.error": - error_session_id = props.get("sessionID") - if error_session_id == self.opencode_session_id: - error = props.get("error", {}) - error_msg = ( - error.get("message") - if isinstance(error, dict) - else str(error) - ) - self.log.error("bridge.session_error", error_msg=error_msg) - yield { - "type": "error", - "error": error_msg or "Unknown error", - "messageId": message_id, - } - return - - if loop.time() > prompt_start + self.PROMPT_MAX_DURATION: - elapsed = time.time() - start_time - self.log.error( - "bridge.prompt_max_duration_timeout", - timeout_ms=int(self.PROMPT_MAX_DURATION * 1000), - elapsed_ms=int(elapsed * 1000), - message_id=message_id, - ) - await self._request_opencode_stop(reason="prompt_max_duration_timeout") - async for final_event in self._fetch_final_message_state( - message_id, - opencode_message_id, - cumulative_text, - allowed_assistant_msg_ids, - ): - yield final_event - raise RuntimeError( - f"Prompt exceeded max duration of {self.PROMPT_MAX_DURATION:.0f}s." - ) - - except TimeoutError: - elapsed = time.time() - start_time - self.log.error( - "bridge.sse_inactivity_timeout", - timeout_name="sse_inactivity", - timeout_ms=int(self.sse_inactivity_timeout * 1000), - elapsed_ms=int(elapsed * 1000), - operation="bridge.sse", - message_id=message_id, - ) - await self._request_opencode_stop(reason="inactivity_timeout") - async for final_event in self._fetch_final_message_state( - message_id, - opencode_message_id, - cumulative_text, - allowed_assistant_msg_ids, - ): - yield final_event - raise RuntimeError( - f"SSE stream inactive for {self.sse_inactivity_timeout:.0f}s " - f"(no data received). Total elapsed: {elapsed:.0f}s" + # Send incremental chunk - frontend concatenates them + await self._send_event( + { + "type": "token", + "content": text, + "messageId": message_id, + } ) - - except httpx.ReadError as e: - self.log.error("bridge.sse_read_error", exc=e) - raise SSEConnectionError(f"SSE read error: {e}") - - async def _fetch_final_message_state( - self, - message_id: str, - opencode_message_id: str, - cumulative_text: dict[str, str], - tracked_msg_ids: set[str] | None = None, - ) -> AsyncIterator[dict[str, Any]]: - """Fetch final message state from API to ensure complete text. - - This is called after session.idle to capture any text that may have - been missed due to SSE event ordering. It fetches the latest message - state and emits any text that's longer than what we've already sent. - - Args: - message_id: Control plane message ID (used in events sent back) - opencode_message_id: OpenCode ascending ID (used for parentID correlation) - cumulative_text: Text already sent, keyed by part ID - tracked_msg_ids: Assistant message IDs tracked during SSE streaming - - Uses parentID-based correlation if available, falling back to - tracked_msg_ids from SSE streaming if parentID doesn't match. - """ - if not self.http_client or not self.opencode_session_id: return - messages_url = f"{self.opencode_base_url}/session/{self.opencode_session_id}/message" - - try: - response = await self.http_client.get( - messages_url, - timeout=self.OPENCODE_REQUEST_TIMEOUT, + if isinstance(content, ToolCall): + # Tool call in progress or completed + # Use cached title from ToolCallStart, or cache new title if available + if content.title: + self._tool_titles[content.tool_call_id] = content.title + tool_name = ( + content.title + or self._tool_titles.get(content.tool_call_id) + or getattr(content, "kind", None) + or "Tool" ) - if response.status_code != 200: - self.log.warn( - "bridge.final_state_fetch_error", - status_code=response.status_code, - ) - return - messages = response.json() - - for msg in messages: - info = msg.get("info", {}) - role = info.get("role", "") - msg_id = info.get("id", "") - parent_id = info.get("parentID", "") - - if role != "assistant": - continue - - parent_matches = parent_id == opencode_message_id - in_tracked_set = tracked_msg_ids and msg_id in tracked_msg_ids - - if not parent_matches and not in_tracked_set: - continue - - parts = msg.get("parts", []) - for part in parts: - part_type = part.get("type", "") - part_id = part.get("id", "") - - if part_type == "text": - text = part.get("text", "") - previously_sent = cumulative_text.get(part_id, "") - if len(text) > len(previously_sent): - self.log.debug( - "bridge.final_text_update", - prev_len=len(previously_sent), - new_len=len(text), - ) - cumulative_text[part_id] = text - yield { - "type": "token", - "content": text, - "messageId": message_id, - } + self.log.debug( + "acp.tool_call_content", + tool_call_id=content.tool_call_id, + title=content.title, + cached_title=self._tool_titles.get(content.tool_call_id), + kind=getattr(content, "kind", None), + status=content.status, + ) + status = "running" + if content.status == "completed": + status = "completed" + elif content.status == "failed": + status = "error" + elif content.status == "pending": + status = "pending" - except Exception as e: - self.log.error("bridge.final_state_error", exc=e) + await self._send_event( + { + "type": "tool_call", + "tool": tool_name, + "args": content.raw_input or {}, + "callId": content.tool_call_id, + "status": status, + "output": content.raw_output or "", + "messageId": message_id, + } + ) async def _handle_stop(self) -> None: - """Handle stop command - halt current execution.""" + """Handle stop command - cancel current execution.""" self.log.info("bridge.stop") - await self._request_opencode_stop(reason="command") + + # Cancel current prompt task if running + if self._prompt_task and not self._prompt_task.done(): + self._prompt_task.cancel() + + # Send cancel notification via ACP if session exists + if self.acp_conn and self.acp_session_id: + try: + await self.acp_conn.cancel(session_id=self.acp_session_id) + self.log.info("acp.cancel_sent") + except Exception as e: + self.log.warn("acp.cancel_error", exc=e) async def _handle_snapshot(self) -> None: """Handle snapshot command - prepare for snapshot.""" @@ -1037,7 +938,7 @@ async def _handle_snapshot(self) -> None: await self._send_event( { "type": "snapshot_ready", - "opencodeSessionId": self.opencode_session_id, + "acpSessionId": self.acp_session_id, } ) @@ -1177,104 +1078,24 @@ async def _configure_git_identity(self, user: GitUser) -> None: self.log.error("git.identity_error", exc=e) async def _load_session_id(self) -> None: - """Load OpenCode session ID from file if it exists.""" - if self.session_id_file.exists(): + """Load ACP session ID from file if it exists.""" + if self.acp_session_id_file.exists(): try: - self.opencode_session_id = self.session_id_file.read_text().strip() + self.acp_session_id = self.acp_session_id_file.read_text().strip() self.log.info( - "opencode.session.ensure", - opencode_session_id=self.opencode_session_id, - action="loaded", + "acp.session.loaded", + acp_session_id=self.acp_session_id, ) - - if self.http_client: - try: - resp = await self.http_client.get( - f"{self.opencode_base_url}/session/{self.opencode_session_id}", - timeout=self.OPENCODE_REQUEST_TIMEOUT, - ) - if resp.status_code != 200: - self.log.info( - "opencode.session.invalid", - opencode_session_id=self.opencode_session_id, - ) - self.opencode_session_id = None - except Exception: - self.opencode_session_id = None - except Exception as e: - self.log.error("opencode.session.load_error", exc=e) + self.log.error("acp.session.load_error", exc=e) async def _save_session_id(self) -> None: - """Save OpenCode session ID to file for persistence.""" - if self.opencode_session_id: + """Save ACP session ID to file for persistence.""" + if self.acp_session_id: try: - self.session_id_file.write_text(self.opencode_session_id) + self.acp_session_id_file.write_text(self.acp_session_id) except Exception as e: - self.log.error("opencode.session.save_error", exc=e) - - async def _request_opencode_stop(self, reason: str) -> bool: - if not self.http_client or not self.opencode_session_id: - return False - - try: - await self.http_client.post( - f"{self.opencode_base_url}/session/{self.opencode_session_id}/stop", - timeout=self.OPENCODE_REQUEST_TIMEOUT, - ) - self.log.info("bridge.stop_requested", reason=reason) - return True - except Exception as e: - self.log.warn("bridge.stop_request_error", exc=e, reason=reason) - return False - - def _resolve_timeout_seconds( - self, - name: str, - default: float, - min_value: float, - max_value: float, - ) -> float: - raw = os.environ.get(name) - if raw is None or raw == "": - value = default - else: - try: - value = float(raw) - except ValueError: - self.log.warn( - "bridge.timeout_invalid", - timeout_name=name, - timeout_ms=int(default * 1000), - detail=f"invalid value '{raw}', using default", - ) - value = default - - if value < min_value: - self.log.warn( - "bridge.timeout_clamped", - timeout_name=name, - timeout_ms=int(min_value * 1000), - detail=f"below min ({min_value}s), clamped", - ) - value = min_value - elif value > max_value: - self.log.warn( - "bridge.timeout_clamped", - timeout_name=name, - timeout_ms=int(max_value * 1000), - detail=f"above max ({max_value}s), clamped", - ) - value = max_value - - self.log.info( - "bridge.timeout_config", - timeout_name=name, - timeout_ms=int(value * 1000), - min_ms=int(min_value * 1000), - max_ms=int(max_value * 1000), - ) - return value + self.log.error("acp.session.save_error", exc=e) async def main(): @@ -1284,7 +1105,6 @@ async def main(): parser.add_argument("--session-id", required=True, help="Session ID for WebSocket connection") parser.add_argument("--control-plane", required=True, help="Control plane URL") parser.add_argument("--token", required=True, help="Auth token") - parser.add_argument("--opencode-port", type=int, default=4096, help="OpenCode port") args = parser.parse_args() @@ -1293,7 +1113,6 @@ async def main(): session_id=args.session_id, control_plane_url=args.control_plane, auth_token=args.token, - opencode_port=args.opencode_port, ) await bridge.run() diff --git a/packages/modal-infra/src/sandbox/entrypoint.py b/packages/modal-infra/src/sandbox/entrypoint.py index af13bafb..a1bd6b95 100644 --- a/packages/modal-infra/src/sandbox/entrypoint.py +++ b/packages/modal-infra/src/sandbox/entrypoint.py @@ -1,19 +1,20 @@ #!/usr/bin/env python3 """ -Sandbox entrypoint - manages OpenCode server and bridge lifecycle. +Sandbox entrypoint - manages bridge lifecycle and git sync. Runs as PID 1 inside the sandbox. Responsibilities: 1. Perform git sync with latest code -2. Start OpenCode server -3. Start bridge process for control plane communication -4. Monitor processes and restart on crash with exponential backoff -5. Handle graceful shutdown on SIGTERM/SIGINT +2. Start bridge process for control plane communication +3. Monitor bridge process and restart on crash with exponential backoff +4. Handle graceful shutdown on SIGTERM/SIGINT + +Note: The ACP agent (Claude Code) is spawned by the bridge on-demand, +not by this entrypoint. This simplifies the startup sequence. """ import asyncio import json import os -import shutil import signal import time from pathlib import Path @@ -31,24 +32,19 @@ class SandboxSupervisor: Manages: - Git synchronization with base branch - - OpenCode server process - Bridge process for control plane communication - Process monitoring with crash recovery """ # Configuration - OPENCODE_PORT = 4096 - HEALTH_CHECK_TIMEOUT = 30.0 MAX_RESTARTS = 5 BACKOFF_BASE = 2.0 BACKOFF_MAX = 60.0 def __init__(self): - self.opencode_process: asyncio.subprocess.Process | None = None self.bridge_process: asyncio.subprocess.Process | None = None self.shutdown_event = asyncio.Event() self.git_sync_complete = asyncio.Event() - self.opencode_ready = asyncio.Event() # Configuration from environment (set by Modal/SandboxManager) self.sandbox_id = os.environ.get("SANDBOX_ID", "unknown") @@ -58,6 +54,10 @@ def __init__(self): self.repo_name = os.environ.get("REPO_NAME", "") self.github_app_token = os.environ.get("GITHUB_APP_TOKEN", "") + # Set GH_TOKEN for GitHub CLI (gh) - uses the same GitHub App token + if self.github_app_token: + os.environ["GH_TOKEN"] = self.github_app_token + # Parse session config if provided session_config_json = os.environ.get("SESSION_CONFIG", "{}") self.session_config = json.loads(session_config_json) @@ -65,7 +65,6 @@ def __init__(self): # Paths self.workspace_path = Path("/workspace") self.repo_path = self.workspace_path / self.repo_name - self.session_id_file = Path("/tmp/opencode-session-id") # Logger session_id = self.session_config.get("session_id", "") @@ -218,115 +217,6 @@ async def perform_git_sync(self) -> bool: self.git_sync_complete.set() # Allow agent to proceed anyway return False - async def start_opencode(self) -> None: - """Start OpenCode server with configuration.""" - self.log.info("opencode.start") - - # Build OpenCode config from session settings - # Model format is "provider/model", e.g. "anthropic/claude-sonnet-4-5" - provider = self.session_config.get("provider", "anthropic") - model = self.session_config.get("model", "claude-sonnet-4-5") - opencode_config = { - "model": f"{provider}/{model}", - "permission": { - "*": { - "*": "allow", - }, - }, - } - - # Determine working directory - use repo path if cloned, otherwise /workspace - workdir = self.workspace_path - if self.repo_path.exists() and (self.repo_path / ".git").exists(): - workdir = self.repo_path - - # Set up .opencode directory for custom tools - opencode_dir = workdir / ".opencode" - tool_dest = opencode_dir / "tool" - tool_source = Path("/app/sandbox/inspect-plugin.js") - - if tool_source.exists(): - # Create .opencode/tool directory - tool_dest.mkdir(parents=True, exist_ok=True) - shutil.copy(tool_source, tool_dest / "create-pull-request.js") - - # Create node_modules symlink to global modules so OpenCode doesn't try to install - # and so imports resolve correctly via NODE_PATH - node_modules = opencode_dir / "node_modules" - global_modules = Path("/usr/lib/node_modules") - if not node_modules.exists() and global_modules.exists(): - try: - node_modules.symlink_to(global_modules) - except Exception as e: - self.log.warn("opencode.symlink_error", exc=e) - - # Create a minimal package.json so OpenCode sees this as a configured directory - package_json = opencode_dir / "package.json" - if not package_json.exists(): - package_json.write_text('{"name": "opencode-tools", "type": "module"}') - - env = { - **os.environ, - "OPENCODE_CONFIG_CONTENT": json.dumps(opencode_config), - } - - # Start OpenCode server in the repo directory - self.opencode_process = await asyncio.create_subprocess_exec( - "opencode", - "serve", - "--port", - str(self.OPENCODE_PORT), - "--hostname", - "0.0.0.0", - "--print-logs", # Print logs to stdout for debugging - cwd=workdir, # Start in repo directory - env=env, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - ) - - # Start log forwarder - asyncio.create_task(self._forward_opencode_logs()) - - # Wait for health check - await self._wait_for_health() - self.opencode_ready.set() - self.log.info("opencode.ready") - - async def _forward_opencode_logs(self) -> None: - """Forward OpenCode stdout to supervisor stdout.""" - if not self.opencode_process or not self.opencode_process.stdout: - return - - try: - async for line in self.opencode_process.stdout: - print(f"[opencode] {line.decode().rstrip()}") - except Exception as e: - print(f"[supervisor] Log forwarding error: {e}") - - async def _wait_for_health(self) -> None: - """Poll health endpoint until server is ready.""" - health_url = f"http://localhost:{self.OPENCODE_PORT}/global/health" - start_time = time.time() - - async with httpx.AsyncClient() as client: - while time.time() - start_time < self.HEALTH_CHECK_TIMEOUT: - if self.shutdown_event.is_set(): - raise RuntimeError("Shutdown requested during startup") - - try: - resp = await client.get(health_url, timeout=2.0) - if resp.status_code == 200: - return - except httpx.ConnectError: - pass - except Exception as e: - self.log.debug("opencode.health_check_error", exc=e) - - await asyncio.sleep(0.5) - - raise RuntimeError("OpenCode server failed to become healthy") - async def start_bridge(self) -> None: """Start the agent bridge process.""" self.log.info("bridge.start") @@ -335,9 +225,6 @@ async def start_bridge(self) -> None: self.log.info("bridge.skip", reason="no_control_plane_url") return - # Wait for OpenCode to be ready - await self.opencode_ready.wait() - # Get session_id from config (required for WebSocket connection) session_id = self.session_config.get("session_id", "") if not session_id: @@ -357,8 +244,6 @@ async def start_bridge(self) -> None: self.control_plane_url, "--token", self.sandbox_token, - "--opencode-port", - str(self.OPENCODE_PORT), env=os.environ, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, @@ -397,44 +282,9 @@ async def _forward_bridge_logs(self) -> None: async def monitor_processes(self) -> None: """Monitor child processes and restart on crash.""" - restart_count = 0 bridge_restart_count = 0 while not self.shutdown_event.is_set(): - # Check OpenCode process - if self.opencode_process and self.opencode_process.returncode is not None: - exit_code = self.opencode_process.returncode - restart_count += 1 - - self.log.error( - "opencode.crash", - exit_code=exit_code, - restart_count=restart_count, - ) - - if restart_count > self.MAX_RESTARTS: - self.log.error( - "opencode.max_restarts", - restart_count=restart_count, - ) - await self._report_fatal_error( - f"OpenCode crashed {restart_count} times, giving up" - ) - self.shutdown_event.set() - break - - # Exponential backoff - delay = min(self.BACKOFF_BASE**restart_count, self.BACKOFF_MAX) - self.log.info( - "opencode.restart", - delay_s=round(delay, 1), - restart_count=restart_count, - ) - - await asyncio.sleep(delay) - self.opencode_ready.clear() - await self.start_opencode() - # Check bridge process if self.bridge_process and self.bridge_process.returncode is not None: exit_code = self.bridge_process.returncode @@ -634,7 +484,6 @@ async def run(self) -> None: loop.add_signal_handler(sig, lambda s=sig: asyncio.create_task(self._handle_signal(s))) git_sync_success = False - opencode_ready = False try: # Phase 1: Git sync if restored_from_snapshot: @@ -649,11 +498,7 @@ async def run(self) -> None: # Phase 2: Configure git identity (if repo was cloned) await self.configure_git_identity() - # Phase 3: Start OpenCode server (in repo directory) - await self.start_opencode() - opencode_ready = True - - # Phase 4: Start bridge (after OpenCode is ready) + # Phase 3: Start bridge (which will spawn ACP agent on-demand) await self.start_bridge() # Emit sandbox.startup wide event @@ -664,12 +509,11 @@ async def run(self) -> None: repo_name=self.repo_name, restored_from_snapshot=restored_from_snapshot, git_sync_success=git_sync_success, - opencode_ready=opencode_ready, duration_ms=duration_ms, outcome="success", ) - # Phase 5: Monitor processes + # Phase 4: Monitor processes await self.monitor_processes() except Exception as e: @@ -688,22 +532,14 @@ async def shutdown(self) -> None: """Graceful shutdown of all processes.""" self.log.info("supervisor.shutdown_start") - # Terminate bridge first + # Terminate bridge (which will clean up its ACP subprocess) if self.bridge_process and self.bridge_process.returncode is None: self.bridge_process.terminate() try: - await asyncio.wait_for(self.bridge_process.wait(), timeout=5.0) + await asyncio.wait_for(self.bridge_process.wait(), timeout=10.0) except TimeoutError: self.bridge_process.kill() - # Terminate OpenCode - if self.opencode_process and self.opencode_process.returncode is None: - self.opencode_process.terminate() - try: - await asyncio.wait_for(self.opencode_process.wait(), timeout=10.0) - except TimeoutError: - self.opencode_process.kill() - self.log.info("supervisor.shutdown_complete") diff --git a/packages/modal-infra/src/sandbox/inspect-plugin.js b/packages/modal-infra/src/sandbox/inspect-plugin.js deleted file mode 100644 index 689c8456..00000000 --- a/packages/modal-infra/src/sandbox/inspect-plugin.js +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Create Pull Request Tool for Open-Inspect. - * - * This tool creates a pull request for committed changes. - * Uses tool() helper from @opencode-ai/plugin with tool.schema for Zod compatibility. - */ -import { tool } from "@opencode-ai/plugin" -import { z } from "zod" - -// Debug: Log that the tool was loaded -console.log("[create-pull-request] Tool module loaded") -console.log("[create-pull-request] CONTROL_PLANE_URL:", process.env.CONTROL_PLANE_URL || "") -console.log("[create-pull-request] SANDBOX_AUTH_TOKEN:", process.env.SANDBOX_AUTH_TOKEN ? "" : "") -console.log("[create-pull-request] SESSION_CONFIG:", process.env.SESSION_CONFIG ? "" : "") - -// Get bridge configuration from environment -const BRIDGE_URL = process.env.CONTROL_PLANE_URL || "http://localhost:8787" -const BRIDGE_TOKEN = process.env.SANDBOX_AUTH_TOKEN || "" - -// Get session ID from SESSION_CONFIG -function getSessionId() { - try { - const config = JSON.parse(process.env.SESSION_CONFIG || "{}") - console.log("[create-pull-request] Parsed SESSION_CONFIG, sessionId:", config.sessionId || config.session_id || "") - return config.sessionId || config.session_id || "" - } catch (e) { - console.log("[create-pull-request] Failed to parse SESSION_CONFIG:", e.message) - return "" - } -} - -// Use tool() helper - args should be a ZodRawShape (plain object), NOT a ZodObject -// OpenCode wraps it with z.object() internally -export default tool({ - name: "create-pull-request", - description: "Create a pull request for the committed changes. DO NOT use 'gh' CLI - use this tool instead. It handles git push and PR creation automatically with pre-configured authentication. You MUST provide a descriptive title and body that explain what changes were made. Call this after committing your changes.", - args: { - title: z.string().describe("Title of the pull request. Should be concise and descriptive of the changes made."), - body: z.string().describe("Body/description of the pull request. Explain what changes were made and why. Use markdown formatting for clarity."), - baseBranch: z.string().optional().describe("Target branch to merge into. Defaults to the repository's default branch (usually 'main')."), - }, - async execute(args, context) { - console.log(`[create-pull-request] execute() called with args:`, JSON.stringify(args)) - const title = args.title || "Changes from OpenCode session" - const body = args.body || "Automated PR created via create-pull-request tool" - const baseBranch = args.baseBranch // undefined if not provided, server will use default - - try { - const sessionId = getSessionId() - console.log(`[create-pull-request] Session ID: ${sessionId || ""}`) - console.log(`[create-pull-request] Bridge URL: ${BRIDGE_URL}`) - console.log(`[create-pull-request] Bridge Token: ${BRIDGE_TOKEN ? "" : ""}`) - - if (!sessionId) { - console.log("[create-pull-request] ERROR: Session ID not found") - return "Failed to create pull request: Session ID not found in environment. Please check that SESSION_CONFIG is set correctly." - } - - // Use the session-specific endpoint - const url = `${BRIDGE_URL}/sessions/${sessionId}/pr` - console.log(`[create-pull-request] Calling PR endpoint: ${url}`) - - const response = await fetch(url, { - method: "POST", - headers: { - "Content-Type": "application/json", - "Authorization": `Bearer ${BRIDGE_TOKEN}`, - }, - body: JSON.stringify({ - title: title, - body: body, - baseBranch: baseBranch, - timestamp: Date.now(), - }), - }) - - if (!response.ok) { - const errorText = await response.text() - // Try to parse as JSON to get structured error message - let errorMessage = errorText - try { - const errorJson = JSON.parse(errorText) - errorMessage = errorJson.error || errorJson.message || errorText - } catch { - // Use raw text if not JSON - } - - // Provide helpful messages based on status code - let userMessage = `Failed to create pull request: ${errorMessage}` - if (response.status === 401) { - userMessage = `Authentication failed: ${errorMessage}. The GitHub token may have expired - please re-authenticate.` - } else if (response.status === 404) { - userMessage = `Session not found: ${errorMessage}. The session may have been deleted or the ID is incorrect.` - } else if (response.status === 409) { - userMessage = `Conflict: ${errorMessage}. A PR may already exist for this branch.` - } - - console.log(`[create-pull-request] ERROR: HTTP ${response.status} - ${errorMessage}`) - return userMessage - } - - const result = await response.json() - console.log(`[create-pull-request] SUCCESS: PR #${result.prNumber} created`) - return `Pull request created successfully!\n\nPR #${result.prNumber}: ${result.prUrl}\n\nThe PR is now ready for review.` - } catch (error) { - const message = error instanceof Error ? error.message : String(error) - console.log(`[create-pull-request] ERROR: ${message}`) - return `Failed to create pull request: ${message}` - } - }, -}) diff --git a/packages/slack-bot/src/completion/extractor.ts b/packages/slack-bot/src/completion/extractor.ts index 3107a98f..691698ad 100644 --- a/packages/slack-bot/src/completion/extractor.ts +++ b/packages/slack-bot/src/completion/extractor.ts @@ -79,8 +79,7 @@ export async function extractAgentResponse( cursor = data.hasMore ? data.cursor : undefined; } while (cursor); - // Get the final text from the last token event - // Token events contain cumulative text (not incremental deltas), so we only need the last one + // Token events contain incremental chunks - concatenate them all to get full text const tokenEvents = allEvents .filter((e): e is EventResponse & { type: "token" } => e.type === "token") .sort((a, b) => { @@ -88,8 +87,7 @@ export async function extractAgentResponse( if (timeDiff !== 0) return timeDiff; return a.id.localeCompare(b.id); // Stable secondary sort }); - const lastToken = tokenEvents[tokenEvents.length - 1]; - const textContent = lastToken ? String(lastToken.data.content ?? "") : ""; + const textContent = tokenEvents.map((e) => String(e.data.content ?? "")).join(""); // Extract tool calls const toolCalls: ToolCallSummary[] = allEvents diff --git a/packages/web/src/app/session/[id]/page.tsx b/packages/web/src/app/session/[id]/page.tsx index a2fbe16a..9ae3a7d0 100644 --- a/packages/web/src/app/session/[id]/page.tsx +++ b/packages/web/src/app/session/[id]/page.tsx @@ -131,6 +131,7 @@ export default function SessionPage() { artifacts, currentParticipantId, isProcessing, + streamingContent, sendPrompt, stopExecution, sendTyping, @@ -171,10 +172,10 @@ export default function SessionPage() { const typingTimeoutRef = useRef(null); const modelDropdownRef = useRef(null); - // Scroll to bottom when new content arrives + // Scroll to bottom when new content arrives (events or streaming) useEffect(() => { messagesEndRef.current?.scrollIntoView({ behavior: "smooth" }); - }, [events]); + }, [events, streamingContent]); // Redirect if not authenticated useEffect(() => { @@ -245,6 +246,7 @@ export default function SessionPage() { messagesEndRef={messagesEndRef} prompt={prompt} isProcessing={isProcessing} + streamingContent={streamingContent} selectedModel={selectedModel} modelDropdownOpen={modelDropdownOpen} modelDropdownRef={modelDropdownRef} @@ -276,6 +278,7 @@ function SessionContent({ messagesEndRef, prompt, isProcessing, + streamingContent, selectedModel, modelDropdownOpen, modelDropdownRef, @@ -302,6 +305,7 @@ function SessionContent({ messagesEndRef: React.RefObject; prompt: string; isProcessing: boolean; + streamingContent: ReturnType["streamingContent"]; selectedModel: string; modelDropdownOpen: boolean; modelDropdownRef: React.RefObject; @@ -409,7 +413,17 @@ function SessionContent({ /> ) )} - {isProcessing && } + {/* Live streaming content display */} + {streamingContent && ( +
+
+ Assistant + +
+ +
+ )} + {isProcessing && !streamingContent && }
diff --git a/packages/web/src/hooks/use-session-socket.ts b/packages/web/src/hooks/use-session-socket.ts index cc793812..1ab8f289 100644 --- a/packages/web/src/hooks/use-session-socket.ts +++ b/packages/web/src/hooks/use-session-socket.ts @@ -62,6 +62,11 @@ interface Participant { lastSeen: number; } +interface StreamingContent { + content: string; + messageId: string; +} + interface UseSessionSocketReturn { connected: boolean; connecting: boolean; @@ -74,6 +79,7 @@ interface UseSessionSocketReturn { artifacts: Artifact[]; currentParticipantId: string | null; isProcessing: boolean; + streamingContent: StreamingContent | null; sendPrompt: (content: string, model?: string) => void; stopExecution: () => void; sendTyping: () => void; @@ -86,8 +92,8 @@ export function useSessionSocket(sessionId: string): UseSessionSocketReturn { const mountedRef = useRef(true); const subscribedRef = useRef(false); const wsTokenRef = useRef(null); - // Accumulates text during streaming, displayed only on completion to avoid duplicate display. - // Stores only the latest token since token events contain the full accumulated text (not incremental). + // Accumulates text during streaming for final event and tracks streaming state. + // Token events now contain incremental chunks that we concatenate. const pendingTextRef = useRef<{ content: string; messageId: string; timestamp: number } | null>( null ); @@ -101,6 +107,8 @@ export function useSessionSocket(sessionId: string): UseSessionSocketReturn { const [participants, setParticipants] = useState([]); const [artifacts, setArtifacts] = useState([]); const [currentParticipantId, setCurrentParticipantId] = useState(null); + // Live streaming content display + const [streamingContent, setStreamingContent] = useState(null); const currentParticipantRef = useRef<{ participantId: string; name: string; @@ -133,6 +141,7 @@ export function useSessionSocket(sessionId: string): UseSessionSocketReturn { setEvents([]); setArtifacts([]); pendingTextRef.current = null; + setStreamingContent(null); if (data.state) { setSessionState(data.state); } @@ -155,12 +164,23 @@ export function useSessionSocket(sessionId: string): UseSessionSocketReturn { const event = data.event; if (event.type === "token" && event.content && event.messageId) { - // Accumulate text but DON'T display yet + // Concatenate incremental chunks for streaming display + const isNewMessage = pendingTextRef.current?.messageId !== event.messageId; + const newContent = isNewMessage + ? event.content + : pendingTextRef.current!.content + event.content; + pendingTextRef.current = { - content: event.content, + content: newContent, messageId: event.messageId, - timestamp: event.timestamp, + timestamp: pendingTextRef.current?.timestamp ?? event.timestamp, }; + + // Update live streaming display + setStreamingContent({ + content: newContent, + messageId: event.messageId, + }); } else if (event.type === "execution_complete") { // On completion: Add final text to events using the token's original timestamp if (pendingTextRef.current) { @@ -176,6 +196,8 @@ export function useSessionSocket(sessionId: string): UseSessionSocketReturn { }, ]); } + // Clear live streaming display + setStreamingContent(null); setEvents((prev) => [...prev, event]); } else { // Other events (tool_call, user_message, git_sync, etc.) - add normally @@ -485,6 +507,8 @@ export function useSessionSocket(sessionId: string): UseSessionSocketReturn { }, ]); } + // Clear live streaming display + setStreamingContent(null); wsRef.current.send(JSON.stringify({ type: "stop" })); }, []); @@ -551,6 +575,7 @@ export function useSessionSocket(sessionId: string): UseSessionSocketReturn { artifacts, currentParticipantId, isProcessing, + streamingContent, sendPrompt, stopExecution, sendTyping, diff --git a/terraform/environments/production/main.tf b/terraform/environments/production/main.tf index 0e541525..bcabfd8a 100644 --- a/terraform/environments/production/main.tf +++ b/terraform/environments/production/main.tf @@ -289,9 +289,14 @@ module "modal_app" { secrets = [ { name = "llm-api-keys" - values = { - ANTHROPIC_API_KEY = var.anthropic_api_key - } + # OAuth token takes precedence (subscription billing, much cheaper) + # ACP_COMMAND allows switching between claude-code-acp, opencode-acp, etc. + # Only include non-empty values + values = merge( + var.anthropic_api_key != "" ? { ANTHROPIC_API_KEY = var.anthropic_api_key } : {}, + var.claude_oauth_token != "" ? { CLAUDE_CODE_OAUTH_TOKEN = var.claude_oauth_token } : {}, + var.acp_command != "claude-code-acp" ? { ACP_COMMAND = var.acp_command } : {} + ) }, { name = "github-app" diff --git a/terraform/environments/production/variables.tf b/terraform/environments/production/variables.tf index bca0385f..b56408f1 100644 --- a/terraform/environments/production/variables.tf +++ b/terraform/environments/production/variables.tf @@ -108,9 +108,23 @@ variable "slack_signing_secret" { # ============================================================================= variable "anthropic_api_key" { - description = "Anthropic API key for Claude" + description = "Anthropic API key for Claude (used if claude_oauth_token not set)" type = string sensitive = true + default = "" +} + +variable "claude_oauth_token" { + description = "Claude OAuth token (from 'claude setup-token'). Takes precedence over API key - uses subscription billing instead of pay-per-token." + type = string + sensitive = true + default = "" +} + +variable "acp_command" { + description = "ACP command to spawn (e.g., 'claude-code-acp' or 'opencode acp'). Supports space-separated command + args. Allows switching between different ACP-compatible agents." + type = string + default = "claude-code-acp" } # =============================================================================