From e5d60a768a2fd42c776b9a9b733038da46667aa5 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 3 Apr 2026 11:20:57 -0400 Subject: [PATCH] Arc based event iterator --- timely/examples/logging_replay.rs | 81 +++++++++++++++++ .../dataflow/operators/core/capture/event.rs | 86 +++++++++++++++++++ 2 files changed, 167 insertions(+) create mode 100644 timely/examples/logging_replay.rs diff --git a/timely/examples/logging_replay.rs b/timely/examples/logging_replay.rs new file mode 100644 index 000000000..cdeb71d51 --- /dev/null +++ b/timely/examples/logging_replay.rs @@ -0,0 +1,81 @@ +//! Demonstrates cross-thread capture and replay of timely logging events. +//! +//! A source timely instance (2 workers) runs a simple dataflow and captures its +//! logging events using thread-safe `link_sync::EventLink`s. A sink timely instance +//! (1 worker) replays those events and counts them. + +use std::sync::Arc; +use std::time::Duration; + +use timely::dataflow::operators::{Exchange, Inspect, ToStream}; +use timely::dataflow::operators::capture::event::link_sync::EventLink; +use timely::dataflow::operators::capture::Replay; +use timely::logging::{BatchLogger, TimelyEventBuilder, TimelyEvent}; + +fn main() { + + let source_workers = 2usize; + let sink_workers = 1usize; + + // One EventLink per source worker, shared between source (writer) and sink (reader). + let event_links: Vec<_> = (0..source_workers) + .map(|_| Arc::new(EventLink::>::new())) + .collect(); + + // Clone reader handles (they start at the head; the writer will advance past them). + let readers: Vec<_> = event_links.iter().map(Arc::clone).collect(); + + std::thread::scope(|scope| { + + // --- Source instance: 2 workers producing logging events --- + let source = scope.spawn(move || { + timely::execute(timely::Config::process(source_workers), move |worker| { + + // Install logging: capture timely events into our shared EventLink. + let link = event_links[worker.index()].clone(); + let mut logger = BatchLogger::new(link); + worker.log_register() + .unwrap() + .insert::("timely", move |time, data| { + logger.publish_batch(time, data); + }); + + // A trivial dataflow to generate some logging activity. + worker.dataflow::(|scope| { + (0..100u64) + .to_stream(scope) + .container::>() + .exchange(|&x| x) + .inspect(|_x| { }); + }); + + }).expect("source execution failed"); + }); + + // --- Sink instance: 1 worker replaying the captured logs --- + let sink = scope.spawn(move || { + timely::execute(timely::Config::process(sink_workers), move |worker| { + + // Each sink worker replays a disjoint subset of the source streams. + let replayers: Vec<_> = readers.iter().enumerate() + .filter(|(i, _)| i % worker.peers() == worker.index()) + .map(|(_, r)| Arc::clone(r)) + .collect(); + + worker.dataflow::(|scope| { + replayers + .replay_into(scope) + .inspect(|event| { + println!(" {:?}", event); + }); + }); + + }).expect("sink execution failed"); + }); + + source.join().expect("source panicked"); + sink.join().expect("sink panicked"); + }); + + println!("Done."); +} diff --git a/timely/src/dataflow/operators/core/capture/event.rs b/timely/src/dataflow/operators/core/capture/event.rs index 7fff34af0..3bd68b9c1 100644 --- a/timely/src/dataflow/operators/core/capture/event.rs +++ b/timely/src/dataflow/operators/core/capture/event.rs @@ -128,6 +128,92 @@ pub mod link { } } +/// A thread-safe linked-list event pusher and iterator. +pub mod link_sync { + + use std::borrow::Cow; + use std::sync::{Arc, Mutex}; + + use super::{Event, EventPusher, EventIterator}; + + /// A linked list of Event usable across threads. + pub struct EventLink { + /// An event, if one exists. + /// + /// An event might not exist, if either we want to insert a `None` and have the output iterator pause, + /// or in the case of the very first linked list element, which has no event when constructed. + pub event: Option>, + /// The next event, if it exists. + pub next: Mutex>>>, + } + + impl EventLink { + /// Allocates a new `EventLink`. + pub fn new() -> EventLink { + EventLink { event: None, next: Mutex::new(None) } + } + } + + impl EventPusher for Arc> { + fn push(&mut self, event: Event) { + let mut guard = self.next.lock().unwrap(); + *guard = Some(Arc::new(EventLink { event: Some(event), next: Mutex::new(None) })); + let next = Arc::clone(guard.as_ref().unwrap()); + drop(guard); + *self = next; + } + } + + impl EventIterator for Arc> { + fn next(&mut self) -> Option>> { + let is_some = self.next.lock().unwrap().is_some(); + if is_some { + let next = Arc::clone(self.next.lock().unwrap().as_ref().unwrap()); + *self = next; + if let Some(this) = Arc::get_mut(self) { + this.event.take().map(Cow::Owned) + } + else { + self.event.as_ref().map(Cow::Borrowed) + } + } + else { + None + } + } + } + + // Drop implementation to prevent stack overflow through naive drop impl. + impl Drop for EventLink { + fn drop(&mut self) { + while let Some(link) = self.next.get_mut().unwrap().take() { + if let Ok(head) = Arc::try_unwrap(link) { + *self = head; + } + } + } + } + + impl Default for EventLink { + fn default() -> Self { + Self::new() + } + } + + #[test] + fn avoid_stack_overflow_in_drop() { + #[cfg(miri)] + let limit = 1_000; + #[cfg(not(miri))] + let limit = 1_000_000; + let mut event1 = Arc::new(EventLink::<(),()>::new()); + let _event2 = Arc::clone(&event1); + for _ in 0 .. limit { + event1.push(Event::Progress(vec![])); + } + } +} + /// A binary event pusher and iterator. pub mod binary {