Lock-free sensor processing pipeline for robotics
4-stage pipeline · SPSC ring buffers · 2.2B items/sec · ~20ns latency
- Lock-free SPSC — Wait-free ring buffers with cache-line padding, no mutexes in the hot path
- 4-stage pipeline — Ingestion → Filter → Aggregation → Output, each on its own thread
- Zero-copy —
ObjectPool,BufferPool, andArc-basedSharedDataminimize allocations - Adaptive backpressure — Block, drop, or sample strategies with a hysteresis controller
- Rich metrics — HDR latency histograms, jitter tracking, per-stage dashboards
no_std-compatible core — buffer, error, sensor, and stage modules compile withoutstd
| Metric | Result |
|---|---|
| Pipeline throughput | >1M items/sec |
| Stage processing | 2.2B items/sec |
| Channel latency | ~20ns |
| Ring buffer push | 0.3ns |
| Ring buffer pop | 9ns |
Numbers from cargo bench on an Apple M-series chip. Your results may vary.
[dependencies]
sensor-bridge = "0.1"use sensor_bridge::pipeline::{MultiStagePipelineBuilder, PipelineConfig};
use sensor_bridge::stage::{Filter, Identity, Map};
use std::time::Duration;
let mut pipeline = MultiStagePipelineBuilder::<i32, i32, _, _, _, _>::new()
.config(PipelineConfig::default().channel_capacity(1024))
.ingestion(Map::new(|x: i32| x + 1)) // normalize
.filtering(Filter::new(|x: &i32| *x > 0)) // drop negatives
.aggregation(Map::new(|x: i32| x * 2)) // scale
.output(Identity::new()) // pass through
.build();
pipeline.send(5).unwrap();
std::thread::sleep(Duration::from_millis(10));
if let Some(result) = pipeline.try_recv() {
println!("result: {result}"); // 12
}
pipeline.shutdown();
pipeline.join().unwrap();[dependencies]
# Full standard-library build (channels, metrics, pipeline):
sensor-bridge = "0.1"
# no_std core only (buffer, error, sensor, stage):
sensor-bridge = { version = "0.1", default-features = false }| Example | Description |
|---|---|
simple_imu |
Hello-world: mock IMU → ring buffer → moving-average filter |
multi_sensor |
IMU + barometer fusion with timestamp synchronisation |
full_pipeline |
All 4 stages, realistic sensor data, throughput report |
metrics_demo |
Live dashboard, latency percentiles, performance targets |
benchmark_latency |
End-to-end latency measurement across scenarios |
cargo run --example simple_imu
cargo run --example multi_sensor
cargo run --example full_pipeline
cargo run --example metrics_demo
cargo run --example benchmark_latency --release# Full suite with HTML reports in target/criterion/
cargo bench
# Quick compile/run smoke-check
cargo bench -- --testBenchmark groups: ring_buffer_*, stage_processing, pipeline_chain, channels/*, allocations/*.
Producer thread(s)
│
▼
┌─────────────┐ ┌────────────┐ ┌─────────────┐ ┌────────────┐
│ Ingestion │────▶│ Filtering │────▶│ Aggregation │────▶│ Output │
│ (thread 1) │ │ (thread 2)│ │ (thread 3) │ │ (thread 4)│
└─────────────┘ └────────────┘ └─────────────┘ └────────────┘
│ │ │ │
bounded bounded bounded bounded
channel channel channel channel
│
▼
Consumer thread(s)
Each stage runs its own thread and communicates via crossbeam bounded channels.
The MultiStagePipelineBuilder wires everything together and manages lifetimes.
For single-threaded use, PipelineBuilder + PipelineRunner give a composable
zero-copy pipeline with map, filter, and then combinators.
Contributions are welcome! Please read CONTRIBUTING.md for:
- How to run tests and benchmarks
- Clippy / fmt requirements
- PR process and commit message style
Licensed under either of:
- MIT license (
LICENSE) - Apache License, Version 2.0
at your option.
