Skip to content

Add generic buffer with capacity and timed flushing to sink#13

Open
af-emp-0271 wants to merge 4 commits intomasterfrom
feat-generic-time-bound-buffer-tp2544
Open

Add generic buffer with capacity and timed flushing to sink#13
af-emp-0271 wants to merge 4 commits intomasterfrom
feat-generic-time-bound-buffer-tp2544

Conversation

@af-emp-0271
Copy link

@af-emp-0271 af-emp-0271 commented Mar 6, 2026

Buffer flushes when:

  • capacity is reached
  • flush interval elapses

Allows pluggable sink implementation with normal defaults.

For more info, please look here: https://cloudstuff-team.atlassian.net/browse/TP-2544

Summary by CodeRabbit

  • New Features

    • Generic, concurrent-safe batching buffer with configurable flush policies, backpressure, diagnostics, and graceful shutdown; enqueue, flush, and close semantics exposed.
  • Documentation

    • Comprehensive README with usage examples, shutdown/backpressure guidance, and integration patterns.
  • Tests

    • Extensive test suite and benchmark covering capacity/time-based flushes, overflow, rejection, flush errors, concurrency, backpressure, and shutdown behavior.

@af-emp-0271 af-emp-0271 self-assigned this Mar 6, 2026
@af-emp-0271 af-emp-0271 added the enhancement New feature or request label Mar 6, 2026
@coderabbitai
Copy link

coderabbitai bot commented Mar 6, 2026

Warning

Rate limit exceeded

@af-emp-0271 has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 0 minutes and 7 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 3fc5e723-582f-4219-ac23-b747c0e36df1

📥 Commits

Reviewing files that changed from the base of the PR and between dbca8e9 and a2dcccd.

📒 Files selected for processing (2)
  • buffer/buffer.go
  • buffer/buffer_test.go
📝 Walkthrough

Walkthrough

Introduces a new generic, concurrent-safe batching buffer package for Go that collects items from producers into batches and flushes them to a configurable sink. Adds documentation, a configurable Buffer[T] implementation with backpressure and shutdown semantics, and an extensive test suite including benchmarks.

Changes

Cohort / File(s) Summary
Documentation
buffer/README.md
New README documenting features, installation, quick-start, internal workflow, configuration options, shutdown/backpressure semantics, usage patterns, and examples.
Core Implementation
buffer/buffer.go
Adds generic Buffer[T], Config[T], FlushFunc[T], NewBuffer, Add, Close, ItemsInChannel, and Run. Implements channel-backed batching, configurable flush triggers, overflow handling, backpressure, and error callbacks.
Tests & Benchmarks
buffer/buffer_test.go
Adds comprehensive tests covering capacity and interval flushes, Close draining, overflow, rejection and error callbacks, concurrency, backpressure, flush-context behavior, and a parallel throughput benchmark.

Sequence Diagram

sequenceDiagram
    participant Producer as Producer
    participant Buffer as Buffer
    participant RunLoop as "Run Loop"
    participant Sink as "Flush (Sink)"
    participant Handler as "OnFlushError / OnReject"

    Producer->>Buffer: Add(ctx, item)
    activate Buffer
    Buffer->>Buffer: CanAdd(item)?
    alt accepted
        Buffer->>Buffer: enqueue to dataChan
    else rejected
        Buffer->>Handler: OnReject(item)
    end
    deactivate Buffer

    RunLoop->>Buffer: read from dataChan
    activate RunLoop
    loop process items / timer
        RunLoop->>RunLoop: accumulate batch
        RunLoop->>RunLoop: ShouldFlush? or timer elapsed
        alt flush condition
            RunLoop->>RunLoop: WillOverflow(next)?
            RunLoop->>Sink: Flush(ctx, batch)
            activate Sink
            Sink-->>RunLoop: success / error
            deactivate Sink
            alt error
                RunLoop->>Handler: OnFlushError(err, batch)
            end
            RunLoop->>RunLoop: reset batch & timer
        end
    end

    Producer->>Buffer: Close()
    RunLoop->>RunLoop: drain remaining items
    RunLoop->>Sink: Flush final batch
    RunLoop-->>Producer: finished
    deactivate RunLoop
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

🐰 I gather bytes in gentle heaps,
Batching dreams while everyone sleeps,
A hop, a flush, no panic nor scare,
Backpressure held with patient care,
I close my paws — the stream runs fair.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 25.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and accurately describes the main change: introducing a generic buffer that flushes based on capacity and timed intervals.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat-generic-time-bound-buffer-tp2544

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
buffer/buffer.go (1)

348-366: Variable shadowing: err in flush() shadows the named return.

The err variable on line 360 shadows the named return value err from line 348. While harmless here (the function always returns nil), it could cause confusion or bugs if the code evolves. Consider renaming the inner variable.

✨ Suggested rename
 	flush := func() {
 		if len(batch) == 0 {
 			return
 		}
 
-		if err := cfg.Flush(ctx, batch); err != nil {
-			cfg.OnFlushError(err, batch)
+		if flushErr := cfg.Flush(ctx, batch); flushErr != nil {
+			cfg.OnFlushError(flushErr, batch)
 		}
 
 		batch = batch[:0]
 		ticker.Reset(cfg.FlushInterval)
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@buffer/buffer.go` around lines 348 - 366, The inner closure flush() in
Buffer.Run shadows the named return variable err by declaring a new err when
calling cfg.Flush; rename the inner variable (e.g., flushErr) or assign the call
result to a blank identifier so the named return isn't shadowed. Locate the
flush closure inside Run on Buffer[T] and change the line that reads "if err :=
cfg.Flush(ctx, batch); err != nil {" to use a non-shadowing name (flushErr) or
use "_ = cfg.Flush(...)" with proper error handling via
cfg.OnFlushError(flushErr, batch).
buffer/buffer_test.go (1)

135-135: Handle NewBuffer errors consistently across all tests.

Multiple tests discard the error from NewBuffer. Consider using a test helper to reduce boilerplate:

✨ Suggested helper
func mustNewBuffer[T any](t *testing.T, cfg Config[T]) *Buffer[T] {
	t.Helper()
	buf, err := NewBuffer(cfg)
	if err != nil {
		t.Fatal(err)
	}
	return buf
}

Then replace all buf, _ := NewBuffer(cfg) with buf := mustNewBuffer(t, cfg).

Also applies to: 168-168, 209-209, 263-263, 301-301, 336-336, 386-386, 424-424, 457-457

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@buffer/buffer_test.go` at line 135, Tests currently ignore the error return
from NewBuffer; add a test helper mustNewBuffer(t, cfg) that calls NewBuffer,
fails the test on error (using t.Helper and t.Fatal(err)), and returns the
buffer, then replace all occurrences of "buf, _ := NewBuffer(cfg)" in tests with
"buf := mustNewBuffer(t, cfg)" (references: NewBuffer and the new mustNewBuffer
helper) to ensure errors are handled consistently across the listed test sites.
buffer/README.md (1)

58-65: Add language specifiers to fenced code blocks.

The static analysis tool flagged two code blocks without language specifiers. For ASCII diagrams and benchmark output, use text to satisfy linters while preserving formatting.

📝 Proposed fix
-```
+```text
 producers                   buffer                      sink
 ─────────                   ──────                      ────
-```
+```text
 goos: darwin
 goarch: arm64

Also applies to: 244-250

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@buffer/README.md` around lines 58 - 65, Update the two fenced code blocks in
README.md that currently lack language specifiers by changing the opening
backticks from ``` to ```text so the ASCII diagram and the benchmark output are
fenced as text; specifically modify the diagram block (the block containing
"producers                   buffer                      sink" and the ticker
diagram) and the benchmark block (the block containing "goos: darwin" / "goarch:
arm64"), and also apply the same change to the other occurrence around lines
244-250 to ensure all code fences use ```text.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@buffer/buffer_test.go`:
- Line 94: The call to NewBuffer in the test discards its error return (buf, _
:= NewBuffer(cfg)); update the test to capture the returned error (buf, err :=
NewBuffer(cfg)) and fail the test if err != nil (e.g., t.Fatalf or use your test
helper like require.NoError) so the test properly surfaces construction failures
from NewBuffer.

---

Nitpick comments:
In `@buffer/buffer_test.go`:
- Line 135: Tests currently ignore the error return from NewBuffer; add a test
helper mustNewBuffer(t, cfg) that calls NewBuffer, fails the test on error
(using t.Helper and t.Fatal(err)), and returns the buffer, then replace all
occurrences of "buf, _ := NewBuffer(cfg)" in tests with "buf := mustNewBuffer(t,
cfg)" (references: NewBuffer and the new mustNewBuffer helper) to ensure errors
are handled consistently across the listed test sites.

In `@buffer/buffer.go`:
- Around line 348-366: The inner closure flush() in Buffer.Run shadows the named
return variable err by declaring a new err when calling cfg.Flush; rename the
inner variable (e.g., flushErr) or assign the call result to a blank identifier
so the named return isn't shadowed. Locate the flush closure inside Run on
Buffer[T] and change the line that reads "if err := cfg.Flush(ctx, batch); err
!= nil {" to use a non-shadowing name (flushErr) or use "_ = cfg.Flush(...)"
with proper error handling via cfg.OnFlushError(flushErr, batch).

In `@buffer/README.md`:
- Around line 58-65: Update the two fenced code blocks in README.md that
currently lack language specifiers by changing the opening backticks from ``` to
```text so the ASCII diagram and the benchmark output are fenced as text;
specifically modify the diagram block (the block containing "producers          
buffer                      sink" and the ticker diagram) and the benchmark
block (the block containing "goos: darwin" / "goarch: arm64"), and also apply
the same change to the other occurrence around lines 244-250 to ensure all code
fences use ```text.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 6b1d4819-8c68-4dfb-9bb3-ee9ad3cab428

📥 Commits

Reviewing files that changed from the base of the PR and between 4393633 and 5b84361.

📒 Files selected for processing (3)
  • buffer/README.md
  • buffer/buffer.go
  • buffer/buffer_test.go

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@buffer/buffer.go`:
- Around line 218-220: The current default no-op cfg.OnFlushError and
unconditional clearing of b.batch after calling flush() causes silent data loss;
change flush/Run behavior so failures are surfaced before resetting the batch:
make Run return the first non-nil error from flush() (or if you prefer enforce
an explicit handler, panic/return a configuration error when cfg.OnFlushError is
nil) and only clear b.batch after a successful flush, and when you do call
cfg.OnFlushError invoke it with the error and the unmodified batch so callers
can decide retry; update both places noted (the initialization of
cfg.OnFlushError and the code paths in flush()/Run that clear b.batch) to follow
this behavior.
- Around line 252-258: The Add method can still send to b.dataChan if ctx was
already cancelled but the channel has space; fix Buffer.Add by checking the
context before attempting the send: in function Add (receiver *Buffer[T]) call
if err := ctx.Err(); err != nil { return err } immediately before the select, so
a cancelled context is rejected deterministically and the method still falls
through to the existing select for non-cancelled contexts.
- Around line 192-203: Ensure NewBuffer validates Config fields before using
them: after calling Config.withDefaults() (method withDefaults), check that
Capacity and ChanSize are positive and that FlushInterval is > 0; detect
overflow when computing ChanSize as 2*Capacity (e.g., compute as int64 and
ensure result fits and is > 0) and reject if any check fails. Return an explicit
error from NewBuffer (or panic consistently with project conventions) when
Capacity, ChanSize, or FlushInterval are invalid instead of proceeding to
make(chan ...) or time.NewTicker; update callers as needed to handle the error.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 7dc74a86-0a67-4bca-920b-da19400f6008

📥 Commits

Reviewing files that changed from the base of the PR and between 5b84361 and dbca8e9.

📒 Files selected for processing (1)
  • buffer/buffer.go

Comment on lines +192 to +203
func (c *Config[T]) withDefaults() Config[T] {
cfg := *c

if cfg.Capacity == 0 {
cfg.Capacity = 100
}
if cfg.ChanSize == 0 {
cfg.ChanSize = 2 * cfg.Capacity
}
if cfg.FlushInterval == 0 {
cfg.FlushInterval = 5 * time.Second
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "buffer.go" -path "*/buffer/*" | head -5

Repository: CloudStuffTech/go-utils

Length of output: 85


🏁 Script executed:

find . -name "buffer.go" | head -10

Repository: CloudStuffTech/go-utils

Length of output: 85


🏁 Script executed:

git ls-files | grep -i buffer

Repository: CloudStuffTech/go-utils

Length of output: 145


🏁 Script executed:

git ls-files | head -30

Repository: CloudStuffTech/go-utils

Length of output: 543


🏁 Script executed:

wc -l buffer/buffer.go

Repository: CloudStuffTech/go-utils

Length of output: 87


🏁 Script executed:

cat -n buffer/buffer.go

Repository: CloudStuffTech/go-utils

Length of output: 15261


🏁 Script executed:

cat -n buffer/buffer_test.go | head -100

Repository: CloudStuffTech/go-utils

Length of output: 3448


🏁 Script executed:

rg -n "NewBuffer" buffer/ -A 5 -B 2

Repository: CloudStuffTech/go-utils

Length of output: 6907


🏁 Script executed:

rg -n "negative\|invalid\|Capacity.*-\|ChanSize.*-\|FlushInterval.*-" buffer/

Repository: CloudStuffTech/go-utils

Length of output: 49


🏁 Script executed:

grep -n "TestNewBuffer\|test.*config\|test.*validation" buffer/buffer_test.go

Repository: CloudStuffTech/go-utils

Length of output: 49


🏁 Script executed:

python3 << 'EOF'
# Verify Go's behavior with invalid values
print("Go runtime behavior for invalid values:")
print("\n1. make([]T, 0, capacity) with negative capacity:")
print("   - Panics: runtime error: negative capacity")
print("\n2. make(chan T, bufsize) with negative bufsize:")
print("   - Panics: runtime error: negative channel buffer size")
print("\n3. time.NewTicker(duration) with duration <= 0:")
print("   - For negative: Panics with 'time: invalid duration'")
print("   - For zero: Also invalid (ticker requires positive duration)")
print("\n4. Integer overflow in 2*Capacity:")
print("   - If Capacity = 2^31 - 1 (max int32), 2*Capacity overflows to negative")
EOF

Repository: CloudStuffTech/go-utils

Length of output: 588


🏁 Script executed:

rg -n "2 \* cfg.Capacity\|2 \* Capacity" buffer/

Repository: CloudStuffTech/go-utils

Length of output: 49


🏁 Script executed:

git log --oneline buffer/buffer.go | head -20

Repository: CloudStuffTech/go-utils

Length of output: 144


🏁 Script executed:

sed -n '230,240p' buffer/buffer.go

Repository: CloudStuffTech/go-utils

Length of output: 314


🏁 Script executed:

sed -n '348,355p' buffer/buffer.go

Repository: CloudStuffTech/go-utils

Length of output: 257


🏁 Script executed:

sed -n '192,203p' buffer/buffer.go

Repository: CloudStuffTech/go-utils

Length of output: 304


🏁 Script executed:

sed -n '230,238p' buffer/buffer.go

Repository: CloudStuffTech/go-utils

Length of output: 311


Add validation in NewBuffer to reject invalid config values.

Negative Capacity, ChanSize, or FlushInterval values currently escape withDefaults() and cause runtime panics in NewBuffer (line 236: make(chan ...) panics on negative ChanSize), or later in Run (line 350: make([]T, 0, Capacity) panics on negative Capacity; line 352: time.NewTicker(FlushInterval) panics on non-positive FlushInterval). Integer overflow in 2*Capacity (line 199) can also wrap to negative. Validate these fields in NewBuffer before constructing the buffer or channel.

Also applies to: 230-238

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@buffer/buffer.go` around lines 192 - 203, Ensure NewBuffer validates Config
fields before using them: after calling Config.withDefaults() (method
withDefaults), check that Capacity and ChanSize are positive and that
FlushInterval is > 0; detect overflow when computing ChanSize as 2*Capacity
(e.g., compute as int64 and ensure result fits and is > 0) and reject if any
check fails. Return an explicit error from NewBuffer (or panic consistently with
project conventions) when Capacity, ChanSize, or FlushInterval are invalid
instead of proceeding to make(chan ...) or time.NewTicker; update callers as
needed to handle the error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant