Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions codex-rs/core/src/agent/control.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::agent::AgentStatus;
use crate::agent::guards::AgentMetadata;
use crate::agent::guards::Guards;
use crate::agent::registry::AgentMetadata;
use crate::agent::registry::AgentRegistry;
use crate::agent::role::DEFAULT_ROLE_NAME;
use crate::agent::role::resolve_role_config;
use crate::agent::status::is_final;
Expand Down Expand Up @@ -80,14 +80,14 @@ fn agent_nickname_candidates(
/// spawn new agents and the inter-agent communication layer.
/// An `AgentControl` instance is intended to be created at most once per root thread/session
/// tree. That same `AgentControl` is then shared with every sub-agent spawned from that root,
/// which keeps the guards scoped to that root thread rather than the entire `ThreadManager`.
/// which keeps the registry scoped to that root thread rather than the entire `ThreadManager`.
#[derive(Clone, Default)]
pub(crate) struct AgentControl {
/// Weak handle back to the global thread registry/state.
/// This is `Weak` to avoid reference cycles and shadow persistence of the form
/// `ThreadManagerState -> CodexThread -> Session -> SessionServices -> ThreadManagerState`.
manager: Weak<ThreadManagerState>,
state: Arc<Guards>,
state: Arc<AgentRegistry>,
}

impl AgentControl {
Expand Down Expand Up @@ -686,7 +686,7 @@ impl AgentControl {
#[allow(clippy::too_many_arguments)]
fn prepare_thread_spawn(
&self,
reservation: &mut crate::agent::guards::SpawnReservation,
reservation: &mut crate::agent::registry::SpawnReservation,
config: &crate::config::Config,
parent_thread_id: ThreadId,
depth: i32,
Expand Down
6 changes: 3 additions & 3 deletions codex-rs/core/src/agent/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
pub(crate) mod agent_resolver;
pub(crate) mod control;
mod guards;
mod registry;
pub(crate) mod role;
pub(crate) mod status;

pub(crate) use codex_protocol::protocol::AgentStatus;
pub(crate) use control::AgentControl;
pub(crate) use guards::exceeds_thread_spawn_depth_limit;
pub(crate) use guards::next_thread_spawn_depth;
pub(crate) use registry::exceeds_thread_spawn_depth_limit;
pub(crate) use registry::next_thread_spawn_depth;
pub(crate) use status::agent_status_from_event;
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::atomic::Ordering;
/// This structure is shared by all agents in the same user session (because the `AgentControl`
/// is).
#[derive(Default)]
pub(crate) struct Guards {
pub(crate) struct AgentRegistry {
active_agents: Mutex<ActiveAgents>,
total_count: AtomicUsize,
}
Expand Down Expand Up @@ -75,7 +75,7 @@ pub(crate) fn exceeds_thread_spawn_depth_limit(depth: i32, max_depth: i32) -> bo
depth > max_depth
}

impl Guards {
impl AgentRegistry {
pub(crate) fn reserve_spawn_slot(
self: &Arc<Self>,
max_threads: Option<usize>,
Expand Down Expand Up @@ -263,7 +263,7 @@ impl Guards {
}

pub(crate) struct SpawnReservation {
state: Arc<Guards>,
state: Arc<AgentRegistry>,
active: bool,
reserved_agent_nickname: Option<String>,
reserved_agent_path: Option<AgentPath>,
Expand Down Expand Up @@ -311,5 +311,5 @@ impl Drop for SpawnReservation {
}

#[cfg(test)]
#[path = "guards_tests.rs"]
#[path = "registry_tests.rs"]
mod tests;
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,22 @@ fn non_thread_spawn_subagents_default_to_depth_zero() {

#[test]
fn reservation_drop_releases_slot() {
let guards = Arc::new(Guards::default());
let reservation = guards.reserve_spawn_slot(Some(1)).expect("reserve slot");
let registry = Arc::new(AgentRegistry::default());
let reservation = registry.reserve_spawn_slot(Some(1)).expect("reserve slot");
drop(reservation);

let reservation = guards.reserve_spawn_slot(Some(1)).expect("slot released");
let reservation = registry.reserve_spawn_slot(Some(1)).expect("slot released");
drop(reservation);
}

#[test]
fn commit_holds_slot_until_release() {
let guards = Arc::new(Guards::default());
let reservation = guards.reserve_spawn_slot(Some(1)).expect("reserve slot");
let registry = Arc::new(AgentRegistry::default());
let reservation = registry.reserve_spawn_slot(Some(1)).expect("reserve slot");
let thread_id = ThreadId::new();
reservation.commit(agent_metadata(thread_id));

let err = match guards.reserve_spawn_slot(Some(1)) {
let err = match registry.reserve_spawn_slot(Some(1)) {
Ok(_) => panic!("limit should be enforced"),
Err(err) => err,
};
Expand All @@ -76,23 +76,23 @@ fn commit_holds_slot_until_release() {
};
assert_eq!(max_threads, 1);

guards.release_spawned_thread(thread_id);
let reservation = guards
registry.release_spawned_thread(thread_id);
let reservation = registry
.reserve_spawn_slot(Some(1))
.expect("slot released after thread removal");
drop(reservation);
}

#[test]
fn release_ignores_unknown_thread_id() {
let guards = Arc::new(Guards::default());
let reservation = guards.reserve_spawn_slot(Some(1)).expect("reserve slot");
let registry = Arc::new(AgentRegistry::default());
let reservation = registry.reserve_spawn_slot(Some(1)).expect("reserve slot");
let thread_id = ThreadId::new();
reservation.commit(agent_metadata(thread_id));

guards.release_spawned_thread(ThreadId::new());
registry.release_spawned_thread(ThreadId::new());

let err = match guards.reserve_spawn_slot(Some(1)) {
let err = match registry.reserve_spawn_slot(Some(1)) {
Ok(_) => panic!("limit should still be enforced"),
Err(err) => err,
};
Expand All @@ -101,29 +101,29 @@ fn release_ignores_unknown_thread_id() {
};
assert_eq!(max_threads, 1);

guards.release_spawned_thread(thread_id);
let reservation = guards
registry.release_spawned_thread(thread_id);
let reservation = registry
.reserve_spawn_slot(Some(1))
.expect("slot released after real thread removal");
drop(reservation);
}

#[test]
fn release_is_idempotent_for_registered_threads() {
let guards = Arc::new(Guards::default());
let reservation = guards.reserve_spawn_slot(Some(1)).expect("reserve slot");
let registry = Arc::new(AgentRegistry::default());
let reservation = registry.reserve_spawn_slot(Some(1)).expect("reserve slot");
let first_id = ThreadId::new();
reservation.commit(agent_metadata(first_id));

guards.release_spawned_thread(first_id);
registry.release_spawned_thread(first_id);

let reservation = guards.reserve_spawn_slot(Some(1)).expect("slot reused");
let reservation = registry.reserve_spawn_slot(Some(1)).expect("slot reused");
let second_id = ThreadId::new();
reservation.commit(agent_metadata(second_id));

guards.release_spawned_thread(first_id);
registry.release_spawned_thread(first_id);

let err = match guards.reserve_spawn_slot(Some(1)) {
let err = match registry.reserve_spawn_slot(Some(1)) {
Ok(_) => panic!("limit should still be enforced"),
Err(err) => err,
};
Expand All @@ -132,24 +132,24 @@ fn release_is_idempotent_for_registered_threads() {
};
assert_eq!(max_threads, 1);

guards.release_spawned_thread(second_id);
let reservation = guards
registry.release_spawned_thread(second_id);
let reservation = registry
.reserve_spawn_slot(Some(1))
.expect("slot released after second thread removal");
drop(reservation);
}

#[test]
fn failed_spawn_keeps_nickname_marked_used() {
let guards = Arc::new(Guards::default());
let mut reservation = guards.reserve_spawn_slot(None).expect("reserve slot");
let registry = Arc::new(AgentRegistry::default());
let mut reservation = registry.reserve_spawn_slot(None).expect("reserve slot");
let agent_nickname = reservation
.reserve_agent_nickname_with_preference(&["alpha"], /*preferred*/ None)
.expect("reserve agent name");
assert_eq!(agent_nickname, "alpha");
drop(reservation);

let mut reservation = guards.reserve_spawn_slot(None).expect("reserve slot");
let mut reservation = registry.reserve_spawn_slot(None).expect("reserve slot");
let agent_nickname = reservation
.reserve_agent_nickname_with_preference(&["alpha", "beta"], /*preferred*/ None)
.expect("unused name should still be preferred");
Expand All @@ -158,23 +158,25 @@ fn failed_spawn_keeps_nickname_marked_used() {

#[test]
fn agent_nickname_resets_used_pool_when_exhausted() {
let guards = Arc::new(Guards::default());
let mut first = guards.reserve_spawn_slot(None).expect("reserve first slot");
let registry = Arc::new(AgentRegistry::default());
let mut first = registry
.reserve_spawn_slot(None)
.expect("reserve first slot");
let first_name = first
.reserve_agent_nickname_with_preference(&["alpha"], /*preferred*/ None)
.expect("reserve first agent name");
let first_id = ThreadId::new();
first.commit(agent_metadata(first_id));
assert_eq!(first_name, "alpha");

let mut second = guards
let mut second = registry
.reserve_spawn_slot(None)
.expect("reserve second slot");
let second_name = second
.reserve_agent_nickname_with_preference(&["alpha"], /*preferred*/ None)
.expect("name should be reused after pool reset");
assert_eq!(second_name, "alpha the 2nd");
let active_agents = guards
let active_agents = registry
.active_agents
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
Expand All @@ -183,19 +185,21 @@ fn agent_nickname_resets_used_pool_when_exhausted() {

#[test]
fn released_nickname_stays_used_until_pool_reset() {
let guards = Arc::new(Guards::default());
let registry = Arc::new(AgentRegistry::default());

let mut first = guards.reserve_spawn_slot(None).expect("reserve first slot");
let mut first = registry
.reserve_spawn_slot(None)
.expect("reserve first slot");
let first_name = first
.reserve_agent_nickname_with_preference(&["alpha"], /*preferred*/ None)
.expect("reserve first agent name");
let first_id = ThreadId::new();
first.commit(agent_metadata(first_id));
assert_eq!(first_name, "alpha");

guards.release_spawned_thread(first_id);
registry.release_spawned_thread(first_id);

let mut second = guards
let mut second = registry
.reserve_spawn_slot(None)
.expect("reserve second slot");
let second_name = second
Expand All @@ -204,15 +208,17 @@ fn released_nickname_stays_used_until_pool_reset() {
assert_eq!(second_name, "beta");
let second_id = ThreadId::new();
second.commit(agent_metadata(second_id));
guards.release_spawned_thread(second_id);
registry.release_spawned_thread(second_id);

let mut third = guards.reserve_spawn_slot(None).expect("reserve third slot");
let mut third = registry
.reserve_spawn_slot(None)
.expect("reserve third slot");
let third_name = third
.reserve_agent_nickname_with_preference(&["alpha", "beta"], /*preferred*/ None)
.expect("pool reset should permit a duplicate");
let expected_names = HashSet::from(["alpha the 2nd".to_string(), "beta the 2nd".to_string()]);
assert!(expected_names.contains(&third_name));
let active_agents = guards
let active_agents = registry
.active_agents
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
Expand All @@ -221,18 +227,20 @@ fn released_nickname_stays_used_until_pool_reset() {

#[test]
fn repeated_resets_advance_the_ordinal_suffix() {
let guards = Arc::new(Guards::default());
let registry = Arc::new(AgentRegistry::default());

let mut first = guards.reserve_spawn_slot(None).expect("reserve first slot");
let mut first = registry
.reserve_spawn_slot(None)
.expect("reserve first slot");
let first_name = first
.reserve_agent_nickname_with_preference(&["Plato"], /*preferred*/ None)
.expect("reserve first agent name");
let first_id = ThreadId::new();
first.commit(agent_metadata(first_id));
assert_eq!(first_name, "Plato");
guards.release_spawned_thread(first_id);
registry.release_spawned_thread(first_id);

let mut second = guards
let mut second = registry
.reserve_spawn_slot(None)
.expect("reserve second slot");
let second_name = second
Expand All @@ -241,14 +249,16 @@ fn repeated_resets_advance_the_ordinal_suffix() {
let second_id = ThreadId::new();
second.commit(agent_metadata(second_id));
assert_eq!(second_name, "Plato the 2nd");
guards.release_spawned_thread(second_id);
registry.release_spawned_thread(second_id);

let mut third = guards.reserve_spawn_slot(None).expect("reserve third slot");
let mut third = registry
.reserve_spawn_slot(None)
.expect("reserve third slot");
let third_name = third
.reserve_agent_nickname_with_preference(&["Plato"], /*preferred*/ None)
.expect("reserve third agent name");
assert_eq!(third_name, "Plato the 3rd");
let active_agents = guards
let active_agents = registry
.active_agents
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
Expand All @@ -257,27 +267,29 @@ fn repeated_resets_advance_the_ordinal_suffix() {

#[test]
fn register_root_thread_indexes_root_path() {
let guards = Arc::new(Guards::default());
let registry = Arc::new(AgentRegistry::default());
let root_thread_id = ThreadId::new();

guards.register_root_thread(root_thread_id);
registry.register_root_thread(root_thread_id);

assert_eq!(
guards.agent_id_for_path(&AgentPath::root()),
registry.agent_id_for_path(&AgentPath::root()),
Some(root_thread_id)
);
}

#[test]
fn reserved_agent_path_is_released_when_spawn_fails() {
let guards = Arc::new(Guards::default());
let mut first = guards.reserve_spawn_slot(None).expect("reserve first slot");
let registry = Arc::new(AgentRegistry::default());
let mut first = registry
.reserve_spawn_slot(None)
.expect("reserve first slot");
first
.reserve_agent_path(&agent_path("/root/researcher"))
.expect("reserve first path");
drop(first);

let mut second = guards
let mut second = registry
.reserve_spawn_slot(None)
.expect("reserve second slot");
second
Expand All @@ -287,9 +299,9 @@ fn reserved_agent_path_is_released_when_spawn_fails() {

#[test]
fn committed_agent_path_is_indexed_until_release() {
let guards = Arc::new(Guards::default());
let registry = Arc::new(AgentRegistry::default());
let thread_id = ThreadId::new();
let mut reservation = guards.reserve_spawn_slot(None).expect("reserve slot");
let mut reservation = registry.reserve_spawn_slot(None).expect("reserve slot");
reservation
.reserve_agent_path(&agent_path("/root/researcher"))
.expect("reserve path");
Expand All @@ -300,13 +312,13 @@ fn committed_agent_path_is_indexed_until_release() {
});

assert_eq!(
guards.agent_id_for_path(&agent_path("/root/researcher")),
registry.agent_id_for_path(&agent_path("/root/researcher")),
Some(thread_id)
);

guards.release_spawned_thread(thread_id);
registry.release_spawned_thread(thread_id);
assert_eq!(
guards.agent_id_for_path(&agent_path("/root/researcher")),
registry.agent_id_for_path(&agent_path("/root/researcher")),
None
);
}
Loading