Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions python/packages/kagent-adk/src/kagent/adk/_session_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,97 @@ async def delete_session(self, *, app_name: str, user_id: str, session_id: str)
)
response.raise_for_status()

async def _recreate_session(self, session: Session) -> None:
"""Recreate a session that was not found (404).

This handles the case where a session expires or is cleaned up
during a long-running operation.

Session State Preservation:
- session_id: Preserved (same ID used for recreation)
- user_id: Preserved
- agent_ref: Preserved
- session_name: Preserved (from session.state["session_name"])
- source: Preserved (from session.state["source"])
- Other session.state fields: NOT preserved (lost on recreation)

If additional state fields are added in the future, they must be
explicitly preserved here and added to _PRESERVED_STATE_FIELDS.

Args:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment left by Claude on behalf of @EItanya

source field not preserved during recreation. create_session (line 46) handles the source field from session state, but _recreate_session does not. Sessions with a source will lose it on recreation.

Also, the state loss warning (len(session.state) > 1) only excludes session_name — a session with source in state would trigger a misleading warning even though source is a known field. Consider maintaining a PRESERVED_STATE_FIELDS set for both the preservation logic and the warning filter.

session: The session object to recreate

Raises:
httpx.HTTPStatusError: If recreation fails
"""
_PRESERVED_STATE_FIELDS = {"session_name", "source"}

request_data = {
"id": session.id,
"user_id": session.user_id,
"agent_ref": session.app_name,
}
if session.state and session.state.get("session_name"):
request_data["name"] = session.state["session_name"]
if session.state and session.state.get("source"):
request_data["source"] = session.state["source"]

# Warn if session has unknown state fields that won't be preserved
if session.state:
extra_fields = [k for k in session.state.keys() if k not in _PRESERVED_STATE_FIELDS]
if extra_fields:
logger.warning(
"Session %s has additional state fields that will not be preserved during recreation: %s. "
"Update _recreate_session() if these fields are critical.",
session.id,
extra_fields,
)

response = await self.client.post(
"/api/sessions",
json=request_data,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment left by Claude on behalf of @EItanya

Task fetching here adds latency and leaks UI concerns into the backend.

This block adds an extra HTTP round-trip on every session recreation purely for informational logging. The log messages reference "UI should resubscribe" which is a cross-layer concern — the backend session service shouldn't be aware of or advise the UI. The UI handles resubscription independently via its own pending task detection.

Consider removing this block entirely, or gating it behind logger.debug level.

headers={"X-User-ID": session.user_id},
)
if response.status_code == 409:
# Session was already recreated by a concurrent call — treat as success.
logger.info(
"Session %s already exists (409 Conflict) during recreation, "
"likely recreated by a concurrent request. Proceeding with retry.",
session.id,
)
else:
response.raise_for_status()
logger.info("Successfully recreated session %s", session.id)

# Fetch existing tasks for this session to log in-flight work for observability
tasks_response = await self.client.get(
f"/api/sessions/{session.id}/tasks?user_id={session.user_id}",
headers={"X-User-ID": session.user_id},
)
if tasks_response.status_code == 200:
tasks_data = tasks_response.json()
if tasks_data.get("data"):
logger.debug(
"Session %s has %d existing task(s) after recreation",
session.id,
len(tasks_data["data"]),
)
for task in tasks_data["data"]:
task_state = task.get("status", {}).get("state", "unknown")
if task_state in ("working", "submitted"):
logger.debug(
"Session %s has in-flight task %s in state '%s'",
session.id,
task.get("id"),
task_state,
)
else:
logger.warning(
"Failed to fetch tasks for recreated session %s (HTTP %d)",
session.id,
tasks_response.status_code,
)

@override
async def append_event(self, session: Session, event: Event) -> Event:
if event.partial:
Expand All @@ -174,6 +265,29 @@ async def append_event(self, session: Session, event: Event) -> Event:
json=event_data,
headers={"X-User-ID": session.user_id},
)

# Handle 404 by recreating session and retrying once
if response.status_code == 404:
logger.warning(
"Session %s not found (404), attempting to recreate before retry",
session.id,
)
try:
await self._recreate_session(session)
except Exception as e:
raise RuntimeError(
f"Session {session.id} not found (404) and recreation failed"
) from e

# Retry the append ONCE. If this retry also fails (including another 404),
# raise_for_status() below will propagate the error without further attempts.
# This prevents infinite recursion while allowing recovery from transient deletion.
response = await self.client.post(
f"/api/sessions/{session.id}/events?user_id={session.user_id}",
json=event_data,
headers={"X-User-ID": session.user_id},
)

response.raise_for_status()

# TODO: potentially pull and update the session from the server
Expand Down
Loading
Loading