From a82a501d37c62d9d023500515be3a7cee2b3cd21 Mon Sep 17 00:00:00 2001
From: James Ross
Date: Sat, 28 Mar 2026 06:57:01 -0700
Subject: [PATCH 01/15] docs: review parallel merge and shard optimizations
---
...allel-merge-and-footprint-design-review.md | 353 ++++++++++++++++++
...allel-merge-and-footprint-optimizations.md | 12 +
2 files changed, 365 insertions(+)
create mode 100644 docs/plans/parallel-merge-and-footprint-design-review.md
diff --git a/docs/plans/parallel-merge-and-footprint-design-review.md b/docs/plans/parallel-merge-and-footprint-design-review.md
new file mode 100644
index 00000000..c8855a5a
--- /dev/null
+++ b/docs/plans/parallel-merge-and-footprint-design-review.md
@@ -0,0 +1,353 @@
+
+
+
+# Parallel Merge & Footprint Optimization Design Review
+
+- **Status:** Review complete; no implementation approved yet
+- **Date:** 2026-03-28
+- **Idea Note:** [Parallel Merge & Footprint Scheduling Optimizations](parallel-merge-and-footprint-optimizations.md)
+
+## Purpose
+
+The earlier optimization note records two attractive ideas:
+
+1. replace parallel delta flatten-and-sort with a k-way merge, and
+2. skip footprint checks for cross-shard rewrites.
+
+This document answers a narrower question:
+
+- what is actually true in Echo today,
+- what would have to be proven before either optimization is safe,
+- which idea is still worth investigating, and
+- which idea should be treated as suspect until stronger evidence exists.
+
+## Executive Summary
+
+1. The **k-way merge** idea is still plausible, but the current note overstates
+ why it works. The current executor returns **per-worker unsorted deltas**,
+ not per-shard canonical runs, so the required sorted-run invariant is not
+ yet established.
+2. The **shard-aware footprint skip** idea is much weaker against the current
+ implementation. The default scheduler is already the `GenSet`-based
+ `RadixScheduler`, whose reservation path scales like `O(m)` in candidate
+ footprint size rather than `O(k×m)` in the number of previously admitted
+ rewrites.
+3. The cross-shard independence claim is **not currently proven**. Shard routing
+ is by the scope node's `NodeId`, while footprint conflicts are checked over
+ warp-scoped nodes, edges, attachments, and ports. Those are not the same
+ keyspace, and current runtime enforcement does not prove they always align.
+4. Recommendation:
+ - keep investigating k-way merge, but only behind an explicit sorted-run
+ proof obligation and benchmark plan
+ - do **not** implement cross-shard footprint skipping until a stronger
+ locality invariant is proven
+
+## Current Code Reality
+
+### Parallel delta merge
+
+Today the merge path is:
+
+1. execute work in parallel
+2. collect one `TickDelta` per worker
+3. flatten all worker deltas
+4. sort globally by `(WarpOpKey, OpOrigin)`
+
+Relevant code:
+
+- `execute_parallel_sharded()` returns one `TickDelta` per worker in
+ `crates/warp-core/src/parallel/exec.rs`
+- `merge_deltas()` in `crates/warp-core/src/parallel/merge.rs` flattens all
+ worker outputs and sorts the combined vector
+- `TickDelta::into_parts_unsorted()` in `crates/warp-core/src/tick_delta.rs`
+ explicitly exposes unsorted emission order
+
+So the current implementation does **not** already materialize the
+"per-shard pre-sorted run" structure that the idea note assumes.
+
+### Scheduler complexity
+
+The default scheduler is the `RadixScheduler`, not the legacy
+`Vec` frontier scan.
+
+- `RadixScheduler::reserve()` uses generation-stamped sets (`GenSet`) for
+ membership checks in `crates/warp-core/src/scheduler.rs`
+- `docs/scheduler-warp-core.md` already documents the default path as `O(m)`,
+ where `m` is the candidate footprint size
+- the legacy pairwise frontier scan still exists as `LegacyScheduler`, but it
+ is not the default hot path
+
+This matters because the shard-aware footprint idea primarily helps pairwise
+all-frontier overlap checks. That is no longer the main scheduler algorithm.
+
+### Footprint keys vs shard keys
+
+Shard routing and footprint conflict detection are based on different data:
+
+- shard routing uses the scoped node's `NodeId` low bits in
+ `crates/warp-core/src/parallel/shard.rs`
+- footprint conflicts are checked over warp-scoped nodes, edges,
+ attachments, and ports in `crates/warp-core/src/footprint.rs` and
+ `crates/warp-core/src/scheduler.rs`
+
+Current runtime enforcement proves footprints are **warp-local**, not
+**shard-local**:
+
+- `FootprintGuard::new()` in `crates/warp-core/src/footprint_guard.rs`
+ asserts against cross-warp entries
+- it does not assert that every touched slot maps to the same shard as
+ `scope(r)`
+
+That distinction is exactly why the cross-shard skip needs a proof instead of a
+performance intuition.
+
+## 1. K-Way Merge Assessment
+
+### What would have to be true
+
+For a k-way merge to be a correct replacement for the current global sort, we
+need a family of runs `R1..Rk` such that:
+
+- each `Ri` is already sorted by the exact canonical order
+ `(WarpOpKey, OpOrigin)`, and
+- the current merged output is
+ `sort(flatten(R1..Rk))`
+
+Under those conditions, a standard heap-based merge is correct:
+
+```text
+kway_merge(R1..Rk) == sort(flatten(R1..Rk))
+```
+
+This is the real proof obligation. The current note skips straight to the
+conclusion without establishing that the merge inputs satisfy the premise.
+
+### What is true today
+
+What we actually have today is weaker:
+
+- the executor returns one `TickDelta` per worker, not per shard
+- workers may process many shards
+- `TickDelta` collects operations in emission order, not canonical order
+- canonical ordering is imposed later by `merge_deltas()`
+
+So "shard assignment exists" does **not** imply "merge inputs are sorted runs."
+
+### What about the 1-core / 1-worker case?
+
+That case is important because it exposes the missing invariant cleanly.
+
+If `k = 1`:
+
+- a k-way merge only helps if the single input run is already sorted by the
+ canonical key
+- otherwise we still need to sort, and the optimization collapses back into a
+ normal sort path
+
+So the "what if 1 shard because 1 CPU core" question has a direct answer:
+
+- if the single worker delta is unsorted, the k-way merge idea provides no
+ algorithmic win
+- if the single worker delta is already canonically sorted, then the merge is
+ effectively a linear pass, but that is a stronger invariant than the current
+ implementation documents
+
+### Recommendation
+
+The k-way merge idea remains worth investigating, but only in this order:
+
+1. decide whether Echo should produce **per-shard canonical runs** or
+ **per-worker canonical runs**
+2. prove or enforce that each run is already sorted by `(WarpOpKey, OpOrigin)`
+3. benchmark:
+ - current flatten-and-sort
+ - sort-each-run-plus-merge
+ - true pre-sorted k-way merge
+4. only keep the optimization if the canonical-output equality is explicit and
+ the benchmark win survives review
+
+## 2. Shard-Aware Footprint Skip Assessment
+
+### The claimed invariant
+
+The idea note assumes:
+
+```text
+shard(r1) != shard(r2) => footprint(r1) and footprint(r2) are disjoint
+```
+
+That is the key claim. Without it, skipping cross-shard overlap checks is not
+conservative, and the optimization is unsound.
+
+### Why the claim is not currently established
+
+The current code only guarantees:
+
+- the rewrite has a scoped node
+- shard routing is a deterministic function of that scope node
+- footprint slots are warp-scoped
+- footprint guards reject cross-warp entries
+
+It does **not** currently prove:
+
+- every node touched by the rewrite hashes to the same shard as the scope node
+- every edge touched by the rewrite hashes to that same shard
+- every attachment touched by the rewrite belongs to resources in that same shard
+- every boundary port touched by the rewrite belongs to that same shard
+
+That is enough to reject the note's current "structurally disjoint by
+construction" wording.
+
+### Could there still be overlapping footprints?
+
+Yes. Unless we prove a stronger locality invariant, the answer is plainly yes.
+
+The dangerous pattern is:
+
+```text
+rewrite r has scope node A
+shard(scope(A)) = s1
+rewrite body touches some other slot X
+shard(slot(X)) = s2
+```
+
+If `X` can differ from the scope shard, then two rewrites can land in different
+scope shards and still overlap through some footprinted slot.
+
+That possibility is enough to block the optimization until the invariant is
+settled.
+
+### Does the "1 / shard_count" math still matter?
+
+Only for a pairwise overlap algorithm.
+
+If we were still using the old pairwise frontier scan, then under a uniform
+distribution:
+
+- probability of same-shard pair: `1 / S`
+- expected candidate pairs surviving the shard gate:
+ `C(N, 2) / S`
+
+That math is fine as a performance estimate for the **legacy** pairwise model.
+
+But the current default scheduler is already `O(m)` with `GenSet`s, not
+`O(k×m)` over frontier size, so there is no honest shard-count crossover to
+claim against today's default path without fresh benchmarks.
+
+### What about the bloom-filter idea?
+
+The bloom-style same-shard prefilter is the least controversial part.
+
+If two footprints share a real slot and the filter is built from those slots,
+then they must share at least one set bit. Therefore:
+
+- `filter_a & filter_b == 0` implies no shared slot represented in the filter
+- false positives are possible
+- false negatives are not acceptable
+
+So the same-shard prefilter is conceptually fine as a conservative
+implementation detail. The problem is the earlier step: the current note has
+not earned the right to skip **cross-shard** checks yet.
+
+## Proof Obligations
+
+### K-Way Merge Benchmarks
+
+Before implementation, prove or enforce:
+
+1. every merge input run is sorted by `(WarpOpKey, OpOrigin)`
+2. the k-way merge produces byte-for-byte identical canonical output to the
+ current flatten-and-sort path
+3. dedupe and conflict detection semantics are unchanged
+
+Acceptable proof styles:
+
+- a direct design proof over sorted runs
+- property tests comparing `kway_merge` against `sort(flatten(...))`
+- deterministic regression tests across worker counts and shard layouts
+
+### Cross-shard footprint skip
+
+Before implementation, prove:
+
+```text
+for every rewrite r and every footprinted slot x in r,
+shard(x) == shard(scope(r))
+```
+
+Then derive:
+
+```text
+shard(r1) != shard(r2) => independent(r1, r2)
+```
+
+Until that implication is proved, the optimization should be treated as unsafe.
+
+Acceptable proof styles:
+
+- a written invariant tied to the rewrite API and enforced by runtime guards
+- property tests that generate rewrites and verify footprint slots stay on the
+ scope shard
+- bounded model checking if a sufficiently small executable model exists
+
+Formal-methods note:
+
+- a tool like Kani or a separate executable model could help for bounded cases
+- but the first useful step is still to write down the invariant precisely
+- without that, "use a formal tool" just formalizes an ambiguous claim
+
+## Benchmark Plan
+
+### K-Way Merge Kill Criteria
+
+Benchmark only after the sorted-run invariant is explicit.
+
+Compare:
+
+1. current flatten-and-sort
+2. sort-each-run then k-way merge
+3. true pre-sorted k-way merge
+
+Measure:
+
+- total merge wall time
+- allocation count / bytes
+- sensitivity to worker count
+- sensitivity to skewed shard distributions
+
+### Shard-aware footprint skipping
+
+Do not benchmark first. Prove the invariant first.
+
+If the invariant is ever proven, then benchmark against the current
+`RadixScheduler`, not against the legacy pairwise scheduler alone.
+
+## Kill Criteria
+
+### K-way merge
+
+Reject the optimization if any of these are true:
+
+- the merge inputs cannot be made individually canonical without an equivalent
+ sorting cost
+- the implementation complicates determinism reasoning materially
+- benchmarks do not show a real win on representative shard distributions
+
+### Shard-aware footprint skip
+
+Reject the optimization if any of these are true:
+
+- a rewrite can touch any slot outside the scope shard
+- runtime enforcement cannot cheaply verify the required locality invariant
+- the benchmark only beats the legacy scheduler but not the current default
+ `GenSet` scheduler
+
+## Final Recommendation
+
+Treat the two ideas differently.
+
+- **K-way merge:** keep alive as a plausible optimization candidate, but convert
+ it into a real design with explicit sorted-run obligations.
+- **Shard-aware footprint skip:** downgrade from "optimization candidate" to
+ "hypothesis requiring a proof." Until the stronger shard-locality invariant is
+ stated and enforced, it should not move toward implementation.
diff --git a/docs/plans/parallel-merge-and-footprint-optimizations.md b/docs/plans/parallel-merge-and-footprint-optimizations.md
index 37aa2842..f332d66d 100644
--- a/docs/plans/parallel-merge-and-footprint-optimizations.md
+++ b/docs/plans/parallel-merge-and-footprint-optimizations.md
@@ -5,6 +5,18 @@
**Status:** Ideas — not yet designed or scheduled
+See also the stricter review note:
+[Parallel Merge & Footprint Optimization Design Review](parallel-merge-and-footprint-design-review.md).
+
+Current disposition after code review:
+
+- k-way merge remains plausible, but only if merge inputs can be proven or
+ enforced to be individually sorted by the canonical `(WarpOpKey, OpOrigin)`
+ order
+- shard-aware cross-shard footprint skipping is **not** currently proven safe
+ against the default scheduler and should be treated as a hypothesis, not an
+ implementation-ready optimization
+
Two optimization opportunities for the parallel execution pipeline, both
exploiting structure that already exists in the shard-based architecture.
From 55f40217fe397549cec512d47f929424089cb76b Mon Sep 17 00:00:00 2001
From: James Ross
Date: Sat, 28 Mar 2026 07:36:44 -0700
Subject: [PATCH 02/15] feat(parallel): add shard policy benchmark matrix
---
crates/warp-benches/benches/README.md | 11 +
.../warp-benches/benches/parallel_baseline.rs | 69 +++-
crates/warp-core/src/lib.rs | 5 +-
crates/warp-core/src/parallel/exec.rs | 365 +++++++++++++++++-
crates/warp-core/src/parallel/mod.rs | 6 +-
5 files changed, 441 insertions(+), 15 deletions(-)
diff --git a/crates/warp-benches/benches/README.md b/crates/warp-benches/benches/README.md
index b84676aa..ee5da88a 100644
--- a/crates/warp-benches/benches/README.md
+++ b/crates/warp-benches/benches/README.md
@@ -24,6 +24,16 @@ results. This README summarizes how to run them and read the output.
- Throughput “elements” = rule applications (`n`). Uses `BatchSize::PerIteration`
so engine construction is excluded from timing.
+- `parallel_baseline.rs`
+ - Compares serial execution, the current shard-parallel baseline, the Phase 6B
+ work-queue pipeline, worker-count scaling, and the shard-policy matrix.
+ - The policy matrix compares:
+ - dynamic shard claiming + per-worker deltas
+ - dynamic shard claiming + per-shard deltas
+ - static round-robin shard assignment + per-worker deltas
+ - static round-robin shard assignment + per-shard deltas
+ - Throughput “elements” = executed items in the synthetic independent workload.
+
## Run
Run the full benches suite:
@@ -37,6 +47,7 @@ Run a single bench target (faster dev loop):
```sh
cargo bench -p warp-benches --bench snapshot_hash
cargo bench -p warp-benches --bench scheduler_drain
+cargo bench -p warp-benches --bench parallel_baseline
```
Criterion HTML reports are written under `target/criterion//report/index.html`.
diff --git a/crates/warp-benches/benches/parallel_baseline.rs b/crates/warp-benches/benches/parallel_baseline.rs
index 93e2284a..76682c99 100644
--- a/crates/warp-benches/benches/parallel_baseline.rs
+++ b/crates/warp-benches/benches/parallel_baseline.rs
@@ -20,14 +20,15 @@
//! - `serial_vs_parallel_N`: Compare parallel sharded execution vs serial baseline
//! - `work_queue_pipeline_N`: Full Phase 6B pipeline (build_work_units → execute_work_queue)
//! - `worker_scaling_100`: How throughput scales with worker count (1, 2, 4, 8, 16)
+//! - `policy_matrix_1000`: Compare shard assignment and delta accumulation policies directly
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput};
use std::collections::BTreeMap;
use std::time::Duration;
use warp_core::parallel::{build_work_units, execute_work_queue, WorkerResult};
use warp_core::{
- execute_parallel, execute_serial, make_node_id, make_type_id, make_warp_id, AtomPayload,
- AttachmentKey, AttachmentValue, ExecItem, GraphStore, GraphView, NodeId, NodeKey, NodeRecord,
- OpOrigin, TickDelta, WarpId, WarpOp,
+ execute_parallel, execute_parallel_with_policy, execute_serial, make_node_id, make_type_id,
+ make_warp_id, AtomPayload, AttachmentKey, AttachmentValue, ExecItem, GraphStore, GraphView,
+ NodeId, NodeKey, NodeRecord, OpOrigin, ParallelExecutionPolicy, TickDelta, WarpId, WarpOp,
};
/// Simple executor that sets an attachment on the scope node.
@@ -297,10 +298,70 @@ fn bench_worker_scaling(c: &mut Criterion) {
group.finish();
}
+// =============================================================================
+// Policy matrix comparison
+// =============================================================================
+
+fn policy_label(policy: ParallelExecutionPolicy) -> &'static str {
+ match policy {
+ ParallelExecutionPolicy::DYNAMIC_PER_WORKER => "dynamic_per_worker",
+ ParallelExecutionPolicy::DYNAMIC_PER_SHARD => "dynamic_per_shard",
+ ParallelExecutionPolicy::STATIC_PER_WORKER => "static_per_worker",
+ ParallelExecutionPolicy::STATIC_PER_SHARD => "static_per_shard",
+ }
+}
+
+/// Compares shard assignment and delta accumulation strategies directly.
+fn bench_policy_matrix(c: &mut Criterion) {
+ let mut group = c.benchmark_group("policy_matrix_1000");
+ group
+ .warm_up_time(Duration::from_secs(2))
+ .measurement_time(Duration::from_secs(5))
+ .sample_size(50);
+
+ const WORKLOAD_SIZE: usize = 1_000;
+ group.throughput(Throughput::Elements(WORKLOAD_SIZE as u64));
+
+ let policies = [
+ ParallelExecutionPolicy::DYNAMIC_PER_WORKER,
+ ParallelExecutionPolicy::DYNAMIC_PER_SHARD,
+ ParallelExecutionPolicy::STATIC_PER_WORKER,
+ ParallelExecutionPolicy::STATIC_PER_SHARD,
+ ];
+
+ for &workers in &[1usize, 4, 8] {
+ for policy in policies {
+ group.bench_with_input(
+ BenchmarkId::new(policy_label(policy), workers),
+ &workers,
+ |b, &workers| {
+ b.iter_batched(
+ || {
+ let (store, nodes) = make_test_store(WORKLOAD_SIZE);
+ let items = make_exec_items(&nodes);
+ (store, items)
+ },
+ |(store, items)| {
+ let view = GraphView::new(&store);
+ let deltas =
+ execute_parallel_with_policy(view, &items, workers, policy);
+ criterion::black_box(deltas)
+ },
+ BatchSize::SmallInput,
+ );
+ },
+ );
+ }
+ }
+
+ group.finish();
+}
+
criterion_group!(
benches,
bench_serial_vs_parallel,
bench_work_queue,
- bench_worker_scaling
+ bench_worker_scaling,
+ bench_policy_matrix
);
criterion_main!(benches);
diff --git a/crates/warp-core/src/lib.rs b/crates/warp-core/src/lib.rs
index 0ba4494c..595808c2 100644
--- a/crates/warp-core/src/lib.rs
+++ b/crates/warp-core/src/lib.rs
@@ -167,8 +167,9 @@ pub use ident::{
TypeId, WarpId,
};
pub use parallel::{
- execute_parallel, execute_parallel_sharded, execute_serial, shard_of, ExecItem, MergeConflict,
- PoisonedDelta, NUM_SHARDS,
+ execute_parallel, execute_parallel_sharded, execute_parallel_sharded_with_policy,
+ execute_parallel_with_policy, execute_serial, shard_of, DeltaAccumulationPolicy, ExecItem,
+ MergeConflict, ParallelExecutionPolicy, PoisonedDelta, ShardAssignmentPolicy, NUM_SHARDS,
};
/// Delta merging functions, only available with `delta_validate` feature.
///
diff --git a/crates/warp-core/src/parallel/exec.rs b/crates/warp-core/src/parallel/exec.rs
index a8655e7f..374cda80 100644
--- a/crates/warp-core/src/parallel/exec.rs
+++ b/crates/warp-core/src/parallel/exec.rs
@@ -20,6 +20,71 @@ use crate::NodeId;
use super::shard::{partition_into_shards, NUM_SHARDS};
+/// How virtual shards are assigned to workers during parallel execution.
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum ShardAssignmentPolicy {
+ /// Workers claim shards dynamically via an atomic counter.
+ DynamicSteal,
+ /// Shards are assigned deterministically to workers by `shard_id % workers`.
+ StaticRoundRobin,
+}
+
+/// How worker execution outputs are grouped into `TickDelta`s.
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum DeltaAccumulationPolicy {
+ /// Each worker accumulates all claimed shards into one `TickDelta`.
+ PerWorker,
+ /// Each non-empty shard produces its own `TickDelta`.
+ PerShard,
+}
+
+/// Execution policy for the shard-based parallel executor.
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub struct ParallelExecutionPolicy {
+ /// How shards are assigned to workers.
+ pub assignment: ShardAssignmentPolicy,
+ /// How execution outputs are grouped into deltas.
+ pub accumulation: DeltaAccumulationPolicy,
+}
+
+impl ParallelExecutionPolicy {
+ /// Current default execution policy used by `execute_parallel()`.
+ pub const DEFAULT: Self = Self {
+ assignment: ShardAssignmentPolicy::DynamicSteal,
+ accumulation: DeltaAccumulationPolicy::PerWorker,
+ };
+
+ /// Dynamic shard claiming with one output delta per worker.
+ pub const DYNAMIC_PER_WORKER: Self = Self {
+ assignment: ShardAssignmentPolicy::DynamicSteal,
+ accumulation: DeltaAccumulationPolicy::PerWorker,
+ };
+
+ /// Dynamic shard claiming with one output delta per non-empty shard.
+ pub const DYNAMIC_PER_SHARD: Self = Self {
+ assignment: ShardAssignmentPolicy::DynamicSteal,
+ accumulation: DeltaAccumulationPolicy::PerShard,
+ };
+
+ /// Deterministic round-robin shard assignment with one output delta per worker.
+ pub const STATIC_PER_WORKER: Self = Self {
+ assignment: ShardAssignmentPolicy::StaticRoundRobin,
+ accumulation: DeltaAccumulationPolicy::PerWorker,
+ };
+
+ /// Deterministic round-robin shard assignment with one output delta per non-empty shard.
+ pub const STATIC_PER_SHARD: Self = Self {
+ assignment: ShardAssignmentPolicy::StaticRoundRobin,
+ accumulation: DeltaAccumulationPolicy::PerShard,
+ };
+}
+
+impl Default for ParallelExecutionPolicy {
+ fn default() -> Self {
+ Self::DEFAULT
+ }
+}
+
/// Classification of an executor for footprint enforcement.
///
/// System items (engine-internal inbox rules) may emit instance-level ops
@@ -171,7 +236,12 @@ pub fn execute_parallel(view: GraphView<'_>, items: &[ExecItem], workers: usize)
// Cap workers at NUM_SHARDS - no point spawning 512 threads for 256 shards
let capped_workers = workers.min(NUM_SHARDS);
- execute_parallel_sharded(view, items, capped_workers)
+ execute_parallel_sharded_with_policy(
+ view,
+ items,
+ capped_workers,
+ ParallelExecutionPolicy::DEFAULT,
+ )
}
/// Parallel execution with virtual shard partitioning (Phase 6B).
@@ -198,6 +268,23 @@ pub fn execute_parallel_sharded(
view: GraphView<'_>,
items: &[ExecItem],
workers: usize,
+) -> Vec {
+ execute_parallel_sharded_with_policy(view, items, workers, ParallelExecutionPolicy::DEFAULT)
+}
+
+/// Parallel execution with an explicit shard assignment and delta accumulation policy.
+///
+/// This exposes the execution-policy matrix for benchmarking and experimentation
+/// while preserving `execute_parallel()` as the stable default entrypoint.
+///
+/// # Panics
+///
+/// Panics if `workers` is 0.
+pub fn execute_parallel_sharded_with_policy(
+ view: GraphView<'_>,
+ items: &[ExecItem],
+ workers: usize,
+ policy: ParallelExecutionPolicy,
) -> Vec {
assert!(workers > 0, "workers must be > 0");
@@ -208,6 +295,52 @@ pub fn execute_parallel_sharded(
// Partition into virtual shards by scope
let shards = partition_into_shards(items);
+ match (policy.assignment, policy.accumulation) {
+ (ShardAssignmentPolicy::DynamicSteal, DeltaAccumulationPolicy::PerWorker) => {
+ execute_dynamic_per_worker(view, &shards, workers)
+ }
+ (ShardAssignmentPolicy::DynamicSteal, DeltaAccumulationPolicy::PerShard) => {
+ execute_dynamic_per_shard(view, &shards, workers)
+ }
+ (ShardAssignmentPolicy::StaticRoundRobin, DeltaAccumulationPolicy::PerWorker) => {
+ execute_static_per_worker(view, &shards, workers)
+ }
+ (ShardAssignmentPolicy::StaticRoundRobin, DeltaAccumulationPolicy::PerShard) => {
+ execute_static_per_shard(view, &shards, workers)
+ }
+ }
+}
+
+/// Parallel execution entry point with an explicit policy and worker cap.
+///
+/// This mirrors `execute_parallel()` but exposes the policy seam for benchmarks.
+///
+/// # Panics
+///
+/// Panics if `workers == 0`.
+pub fn execute_parallel_with_policy(
+ view: GraphView<'_>,
+ items: &[ExecItem],
+ workers: usize,
+ policy: ParallelExecutionPolicy,
+) -> Vec {
+ assert!(workers >= 1, "need at least one worker");
+ let capped_workers = workers.min(NUM_SHARDS);
+ execute_parallel_sharded_with_policy(view, items, capped_workers, policy)
+}
+
+fn execute_shard_into_delta(view: GraphView<'_>, items: &[ExecItem], delta: &mut TickDelta) {
+ for item in items {
+ let mut scoped = delta.scoped(item.origin);
+ (item.exec)(view, &item.scope, scoped.inner_mut());
+ }
+}
+
+fn execute_dynamic_per_worker(
+ view: GraphView<'_>,
+ shards: &[super::shard::VirtualShard],
+ workers: usize,
+) -> Vec {
let next_shard = AtomicUsize::new(0);
std::thread::scope(|s| {
@@ -219,21 +352,90 @@ pub fn execute_parallel_sharded(
s.spawn(move || {
let mut delta = TickDelta::new();
-
- // Work-stealing loop: claim shards until none remain
loop {
let shard_id = next_shard.fetch_add(1, Ordering::Relaxed);
if shard_id >= NUM_SHARDS {
break;
}
+ execute_shard_into_delta(view_copy, &shards[shard_id].items, &mut delta);
+ }
+ delta
+ })
+ })
+ .collect();
+
+ handles
+ .into_iter()
+ .map(|h| match h.join() {
+ Ok(delta) => delta,
+ Err(e) => std::panic::resume_unwind(e),
+ })
+ .collect()
+ })
+}
- // Execute all items in this shard (cache locality)
- for item in &shards[shard_id].items {
- let mut scoped = delta.scoped(item.origin);
- (item.exec)(view_copy, &item.scope, scoped.inner_mut());
+fn execute_dynamic_per_shard(
+ view: GraphView<'_>,
+ shards: &[super::shard::VirtualShard],
+ workers: usize,
+) -> Vec {
+ let next_shard = AtomicUsize::new(0);
+
+ std::thread::scope(|s| {
+ let handles: Vec<_> = (0..workers)
+ .map(|_| {
+ let view_copy = view;
+ let shards = &shards;
+ let next_shard = &next_shard;
+
+ s.spawn(move || {
+ let mut deltas: Vec<(usize, TickDelta)> = Vec::new();
+ loop {
+ let shard_id = next_shard.fetch_add(1, Ordering::Relaxed);
+ if shard_id >= NUM_SHARDS {
+ break;
+ }
+ let items = &shards[shard_id].items;
+ if items.is_empty() {
+ continue;
}
+ let mut delta = TickDelta::new();
+ execute_shard_into_delta(view_copy, items, &mut delta);
+ deltas.push((shard_id, delta));
}
+ deltas
+ })
+ })
+ .collect();
+ let mut deltas: Vec<(usize, TickDelta)> = handles
+ .into_iter()
+ .flat_map(|h| match h.join() {
+ Ok(worker_deltas) => worker_deltas,
+ Err(e) => std::panic::resume_unwind(e),
+ })
+ .collect();
+ deltas.sort_by_key(|(shard_id, _)| *shard_id);
+ deltas.into_iter().map(|(_, delta)| delta).collect()
+ })
+}
+
+fn execute_static_per_worker(
+ view: GraphView<'_>,
+ shards: &[super::shard::VirtualShard],
+ workers: usize,
+) -> Vec {
+ std::thread::scope(|s| {
+ let handles: Vec<_> = (0..workers)
+ .map(|worker_ix| {
+ let view_copy = view;
+ let shards = &shards;
+
+ s.spawn(move || {
+ let mut delta = TickDelta::new();
+ for shard_id in (worker_ix..NUM_SHARDS).step_by(workers) {
+ execute_shard_into_delta(view_copy, &shards[shard_id].items, &mut delta);
+ }
delta
})
})
@@ -249,6 +451,45 @@ pub fn execute_parallel_sharded(
})
}
+fn execute_static_per_shard(
+ view: GraphView<'_>,
+ shards: &[super::shard::VirtualShard],
+ workers: usize,
+) -> Vec {
+ std::thread::scope(|s| {
+ let handles: Vec<_> = (0..workers)
+ .map(|worker_ix| {
+ let view_copy = view;
+ let shards = &shards;
+
+ s.spawn(move || {
+ let mut deltas: Vec<(usize, TickDelta)> = Vec::new();
+ for shard_id in (worker_ix..NUM_SHARDS).step_by(workers) {
+ let items = &shards[shard_id].items;
+ if items.is_empty() {
+ continue;
+ }
+ let mut delta = TickDelta::new();
+ execute_shard_into_delta(view_copy, items, &mut delta);
+ deltas.push((shard_id, delta));
+ }
+ deltas
+ })
+ })
+ .collect();
+
+ let mut deltas: Vec<(usize, TickDelta)> = handles
+ .into_iter()
+ .flat_map(|h| match h.join() {
+ Ok(worker_deltas) => worker_deltas,
+ Err(e) => std::panic::resume_unwind(e),
+ })
+ .collect();
+ deltas.sort_by_key(|(shard_id, _)| *shard_id);
+ deltas.into_iter().map(|(_, delta)| delta).collect()
+ })
+}
+
// =============================================================================
// Cross-Warp Parallelism (Phase 6B+)
// =============================================================================
@@ -529,3 +770,113 @@ fn execute_item_enforced(
Ok(delta)
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::{
+ execute_parallel_with_policy, DeltaAccumulationPolicy, ExecItem, ParallelExecutionPolicy,
+ ShardAssignmentPolicy,
+ };
+ use crate::{
+ make_type_id, merge_deltas_ok, AtomPayload, AttachmentKey, AttachmentValue, GraphStore,
+ GraphView, NodeId, NodeKey, NodeRecord, OpOrigin, TickDelta, WarpOp,
+ };
+
+ fn test_executor(view: GraphView<'_>, scope: &NodeId, delta: &mut TickDelta) {
+ let payload = AtomPayload::new(
+ make_type_id("parallel/policy-test"),
+ bytes::Bytes::from_static(b"ok"),
+ );
+ let key = AttachmentKey::node_alpha(NodeKey {
+ warp_id: view.warp_id(),
+ local_id: *scope,
+ });
+ delta.push(WarpOp::SetAttachment {
+ key,
+ value: Some(AttachmentValue::Atom(payload)),
+ });
+ }
+
+ fn make_store_and_items(count: usize) -> (GraphStore, Vec) {
+ let mut store = GraphStore::default();
+ let node_ty = make_type_id("parallel/policy-node");
+ let mut items = Vec::with_capacity(count);
+ for i in 0..count {
+ let mut bytes = [0u8; 32];
+ bytes[0] = i as u8;
+ let scope = NodeId(bytes);
+ store.insert_node(scope, NodeRecord { ty: node_ty });
+ items.push(ExecItem::new(
+ test_executor,
+ scope,
+ OpOrigin {
+ intent_id: i as u64,
+ rule_id: 1,
+ match_ix: 0,
+ op_ix: 0,
+ },
+ ));
+ }
+ (store, items)
+ }
+
+ #[test]
+ fn all_parallel_policies_preserve_merged_ops() {
+ let policies = [
+ ParallelExecutionPolicy::DYNAMIC_PER_WORKER,
+ ParallelExecutionPolicy::DYNAMIC_PER_SHARD,
+ ParallelExecutionPolicy::STATIC_PER_WORKER,
+ ParallelExecutionPolicy::STATIC_PER_SHARD,
+ ];
+ let (store, items) = make_store_and_items(32);
+ let view = GraphView::new(&store);
+ let baseline = merge_deltas_ok(execute_parallel_with_policy(
+ view,
+ &items,
+ 4,
+ ParallelExecutionPolicy::DYNAMIC_PER_WORKER,
+ ))
+ .expect("baseline merge failed");
+
+ for policy in policies {
+ let deltas = execute_parallel_with_policy(view, &items, 4, policy);
+ let merged = merge_deltas_ok(deltas).expect("policy merge failed");
+ assert_eq!(merged, baseline, "policy {policy:?} changed merged ops");
+ }
+ }
+
+ #[test]
+ fn per_shard_policy_emits_more_than_one_delta_when_one_worker_sees_many_shards() {
+ let (store, items) = make_store_and_items(8);
+ let view = GraphView::new(&store);
+
+ let per_worker = execute_parallel_with_policy(
+ view,
+ &items,
+ 1,
+ ParallelExecutionPolicy {
+ assignment: ShardAssignmentPolicy::DynamicSteal,
+ accumulation: DeltaAccumulationPolicy::PerWorker,
+ },
+ );
+ let per_shard = execute_parallel_with_policy(
+ view,
+ &items,
+ 1,
+ ParallelExecutionPolicy {
+ assignment: ShardAssignmentPolicy::DynamicSteal,
+ accumulation: DeltaAccumulationPolicy::PerShard,
+ },
+ );
+
+ assert_eq!(
+ per_worker.len(),
+ 1,
+ "per-worker policy should emit one delta"
+ );
+ assert!(
+ per_shard.len() > 1,
+ "per-shard policy should emit multiple deltas when one worker processes multiple shards"
+ );
+ }
+}
diff --git a/crates/warp-core/src/parallel/mod.rs b/crates/warp-core/src/parallel/mod.rs
index 6f4ec66b..cc7ceaa3 100644
--- a/crates/warp-core/src/parallel/mod.rs
+++ b/crates/warp-core/src/parallel/mod.rs
@@ -12,8 +12,10 @@ pub mod shard;
#[cfg(not(feature = "unsafe_graph"))]
pub(crate) use exec::ExecItemKind;
pub use exec::{
- build_work_units, execute_parallel, execute_parallel_sharded, execute_serial,
- execute_work_queue, ExecItem, PoisonedDelta, WorkUnit, WorkerResult,
+ build_work_units, execute_parallel, execute_parallel_sharded,
+ execute_parallel_sharded_with_policy, execute_parallel_with_policy, execute_serial,
+ execute_work_queue, DeltaAccumulationPolicy, ExecItem, ParallelExecutionPolicy, PoisonedDelta,
+ ShardAssignmentPolicy, WorkUnit, WorkerResult,
};
#[cfg(not(any(test, feature = "delta_validate")))]
pub(crate) use merge::check_write_to_new_warp;
From f6cf00dbc43dc8bd6deee2601ed691d70e82bfbf Mon Sep 17 00:00:00 2001
From: James Ross
Date: Sat, 28 Mar 2026 11:03:31 -0700
Subject: [PATCH 03/15] feat(bench): add shard policy matrix study
---
Makefile | 26 +-
crates/warp-benches/benches/README.md | 3 +
.../warp-benches/benches/parallel_baseline.rs | 62 +-
crates/warp-core/src/parallel/exec.rs | 49 ++
docs/benchmarks/PARALLEL_POLICY_MATRIX.md | 75 ++
.../parallel-policy-matrix-inline.html | 770 ++++++++++++++++++
docs/benchmarks/parallel-policy-matrix.html | 376 +++++++++
docs/benchmarks/parallel-policy-matrix.json | 395 +++++++++
scripts/bench_parallel_policy_bake.py | 164 ++++
9 files changed, 1902 insertions(+), 18 deletions(-)
create mode 100644 docs/benchmarks/PARALLEL_POLICY_MATRIX.md
create mode 100644 docs/benchmarks/parallel-policy-matrix-inline.html
create mode 100644 docs/benchmarks/parallel-policy-matrix.html
create mode 100644 docs/benchmarks/parallel-policy-matrix.json
create mode 100644 scripts/bench_parallel_policy_bake.py
diff --git a/Makefile b/Makefile
index 58a7d031..2c9e35b2 100644
--- a/Makefile
+++ b/Makefile
@@ -147,7 +147,7 @@ bench-stop:
echo "[bench] No PID file at target/bench_http.pid"; \
fi
-.PHONY: bench-bake bench-open-inline
+.PHONY: bench-bake bench-open-inline bench-policy-bake bench-policy-export bench-policy-open-inline
# Bake a standalone HTML with inline data that works over file://
bench-bake: vendor-d3
@@ -161,6 +161,30 @@ bench-bake: vendor-d3
bench-open-inline:
@open docs/benchmarks/report-inline.html
+bench-policy-export: vendor-d3
+ @echo "Exporting parallel policy matrix JSON + inline HTML..."
+ @python3 scripts/bench_parallel_policy_bake.py \
+ --json-out docs/benchmarks/parallel-policy-matrix.json \
+ --html-out docs/benchmarks/parallel-policy-matrix-inline.html
+ @pnpm exec prettier --write docs/benchmarks/parallel-policy-matrix-inline.html >/dev/null
+
+bench-policy-bake: vendor-d3
+ @echo "Running parallel policy matrix benchmarks..."
+ cargo bench -p warp-benches --bench parallel_baseline -- parallel_policy_matrix
+ @$(MAKE) bench-policy-export
+ @if [ -n "$(OPEN)" ]; then \
+ $(OPEN) docs/benchmarks/parallel-policy-matrix-inline.html >/dev/null 2>&1 || echo "Open file: docs/benchmarks/parallel-policy-matrix-inline.html" ; \
+ else \
+ echo "Open file: docs/benchmarks/parallel-policy-matrix-inline.html" ; \
+ fi
+
+bench-policy-open-inline:
+ @if [ -n "$(OPEN)" ]; then \
+ $(OPEN) docs/benchmarks/parallel-policy-matrix-inline.html >/dev/null 2>&1 || echo "Open file: docs/benchmarks/parallel-policy-matrix-inline.html" ; \
+ else \
+ echo "Open file: docs/benchmarks/parallel-policy-matrix-inline.html" ; \
+ fi
+
# Spec-000 (WASM) helpers
.PHONY: spec-000-dev spec-000-build
diff --git a/crates/warp-benches/benches/README.md b/crates/warp-benches/benches/README.md
index ee5da88a..b204c844 100644
--- a/crates/warp-benches/benches/README.md
+++ b/crates/warp-benches/benches/README.md
@@ -32,6 +32,9 @@ results. This README summarizes how to run them and read the output.
- dynamic shard claiming + per-shard deltas
- static round-robin shard assignment + per-worker deltas
- static round-robin shard assignment + per-shard deltas
+ - dedicated one-worker-per-shard + one-delta-per-shard
+ - The policy matrix runs across loads `100`, `1000`, and `10000`, with worker
+ counts `1`, `4`, and `8` where the policy uses a worker pool.
- Throughput “elements” = executed items in the synthetic independent workload.
## Run
diff --git a/crates/warp-benches/benches/parallel_baseline.rs b/crates/warp-benches/benches/parallel_baseline.rs
index 76682c99..c931cde6 100644
--- a/crates/warp-benches/benches/parallel_baseline.rs
+++ b/crates/warp-benches/benches/parallel_baseline.rs
@@ -20,11 +20,14 @@
//! - `serial_vs_parallel_N`: Compare parallel sharded execution vs serial baseline
//! - `work_queue_pipeline_N`: Full Phase 6B pipeline (build_work_units → execute_work_queue)
//! - `worker_scaling_100`: How throughput scales with worker count (1, 2, 4, 8, 16)
-//! - `policy_matrix_1000`: Compare shard assignment and delta accumulation policies directly
+//! - `parallel_policy_matrix`: Compare shard assignment and delta accumulation policies across loads
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput};
use std::collections::BTreeMap;
use std::time::Duration;
-use warp_core::parallel::{build_work_units, execute_work_queue, WorkerResult};
+use warp_core::parallel::{
+ build_work_units, execute_work_queue, DeltaAccumulationPolicy, ShardAssignmentPolicy,
+ WorkerResult,
+};
use warp_core::{
execute_parallel, execute_parallel_with_policy, execute_serial, make_node_id, make_type_id,
make_warp_id, AtomPayload, AttachmentKey, AttachmentValue, ExecItem, GraphStore, GraphView,
@@ -308,49 +311,74 @@ fn policy_label(policy: ParallelExecutionPolicy) -> &'static str {
ParallelExecutionPolicy::DYNAMIC_PER_SHARD => "dynamic_per_shard",
ParallelExecutionPolicy::STATIC_PER_WORKER => "static_per_worker",
ParallelExecutionPolicy::STATIC_PER_SHARD => "static_per_shard",
+ ParallelExecutionPolicy {
+ assignment: ShardAssignmentPolicy::DedicatedPerShard,
+ accumulation: DeltaAccumulationPolicy::PerWorker,
+ } => "dedicated_per_worker",
+ ParallelExecutionPolicy::DEDICATED_PER_SHARD => "dedicated_per_shard",
}
}
/// Compares shard assignment and delta accumulation strategies directly.
fn bench_policy_matrix(c: &mut Criterion) {
- let mut group = c.benchmark_group("policy_matrix_1000");
+ let mut group = c.benchmark_group("parallel_policy_matrix");
group
.warm_up_time(Duration::from_secs(2))
.measurement_time(Duration::from_secs(5))
- .sample_size(50);
-
- const WORKLOAD_SIZE: usize = 1_000;
- group.throughput(Throughput::Elements(WORKLOAD_SIZE as u64));
+ .sample_size(40);
let policies = [
ParallelExecutionPolicy::DYNAMIC_PER_WORKER,
ParallelExecutionPolicy::DYNAMIC_PER_SHARD,
ParallelExecutionPolicy::STATIC_PER_WORKER,
ParallelExecutionPolicy::STATIC_PER_SHARD,
+ ParallelExecutionPolicy::DEDICATED_PER_SHARD,
];
- for &workers in &[1usize, 4, 8] {
+ for &n in &[100usize, 1_000, 10_000] {
+ group.throughput(Throughput::Elements(n as u64));
for policy in policies {
- group.bench_with_input(
- BenchmarkId::new(policy_label(policy), workers),
- &workers,
- |b, &workers| {
+ if policy == ParallelExecutionPolicy::DEDICATED_PER_SHARD {
+ group.bench_with_input(BenchmarkId::new(policy_label(policy), n), &n, |b, &n| {
b.iter_batched(
|| {
- let (store, nodes) = make_test_store(WORKLOAD_SIZE);
+ let (store, nodes) = make_test_store(n);
let items = make_exec_items(&nodes);
(store, items)
},
|(store, items)| {
let view = GraphView::new(&store);
- let deltas =
- execute_parallel_with_policy(view, &items, workers, policy);
+ let deltas = execute_parallel_with_policy(view, &items, 1, policy);
criterion::black_box(deltas)
},
BatchSize::SmallInput,
);
- },
- );
+ });
+ continue;
+ }
+
+ for &workers in &[1usize, 4, 8] {
+ group.bench_with_input(
+ BenchmarkId::new(format!("{}/{}w", policy_label(policy), workers), n),
+ &n,
+ |b, &n| {
+ b.iter_batched(
+ || {
+ let (store, nodes) = make_test_store(n);
+ let items = make_exec_items(&nodes);
+ (store, items)
+ },
+ |(store, items)| {
+ let view = GraphView::new(&store);
+ let deltas =
+ execute_parallel_with_policy(view, &items, workers, policy);
+ criterion::black_box(deltas)
+ },
+ BatchSize::SmallInput,
+ );
+ },
+ );
+ }
}
}
diff --git a/crates/warp-core/src/parallel/exec.rs b/crates/warp-core/src/parallel/exec.rs
index 374cda80..c939a2a2 100644
--- a/crates/warp-core/src/parallel/exec.rs
+++ b/crates/warp-core/src/parallel/exec.rs
@@ -27,6 +27,12 @@ pub enum ShardAssignmentPolicy {
DynamicSteal,
/// Shards are assigned deterministically to workers by `shard_id % workers`.
StaticRoundRobin,
+ /// Each non-empty shard gets its own worker thread.
+ ///
+ /// This is primarily a benchmarking / comparison policy, not the default
+ /// engine topology. It intentionally maximizes scheduling isolation at the
+ /// cost of spawning up to one thread per non-empty shard.
+ DedicatedPerShard,
}
/// How worker execution outputs are grouped into `TickDelta`s.
@@ -77,6 +83,12 @@ impl ParallelExecutionPolicy {
assignment: ShardAssignmentPolicy::StaticRoundRobin,
accumulation: DeltaAccumulationPolicy::PerShard,
};
+
+ /// One worker per non-empty shard with one output delta per shard.
+ pub const DEDICATED_PER_SHARD: Self = Self {
+ assignment: ShardAssignmentPolicy::DedicatedPerShard,
+ accumulation: DeltaAccumulationPolicy::PerShard,
+ };
}
impl Default for ParallelExecutionPolicy {
@@ -308,6 +320,10 @@ pub fn execute_parallel_sharded_with_policy(
(ShardAssignmentPolicy::StaticRoundRobin, DeltaAccumulationPolicy::PerShard) => {
execute_static_per_shard(view, &shards, workers)
}
+ (
+ ShardAssignmentPolicy::DedicatedPerShard,
+ DeltaAccumulationPolicy::PerWorker | DeltaAccumulationPolicy::PerShard,
+ ) => execute_dedicated_per_shard(view, &shards),
}
}
@@ -490,6 +506,38 @@ fn execute_static_per_shard(
})
}
+fn execute_dedicated_per_shard(
+ view: GraphView<'_>,
+ shards: &[super::shard::VirtualShard],
+) -> Vec {
+ std::thread::scope(|s| {
+ let handles: Vec<_> = shards
+ .iter()
+ .enumerate()
+ .filter(|(_, shard)| !shard.items.is_empty())
+ .map(|(shard_id, shard)| {
+ let view_copy = view;
+ let items = &shard.items;
+ s.spawn(move || {
+ let mut delta = TickDelta::new();
+ execute_shard_into_delta(view_copy, items, &mut delta);
+ (shard_id, delta)
+ })
+ })
+ .collect();
+
+ let mut deltas: Vec<(usize, TickDelta)> = handles
+ .into_iter()
+ .map(|h| match h.join() {
+ Ok(delta) => delta,
+ Err(e) => std::panic::resume_unwind(e),
+ })
+ .collect();
+ deltas.sort_by_key(|(shard_id, _)| *shard_id);
+ deltas.into_iter().map(|(_, delta)| delta).collect()
+ })
+}
+
// =============================================================================
// Cross-Warp Parallelism (Phase 6B+)
// =============================================================================
@@ -827,6 +875,7 @@ mod tests {
ParallelExecutionPolicy::DYNAMIC_PER_SHARD,
ParallelExecutionPolicy::STATIC_PER_WORKER,
ParallelExecutionPolicy::STATIC_PER_SHARD,
+ ParallelExecutionPolicy::DEDICATED_PER_SHARD,
];
let (store, items) = make_store_and_items(32);
let view = GraphView::new(&store);
diff --git a/docs/benchmarks/PARALLEL_POLICY_MATRIX.md b/docs/benchmarks/PARALLEL_POLICY_MATRIX.md
new file mode 100644
index 00000000..2f89e8bd
--- /dev/null
+++ b/docs/benchmarks/PARALLEL_POLICY_MATRIX.md
@@ -0,0 +1,75 @@
+
+
+
+# Parallel Policy Matrix Benchmark
+
+## Purpose
+
+This benchmark compares shard execution topology choices, not just raw worker
+count:
+
+- dynamic shard claiming + one delta per worker
+- dynamic shard claiming + one delta per shard
+- static round-robin shard assignment + one delta per worker
+- static round-robin shard assignment + one delta per shard
+- dedicated one-worker-per-shard + one delta per shard
+
+The point is to answer a narrower question than "is parallel good?":
+
+- which shard assignment policy is cheaper,
+- which delta grouping policy is cheaper, and
+- whether "one worker = one shard = one delta" is ever worth the overhead.
+
+## Loads
+
+The benchmark currently runs at:
+
+- `100`
+- `1000`
+- `10000`
+
+For pooled-worker policies, it also varies worker counts:
+
+- `1`
+- `4`
+- `8`
+
+The dedicated per-shard policy intentionally ignores the worker-count knob and
+spawns one thread per non-empty shard.
+
+## Outputs
+
+Running the dedicated bake target produces:
+
+- raw JSON: [parallel-policy-matrix.json](/Users/james/git/echo/docs/benchmarks/parallel-policy-matrix.json)
+- baked static HTML:
+ [parallel-policy-matrix-inline.html](/Users/james/git/echo/docs/benchmarks/parallel-policy-matrix-inline.html)
+
+Criterion's original raw estimates remain under `target/criterion/parallel_policy_matrix/`.
+
+## Commands
+
+Run the targeted policy study and bake outputs:
+
+```sh
+make bench-policy-bake
+```
+
+If benchmark results already exist and you only want to regenerate JSON + HTML:
+
+```sh
+make bench-policy-export
+```
+
+To inspect the registered benchmark cases without running them:
+
+```sh
+cargo bench -p warp-benches --bench parallel_baseline -- --list
+```
+
+## Notes
+
+- The benchmark measures execution topology overhead on a synthetic independent
+ workload. It is not a substitute for end-to-end engine traces.
+- The dedicated per-shard policy is primarily a comparison tool. It is expected
+ to pay substantial thread-spawn overhead, especially at higher shard counts.
diff --git a/docs/benchmarks/parallel-policy-matrix-inline.html b/docs/benchmarks/parallel-policy-matrix-inline.html
new file mode 100644
index 00000000..57377e29
--- /dev/null
+++ b/docs/benchmarks/parallel-policy-matrix-inline.html
@@ -0,0 +1,770 @@
+
+
+
+
+
+
+
+ Echo Parallel Policy Matrix
+
+
+
+
+
Parallel Policy Matrix
+
+ Compare shard assignment and delta accumulation strategies
+ across multiple loads. Raw data lives in
+ parallel-policy-matrix.json; this page is the
+ lightweight static view.
+
+
+
+
+
+
What This Compares
+
+
+ dynamic_per_worker: dynamic shard stealing,
+ one delta per worker
+
+
+ dynamic_per_shard: dynamic shard stealing,
+ one delta per shard
+
+
+ static_per_worker: round-robin shard
+ assignment, one delta per worker
+
+
+ static_per_shard: round-robin shard
+ assignment, one delta per shard
+
+
+ dedicated_per_shard: one worker thread per
+ non-empty shard, one delta per shard
+
+ Compare shard assignment and delta accumulation strategies
+ across multiple loads. Raw data lives in
+ parallel-policy-matrix.json; this page is the
+ lightweight static view.
+
+
+
+
+
+
What This Compares
+
+
+ dynamic_per_worker: dynamic shard stealing,
+ one delta per worker
+
+
+ dynamic_per_shard: dynamic shard stealing,
+ one delta per shard
+
+
+ static_per_worker: round-robin shard
+ assignment, one delta per worker
+
+
+ static_per_shard: round-robin shard
+ assignment, one delta per shard
+
+
+ dedicated_per_shard: one worker thread per
+ non-empty shard, one delta per shard
+
One page for the runtime story and the policy dogfight
+
+ This report keeps the core scheduler-overhead view and the
+ newer parallel policy study in one static page. The core tab
+ tracks the steady-state budget story; the policy tab
+ compares sharding and delta accumulation strategies under
+ the same synthetic independent workload.
+
+
+
+
+
+
+
+
+
Deterministic scheduler overhead
+
+ This tab measures the stable runtime path: snapshot
+ hashing and scheduler drain behavior as input size
+ grows. Lower is better. The budget view keeps the
+ frame-budget context visible, while the complexity view
+ makes scaling trends easier to see.
+
+
+ Loading benchmark data...
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Breakout tables
+
+
+
+
Missing data
+
+
+
+
+
+
+
+
Goal, method, and metric
+
+ The policy study isolates topology overhead inside
+ execute_parallel_with_policy. Every case
+ runs the same independent workload, so the only moving
+ parts are shard assignment, worker shape, and delta
+ accumulation policy.
+
+
+
+
What each case actually does
+
+
+ Build a fresh graph store with independent
+ benchmark nodes.
+
+
+ Create one ExecItem per node
+ with the same executor.
+
+
+ Run the exact same item set under one policy
+ at one load.
+
+
+ Black-box the returned deltas so Criterion
+ measures the real path.
+
+
+
+
+
How it is measured
+
+
+ Harness: Criterion group
+ parallel_policy_matrix
+
+
+ Loads: 100, 1000,
+ 10000
+
+
Warmup: 2s
+
Measurement window: 5s
+
Sample size: 40
+
+ Primary metric: mean runtime per invocation
+
+
+ Hover tooltips show exact mean and
+ confidence interval
+
+
+
+
+
+ The graph below draws every measured policy/load point.
+ Color tracks policy family. Line style tracks worker
+ shape. The English commentary and comparison matrix
+ intentionally avoid numbers so the performance story is
+ readable at a glance; the raw table keeps the exact
+ timings.
+
+
+
+
+
+
Plain-English play-by-play
+
+
+
+
Comparison matrix in English
+
+
+
+
+
+
All policy data on one chart
+
+ Every series is shown here. Hover any point for exact
+ details. Dedicated-per-shard stays on the same axes as
+ the pooled-worker cases so you can see whether it ever
+ catches up.
+