From 1e4b2d31ba0b3a237569f691030e2041b5bd5966 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sat, 7 Feb 2026 10:46:22 +0100 Subject: [PATCH 1/4] fix: send JSONRPCError instead of bare exceptions in streamable HTTP client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the streamable HTTP client encountered errors (unexpected content type, JSON parse failure, SSE parse failure), it sent bare Exception objects that never resolved the pending send_request() — causing the caller to hang indefinitely until timeout. Send JSONRPCError with the request's id so the response stream unblocks immediately, following the pattern already used by the 404 handler. --- src/mcp/client/streamable_http.py | 59 +++--- src/mcp/types/jsonrpc.py | 2 +- tests/client/test_notification_response.py | 204 +++++++++++---------- 3 files changed, 141 insertions(+), 124 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index cbb611419..57fa6261f 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -18,6 +18,8 @@ from mcp.shared._httpx_utils import create_mcp_http_client from mcp.shared.message import ClientMessageMetadata, SessionMessage from mcp.types import ( + INVALID_REQUEST, + PARSE_ERROR, ErrorData, InitializeResult, JSONRPCError, @@ -163,6 +165,11 @@ async def _handle_sse_event( except Exception as exc: # pragma: no cover logger.exception("Error parsing SSE message") + if original_request_id is not None: + error_data = ErrorData(code=PARSE_ERROR, message=f"Failed to parse SSE message: {exc}") + error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=original_request_id, error=error_data)) + await read_stream_writer.send(error_msg) + return True await read_stream_writer.send(exc) return False else: # pragma: no cover @@ -260,7 +267,9 @@ async def _handle_post_request(self, ctx: RequestContext) -> None: if response.status_code == 404: # pragma: no branch if isinstance(message, JSONRPCRequest): # pragma: no branch - await self._send_session_terminated_error(ctx.read_stream_writer, message.id) + error_data = ErrorData(code=INVALID_REQUEST, message="Session terminated") + session_message = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data)) + await ctx.read_stream_writer.send(session_message) return response.raise_for_status() @@ -272,20 +281,24 @@ async def _handle_post_request(self, ctx: RequestContext) -> None: if isinstance(message, JSONRPCRequest): content_type = response.headers.get("content-type", "").lower() if content_type.startswith("application/json"): - await self._handle_json_response(response, ctx.read_stream_writer, is_initialization) + await self._handle_json_response( + response, ctx.read_stream_writer, is_initialization, request_id=message.id + ) elif content_type.startswith("text/event-stream"): await self._handle_sse_response(response, ctx, is_initialization) else: - await self._handle_unexpected_content_type( # pragma: no cover - content_type, # pragma: no cover - ctx.read_stream_writer, # pragma: no cover - ) # pragma: no cover + logger.error(f"Unexpected content type: {content_type}") + error_data = ErrorData(code=INVALID_REQUEST, message=f"Unexpected content type: {content_type}") + error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=message.id, error=error_data)) + await ctx.read_stream_writer.send(error_msg) async def _handle_json_response( self, response: httpx.Response, read_stream_writer: StreamWriter, is_initialization: bool = False, + *, + request_id: RequestId, ) -> None: """Handle JSON response from the server.""" try: @@ -298,9 +311,11 @@ async def _handle_json_response( session_message = SessionMessage(message) await read_stream_writer.send(session_message) - except Exception as exc: # pragma: no cover + except Exception as exc: logger.exception("Error parsing JSON response") - await read_stream_writer.send(exc) + error_data = ErrorData(code=PARSE_ERROR, message=f"Failed to parse JSON response: {exc}") + error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=request_id, error=error_data)) + await read_stream_writer.send(error_msg) async def _handle_sse_response( self, @@ -312,6 +327,11 @@ async def _handle_sse_response( last_event_id: str | None = None retry_interval_ms: int | None = None + # The caller (_handle_post_request) only reaches here inside + # isinstance(message, JSONRPCRequest), so this is always a JSONRPCRequest. + assert isinstance(ctx.session_message.message, JSONRPCRequest) + original_request_id = ctx.session_message.message.id + try: event_source = EventSource(response) async for sse in event_source.aiter_sse(): # pragma: no branch @@ -326,6 +346,7 @@ async def _handle_sse_response( is_complete = await self._handle_sse_event( sse, ctx.read_stream_writer, + original_request_id=original_request_id, resumption_callback=(ctx.metadata.on_resumption_token_update if ctx.metadata else None), is_initialization=is_initialization, ) @@ -334,8 +355,8 @@ async def _handle_sse_response( if is_complete: await response.aclose() return # Normal completion, no reconnect needed - except Exception as e: - logger.debug(f"SSE stream ended: {e}") # pragma: no cover + except Exception: + logger.debug("SSE stream ended", exc_info=True) # pragma: no cover # Stream ended without response - reconnect if we received an event with ID if last_event_id is not None: # pragma: no branch @@ -400,24 +421,6 @@ async def _handle_reconnection( # Try to reconnect again if we still have an event ID await self._handle_reconnection(ctx, last_event_id, retry_interval_ms, attempt + 1) - async def _handle_unexpected_content_type( - self, content_type: str, read_stream_writer: StreamWriter - ) -> None: # pragma: no cover - """Handle unexpected content type in response.""" - error_msg = f"Unexpected content type: {content_type}" # pragma: no cover - logger.error(error_msg) # pragma: no cover - await read_stream_writer.send(ValueError(error_msg)) # pragma: no cover - - async def _send_session_terminated_error(self, read_stream_writer: StreamWriter, request_id: RequestId) -> None: - """Send a session terminated error response.""" - jsonrpc_error = JSONRPCError( - jsonrpc="2.0", - id=request_id, - error=ErrorData(code=32600, message="Session terminated"), - ) - session_message = SessionMessage(jsonrpc_error) - await read_stream_writer.send(session_message) - async def post_writer( self, client: httpx.AsyncClient, diff --git a/src/mcp/types/jsonrpc.py b/src/mcp/types/jsonrpc.py index 897e2450c..0cfdc993a 100644 --- a/src/mcp/types/jsonrpc.py +++ b/src/mcp/types/jsonrpc.py @@ -75,7 +75,7 @@ class JSONRPCError(BaseModel): """A response to a request that indicates an error occurred.""" jsonrpc: Literal["2.0"] - id: str | int + id: RequestId error: ErrorData diff --git a/tests/client/test_notification_response.py b/tests/client/test_notification_response.py index b36f2fe34..faabd8014 100644 --- a/tests/client/test_notification_response.py +++ b/tests/client/test_notification_response.py @@ -5,129 +5,92 @@ """ import json -import multiprocessing -import socket -from collections.abc import Generator +import httpx import pytest -import uvicorn from starlette.applications import Starlette from starlette.requests import Request from starlette.responses import JSONResponse, Response from starlette.routing import Route -from mcp import ClientSession, types +from mcp import ClientSession, MCPError, types from mcp.client.streamable_http import streamable_http_client from mcp.shared.session import RequestResponder from mcp.types import RootsListChangedNotification -from tests.test_helpers import wait_for_server +pytestmark = pytest.mark.anyio -def create_non_sdk_server_app() -> Starlette: # pragma: no cover +INIT_RESPONSE = { + "serverInfo": {"name": "test-non-sdk-server", "version": "1.0.0"}, + "protocolVersion": "2024-11-05", + "capabilities": {}, +} + + +def _init_json_response(data: dict[str, object]) -> JSONResponse: + return JSONResponse({"jsonrpc": "2.0", "id": data["id"], "result": INIT_RESPONSE}) + + +def _create_non_sdk_server_app() -> Starlette: """Create a minimal server that doesn't follow SDK conventions.""" async def handle_mcp_request(request: Request) -> Response: - """Handle MCP requests with non-standard responses.""" - try: - body = await request.body() - data = json.loads(body) - - # Handle initialize request normally - if data.get("method") == "initialize": - response_data = { - "jsonrpc": "2.0", - "id": data["id"], - "result": { - "serverInfo": {"name": "test-non-sdk-server", "version": "1.0.0"}, - "protocolVersion": "2024-11-05", - "capabilities": {}, - }, - } - return JSONResponse(response_data) - - # For notifications, return 204 No Content (non-SDK behavior) - if "id" not in data: - return Response(status_code=204, headers={"Content-Type": "application/json"}) - - # Default response for other requests - return JSONResponse( - {"jsonrpc": "2.0", "id": data.get("id"), "error": {"code": -32601, "message": "Method not found"}} - ) - - except Exception as e: - return JSONResponse({"error": f"Server error: {str(e)}"}, status_code=500) - - app = Starlette( - debug=True, - routes=[ - Route("/mcp", handle_mcp_request, methods=["POST"]), - ], - ) - return app - - -def run_non_sdk_server(port: int) -> None: # pragma: no cover - """Run the non-SDK server in a separate process.""" - app = create_non_sdk_server_app() - config = uvicorn.Config( - app=app, - host="127.0.0.1", - port=port, - log_level="error", # Reduce noise in tests - ) - server = uvicorn.Server(config=config) - server.run() - - -@pytest.fixture -def non_sdk_server_port() -> int: - """Get an available port for the test server.""" - with socket.socket() as s: - s.bind(("127.0.0.1", 0)) - return s.getsockname()[1] - - -@pytest.fixture -def non_sdk_server(non_sdk_server_port: int) -> Generator[None, None, None]: - """Start a non-SDK server for testing.""" - proc = multiprocessing.Process(target=run_non_sdk_server, kwargs={"port": non_sdk_server_port}, daemon=True) - proc.start() - - # Wait for server to be ready - try: - wait_for_server(non_sdk_server_port, timeout=10.0) - except TimeoutError: # pragma: no cover - proc.kill() - proc.join(timeout=2) - pytest.fail("Server failed to start within 10 seconds") - - yield - - proc.kill() - proc.join(timeout=2) - - -@pytest.mark.anyio -async def test_non_compliant_notification_response(non_sdk_server: None, non_sdk_server_port: int) -> None: - """This test verifies that the client ignores unexpected responses to notifications: the spec states they should - either be 202 + no response body, or 4xx + optional error body + body = await request.body() + data = json.loads(body) + + if data.get("method") == "initialize": + return _init_json_response(data) + + # For notifications, return 204 No Content (non-SDK behavior) + if "id" not in data: + return Response(status_code=204, headers={"Content-Type": "application/json"}) + + return JSONResponse( + {"jsonrpc": "2.0", "id": data.get("id"), "error": {"code": -32601, "message": "Method not found"}} + ) + + return Starlette(debug=True, routes=[Route("/mcp", handle_mcp_request, methods=["POST"])]) + + +def _create_unexpected_content_type_app() -> Starlette: + """Create a server that returns an unexpected content type for requests.""" + + async def handle_mcp_request(request: Request) -> Response: + body = await request.body() + data = json.loads(body) + + if data.get("method") == "initialize": + return _init_json_response(data) + + if "id" not in data: + return Response(status_code=202) + + # Return text/plain for all other requests — an unexpected content type. + return Response(content="this is plain text, not json or sse", status_code=200, media_type="text/plain") + + return Starlette(debug=True, routes=[Route("/mcp", handle_mcp_request, methods=["POST"])]) + + +async def test_non_compliant_notification_response() -> None: + """Verify the client ignores unexpected responses to notifications. + + The spec states notifications should get either 202 + no response body, or 4xx + optional error body (https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#sending-messages-to-the-server), but some servers wrongly return other 2xx codes (e.g. 204). For now we simply ignore unexpected responses (aligning behaviour w/ the TS SDK). """ - server_url = f"http://127.0.0.1:{non_sdk_server_port}/mcp" returned_exception = None async def message_handler( # pragma: no cover message: RequestResponder[types.ServerRequest, types.ClientResult] | types.ServerNotification | Exception, - ): + ) -> None: nonlocal returned_exception if isinstance(message, Exception): returned_exception = message - async with streamable_http_client(server_url) as (read_stream, write_stream): + client = httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_non_sdk_server_app())) + async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream): async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session: - # Initialize should work normally await session.initialize() # The test server returns a 204 instead of the expected 202 @@ -135,3 +98,54 @@ async def message_handler( # pragma: no cover if returned_exception: # pragma: no cover pytest.fail(f"Server encountered an exception: {returned_exception}") + + +async def test_unexpected_content_type_sends_jsonrpc_error() -> None: + """Verify unexpected content types unblock the pending request with an MCPError. + + When a server returns a content type that is neither application/json nor text/event-stream, + the client should send a JSONRPCError so the pending request resolves immediately + instead of hanging until timeout. + """ + client = httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_unexpected_content_type_app())) + async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + + with pytest.raises(MCPError, match="Unexpected content type: text/plain"): + await session.list_tools() + + +def _create_invalid_json_response_app() -> Starlette: + """Create a server that returns invalid JSON for requests.""" + + async def handle_mcp_request(request: Request) -> Response: + body = await request.body() + data = json.loads(body) + + if data.get("method") == "initialize": + return _init_json_response(data) + + if "id" not in data: + return Response(status_code=202) + + # Return application/json content type but with invalid JSON body. + return Response(content="not valid json{{{", status_code=200, media_type="application/json") + + return Starlette(debug=True, routes=[Route("/mcp", handle_mcp_request, methods=["POST"])]) + + +async def test_invalid_json_response_sends_jsonrpc_error() -> None: + """Verify invalid JSON responses unblock the pending request with an MCPError. + + When a server returns application/json with an unparseable body, the client + should send a JSONRPCError so the pending request resolves immediately + instead of hanging until timeout. + """ + client = httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_invalid_json_response_app())) + async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + + with pytest.raises(MCPError, match="Failed to parse JSON response"): + await session.list_tools() From b6e62f170a865d97b1123677bc5b6d825343567b Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sat, 7 Feb 2026 10:51:58 +0100 Subject: [PATCH 2/4] fix: add missing pragma no cover annotations for CI coverage --- src/mcp/client/streamable_http.py | 4 ++-- tests/client/test_notification_response.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 57fa6261f..fbc9c4d30 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -470,8 +470,8 @@ async def handle_request_async(): else: await handle_request_async() - except Exception: - logger.exception("Error in post_writer") # pragma: no cover + except Exception: # pragma: no cover + logger.exception("Error in post_writer") finally: await read_stream_writer.aclose() await write_stream.aclose() diff --git a/tests/client/test_notification_response.py b/tests/client/test_notification_response.py index faabd8014..92939f540 100644 --- a/tests/client/test_notification_response.py +++ b/tests/client/test_notification_response.py @@ -45,7 +45,7 @@ async def handle_mcp_request(request: Request) -> Response: if "id" not in data: return Response(status_code=204, headers={"Content-Type": "application/json"}) - return JSONResponse( + return JSONResponse( # pragma: no cover {"jsonrpc": "2.0", "id": data.get("id"), "error": {"code": -32601, "message": "Method not found"}} ) From 605832e429420f61fd62028ac37e5e85caf18d39 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sat, 7 Feb 2026 11:09:46 +0100 Subject: [PATCH 3/4] fix: narrow exception handling in _handle_json_response Catch (httpx.StreamError, ValidationError) instead of bare Exception. --- src/mcp/client/streamable_http.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index fbc9c4d30..1060c97fc 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -13,6 +13,7 @@ from anyio.abc import TaskGroup from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from httpx_sse import EventSource, ServerSentEvent, aconnect_sse +from pydantic import ValidationError from mcp.client._transport import TransportStreams from mcp.shared._httpx_utils import create_mcp_http_client @@ -311,7 +312,7 @@ async def _handle_json_response( session_message = SessionMessage(message) await read_stream_writer.send(session_message) - except Exception as exc: + except (httpx.StreamError, ValidationError) as exc: logger.exception("Error parsing JSON response") error_data = ErrorData(code=PARSE_ERROR, message=f"Failed to parse JSON response: {exc}") error_msg = SessionMessage(JSONRPCError(jsonrpc="2.0", id=request_id, error=error_data)) From 458950c4ccc4385e6134ae109bd1a18a91840944 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Sat, 7 Feb 2026 11:43:25 +0100 Subject: [PATCH 4/4] fix: properly close httpx.AsyncClient in tests --- tests/client/test_notification_response.py | 36 +++++++++++----------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/tests/client/test_notification_response.py b/tests/client/test_notification_response.py index 92939f540..a6cb44273 100644 --- a/tests/client/test_notification_response.py +++ b/tests/client/test_notification_response.py @@ -88,13 +88,13 @@ async def message_handler( # pragma: no cover if isinstance(message, Exception): returned_exception = message - client = httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_non_sdk_server_app())) - async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream): - async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session: - await session.initialize() + async with httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_non_sdk_server_app())) as client: + async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream, message_handler=message_handler) as session: + await session.initialize() - # The test server returns a 204 instead of the expected 202 - await session.send_notification(RootsListChangedNotification(method="notifications/roots/list_changed")) + # The test server returns a 204 instead of the expected 202 + await session.send_notification(RootsListChangedNotification(method="notifications/roots/list_changed")) if returned_exception: # pragma: no cover pytest.fail(f"Server encountered an exception: {returned_exception}") @@ -107,13 +107,13 @@ async def test_unexpected_content_type_sends_jsonrpc_error() -> None: the client should send a JSONRPCError so the pending request resolves immediately instead of hanging until timeout. """ - client = httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_unexpected_content_type_app())) - async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream): - async with ClientSession(read_stream, write_stream) as session: - await session.initialize() + async with httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_unexpected_content_type_app())) as client: + async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() - with pytest.raises(MCPError, match="Unexpected content type: text/plain"): - await session.list_tools() + with pytest.raises(MCPError, match="Unexpected content type: text/plain"): + await session.list_tools() def _create_invalid_json_response_app() -> Starlette: @@ -142,10 +142,10 @@ async def test_invalid_json_response_sends_jsonrpc_error() -> None: should send a JSONRPCError so the pending request resolves immediately instead of hanging until timeout. """ - client = httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_invalid_json_response_app())) - async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream): - async with ClientSession(read_stream, write_stream) as session: - await session.initialize() + async with httpx.AsyncClient(transport=httpx.ASGITransport(app=_create_invalid_json_response_app())) as client: + async with streamable_http_client("http://localhost/mcp", http_client=client) as (read_stream, write_stream): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() - with pytest.raises(MCPError, match="Failed to parse JSON response"): - await session.list_tools() + with pytest.raises(MCPError, match="Failed to parse JSON response"): + await session.list_tools()