Conversation
3663ba2 to
c54d3bb
Compare
d8db985 to
b0e7c16
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds comprehensive workflow orchestration support to Azure Functions, enabling multi-agent workflows with conditional routing, parallel execution, and human-in-the-loop (HITL) patterns using Azure Durable Functions.
Changes:
- New workflow orchestration engine in
_workflow.pythat executes MAF Workflows using Durable Functions' generator-based model - Serialization utilities in
_serialization.pyfor cross-activity message passing - Capturing runner context in
_context.pyfor activity execution - Extended
AgentFunctionAppwith workflow parameter and auto-registration of agents - Four new samples (09-12) demonstrating shared state, stateless, parallel, and HITL workflow patterns
- Comprehensive unit and integration tests (152 unit tests, 14 integration tests)
- Bug fix: Updated
chat_client.as_agent()usage withdefault_optionsparameter
Reviewed changes
Copilot reviewed 47 out of 47 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
_workflow.py |
Main orchestration engine with HITL support, parallel execution, and edge group routing |
_serialization.py |
Serialization/deserialization for dataclasses, Pydantic models, and MAF types |
_context.py |
CapturingRunnerContext for activity execution without durable storage |
_app.py |
Extended AgentFunctionApp with workflow support and HTTP endpoints |
_agent_executor.py |
Added agent property to expose underlying agent |
| Samples 09-12 | Four new workflow samples with requirements, configs, and README files |
| Test files | Unit tests for workflow utilities and integration tests for all samples |
pyproject.toml |
Increased test timeout from 120s to 300s for workflow tests |
| Sample 07 | Fixed type annotations for Azure Functions worker compatibility |
Comments suppressed due to low confidence (1)
python/packages/azurefunctions/agent_framework_azurefunctions/_app.py:1056
- This import of module asyncio is redundant, as it was previously imported on line 9.
import asyncio
python/samples/getting_started/azure_functions/10_workflow_no_shared_state/function_app.py
Outdated
Show resolved
Hide resolved
python/samples/getting_started/azure_functions/10_workflow_no_shared_state/function_app.py
Outdated
Show resolved
Hide resolved
python/packages/azurefunctions/tests/integration_tests/testutils.py
Outdated
Show resolved
Hide resolved
python/samples/getting_started/azure_functions/07_single_agent_orchestration_hitl/demo.http
Outdated
Show resolved
Hide resolved
| import json | ||
|
|
There was a problem hiding this comment.
The json module is imported inside a try-except block on line 722. However, json is already imported at the module level (line 22). This redundant import should be removed, and the module-level import should be used instead.
| import json |
There was a problem hiding this comment.
+1 to this comment, unresolving it here since the comment is still relevant
python/packages/azurefunctions/agent_framework_azurefunctions/_serialization.py
Outdated
Show resolved
Hide resolved
python/samples/getting_started/azure_functions/10_workflow_no_shared_state/function_app.py
Outdated
Show resolved
Hide resolved
b0e7c16 to
668d98a
Compare
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
5edfd45 to
3c3c325
Compare
| logger.debug("[AgentFunctionApp] Extracting agents from workflow") | ||
| for executor in workflow.executors.values(): | ||
| if isinstance(executor, AgentExecutor): | ||
| agents.append(executor.agent) |
There was a problem hiding this comment.
If the same agent is registered in both "agents" and "workflow" will that agent be re-registered?
| Note: We use str type annotations instead of dict to work around | ||
| Azure Functions worker type validation issues with dict[str, Any]. | ||
| """ | ||
| import json as json_module |
There was a problem hiding this comment.
NIT: Move all imports to top of the file
| data = json_module.loads(inputData) | ||
| message_data = data["message"] | ||
| shared_state_snapshot = data.get("shared_state_snapshot", {}) | ||
| source_executor_ids = data.get("source_executor_ids", ["__orchestrator__"]) |
There was a problem hiding this comment.
Out of curiosity, why defaulting to __orchestrator__? Is that always going to be there?
| runner_context = CapturingRunnerContext() | ||
| shared_state = SharedState() | ||
|
|
||
| # Deserialize shared state values to reconstruct dataclasses/Pydantic models |
There was a problem hiding this comment.
We already have some logic for this in
, can we reuse same classes (feel free to expand on a single class and use it everywhere)| try: | ||
| req_body = req.get_json() | ||
| except ValueError: | ||
| return func.HttpResponse( |
There was a problem hiding this comment.
Can we make this a generic function to return a 400 back (I see it being used in multiple places here)
| It captures all messages and events produced during execution without requiring durable | ||
| entity storage, allowing the results to be returned to the orchestrator. | ||
|
|
||
| Unlike the full InProcRunnerContext, this implementation: |
There was a problem hiding this comment.
NIT: Is this remnants of a previous implementation? If so, would be good to cleanup
|
|
||
| async def drain_messages(self) -> dict[str, list[Message]]: | ||
| """Drain and return all captured messages.""" | ||
| from copy import copy |
There was a problem hiding this comment.
NIT: Move to top (for all files)
| @@ -0,0 +1,378 @@ | |||
| # Copyright (c) Microsoft. All rights reserved. | |||
|
|
|||
| """Serialization and deserialization utilities for workflow execution. | |||
There was a problem hiding this comment.
Not sure if you already looked, but agent-framework already has a very comprehensive class for thsi - https://github.com/microsoft/agent-framework/blob/main/python/packages/core/agent_framework/_serialization.py
It would be ideal if we can reuse code from here and not introduce new overrides for similar experiences
| # ============================================================================ | ||
|
|
||
|
|
||
| def reconstruct_agent_executor_request(data: dict[str, Any]) -> AgentExecutorRequest: |
There was a problem hiding this comment.
We have some similar logic (to deserialize-from and serialize-to MAF constructs) in this file -
Given that this is doing something similar, it would be a good idea to combine all of them in a single place
| ) | ||
| else: | ||
| # Timeout occurred | ||
| logger.warning("HITL request %s timed out after %s hours", request_id, hitl_timeout_hours) |
There was a problem hiding this comment.
Does the approval_task need to be canceled in this case?
| # ============================================================================ | ||
|
|
||
|
|
||
| def _evaluate_edge_condition_sync(edge: Any, message: Any) -> bool: |
There was a problem hiding this comment.
can we have stricter type-checks so that we dont need to add code like getattr(edge, "_condition", None) everywhere
| if structured_response: | ||
| final_text = json.dumps(structured_response) | ||
|
|
||
| assistant_message = ChatMessage(role="assistant", text=final_text) |
There was a problem hiding this comment.
NIT:
| assistant_message = ChatMessage(role="assistant", text=final_text) | |
| assistant_message = ChatMessage(role=Role.ASSISTANT, text=final_text) |
| List of workflow outputs collected from executor activities | ||
| """ | ||
| pending_messages: dict[str, list[tuple[Any, str]]] = { | ||
| workflow.start_executor_id: [(initial_message, "__workflow_start__")] |
There was a problem hiding this comment.
I would recommend making all these markers ("workflow_start", "hitl_response", etc) as constants and using them everywhere to avoid chances of confusion or typos
| import json | ||
|
|
There was a problem hiding this comment.
+1 to this comment, unresolving it here since the comment is still relevant
| """Extract text content from serialized message dictionaries.""" | ||
| message_content = "" | ||
|
|
||
| if message.get("messages"): |
There was a problem hiding this comment.
This seems very fragile and easy to break. Can we use existing to_dict and from_dict methods provided by MAF here instead? If not, consider having a similar appraoch to https://github.com/microsoft/agent-framework/blob/907654a489a4f41bd4b44618ce5daa994383cb3c/python/packages/durabletask/agent_framework_durabletask/_durable_agent_state.py to use inheritance to make this serialization/deserialization more cleaner and less specific to how it is today.
| if response_type_str: | ||
| try: | ||
| module_name, class_name = response_type_str.rsplit(":", 1) | ||
| import importlib |
Motivation and Context
Adds workflow orchestration support to Azure Functions, enabling multi-agent workflows with conditional routing, parallel execution, and human-in-the-loop (HITL) patterns using Azure Durable Functions.
Description
Core Package Changes (
agent_framework_azurefunctions):_workflow.py - Orchestration engine that executes MAF Workflows using Durable Functions' generator-based model:
wait_for_external_eventwith configurable timeouts_context.py-CapturingRunnerContextfor activity execution (captures messages/events without durable storage)_serialization.py- Serialization utilities for cross-activity message passing (dataclasses, Pydantic models, ChatMessage)_app.py - Extended
AgentFunctionAppwith:workflowparameter accepting aWorkflowinstancePOST /workflow/run,GET /workflow/status/{id},POST /workflow/respond/{id}/{requestId}New Samples (09-12):
09_workflow_shared_state10_workflow_no_shared_state11_workflow_parallel12_workflow_hitlTests:
_serialization.py,_context.py, and _app.py workflow features (152 total)API Fix:
chat_client.as_agent()withdefault_options={"response_format": ...}instead of non-existentcreate_agent()Contribution Checklist