-
Notifications
You must be signed in to change notification settings - Fork 507
improve session resilience #1612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
fb3d207
781d07b
8ae1de7
d2aaab3
d62800f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -157,6 +157,97 @@ async def delete_session(self, *, app_name: str, user_id: str, session_id: str) | |
| ) | ||
| response.raise_for_status() | ||
|
|
||
| async def _recreate_session(self, session: Session) -> None: | ||
| """Recreate a session that was not found (404). | ||
|
|
||
| This handles the case where a session expires or is cleaned up | ||
| during a long-running operation. | ||
|
|
||
| Session State Preservation: | ||
| - session_id: Preserved (same ID used for recreation) | ||
| - user_id: Preserved | ||
| - agent_ref: Preserved | ||
| - session_name: Preserved (from session.state["session_name"]) | ||
| - source: Preserved (from session.state["source"]) | ||
| - Other session.state fields: NOT preserved (lost on recreation) | ||
|
|
||
| If additional state fields are added in the future, they must be | ||
| explicitly preserved here and added to _PRESERVED_STATE_FIELDS. | ||
|
|
||
| Args: | ||
| session: The session object to recreate | ||
|
|
||
| Raises: | ||
| httpx.HTTPStatusError: If recreation fails | ||
| """ | ||
| _PRESERVED_STATE_FIELDS = {"session_name", "source"} | ||
|
|
||
| request_data = { | ||
| "id": session.id, | ||
| "user_id": session.user_id, | ||
| "agent_ref": session.app_name, | ||
| } | ||
| if session.state and session.state.get("session_name"): | ||
| request_data["name"] = session.state["session_name"] | ||
| if session.state and session.state.get("source"): | ||
| request_data["source"] = session.state["source"] | ||
|
|
||
| # Warn if session has unknown state fields that won't be preserved | ||
| if session.state: | ||
| extra_fields = [k for k in session.state.keys() if k not in _PRESERVED_STATE_FIELDS] | ||
| if extra_fields: | ||
| logger.warning( | ||
| "Session %s has additional state fields that will not be preserved during recreation: %s. " | ||
| "Update _recreate_session() if these fields are critical.", | ||
| session.id, | ||
| extra_fields, | ||
| ) | ||
|
|
||
| response = await self.client.post( | ||
| "/api/sessions", | ||
| json=request_data, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Comment left by Claude on behalf of @EItanya Task fetching here adds latency and leaks UI concerns into the backend. This block adds an extra HTTP round-trip on every session recreation purely for informational logging. The log messages reference "UI should resubscribe" which is a cross-layer concern — the backend session service shouldn't be aware of or advise the UI. The UI handles resubscription independently via its own pending task detection. Consider removing this block entirely, or gating it behind |
||
| headers={"X-User-ID": session.user_id}, | ||
| ) | ||
| if response.status_code == 409: | ||
| # Session was already recreated by a concurrent call — treat as success. | ||
| logger.info( | ||
| "Session %s already exists (409 Conflict) during recreation, " | ||
| "likely recreated by a concurrent request. Proceeding with retry.", | ||
| session.id, | ||
| ) | ||
| else: | ||
| response.raise_for_status() | ||
| logger.info("Successfully recreated session %s", session.id) | ||
|
|
||
| # Fetch existing tasks for this session to log in-flight work for observability | ||
| tasks_response = await self.client.get( | ||
| f"/api/sessions/{session.id}/tasks?user_id={session.user_id}", | ||
| headers={"X-User-ID": session.user_id}, | ||
| ) | ||
| if tasks_response.status_code == 200: | ||
| tasks_data = tasks_response.json() | ||
| if tasks_data.get("data"): | ||
| logger.debug( | ||
| "Session %s has %d existing task(s) after recreation", | ||
| session.id, | ||
| len(tasks_data["data"]), | ||
| ) | ||
| for task in tasks_data["data"]: | ||
| task_state = task.get("status", {}).get("state", "unknown") | ||
| if task_state in ("working", "submitted"): | ||
| logger.debug( | ||
| "Session %s has in-flight task %s in state '%s'", | ||
| session.id, | ||
| task.get("id"), | ||
| task_state, | ||
| ) | ||
| else: | ||
| logger.warning( | ||
| "Failed to fetch tasks for recreated session %s (HTTP %d)", | ||
| session.id, | ||
| tasks_response.status_code, | ||
| ) | ||
|
|
||
| @override | ||
| async def append_event(self, session: Session, event: Event) -> Event: | ||
| if event.partial: | ||
|
|
@@ -174,6 +265,29 @@ async def append_event(self, session: Session, event: Event) -> Event: | |
| json=event_data, | ||
| headers={"X-User-ID": session.user_id}, | ||
| ) | ||
|
|
||
| # Handle 404 by recreating session and retrying once | ||
| if response.status_code == 404: | ||
| logger.warning( | ||
| "Session %s not found (404), attempting to recreate before retry", | ||
| session.id, | ||
| ) | ||
| try: | ||
| await self._recreate_session(session) | ||
| except Exception as e: | ||
| raise RuntimeError( | ||
| f"Session {session.id} not found (404) and recreation failed" | ||
| ) from e | ||
|
|
||
| # Retry the append ONCE. If this retry also fails (including another 404), | ||
| # raise_for_status() below will propagate the error without further attempts. | ||
| # This prevents infinite recursion while allowing recovery from transient deletion. | ||
| response = await self.client.post( | ||
| f"/api/sessions/{session.id}/events?user_id={session.user_id}", | ||
| json=event_data, | ||
| headers={"X-User-ID": session.user_id}, | ||
| ) | ||
|
|
||
| response.raise_for_status() | ||
|
|
||
| # TODO: potentially pull and update the session from the server | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment left by Claude on behalf of @EItanya
sourcefield not preserved during recreation.create_session(line 46) handles thesourcefield from session state, but_recreate_sessiondoes not. Sessions with asourcewill lose it on recreation.Also, the state loss warning (
len(session.state) > 1) only excludessession_name— a session withsourcein state would trigger a misleading warning even thoughsourceis a known field. Consider maintaining aPRESERVED_STATE_FIELDSset for both the preservation logic and the warning filter.