From b3a0d482b6810613b11273356d6bac43802aef73 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 30 Mar 2026 21:52:35 -0400 Subject: [PATCH 1/3] Less quadratic half_join --- dogsdogsdogs/src/operators/half_join2.rs | 384 +++++++++++++++++++++++ dogsdogsdogs/src/operators/mod.rs | 4 +- 2 files changed, 387 insertions(+), 1 deletion(-) create mode 100644 dogsdogsdogs/src/operators/half_join2.rs diff --git a/dogsdogsdogs/src/operators/half_join2.rs b/dogsdogsdogs/src/operators/half_join2.rs new file mode 100644 index 000000000..b7eac072c --- /dev/null +++ b/dogsdogsdogs/src/operators/half_join2.rs @@ -0,0 +1,384 @@ +/// Streaming asymmetric join between updates (K,V1) and an arrangement on (K,V2). +/// +/// The asymmetry is that the join only responds to streamed updates, not to changes in the arrangement. +/// Streamed updates join only with matching arranged updates at lesser times *in the total order*, and +/// subject to a predicate supplied by the user (roughly: strictly less, or not). +/// +/// This behavior can ensure that any pair of matching updates interact exactly once. +/// +/// There are various forms of this operator with tangled closures about how to emit the outputs and +/// wrangle the logical compaction frontier in order to preserve the distinctions around times that are +/// strictly less (conventional compaction logic would collapse unequal times to the frontier, and lose +/// the distiction). +/// +/// The methods also carry an auxiliary time next to the value, which is used to advance the joined times. +/// This is .. a byproduct of wanting to allow advancing times a la `join_function`, without breaking the +/// coupling by total order on "initial time". +/// +/// The doccomments for individual methods are a bit of a mess. Sorry. + +use std::collections::VecDeque; +use std::ops::Mul; + +use timely::ContainerBuilder; +use timely::container::CapacityContainerBuilder; +use timely::dataflow::{Scope, ScopeParent, Stream}; +use timely::dataflow::channels::pact::{Pipeline, Exchange}; +use timely::dataflow::operators::{Capability, Operator, generic::Session}; +use timely::PartialOrder; +use timely::progress::{Antichain, ChangeBatch, Timestamp}; +use timely::progress::frontier::MutableAntichain; + +use differential_dataflow::{ExchangeData, VecCollection, AsCollection, Hashable}; +use differential_dataflow::difference::{Monoid, Semigroup}; +use differential_dataflow::lattice::Lattice; +use differential_dataflow::operators::arrange::Arranged; +use differential_dataflow::trace::{Cursor, TraceReader}; +use differential_dataflow::consolidation::{consolidate, consolidate_updates}; +use differential_dataflow::trace::implementations::BatchContainer; + +use timely::dataflow::operators::CapabilitySet; + +/// A binary equijoin that responds to updates on only its first input. +/// +/// This operator responds to inputs of the form +/// +/// ```ignore +/// ((key, val1, time1), initial_time, diff1) +/// ``` +/// +/// where `initial_time` is less or equal to `time1`, and produces as output +/// +/// ```ignore +/// ((output_func(key, val1, val2), lub(time1, time2)), initial_time, diff1 * diff2) +/// ``` +/// +/// for each `((key, val2), time2, diff2)` present in `arrangement`, where +/// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*. +/// This last constraint is important to ensure that we correctly produce +/// all pairs of output updates across multiple `half_join` operators. +/// +/// Notice that the time is hoisted up into data. The expectation is that +/// once out of the "delta flow region", the updates will be `delay`d to the +/// times specified in the payloads. +pub fn half_join( + stream: VecCollection, + arrangement: Arranged, + frontier_func: FF, + comparison: CF, + mut output_func: S, +) -> VecCollection>::Output> +where + G: Scope, + K: Hashable + ExchangeData, + V: ExchangeData, + R: ExchangeData + Monoid, + Tr: TraceReader+Clone+'static, + R: Mul, + FF: Fn(&G::Timestamp, &mut Antichain) + 'static, + CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static, + DOut: Clone+'static, + S: FnMut(&K, &V, Tr::Val<'_>)->DOut+'static, +{ + let output_func = move |session: &mut SessionFor, k: &K, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, diff1: &R, output: &mut Vec<(G::Timestamp, Tr::Diff)>| { + for (time, diff2) in output.drain(..) { + let diff = diff1.clone() * diff2.clone(); + let dout = (output_func(k, v1, v2), time.clone()); + session.give((dout, initial.clone(), diff)); + } + }; + half_join_internal_unsafe::<_, _, _, _, _, _,_,_,_, CapacityContainerBuilder>>(stream, arrangement, frontier_func, comparison, |_timer, _count| false, output_func) + .as_collection() +} + +/// A session with lifetime `'a` in a scope `G` with a container builder `CB`. +/// +/// This is a shorthand primarily for the reson of readability. +type SessionFor<'a, 'b, G, CB> = + Session<'a, 'b, + ::Timestamp, + CB, + Capability<::Timestamp>, + >; + +/// An unsafe variant of `half_join` where the `output_func` closure takes +/// additional arguments a vector of `time` and `diff` tuples as input and +/// writes its outputs at a container builder. The container builder +/// can, but isn't required to, accept `(data, time, diff)` triplets. +/// This allows for more flexibility, but is more error-prone. +/// +/// This operator responds to inputs of the form +/// +/// ```ignore +/// ((key, val1, time1), initial_time, diff1) +/// ``` +/// +/// where `initial_time` is less or equal to `time1`, and produces as output +/// +/// ```ignore +/// output_func(session, key, val1, val2, initial_time, diff1, &[lub(time1, time2), diff2]) +/// ``` +/// +/// for each `((key, val2), time2, diff2)` present in `arrangement`, where +/// `time2` is less than `initial_time` *UNDER THE TOTAL ORDER ON TIMES*. +/// +/// The `yield_function` allows the caller to indicate when the operator should +/// yield control, as a function of the elapsed time and the number of matched +/// records. Note this is not the number of *output* records, owing mainly to +/// the number of matched records being easiest to record with low overhead. +pub fn half_join_internal_unsafe( + stream: VecCollection, + mut arrangement: Arranged, + frontier_func: FF, + comparison: CF, + yield_function: Y, + mut output_func: S, +) -> Stream +where + G: Scope, + K: Hashable + ExchangeData, + V: ExchangeData, + R: ExchangeData + Monoid, + Tr: for<'a> TraceReader+Clone+'static, + FF: Fn(&G::Timestamp, &mut Antichain) + 'static, + CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static, + Y: Fn(std::time::Instant, usize) -> bool + 'static, + S: FnMut(&mut SessionFor, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static, + CB: ContainerBuilder, +{ + // No need to block physical merging for this operator. + arrangement.trace.set_physical_compaction(Antichain::new().borrow()); + let mut arrangement_trace = Some(arrangement.trace); + let arrangement_stream = arrangement.stream; + + let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into()); + + // Stash for (time, diff) accumulation. + let mut output_buffer = Vec::new(); + + // Stage 1: Stuck blobs sorted by (initial, data). Nibbled from the front + // as the arrangement frontier advances to determine eligible records. + let mut stuck: Vec> = Vec::new(); + // Stage 2: Ready blobs sorted by (data, initial). Consumed from the front + // one record at a time, yield-safe. + let mut ready: Vec> = Vec::new(); + // Buffer for new arrivals, stored as (initial, data, diff) for direct consolidation. + let mut arriving: Vec<(G::Timestamp, (K, V, G::Timestamp), R)> = Vec::new(); + + let scope = stream.scope(); + stream.inner.binary_frontier(arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,info| { + + // Acquire an activator to reschedule the operator when it has unfinished work. + let activator = scope.activator_for(info.address); + + move |(input1, frontier1), (input2, frontier2), output| { + + // Drain all input for this activation into a single buffer. + let mut caps = CapabilitySet::new(); + input1.for_each(|capability, data| { + caps.insert(capability.retain(0)); + arriving.extend(data.drain(..).map(|(d, t, r)| (t, d, r))); + }); + + // Form a new blob from this activation's arrivals. + if !arriving.is_empty() { + consolidate_updates(&mut arriving); + + if !arriving.is_empty() { + let mut lower = MutableAntichain::new(); + // TODO: consider implementing `update_iter_ref` to avoid clone. + lower.update_iter(arriving.iter().map(|(t, _, _)| (t.clone(), 1))); + + caps.downgrade(&lower.frontier()); + + stuck.push(StuckBlob { + caps, + lower, + data: std::mem::take(&mut arriving).into(), + }); + } + } + + // Drain input batches; although we do not observe them, we want access to the input + // to observe the frontier and to drive scheduling. + input2.for_each(|_, _| { }); + + // Local variables to track if and when we should exit early. + let mut yielded = false; + let timer = std::time::Instant::now(); + let mut work = 0; + + if let Some(ref mut trace) = arrangement_trace { + + let frontier = frontier2.frontier(); + + // Stage 2 first: drain ready piles (key-sorted, yield-safe). + for pile in ready.iter_mut() { + if yielded { break; } + + let (mut cursor, storage) = trace.cursor(); + let mut key_con = Tr::KeyContainer::with_capacity(1); + let mut removals: ChangeBatch = ChangeBatch::new(); + // Cache a delayed capability. Reusable for any `initial` that is + // greater or equal to the cached time in the partial order. + let mut cached_cap: Option> = None; + + while let Some(((ref key, ref val1, ref time), ref initial, ref diff1)) = pile.data.front() { + yielded = yielded || yield_function(timer, work); + if yielded { break; } + + // Reuse cached capability if its time is <= initial. + if !cached_cap.as_ref().is_some_and(|cap| cap.time().less_equal(initial)) { + cached_cap = Some(pile.caps.delayed(initial)); + } + let cap = cached_cap.as_ref().unwrap(); + + key_con.clear(); key_con.push_own(&key); + cursor.seek_key(&storage, key_con.index(0)); + if cursor.get_key(&storage) == key_con.get(0) { + while let Some(val2) = cursor.get_val(&storage) { + cursor.map_times(&storage, |t, d| { + if comparison(t, initial) { + let mut t = Tr::owned_time(t); + t.join_assign(time); + output_buffer.push((t, Tr::owned_diff(d))) + } + }); + consolidate(&mut output_buffer); + work += output_buffer.len(); + // TODO: Worry about how to avoid reconstructing sessions so often. + output_func(&mut output.session_with_builder(&cap), key, val1, val2, initial, diff1, &mut output_buffer); + output_buffer.clear(); + cursor.step_val(&storage); + } + cursor.rewind_vals(&storage); + } + + let (_, initial, _) = pile.data.pop_front().unwrap(); + removals.update(initial, -1); + } + + // Apply all removals in bulk and downgrade once. + if !removals.is_empty() { + pile.lower.update_iter(removals.drain()); + pile.caps.downgrade(&pile.lower.frontier()); + } + } + ready.retain(|pile| !pile.data.is_empty()); + + // Stage 1: nibble blobs to produce new ready piles. + if !yielded { + // Put the total-order minimum of the frontier into a TimeContainer + // so we can call `comparison`. Since `comparison` is monotone with + // the total order, only the minimum matters. + let mut time_con = Tr::TimeContainer::with_capacity(1); + if let Some(min_time) = frontier.iter().min() { + time_con.push_own(min_time); + } + + // Collect all eligible records across all blobs into one pile. + let mut eligible = Vec::new(); + let mut eligible_caps = CapabilitySet::new(); + + for blob in stuck.iter_mut() { + // Pop eligible records from the front. The deque is sorted by + // initial time in total order, and `comparison` is monotone, + // so we stop at the first ineligible record. + let before = eligible.len(); + while let Some((initial, _, _)) = blob.data.front() { + if (0..time_con.len()).any(|i| comparison(time_con.index(i), initial)) { + break; + } + eligible.push(blob.data.pop_front().unwrap()); + } + + if eligible.len() > before { + // Grab caps for the eligible records before downgrading the blob. + let mut frontier = Antichain::new(); + for (initial, _, _) in eligible[before..].iter() { + frontier.insert(initial.clone()); + } + for time in frontier.iter() { + eligible_caps.insert(blob.caps.delayed(time)); + } + + // Remove eligible times from the blob's antichain and downgrade. + blob.lower.update_iter(eligible[before..].iter().map(|(t, _, _)| (t.clone(), -1))); + blob.caps.downgrade(&blob.lower.frontier()); + } + } + + stuck.retain(|blob| !blob.data.is_empty()); + + if !eligible.is_empty() { + // Rearrange to (data, initial, diff) and consolidate. + // consolidate_updates sorts by (data, initial) which is + // the order we want for the active blob. + let mut active_data: Vec<_> = eligible.into_iter() + .map(|(t, d, r)| (d, t, r)) + .collect(); + consolidate_updates(&mut active_data); + + if !active_data.is_empty() { + let mut pile_lower = MutableAntichain::new(); + pile_lower.update_iter(active_data.iter().map(|(_, t, _)| (t.clone(), 1))); + eligible_caps.downgrade(&pile_lower.frontier()); + + ready.push(ReadyBlob { + caps: eligible_caps, + lower: pile_lower, + data: VecDeque::from(active_data), + }); + } + } + } + } + + // Re-activate if we have ready piles to process. + if !ready.is_empty() { + activator.activate(); + } + + // The logical merging frontier depends on input1, stuck, and ready blobs. + let mut frontier = Antichain::new(); + for time in frontier1.frontier().iter() { + frontier_func(time, &mut frontier); + } + for blob in stuck.iter() { + for cap in blob.caps.iter() { + frontier_func(cap.time(), &mut frontier); + } + } + for blob in ready.iter() { + for cap in blob.caps.iter() { + frontier_func(cap.time(), &mut frontier); + } + } + arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow())); + + if frontier1.is_empty() && stuck.is_empty() && ready.is_empty() { + arrangement_trace = None; + } + } + }) +} + +/// Stuck work sorted by `(initial, data)`. Nibbled from the front as the +/// arrangement frontier advances to determine eligible records. +struct StuckBlob { + caps: CapabilitySet, + lower: MutableAntichain, + data: VecDeque<(T, D, R)>, +} + +/// Ready work sorted by `(data, initial)`. Consumed from the front one record +/// at a time during trace lookups. Yield-safe: can stop and resume at any point, +/// because the work can be resumed without re-sorting. Strictly speaking we will +/// do a MutableAntichain rebuild, which could be linear time, but fixing that is +/// future work. +// TODO: Fix the thing in the comments. +struct ReadyBlob { + caps: CapabilitySet, + lower: MutableAntichain, + data: VecDeque<(D, T, R)>, +} diff --git a/dogsdogsdogs/src/operators/mod.rs b/dogsdogsdogs/src/operators/mod.rs index 1a9f30415..fe15c5283 100644 --- a/dogsdogsdogs/src/operators/mod.rs +++ b/dogsdogsdogs/src/operators/mod.rs @@ -1,4 +1,5 @@ pub mod half_join; +pub mod half_join2; pub mod lookup_map; pub mod count; @@ -6,7 +7,8 @@ pub mod propose; pub mod validate; pub use self::half_join::half_join; +pub use self::half_join2::half_join as half_join2; pub use self::lookup_map::lookup_map; pub use self::count::count; pub use self::propose::{propose, propose_distinct}; -pub use self::validate::validate; \ No newline at end of file +pub use self::validate::validate; From f803be3db4ec97bc36cbace1b59ca82716004e31 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 31 Mar 2026 15:55:40 -0400 Subject: [PATCH 2/3] Re-order stages and improve session batching --- dogsdogsdogs/src/operators/half_join2.rs | 233 +++++++++++++---------- 1 file changed, 135 insertions(+), 98 deletions(-) diff --git a/dogsdogsdogs/src/operators/half_join2.rs b/dogsdogsdogs/src/operators/half_join2.rs index b7eac072c..385ab7169 100644 --- a/dogsdogsdogs/src/operators/half_join2.rs +++ b/dogsdogsdogs/src/operators/half_join2.rs @@ -22,9 +22,9 @@ use std::ops::Mul; use timely::ContainerBuilder; use timely::container::CapacityContainerBuilder; -use timely::dataflow::{Scope, ScopeParent, Stream}; +use timely::dataflow::{Scope, Stream}; use timely::dataflow::channels::pact::{Pipeline, Exchange}; -use timely::dataflow::operators::{Capability, Operator, generic::Session}; +use timely::dataflow::operators::Operator; use timely::PartialOrder; use timely::progress::{Antichain, ChangeBatch, Timestamp}; use timely::progress::frontier::MutableAntichain; @@ -80,27 +80,18 @@ where DOut: Clone+'static, S: FnMut(&K, &V, Tr::Val<'_>)->DOut+'static, { - let output_func = move |session: &mut SessionFor, k: &K, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, diff1: &R, output: &mut Vec<(G::Timestamp, Tr::Diff)>| { + let output_func = move |builder: &mut CapacityContainerBuilder>, k: &K, v1: &V, v2: Tr::Val<'_>, initial: &G::Timestamp, diff1: &R, output: &mut Vec<(G::Timestamp, Tr::Diff)>| { for (time, diff2) in output.drain(..) { let diff = diff1.clone() * diff2.clone(); let dout = (output_func(k, v1, v2), time.clone()); - session.give((dout, initial.clone(), diff)); + use timely::container::PushInto; + builder.push_into((dout, initial.clone(), diff)); } }; half_join_internal_unsafe::<_, _, _, _, _, _,_,_,_, CapacityContainerBuilder>>(stream, arrangement, frontier_func, comparison, |_timer, _count| false, output_func) .as_collection() } -/// A session with lifetime `'a` in a scope `G` with a container builder `CB`. -/// -/// This is a shorthand primarily for the reson of readability. -type SessionFor<'a, 'b, G, CB> = - Session<'a, 'b, - ::Timestamp, - CB, - Capability<::Timestamp>, - >; - /// An unsafe variant of `half_join` where the `output_func` closure takes /// additional arguments a vector of `time` and `diff` tuples as input and /// writes its outputs at a container builder. The container builder @@ -143,7 +134,7 @@ where FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static, Y: Fn(std::time::Instant, usize) -> bool + 'static, - S: FnMut(&mut SessionFor, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static, + S: FnMut(&mut CB, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static, CB: ContainerBuilder, { // No need to block physical merging for this operator. @@ -180,25 +171,6 @@ where arriving.extend(data.drain(..).map(|(d, t, r)| (t, d, r))); }); - // Form a new blob from this activation's arrivals. - if !arriving.is_empty() { - consolidate_updates(&mut arriving); - - if !arriving.is_empty() { - let mut lower = MutableAntichain::new(); - // TODO: consider implementing `update_iter_ref` to avoid clone. - lower.update_iter(arriving.iter().map(|(t, _, _)| (t.clone(), 1))); - - caps.downgrade(&lower.frontier()); - - stuck.push(StuckBlob { - caps, - lower, - data: std::mem::take(&mut arriving).into(), - }); - } - } - // Drain input batches; although we do not observe them, we want access to the input // to observe the frontier and to drive scheduling. input2.for_each(|_, _| { }); @@ -212,90 +184,92 @@ where let frontier = frontier2.frontier(); - // Stage 2 first: drain ready piles (key-sorted, yield-safe). - for pile in ready.iter_mut() { - if yielded { break; } - - let (mut cursor, storage) = trace.cursor(); - let mut key_con = Tr::KeyContainer::with_capacity(1); - let mut removals: ChangeBatch = ChangeBatch::new(); - // Cache a delayed capability. Reusable for any `initial` that is - // greater or equal to the cached time in the partial order. - let mut cached_cap: Option> = None; - - while let Some(((ref key, ref val1, ref time), ref initial, ref diff1)) = pile.data.front() { - yielded = yielded || yield_function(timer, work); - if yielded { break; } + // Determine the total-order minimum of the arrangement frontier, + // used to partition arrivals into immediately-eligible vs stuck. + let mut time_con = Tr::TimeContainer::with_capacity(1); + if let Some(min_time) = frontier.iter().min() { + time_con.push_own(min_time); + } + let eligible = |initial: &G::Timestamp| -> bool { + !(0..time_con.len()).any(|i| comparison(time_con.index(i), initial)) + }; - // Reuse cached capability if its time is <= initial. - if !cached_cap.as_ref().is_some_and(|cap| cap.time().less_equal(initial)) { - cached_cap = Some(pile.caps.delayed(initial)); + // Form new blobs from this activation's arrivals, partitioning + // immediately-eligible records from those that must wait. + if !arriving.is_empty() { + consolidate_updates(&mut arriving); + + if !arriving.is_empty() { + // Partition: eligible records go straight to a ready blob, + // the rest go to a stuck blob. `arriving` is sorted by + // (initial, data) after consolidation, and eligibility is + // monotone in total-order initial time, so we can split + // with a linear scan. + let split = arriving.iter().position(|&(ref t, _, _)| !eligible(t)).unwrap_or(arriving.len()); + + // Stuck portion (tail). + if split < arriving.len() { + let stuck_data: Vec<_> = arriving.drain(split..).collect(); + let mut lower = MutableAntichain::new(); + lower.update_iter(stuck_data.iter().map(|(t, _, _)| (t.clone(), 1))); + let mut stuck_caps = CapabilitySet::new(); + for time in lower.frontier().iter() { + stuck_caps.insert(caps.delayed(time)); + } + stuck.push(StuckBlob { + caps: stuck_caps, + lower, + data: stuck_data.into(), + }); } - let cap = cached_cap.as_ref().unwrap(); - key_con.clear(); key_con.push_own(&key); - cursor.seek_key(&storage, key_con.index(0)); - if cursor.get_key(&storage) == key_con.get(0) { - while let Some(val2) = cursor.get_val(&storage) { - cursor.map_times(&storage, |t, d| { - if comparison(t, initial) { - let mut t = Tr::owned_time(t); - t.join_assign(time); - output_buffer.push((t, Tr::owned_diff(d))) - } + // Eligible portion (head) — rearrange to (data, initial, diff) + // and consolidate for key-sorted cursor traversal. + if !arriving.is_empty() { + let mut active_data: Vec<_> = arriving.drain(..) + .map(|(t, d, r)| (d, t, r)) + .collect(); + consolidate_updates(&mut active_data); + + if !active_data.is_empty() { + let mut pile_lower = MutableAntichain::new(); + pile_lower.update_iter(active_data.iter().map(|(_, t, _)| (t.clone(), 1))); + let mut ready_caps = CapabilitySet::new(); + for time in pile_lower.frontier().iter() { + ready_caps.insert(caps.delayed(time)); + } + ready.push(ReadyBlob { + caps: ready_caps, + lower: pile_lower, + data: VecDeque::from(active_data), }); - consolidate(&mut output_buffer); - work += output_buffer.len(); - // TODO: Worry about how to avoid reconstructing sessions so often. - output_func(&mut output.session_with_builder(&cap), key, val1, val2, initial, diff1, &mut output_buffer); - output_buffer.clear(); - cursor.step_val(&storage); } - cursor.rewind_vals(&storage); } - - let (_, initial, _) = pile.data.pop_front().unwrap(); - removals.update(initial, -1); - } - - // Apply all removals in bulk and downgrade once. - if !removals.is_empty() { - pile.lower.update_iter(removals.drain()); - pile.caps.downgrade(&pile.lower.frontier()); } } - ready.retain(|pile| !pile.data.is_empty()); - - // Stage 1: nibble blobs to produce new ready piles. - if !yielded { - // Put the total-order minimum of the frontier into a TimeContainer - // so we can call `comparison`. Since `comparison` is monotone with - // the total order, only the minimum matters. - let mut time_con = Tr::TimeContainer::with_capacity(1); - if let Some(min_time) = frontier.iter().min() { - time_con.push_own(min_time); - } + // Stage 1: nibble stuck blobs to produce new ready piles. + { // Collect all eligible records across all blobs into one pile. - let mut eligible = Vec::new(); + let mut eligible_vec = Vec::new(); let mut eligible_caps = CapabilitySet::new(); for blob in stuck.iter_mut() { // Pop eligible records from the front. The deque is sorted by // initial time in total order, and `comparison` is monotone, // so we stop at the first ineligible record. - let before = eligible.len(); + let before = eligible_vec.len(); while let Some((initial, _, _)) = blob.data.front() { - if (0..time_con.len()).any(|i| comparison(time_con.index(i), initial)) { + if !eligible(initial) { break; } - eligible.push(blob.data.pop_front().unwrap()); + eligible_vec.push(blob.data.pop_front().unwrap()); } - if eligible.len() > before { + if eligible_vec.len() > before { // Grab caps for the eligible records before downgrading the blob. let mut frontier = Antichain::new(); - for (initial, _, _) in eligible[before..].iter() { + for (initial, _, _) in eligible_vec[before..].iter() { frontier.insert(initial.clone()); } for time in frontier.iter() { @@ -303,18 +277,18 @@ where } // Remove eligible times from the blob's antichain and downgrade. - blob.lower.update_iter(eligible[before..].iter().map(|(t, _, _)| (t.clone(), -1))); + blob.lower.update_iter(eligible_vec[before..].iter().map(|(t, _, _)| (t.clone(), -1))); blob.caps.downgrade(&blob.lower.frontier()); } } stuck.retain(|blob| !blob.data.is_empty()); - if !eligible.is_empty() { + if !eligible_vec.is_empty() { // Rearrange to (data, initial, diff) and consolidate. // consolidate_updates sorts by (data, initial) which is // the order we want for the active blob. - let mut active_data: Vec<_> = eligible.into_iter() + let mut active_data: Vec<_> = eligible_vec.into_iter() .map(|(t, d, r)| (d, t, r)) .collect(); consolidate_updates(&mut active_data); @@ -332,6 +306,69 @@ where } } } + + // Stage 2: drain ready piles (key-sorted, yield-safe). + for pile in ready.iter_mut() { + if yielded { break; } + + // For each pile, we'll set up container builders for each distinct capability in the capset. + // The closure gets invoked on a container builder, and if there are containers to extract we + // ship them with the associated capability. We flush at the end. + + let mut builders = (0..pile.caps.len()).map(|_| CB::default()).collect::>(); + + let (mut cursor, storage) = trace.cursor(); + let mut key_con = Tr::KeyContainer::with_capacity(1); + let mut removals: ChangeBatch = ChangeBatch::new(); + + while let Some(((ref key, ref val1, ref time), ref initial, ref diff1)) = pile.data.front() { + yielded = yielded || yield_function(timer, work); + if yielded { break; } + + let builder_idx = pile.caps.iter().position(|c| c.time().less_equal(initial)).unwrap(); + + key_con.clear(); key_con.push_own(&key); + cursor.seek_key(&storage, key_con.index(0)); + if cursor.get_key(&storage) == key_con.get(0) { + while let Some(val2) = cursor.get_val(&storage) { + cursor.map_times(&storage, |t, d| { + if comparison(t, initial) { + let mut t = Tr::owned_time(t); + t.join_assign(time); + output_buffer.push((t, Tr::owned_diff(d))) + } + }); + consolidate(&mut output_buffer); + work += output_buffer.len(); + // TODO: Worry about how to avoid reconstructing sessions so often. + output_func(&mut builders[builder_idx], key, val1, val2, initial, diff1, &mut output_buffer); + output_buffer.clear(); + cursor.step_val(&storage); + } + cursor.rewind_vals(&storage); + } + + while let Some(container) = builders[builder_idx].extract() { + output.session(&pile.caps[builder_idx]).give_container(container); + } + + let (_, initial, _) = pile.data.pop_front().unwrap(); + removals.update(initial, -1); + } + + for builder_idx in 0 .. pile.caps.len() { + while let Some(container) = builders[builder_idx].finish() { + output.session(&pile.caps[builder_idx]).give_container(container); + } + } + + // Apply all removals in bulk and downgrade once. + if !removals.is_empty() { + pile.lower.update_iter(removals.drain()); + pile.caps.downgrade(&pile.lower.frontier()); + } + } + ready.retain(|pile| !pile.data.is_empty()); } // Re-activate if we have ready piles to process. From 399cc22a3cfdfeab32b04f029dd03947d2ed75a3 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 1 Apr 2026 08:54:07 -0400 Subject: [PATCH 3/3] Consolidate storage types to avoid double allocations --- dogsdogsdogs/src/operators/half_join2.rs | 251 +++++++++-------------- 1 file changed, 97 insertions(+), 154 deletions(-) diff --git a/dogsdogsdogs/src/operators/half_join2.rs b/dogsdogsdogs/src/operators/half_join2.rs index 385ab7169..650dc433d 100644 --- a/dogsdogsdogs/src/operators/half_join2.rs +++ b/dogsdogsdogs/src/operators/half_join2.rs @@ -147,14 +147,10 @@ where // Stash for (time, diff) accumulation. let mut output_buffer = Vec::new(); - // Stage 1: Stuck blobs sorted by (initial, data). Nibbled from the front - // as the arrangement frontier advances to determine eligible records. - let mut stuck: Vec> = Vec::new(); - // Stage 2: Ready blobs sorted by (data, initial). Consumed from the front - // one record at a time, yield-safe. - let mut ready: Vec> = Vec::new(); - // Buffer for new arrivals, stored as (initial, data, diff) for direct consolidation. - let mut arriving: Vec<(G::Timestamp, (K, V, G::Timestamp), R)> = Vec::new(); + // Unified blobs: each blob holds data in (T, D, R) order, with a stuck_count + // tracking how many elements at the back are not yet eligible for processing. + // The ready prefix is sorted by (D, T, R) for cursor traversal. + let mut blobs: Vec> = Vec::new(); let scope = stream.scope(); stream.inner.binary_frontier(arrangement_stream, exchange, Pipeline, "HalfJoin", move |_,info| { @@ -164,7 +160,8 @@ where move |(input1, frontier1), (input2, frontier2), output| { - // Drain all input for this activation into a single buffer. + // Drain all input into a single buffer. + let mut arriving: Vec<(G::Timestamp, (K, V, G::Timestamp), R)> = Vec::new(); let mut caps = CapabilitySet::new(); input1.for_each(|capability, data| { caps.insert(capability.retain(0)); @@ -194,138 +191,90 @@ where !(0..time_con.len()).any(|i| comparison(time_con.index(i), initial)) }; - // Form new blobs from this activation's arrivals, partitioning - // immediately-eligible records from those that must wait. + // Form a new blob from arrivals. + // consolidate_updates sorts by (T, D, R) — the stuck order. + consolidate_updates(&mut arriving); + if !arriving.is_empty() { - consolidate_updates(&mut arriving); - - if !arriving.is_empty() { - // Partition: eligible records go straight to a ready blob, - // the rest go to a stuck blob. `arriving` is sorted by - // (initial, data) after consolidation, and eligibility is - // monotone in total-order initial time, so we can split - // with a linear scan. - let split = arriving.iter().position(|&(ref t, _, _)| !eligible(t)).unwrap_or(arriving.len()); - - // Stuck portion (tail). - if split < arriving.len() { - let stuck_data: Vec<_> = arriving.drain(split..).collect(); - let mut lower = MutableAntichain::new(); - lower.update_iter(stuck_data.iter().map(|(t, _, _)| (t.clone(), 1))); - let mut stuck_caps = CapabilitySet::new(); - for time in lower.frontier().iter() { - stuck_caps.insert(caps.delayed(time)); - } - stuck.push(StuckBlob { - caps: stuck_caps, - lower, - data: stuck_data.into(), - }); - } + let mut lower = MutableAntichain::new(); + lower.update_iter(arriving.iter().map(|(t, _, _)| (t.clone(), 1))); + let mut blob_caps = CapabilitySet::new(); + for time in lower.frontier().iter() { + blob_caps.insert(caps.delayed(time)); + } - // Eligible portion (head) — rearrange to (data, initial, diff) - // and consolidate for key-sorted cursor traversal. - if !arriving.is_empty() { - let mut active_data: Vec<_> = arriving.drain(..) - .map(|(t, d, r)| (d, t, r)) - .collect(); - consolidate_updates(&mut active_data); - - if !active_data.is_empty() { - let mut pile_lower = MutableAntichain::new(); - pile_lower.update_iter(active_data.iter().map(|(_, t, _)| (t.clone(), 1))); - let mut ready_caps = CapabilitySet::new(); - for time in pile_lower.frontier().iter() { - ready_caps.insert(caps.delayed(time)); - } - ready.push(ReadyBlob { - caps: ready_caps, - lower: pile_lower, - data: VecDeque::from(active_data), - }); - } - } + // Determine how many records are stuck (ineligible). + // Data is sorted by (T, D, R) and eligibility is monotone in T, + // so stuck records form a suffix. + let stuck_count = arriving.iter().rev() + .take_while(|(t, _, _)| !eligible(t)) + .count(); + + let mut data: VecDeque<_> = arriving.into(); + + // Sort the ready prefix by (D, T, R) for cursor traversal. + let ready_len = data.len() - stuck_count; + if ready_len > 0 { + // VecDeque slices: make_contiguous then sort the prefix. + let slice = data.make_contiguous(); + slice[..ready_len].sort_by(|(t1, d1, r1), (t2, d2, r2)| { + (d1, t1, r1).cmp(&(d2, t2, r2)) + }); } + + blobs.push(Blob { + caps: blob_caps, + lower, + data, + stuck_count, + }); } - // Stage 1: nibble stuck blobs to produce new ready piles. - { - // Collect all eligible records across all blobs into one pile. - let mut eligible_vec = Vec::new(); - let mut eligible_caps = CapabilitySet::new(); - - for blob in stuck.iter_mut() { - // Pop eligible records from the front. The deque is sorted by - // initial time in total order, and `comparison` is monotone, - // so we stop at the first ineligible record. - let before = eligible_vec.len(); - while let Some((initial, _, _)) = blob.data.front() { - if !eligible(initial) { - break; - } - eligible_vec.push(blob.data.pop_front().unwrap()); - } + // Nibble: only when all ready elements have been drained (stuck_count == len), + // check if stuck records have become eligible and promote them. + for blob in blobs.iter_mut().filter(|b| b.stuck_count == b.data.len()) { - if eligible_vec.len() > before { - // Grab caps for the eligible records before downgrading the blob. - let mut frontier = Antichain::new(); - for (initial, _, _) in eligible_vec[before..].iter() { - frontier.insert(initial.clone()); - } - for time in frontier.iter() { - eligible_caps.insert(blob.caps.delayed(time)); - } + // Count how many stuck records (from the front, which has the + // lowest initial times) are now eligible. + let newly_ready = blob.data.iter().take_while(|(t, _, _)| eligible(t)).count(); - // Remove eligible times from the blob's antichain and downgrade. - blob.lower.update_iter(eligible_vec[before..].iter().map(|(t, _, _)| (t.clone(), -1))); - blob.caps.downgrade(&blob.lower.frontier()); - } - } + if newly_ready > 0 { + blob.stuck_count -= newly_ready; - stuck.retain(|blob| !blob.data.is_empty()); - - if !eligible_vec.is_empty() { - // Rearrange to (data, initial, diff) and consolidate. - // consolidate_updates sorts by (data, initial) which is - // the order we want for the active blob. - let mut active_data: Vec<_> = eligible_vec.into_iter() - .map(|(t, d, r)| (d, t, r)) - .collect(); - consolidate_updates(&mut active_data); - - if !active_data.is_empty() { - let mut pile_lower = MutableAntichain::new(); - pile_lower.update_iter(active_data.iter().map(|(_, t, _)| (t.clone(), 1))); - eligible_caps.downgrade(&pile_lower.frontier()); - - ready.push(ReadyBlob { - caps: eligible_caps, - lower: pile_lower, - data: VecDeque::from(active_data), - }); - } + // Sort the newly-ready prefix by (D, T, R) for cursor traversal. + let slice = blob.data.make_contiguous(); + slice[..newly_ready].sort_by(|(t1, d1, r1), (t2, d2, r2)| { + (d1, t1, r1).cmp(&(d2, t2, r2)) + }); + + // Downgrade capabilities. + let mut new_lower = MutableAntichain::new(); + new_lower.update_iter(blob.data.iter().map(|(t, _, _)| (t.clone(), 1))); + blob.lower = new_lower; + blob.caps.downgrade(&blob.lower.frontier()); } } - // Stage 2: drain ready piles (key-sorted, yield-safe). - for pile in ready.iter_mut() { + // Process ready elements from blobs. + for blob in blobs.iter_mut().filter(|b| b.data.len() > b.stuck_count) { if yielded { break; } - // For each pile, we'll set up container builders for each distinct capability in the capset. - // The closure gets invoked on a container builder, and if there are containers to extract we - // ship them with the associated capability. We flush at the end. - - let mut builders = (0..pile.caps.len()).map(|_| CB::default()).collect::>(); + let mut builders = (0..blob.caps.len()).map(|_| CB::default()).collect::>(); let (mut cursor, storage) = trace.cursor(); let mut key_con = Tr::KeyContainer::with_capacity(1); let mut removals: ChangeBatch = ChangeBatch::new(); - while let Some(((ref key, ref val1, ref time), ref initial, ref diff1)) = pile.data.front() { + // Process ready elements from the front. + while blob.data.len() > blob.stuck_count { yielded = yielded || yield_function(timer, work); if yielded { break; } - let builder_idx = pile.caps.iter().position(|c| c.time().less_equal(initial)).unwrap(); + // Peek at the front element. It's in (T, D, R) storage order, + // but the ready prefix has been sorted by (D, T, R). + let (ref initial, (ref key, ref val1, ref time), ref diff1) = blob.data[0]; + + let builder_idx = blob.caps.iter().position(|c| c.time().less_equal(initial)).unwrap(); key_con.clear(); key_con.push_own(&key); cursor.seek_key(&storage, key_con.index(0)); @@ -340,7 +289,6 @@ where }); consolidate(&mut output_buffer); work += output_buffer.len(); - // TODO: Worry about how to avoid reconstructing sessions so often. output_func(&mut builders[builder_idx], key, val1, val2, initial, diff1, &mut output_buffer); output_buffer.clear(); cursor.step_val(&storage); @@ -349,73 +297,68 @@ where } while let Some(container) = builders[builder_idx].extract() { - output.session(&pile.caps[builder_idx]).give_container(container); + output.session(&blob.caps[builder_idx]).give_container(container); } - let (_, initial, _) = pile.data.pop_front().unwrap(); + let (initial, _, _) = blob.data.pop_front().unwrap(); removals.update(initial, -1); } - for builder_idx in 0 .. pile.caps.len() { + for builder_idx in 0 .. blob.caps.len() { while let Some(container) = builders[builder_idx].finish() { - output.session(&pile.caps[builder_idx]).give_container(container); + output.session(&blob.caps[builder_idx]).give_container(container); } } // Apply all removals in bulk and downgrade once. - if !removals.is_empty() { - pile.lower.update_iter(removals.drain()); - pile.caps.downgrade(&pile.lower.frontier()); + if blob.data.is_empty() { + // Eagerly release the blob's resources. + blob.lower = MutableAntichain::new(); + blob.caps = CapabilitySet::new(); + blob.data = VecDeque::default(); + } else { + blob.lower.update_iter(removals.drain()); + blob.caps.downgrade(&blob.lower.frontier()); } } - ready.retain(|pile| !pile.data.is_empty()); + + // Remove fully-consumed blobs. + blobs.retain(|blob| !blob.data.is_empty()); } - // Re-activate if we have ready piles to process. - if !ready.is_empty() { + // Re-activate if we have blobs with ready elements to process. + if blobs.iter().any(|b| b.data.len() > b.stuck_count) { activator.activate(); } - // The logical merging frontier depends on input1, stuck, and ready blobs. + // The logical merging frontier depends on input1 and all blobs. let mut frontier = Antichain::new(); for time in frontier1.frontier().iter() { frontier_func(time, &mut frontier); } - for blob in stuck.iter() { - for cap in blob.caps.iter() { - frontier_func(cap.time(), &mut frontier); - } - } - for blob in ready.iter() { + for blob in blobs.iter() { for cap in blob.caps.iter() { frontier_func(cap.time(), &mut frontier); } } arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow())); - if frontier1.is_empty() && stuck.is_empty() && ready.is_empty() { + if frontier1.is_empty() && blobs.is_empty() { arrangement_trace = None; } } }) } -/// Stuck work sorted by `(initial, data)`. Nibbled from the front as the -/// arrangement frontier advances to determine eligible records. -struct StuckBlob { +/// A unified blob of updates. Data is stored as `(T, D, R)` tuples in a VecDeque. +/// The last `stuck_count` elements are stuck (not yet eligible for processing), +/// sorted by `(T, D, R)` from consolidation. The ready prefix (everything before +/// the stuck tail) is sorted by `(D, T, R)` for efficient cursor traversal. +/// Ready elements are consumed from the front via `pop_front`. +struct Blob { caps: CapabilitySet, lower: MutableAntichain, data: VecDeque<(T, D, R)>, -} - -/// Ready work sorted by `(data, initial)`. Consumed from the front one record -/// at a time during trace lookups. Yield-safe: can stop and resume at any point, -/// because the work can be resumed without re-sorting. Strictly speaking we will -/// do a MutableAntichain rebuild, which could be linear time, but fixing that is -/// future work. -// TODO: Fix the thing in the comments. -struct ReadyBlob { - caps: CapabilitySet, - lower: MutableAntichain, - data: VecDeque<(D, T, R)>, + /// Number of stuck (ineligible) elements at the back of `data`. + stuck_count: usize, }