Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 52 additions & 38 deletions differential-dataflow/src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,12 @@ use crate::trace::{Batcher, Builder, Description};
pub struct MergeBatcher<Input, C, M: Merger> {
/// Transforms input streams to chunks of sorted, consolidated data.
chunker: C,
/// A sequence of power-of-two length lists of sorted, consolidated containers.
///
/// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]).
chains: Vec<Vec<M::Chunk>>,
/// Stash of empty chunks, recycled through the merging process.
stash: Vec<M::Chunk>,
/// Merges consolidated chunks, and extracts the subset of an update chain that lies in an interval of time.
merger: M,
/// Current lower frontier, we sealed up to here.
lower: Antichain<M::Time>,
/// Core merginging engine.
engine: MergeEngine<M>,
/// The lower-bound frontier of the data, after the last call to seal.
frontier: Antichain<M::Time>,
/// Logger for size accounting.
logger: Option<Logger>,
/// Timely operator ID.
operator_id: usize,
/// Current lower frontier, we sealed up to here.
lower: Antichain<M::Time>,
/// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present.
_marker: PhantomData<Input>,
}
Expand All @@ -57,12 +47,14 @@ where

fn new(logger: Option<Logger>, operator_id: usize) -> Self {
Self {
logger,
operator_id,
chunker: C::default(),
merger: M::default(),
chains: Vec::new(),
stash: Vec::new(),
engine: MergeEngine {
logger,
operator_id,
merger: M::default(),
chains: Vec::new(),
stash: Vec::new(),
},
frontier: Antichain::new(),
lower: Antichain::from_elem(M::Time::minimum()),
_marker: PhantomData,
Expand All @@ -75,7 +67,7 @@ where
self.chunker.push_into(container);
while let Some(chunk) = self.chunker.extract() {
let chunk = std::mem::take(chunk);
self.insert_chain(vec![chunk]);
self.engine.insert_chain(vec![chunk]);
}
}

Expand All @@ -87,9 +79,44 @@ where
// Finish
while let Some(chunk) = self.chunker.finish() {
let chunk = std::mem::take(chunk);
self.insert_chain(vec![chunk]);
self.engine.insert_chain(vec![chunk]);
}

self.frontier.clear();
let mut readied = self.engine.extract_ready(&mut self.frontier, upper.borrow());

let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
let seal = B::seal(&mut readied, description);
self.lower = upper;
seal
}

/// The frontier of elements remaining after the most recent call to `self.seal`.
#[inline]
fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
self.frontier.borrow()
}
}

/// Chain-based consolidating merge engine.
pub struct MergeEngine<M: Merger> {
/// A sequence of power-of-two length lists of sorted, consolidated containers.
///
/// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]).
pub chains: Vec<Vec<M::Chunk>>,
/// Stash of empty chunks, recycled through the merging process.
pub stash: Vec<M::Chunk>,
/// Merges consolidated chunks, and extracts the subset of an update chain that lies in an interval of time.
pub merger: M,
/// Logger for size accounting.
pub logger: Option<Logger>,
/// Timely operator ID.
pub operator_id: usize,
}

impl<M: Merger> MergeEngine<M> {
/// Extracts all updates at times not greater or equal to `upper`, recording the lower bound of remaining times in `frontier`.
pub fn extract_ready(&mut self, frontier: &mut Antichain<M::Time>, upper: AntichainRef<M::Time>) -> Vec<M::Chunk> {
// Merge all remaining chains into a single chain.
while self.chains.len() > 1 {
let list1 = self.chain_pop().unwrap();
Expand All @@ -102,33 +129,20 @@ where
// Extract readied data.
let mut kept = Vec::new();
let mut readied = Vec::new();
self.frontier.clear();

self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash);
self.merger.extract(merged, upper, frontier, &mut readied, &mut kept, &mut self.stash);

if !kept.is_empty() {
self.chain_push(kept);
}

self.stash.clear();

let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
let seal = B::seal(&mut readied, description);
self.lower = upper;
seal
}

/// The frontier of elements remaining after the most recent call to `self.seal`.
#[inline]
fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
self.frontier.borrow()
readied
}
}

impl<Input, C, M: Merger> MergeBatcher<Input, C, M> {
/// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered
/// by decreasing length.
fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
pub fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
if !chain.is_empty() {
self.chain_push(chain);
while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
Expand Down Expand Up @@ -189,7 +203,7 @@ impl<Input, C, M: Merger> MergeBatcher<Input, C, M> {
}
}

impl<Input, C, M: Merger> Drop for MergeBatcher<Input, C, M> {
impl<M: Merger> Drop for MergeEngine<M> {
fn drop(&mut self) {
// Cleanup chain to retract accounting information.
while self.chain_pop().is_some() {}
Expand Down
Loading