Skip to content

feat(cold): add stream_logs for incremental log streaming#30

Merged
prestwich merged 20 commits intomainfrom
feat/cold-stream-logs
Feb 15, 2026
Merged

feat(cold): add stream_logs for incremental log streaming#30
prestwich merged 20 commits intomainfrom
feat/cold-stream-logs

Conversation

@prestwich
Copy link
Member

@prestwich prestwich commented Feb 14, 2026

Summary

  • Add stream_logs method to the ColdStorage trait for incremental log streaming via bounded mpsc channels, avoiding full result materialization into Vec<T>
  • Implement for all three backends: in-memory (MemColdBackend), MDBX (MdbxColdBackend), and SQL (SqlColdBackend — SQLite + PostgreSQL)
  • Add StreamLogs request variant, handle method, and runner dispatch for the task-based architecture
  • Add streaming conformance tests and two new error variants (StreamDeadlineExceeded, ReorgDetected)
  • Bump workspace version to 0.6.0 (new required trait method)
  • Make DatabaseEnv cheaply cloneable (derive Clone)
  • Eliminate per-row Vec<u8> heap allocations in SQL row extraction (borrow &[u8] from rows)
  • Borrow filter params as &[u8] from the Filter struct instead of cloning into Vec<Vec<u8>>
  • Remove get_block_hash and get_logs_block from the ColdStorage trait (replaced by existing get_header/get_logs)
  • Fix TooManyLogs bug: MDBX and default streaming reported remaining instead of max_logs in error, causing wrong limit values when logs span multiple blocks
  • Introduce StreamParams struct: bundles from, to, max_logs, sender, deadline to reduce produce_log_stream from 7 parameters to 2
  • Add try_stream! macros: reduce repeated match/send-error/return boilerplate in MDBX and SQL streaming implementations
  • Borrow Filter in get_logs: change from filter: Filter to filter: &Filter, eliminating per-block clone in the default streaming helper (N clones → 1)

Design

Each stream_logs call spawns a producer task that sends RpcLog items through a bounded mpsc channel. The consumer receives a ReceiverStream<ColdResult<RpcLog>> (concrete type, no boxing).

Key properties:

  • Active deadline enforcement — producer checks deadline at block boundaries and uses timeout_at on channel sends, guaranteeing resource release within stream_deadline regardless of consumer behavior
  • MVCC snapshot consistency — MDBX uses a single read transaction via spawn_blocking; PostgreSQL uses REPEATABLE READ isolation
  • Reorg detection — default implementation anchors to the to block hash at stream start, re-checked before every block; snapshot backends skip this (unnecessary under MVCC)
  • Concurrent stream limiting — backend-owned semaphore (8 permits), held by the producer task and released on exit
  • Clean cancellation — dropping the stream causes send to fail, producer exits immediately
  • Minimal allocations — SQL row extraction borrows &[u8] from rows; filter params borrow from the Filter struct; DatabaseEnv clones cheaply without Arc wrapper; get_logs borrows &Filter avoiding ownership transfer

Test plan

  • cargo clippy -p signet-cold --all-features --all-targets
  • cargo clippy -p signet-cold --no-default-features --all-targets
  • cargo clippy -p signet-cold-mdbx --all-features --all-targets
  • cargo clippy -p signet-cold-mdbx --no-default-features --all-targets
  • cargo clippy -p signet-cold-sql --all-features --all-targets
  • cargo clippy -p signet-cold-sql --no-default-features --all-targets
  • cargo clippy -p signet-hot-mdbx --all-features --all-targets
  • cargo clippy -p signet-hot-mdbx --no-default-features --all-targets
  • cargo +nightly fmt
  • cargo t -p signet-hot-mdbx (44 passed)
  • cargo t -p signet-cold (2 passed)
  • cargo t -p signet-cold-mdbx (4 passed)
  • cargo t -p signet-cold-sql --all-features (2 passed — SQLite + PG conformance)
  • ./scripts/test-postgres.sh (PostgreSQL conformance via Docker)

🤖 Generated with Claude Code

Add `stream_logs` to the `ColdStorage` trait, enabling incremental
streaming of log results via bounded `mpsc` channels instead of
materializing entire result sets into `Vec<T>`. This bounds memory
usage, limits MDBX read transaction and SQL connection hold times,
and enables active deadline enforcement independent of consumer
polling behavior.

Key design decisions:
- Spawned producer task + bounded channel (`ReceiverStream`)
- Per-block resource acquisition (short-lived MDBX txs / SQL queries)
- Fixed anchor hash on the `to` block, re-checked every block for
  reorg detection
- `tokio::time::timeout_at` on sends for active deadline enforcement
- Backend-owned semaphore (8 permits) limiting concurrent streams

Implements for all three backends: in-memory, MDBX, and SQL (SQLite
+ PostgreSQL). Includes streaming conformance tests.

Bumps workspace version to 0.6.0 (new required trait method).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use fully qualified `crate::LogStream` in doc comment to fix
`-D rustdoc::broken-intra-doc-links` in CI.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Member Author

@prestwich prestwich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally review for unnecessary allocations

…anup

Move the streaming loop from individual backends into the shared
ColdStorageTask, replacing `stream_logs` on the ColdStorage trait with
two simpler primitives (`get_block_hash`, `get_logs_block`). This
eliminates ~200 lines of duplicated streaming logic across backends.

- Rewrite collect_logs_block in MDBX backend functionally
- Extract check_block_hash helper in MDBX backend
- Extract append_filter_clause utility in SQL backend
- Remove unnecessary collect in SQL get_logs_block
- Remove unused tokio/tokio-stream deps from backend crates
- Update conformance tests to run through ColdStorageHandle

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@prestwich
Copy link
Member Author

[Claude Code]

Addressed all review comments in 2e3b4f1:

  1. Functional rewrite of collect_logs_block (comment 1) — Collect the fallible MDBX iterator first, then flat_mapfilter_mapcollect for the inner log iteration.

  2. Extract check_block_hash helper (comment 2) — Flattens 3 levels of match nesting into a ? chain. Also serves as the MDBX get_block_hash trait impl.

  3. DRY streaming into shared task (comments 3 + 6) — Removed stream_logs from the ColdStorage trait entirely. Replaced with two simpler primitives (get_block_hash, get_logs_block). The streaming loop (semaphore, deadline, reorg detection, channel sends) now lives solely in ColdStorageTaskInner::handle_stream_logs. Eliminates ~200 lines of duplicated code across 3 backends.

  4. Extract append_filter_clause utility (comments 4 + 5) — Deduplicated the single-vs-multi-value SQL clause pattern into a shared function. build_log_filter_clause now calls it for both address and topic filters.

  5. Remove unnecessary collect (comment 7) — SQL get_logs_block checks rows.len() > remaining before mapping, avoiding an intermediate allocation.

  6. Allocation review (general comment) — Removed unused tokio and tokio-stream dependencies from both backend crates. Conformance tests now run through ColdStorageHandle (spawning a ColdStorageTask internally).

All clippy (both --all-features and --no-default-features), tests, docs, and test-postgres.sh pass.

prestwich and others added 2 commits February 14, 2026 19:36
…tency

Move the streaming loop from the task runner into ColdStorage::produce_log_stream
with a default implementation using get_block_hash + get_logs_block per block.
MDBX and SQL backends override with single-transaction implementations for
MVCC consistency and fewer round-trips. Add caller-supplied deadline clamped
to the task's configured maximum.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Remove anchor_hash from produce_log_stream trait; backends that hold
  a consistent snapshot (MDBX Tx<Ro>, PG REPEATABLE READ) no longer
  need external reorg detection. Extract produce_log_stream_default
  for backends without snapshot semantics.

- Add first_log_index column to receipts, replacing the O(n*k)
  correlated subquery with an O(1) JOIN for block_log_index.

- Split SQL produce_log_stream by backend: PostgreSQL uses
  REPEATABLE READ with row-level streaming; SQLite delegates to the
  default implementation to avoid single-connection starvation.

- Document partial-delivery semantics on LogStream and stream_logs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
prestwich and others added 13 commits February 14, 2026 20:28
- MDBX: iterate receipt cursors inline instead of collecting into
  intermediate Vec before processing. Both collect_logs_block and
  produce_log_stream_blocking now process receipts as they are read.

- SQL: write filter placeholders directly into the clause string
  instead of collect/join. Accept iterators in append_filter_clause
  to avoid intermediate Vec<&[u8]> in build_log_filter_clause.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…rait

These per-block methods were only used by the default
produce_log_stream_default implementation. Replace them with existing
get_header and get_logs calls, reducing the trait surface area.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Derive Clone on DatabaseEnv (all fields already cheaply cloneable)
- Remove Arc<DatabaseEnv> wrapper from MdbxColdBackend, clone env
  directly into spawn_blocking for produce_log_stream
- Uncomment MDBX produce_log_stream override (single-txn MVCC path)
- Change blob/opt_blob helpers to return borrowed &[u8] instead of
  Vec<u8>, eliminating per-row heap allocations for fixed-size fields
- Add b256_col helper for direct B256 extraction from rows
- Update decode_u128_required and decode_access_list_or_empty to
  accept Option<&[u8]> instead of &Option<Vec<u8>>

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Filter addresses and topics are already fixed-size slices living in
the Filter struct. Borrow them as &[u8] instead of copying each one
into a fresh Vec<u8>.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Make produce_log_stream a required trait method — every backend must
explicitly choose its streaming strategy. The reorg-detecting helper
moves from traits.rs to stream.rs and remains exported as
produce_log_stream_default for non-snapshot backends (mem, SQLite).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The MDBX streaming and default streaming implementations reported
the per-block `remaining` count instead of the original `max_logs`
in TooManyLogs errors. When logs spanned multiple blocks, this
caused the error to report a smaller limit than what the caller
configured. Add a multi-block conformance test that catches this.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Bundle from, to, max_logs, sender, and deadline into a StreamParams
struct, reducing the trait method from 7 parameters to 2 (filter +
params). Remove #[allow(clippy::too_many_arguments)] annotations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… macro

Add try_stream! macros in the MDBX (blocking) and SQL (async) streaming
implementations to replace repeated match/send-error/return blocks.
Also thread StreamParams through produce_log_stream_pg directly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Change ColdStorage::get_logs from `filter: Filter` to `filter: &Filter`
since no implementation needs ownership. In the default streaming
helper, clone the filter once before the loop and mutate block_option
per iteration instead of cloning the full filter (address + topics
arrays) on every block.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Merge duplicate alloy::consensus import paths in conformance tests
- Use mut parameter instead of let-rebinding in collect_stream
- Unwrap directly in tests instead of checking is_some() first
- Replace closures with function references in header_from_row
- Remove duplicate doc comments on append_filter_clause
- Condense truncate_above with loop over table names

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add alloy-primitives with the sqlx feature to get native sqlx
Type/Encode/Decode impls for Address, B256, Bytes, and FixedBytes<N>.
This eliminates manual from_slice/copy_from_slice calls on the read
path and removes the b256_col helper and from_address converter.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Annotate the block-iteration loops, channel sends, deadline checks,
reorg detection, and snapshot-isolation setup in both
produce_log_stream_pg and produce_log_stream_default.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Annotate the blocking log stream method with comments covering MVCC
snapshot semantics, cursor reuse, block iteration, filter matching,
log limit enforcement, and blocking channel sends.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@prestwich prestwich merged commit f9ccf3a into main Feb 15, 2026
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant