Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 203 additions & 22 deletions codex-rs/core/src/agent/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use crate::agent::role::DEFAULT_ROLE_NAME;
use crate::agent::role::resolve_role_config;
use crate::agent::status::is_final;
use crate::codex_thread::ThreadConfigSnapshot;
use crate::context_manager::is_user_turn_boundary;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::event_mapping::parse_turn_item;
use crate::find_archived_thread_path_by_id_str;
use crate::find_thread_path_by_id_str;
use crate::rollout::RolloutRecorder;
Expand All @@ -18,7 +20,10 @@ use crate::thread_manager::ThreadManagerState;
use codex_features::Feature;
use codex_protocol::AgentPath;
use codex_protocol::ThreadId;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::InterAgentCommunication;
Expand All @@ -29,6 +34,7 @@ use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::user_input::UserInput;
use codex_state::DirectionalThreadSpawnEdgeStatus;
use serde::Serialize;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::Arc;
Expand All @@ -51,6 +57,13 @@ pub(crate) struct LiveAgent {
pub(crate) status: AgentStatus,
}

#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
pub(crate) struct ListedAgent {
pub(crate) agent_name: String,
pub(crate) agent_status: AgentStatus,
pub(crate) last_task_message: Option<String>,
}

fn default_agent_nickname_list() -> Vec<&'static str> {
AGENT_NAMES
.lines()
Expand Down Expand Up @@ -456,21 +469,28 @@ impl AgentControl {
agent_id: ThreadId,
items: Vec<UserInput>,
) -> CodexResult<String> {
let last_task_message = render_input_preview(&items);
let state = self.upgrade()?;
self.handle_thread_request_result(
agent_id,
&state,
state
.send_op(
agent_id,
Op::UserInput {
items,
final_output_json_schema: None,
},
)
.await,
)
.await
let result = self
.handle_thread_request_result(
agent_id,
&state,
state
.send_op(
agent_id,
Op::UserInput {
items,
final_output_json_schema: None,
},
)
.await,
)
.await;
if result.is_ok() {
self.state
.update_last_task_message(agent_id, last_task_message);
}
result
}

/// Append a prebuilt message to an existing agent thread outside the normal user-input path.
Expand All @@ -494,15 +514,22 @@ impl AgentControl {
agent_id: ThreadId,
communication: InterAgentCommunication,
) -> CodexResult<String> {
let last_task_message = communication.content.clone();
let state = self.upgrade()?;
self.handle_thread_request_result(
agent_id,
&state,
state
.send_op(agent_id, Op::InterAgentCommunication { communication })
.await,
)
.await
let result = self
.handle_thread_request_result(
agent_id,
&state,
state
.send_op(agent_id, Op::InterAgentCommunication { communication })
.await,
)
.await;
if result.is_ok() {
self.state
.update_last_task_message(agent_id, last_task_message);
}
result
}

/// Interrupt the current task for an existing agent thread.
Expand Down Expand Up @@ -680,6 +707,70 @@ impl AgentControl {
.join("\n")
}

pub(crate) async fn list_agents(
&self,
current_session_source: &SessionSource,
path_prefix: Option<&str>,
) -> CodexResult<Vec<ListedAgent>> {
let state = self.upgrade()?;
let resolved_prefix = path_prefix
.map(|prefix| {
current_session_source
.get_agent_path()
.unwrap_or_else(AgentPath::root)
.resolve(prefix)
.map_err(CodexErr::UnsupportedOperation)
})
.transpose()?;

let mut live_agents = self.state.live_agents();
live_agents.sort_by(|left, right| {
left.agent_path
.as_deref()
.unwrap_or_default()
.cmp(right.agent_path.as_deref().unwrap_or_default())
.then_with(|| {
left.agent_id
.map(|id| id.to_string())
.unwrap_or_default()
.cmp(&right.agent_id.map(|id| id.to_string()).unwrap_or_default())
})
});

let mut agents = Vec::with_capacity(live_agents.len());
for metadata in live_agents {
let Some(thread_id) = metadata.agent_id else {
continue;
};
if resolved_prefix
.as_ref()
.is_some_and(|prefix| !agent_matches_prefix(metadata.agent_path.as_ref(), prefix))
{
continue;
}

let Ok(thread) = state.get_thread(thread_id).await else {
continue;
};
let agent_name = metadata
.agent_path
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| thread_id.to_string());
let last_task_message = match metadata.last_task_message.clone() {
Some(last_task_message) => Some(last_task_message),
None => last_task_message_for_thread(thread.as_ref()).await,
};
agents.push(ListedAgent {
agent_name,
agent_status: thread.agent_status().await,
last_task_message,
});
}

Ok(agents)
}

/// Starts a detached watcher for sub-agents spawned from another thread.
///
/// This is only enabled for `SubAgentSource::ThreadSpawn`, where a parent thread exists and
Expand Down Expand Up @@ -800,6 +891,7 @@ impl AgentControl {
agent_path,
agent_nickname,
agent_role,
last_task_message: None,
};
Ok((session_source, agent_metadata))
}
Expand Down Expand Up @@ -963,6 +1055,95 @@ fn thread_spawn_parent_thread_id(session_source: &SessionSource) -> Option<Threa
}
}

fn agent_matches_prefix(agent_path: Option<&AgentPath>, prefix: &AgentPath) -> bool {
if prefix.is_root() {
return true;
}

agent_path.is_some_and(|agent_path| {
agent_path == prefix
|| agent_path
.as_str()
.strip_prefix(prefix.as_str())
.is_some_and(|suffix| suffix.starts_with('/'))
})
}

async fn last_task_message_for_thread(thread: &crate::CodexThread) -> Option<String> {
let pending_input = thread.codex.session.pending_input_snapshot().await;
if let Some(message) = pending_input
.iter()
.rev()
.find_map(last_task_message_from_input_item)
{
return Some(message);
}

let queued_input = thread
.codex
.session
.queued_response_items_for_next_turn_snapshot()
.await;
if let Some(message) = queued_input
.iter()
.rev()
.find_map(last_task_message_from_input_item)
{
return Some(message);
}

let history = thread.codex.session.clone_history().await;
history
.raw_items()
.iter()
.rev()
.find_map(last_task_message_from_item)
}

fn last_task_message_from_input_item(item: &ResponseInputItem) -> Option<String> {
let response_item: ResponseItem = item.clone().into();
last_task_message_from_item(&response_item)
}

fn last_task_message_from_item(item: &ResponseItem) -> Option<String> {
if !is_user_turn_boundary(item) {
return None;
}

match item {
ResponseItem::Message { role, content, .. } if role == "user" => {
let Some(TurnItem::UserMessage(message)) = parse_turn_item(item) else {
return None;
};
Some(render_input_preview(&message.content))
}
ResponseItem::Message { content, .. } => match content.as_slice() {
[ContentItem::InputText { text }] | [ContentItem::OutputText { text }] => {
serde_json::from_str::<InterAgentCommunication>(text)
.ok()
.map(|communication| communication.content)
}
_ => None,
},
_ => None,
}
}

fn render_input_preview(items: &[UserInput]) -> String {
items
.iter()
.map(|item| match item {
UserInput::Text { text, .. } => text.clone(),
UserInput::Image { .. } => "[image]".to_string(),
UserInput::LocalImage { path } => format!("[local_image:{}]", path.display()),
UserInput::Skill { name, path } => format!("[skill:${name}]({})", path.display()),
UserInput::Mention { name, path } => format!("[mention:${name}]({path})"),
_ => "[input]".to_string(),
})
.collect::<Vec<_>>()
.join("\n")
}

fn thread_spawn_depth(session_source: &SessionSource) -> Option<i32> {
match session_source {
SessionSource::SubAgent(SubAgentSource::ThreadSpawn { depth, .. }) => Some(*depth),
Expand Down
13 changes: 8 additions & 5 deletions codex-rs/core/src/agent/control_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -965,7 +965,6 @@ async fn spawn_child_completion_notifies_parent_history() {
}

#[tokio::test]
#[ignore = "flaky on: rust-ci / Tests — windows-arm64 - aarch64-pc-windows-msvc"]
async fn multi_agent_v2_completion_sends_inter_agent_message_to_direct_parent() {
let harness = AgentControlHarness::new().await;
let (root_thread_id, _) = harness.start_thread().await;
Expand Down Expand Up @@ -1022,6 +1021,13 @@ async fn multi_agent_v2_completion_sends_inter_agent_message_to_direct_parent()
)
.await;

let expected = InterAgentCommunication::new(
tester_path.clone(),
worker_path.clone(),
Vec::new(),
"done".to_string(),
);

timeout(MULTI_AGENT_EVENTUAL_TIMEOUT, async {
loop {
let delivered = harness
Expand All @@ -1033,10 +1039,7 @@ async fn multi_agent_v2_completion_sends_inter_agent_message_to_direct_parent()
&& matches!(
op,
Op::InterAgentCommunication { communication }
if communication.author == tester_path
&& communication.recipient == worker_path
&& communication.other_recipients.is_empty()
&& communication.content == "done"
if communication == expected
)
});
if delivered {
Expand Down
29 changes: 29 additions & 0 deletions codex-rs/core/src/agent/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub(crate) struct AgentMetadata {
pub(crate) agent_path: Option<AgentPath>,
pub(crate) agent_nickname: Option<String>,
pub(crate) agent_role: Option<String>,
pub(crate) last_task_message: Option<String>,
}

fn format_agent_nickname(name: &str, nickname_reset_count: usize) -> String {
Expand Down Expand Up @@ -151,6 +152,34 @@ impl AgentRegistry {
.cloned()
}

pub(crate) fn live_agents(&self) -> Vec<AgentMetadata> {
self.active_agents
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.agent_tree
.values()
.filter(|metadata| {
metadata.agent_id.is_some()
&& !metadata.agent_path.as_ref().is_some_and(AgentPath::is_root)
})
.cloned()
.collect()
}

pub(crate) fn update_last_task_message(&self, thread_id: ThreadId, last_task_message: String) {
let mut active_agents = self
.active_agents
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(metadata) = active_agents
.agent_tree
.values_mut()
.find(|metadata| metadata.agent_id == Some(thread_id))
{
metadata.last_task_message = Some(last_task_message);
}
}

fn register_spawned_thread(&self, agent_metadata: AgentMetadata) {
let Some(thread_id) = agent_metadata.agent_id else {
return;
Expand Down
Loading