Skip to content

Commit d8db985

Browse files
committed
refactor code
1 parent c54d3bb commit d8db985

File tree

5 files changed

+314
-285
lines changed

5 files changed

+314
-285
lines changed

python/packages/azurefunctions/agent_framework_azurefunctions/_app.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,16 @@
3737
RunRequest,
3838
)
3939

40+
from ._context import CapturingRunnerContext
4041
from ._entities import create_agent_entity
4142
from ._errors import IncomingRequestError
4243
from ._orchestration import AgentOrchestrationContextType, AgentTask, AzureFunctionsAgentExecutor
43-
from ._utils import (
44-
CapturingRunnerContext,
45-
_execute_hitl_response_handler,
44+
from ._serialization import (
4645
deserialize_value,
4746
reconstruct_message_for_handler,
4847
serialize_message,
4948
)
50-
from ._workflow import run_workflow_orchestrator
49+
from ._workflow import execute_hitl_response_handler, run_workflow_orchestrator
5150

5251
logger = get_logger("agent_framework.azurefunctions")
5352

@@ -303,7 +302,7 @@ async def run() -> dict[str, Any]:
303302

304303
if is_hitl_response:
305304
# Handle HITL response by calling the executor's @response_handler
306-
await _execute_hitl_response_handler(
305+
await execute_hitl_response_handler(
307306
executor=executor,
308307
hitl_message=message_data,
309308
shared_state=shared_state,
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
# Copyright (c) Microsoft. All rights reserved.
2+
3+
"""Runner context for Azure Functions activity execution.
4+
5+
This module provides the CapturingRunnerContext class that captures messages
6+
and events produced during executor execution within Azure Functions activities.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
import asyncio
12+
from typing import Any
13+
14+
from agent_framework import (
15+
CheckpointStorage,
16+
Message,
17+
RequestInfoEvent,
18+
RunnerContext,
19+
SharedState,
20+
WorkflowCheckpoint,
21+
WorkflowEvent,
22+
)
23+
24+
25+
class CapturingRunnerContext(RunnerContext):
26+
"""A RunnerContext implementation that captures messages and events for Azure Functions activities.
27+
28+
This context is designed for executing standard Executors within Azure Functions activities.
29+
It captures all messages and events produced during execution without requiring durable
30+
entity storage, allowing the results to be returned to the orchestrator.
31+
32+
Unlike the full InProcRunnerContext, this implementation:
33+
- Does NOT support checkpointing (always returns False for has_checkpointing)
34+
- Does NOT support streaming (always returns False for is_streaming)
35+
- Captures messages and events in memory for later retrieval
36+
37+
The orchestrator manages state coordination; this context just captures execution output.
38+
"""
39+
40+
def __init__(self) -> None:
41+
"""Initialize the capturing runner context."""
42+
self._messages: dict[str, list[Message]] = {}
43+
self._event_queue: asyncio.Queue[WorkflowEvent] = asyncio.Queue()
44+
self._pending_request_info_events: dict[str, RequestInfoEvent] = {}
45+
self._workflow_id: str | None = None
46+
self._streaming: bool = False
47+
48+
# region Messaging
49+
50+
async def send_message(self, message: Message) -> None:
51+
"""Capture a message sent by an executor."""
52+
self._messages.setdefault(message.source_id, [])
53+
self._messages[message.source_id].append(message)
54+
55+
async def drain_messages(self) -> dict[str, list[Message]]:
56+
"""Drain and return all captured messages."""
57+
from copy import copy
58+
59+
messages = copy(self._messages)
60+
self._messages.clear()
61+
return messages
62+
63+
async def has_messages(self) -> bool:
64+
"""Check if there are any captured messages."""
65+
return bool(self._messages)
66+
67+
# endregion Messaging
68+
69+
# region Events
70+
71+
async def add_event(self, event: WorkflowEvent) -> None:
72+
"""Capture an event produced during execution."""
73+
await self._event_queue.put(event)
74+
75+
async def drain_events(self) -> list[WorkflowEvent]:
76+
"""Drain all currently queued events without blocking."""
77+
events: list[WorkflowEvent] = []
78+
while True:
79+
try:
80+
events.append(self._event_queue.get_nowait())
81+
except asyncio.QueueEmpty:
82+
break
83+
return events
84+
85+
async def has_events(self) -> bool:
86+
"""Check if there are any queued events."""
87+
return not self._event_queue.empty()
88+
89+
async def next_event(self) -> WorkflowEvent:
90+
"""Wait for and return the next event."""
91+
return await self._event_queue.get()
92+
93+
# endregion Events
94+
95+
# region Checkpointing (not supported in activity context)
96+
97+
def has_checkpointing(self) -> bool:
98+
"""Checkpointing is not supported in activity context."""
99+
return False
100+
101+
def set_runtime_checkpoint_storage(self, storage: CheckpointStorage) -> None:
102+
"""No-op: checkpointing not supported in activity context."""
103+
pass
104+
105+
def clear_runtime_checkpoint_storage(self) -> None:
106+
"""No-op: checkpointing not supported in activity context."""
107+
pass
108+
109+
async def create_checkpoint(
110+
self,
111+
shared_state: SharedState,
112+
iteration_count: int,
113+
metadata: dict[str, Any] | None = None,
114+
) -> str:
115+
"""Checkpointing not supported in activity context."""
116+
raise NotImplementedError("Checkpointing is not supported in Azure Functions activity context")
117+
118+
async def load_checkpoint(self, checkpoint_id: str) -> WorkflowCheckpoint | None:
119+
"""Checkpointing not supported in activity context."""
120+
raise NotImplementedError("Checkpointing is not supported in Azure Functions activity context")
121+
122+
async def apply_checkpoint(self, checkpoint: WorkflowCheckpoint) -> None:
123+
"""Checkpointing not supported in activity context."""
124+
raise NotImplementedError("Checkpointing is not supported in Azure Functions activity context")
125+
126+
# endregion Checkpointing
127+
128+
# region Workflow Configuration
129+
130+
def set_workflow_id(self, workflow_id: str) -> None:
131+
"""Set the workflow ID."""
132+
self._workflow_id = workflow_id
133+
134+
def reset_for_new_run(self) -> None:
135+
"""Reset the context for a new run."""
136+
self._messages.clear()
137+
self._event_queue = asyncio.Queue()
138+
self._pending_request_info_events.clear()
139+
self._streaming = False
140+
141+
def set_streaming(self, streaming: bool) -> None:
142+
"""Set streaming mode (not used in activity context)."""
143+
self._streaming = streaming
144+
145+
def is_streaming(self) -> bool:
146+
"""Check if streaming mode is enabled (always False in activity context)."""
147+
return self._streaming
148+
149+
# endregion Workflow Configuration
150+
151+
# region Request Info Events
152+
153+
async def add_request_info_event(self, event: RequestInfoEvent) -> None:
154+
"""Add a RequestInfoEvent and track it for correlation."""
155+
self._pending_request_info_events[event.request_id] = event
156+
await self.add_event(event)
157+
158+
async def send_request_info_response(self, request_id: str, response: Any) -> None:
159+
"""Send a response correlated to a pending request.
160+
161+
Note: This is not supported in activity context since human-in-the-loop
162+
scenarios require orchestrator-level coordination.
163+
"""
164+
raise NotImplementedError(
165+
"send_request_info_response is not supported in Azure Functions activity context. "
166+
"Human-in-the-loop scenarios should be handled at the orchestrator level."
167+
)
168+
169+
async def get_pending_request_info_events(self) -> dict[str, RequestInfoEvent]:
170+
"""Get the mapping of request IDs to their corresponding RequestInfoEvent."""
171+
return dict(self._pending_request_info_events)
172+
173+
# endregion Request Info Events

0 commit comments

Comments
 (0)