[BREAKING] Python: Refactor SharedState to State with sync methods and superstep caching#3667
[BREAKING] Python: Refactor SharedState to State with sync methods and superstep caching#3667moonbox3 wants to merge 3 commits intomicrosoft:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR implements a breaking refactoring of workflow state management, renaming SharedState to State, converting all async methods to sync, and implementing superstep caching semantics to align with .NET behavior.
Changes:
- Renamed
SharedStateclass toStateand file_shared_state.pyto_state.py - Converted all state methods from async to sync (removing asyncio.Lock)
- Implemented superstep caching: writes go to pending buffer, committed at boundaries via
commit() - Added
get(key, default=None)API andhas(key)method - Renamed
WorkflowCheckpoint.shared_state→WorkflowCheckpoint.state - Updated all references across codebase (workflow context, runner, executors, tests, samples)
Reviewed changes
Copilot reviewed 47 out of 47 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
_state.py |
New State class with sync methods and superstep caching implementation |
_workflow.py |
Updated to use State with commit() call after kwargs initialization |
_runner.py |
Integrated commit() at superstep boundaries |
_workflow_context.py |
Changed to sync get_state/set_state methods |
_checkpoint.py |
Renamed shared_state field to state |
__init__.py |
Updated exports (but State still exported - should be removed) |
| Test files | Updated all test mocks and assertions to use State |
| Sample files | Updated comments and documentation references |
| Frontend | Updated TypeScript interfaces and references |
python/packages/devui/frontend/src/components/features/workflow/checkpoint-info-modal.tsx
Outdated
Show resolved
Hide resolved
| new_email = Email(email_id=str(uuid4()), email_content=email_text) | ||
| await ctx.set_shared_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email) | ||
| await ctx.set_shared_state(CURRENT_EMAIL_ID_KEY, new_email.email_id) | ||
| ctx.set_state(f"{EMAIL_STATE_PREFIX}{new_email.email_id}", new_email) |
There was a problem hiding this comment.
is this prefix needed to demonstrate the concept?
| messages: Messages exchanged between executors | ||
| shared_state: Complete shared state including user data and executor states. | ||
| Executor states are stored under the reserved key '_executor_state'. | ||
| state: Complete workflow state including user data and executor states. |
There was a problem hiding this comment.
maybe clarify that this is committed state only
| The value will be visible to subsequent `get()` calls but won't be | ||
| committed to the actual state until `commit()` is called. | ||
| """ | ||
| self._pending[key] = value |
There was a problem hiding this comment.
should we deal with two pending updates for the same key? maybe from two different executors?
| The complete AgentResponse, or None if waiting for user input. | ||
| """ | ||
| run_kwargs: dict[str, Any] = await ctx.get_shared_state(WORKFLOW_RUN_KWARGS_KEY) | ||
| run_kwargs: dict[str, Any] = ctx.get_state(WORKFLOW_RUN_KWARGS_KEY) or {} |
There was a problem hiding this comment.
| run_kwargs: dict[str, Any] = ctx.get_state(WORKFLOW_RUN_KWARGS_KEY) or {} | |
| run_kwargs: dict[str, Any] = ctx.get_state(WORKFLOW_RUN_KWARGS_KEY, {}) |
| @@ -459,41 +459,37 @@ def get_yielded_outputs(self) -> list[Any]: | |||
|
|
|||
| @deprecated( | |||
There was a problem hiding this comment.
Should we remove these deprecated methods?
Motivation and Context
Refactors the workflow state management system with breaking changes:
SharedState->Stateclass and_shared_state.py->_state.pycommit(), which is now consistent with the .NET behavior.get(key, default=None)API that returns default instead of raising KeyErrorhas(key)method for existence checksWorkflowCheckpoint.shared_state→WorkflowCheckpoint.stateStatefrom public exports (internal implementation detail)Breaking Changes
SharedStateStatectx.shared_statectx.stateawait ctx.get_shared_state(key)ctx.get_state(key, default=None)await ctx.set_shared_state(key, value)ctx.set_state(key, value)checkpoint.shared_statecheckpoint.stateDescription
Contribution Checklist