Add generic buffer with capacity and timed flushing to sink#13
Add generic buffer with capacity and timed flushing to sink#13af-emp-0271 wants to merge 4 commits intomasterfrom
Conversation
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughIntroduces a new generic, concurrent-safe batching buffer package for Go that collects items from producers into batches and flushes them to a configurable sink. Adds documentation, a configurable Buffer[T] implementation with backpressure and shutdown semantics, and an extensive test suite including benchmarks. Changes
Sequence DiagramsequenceDiagram
participant Producer as Producer
participant Buffer as Buffer
participant RunLoop as "Run Loop"
participant Sink as "Flush (Sink)"
participant Handler as "OnFlushError / OnReject"
Producer->>Buffer: Add(ctx, item)
activate Buffer
Buffer->>Buffer: CanAdd(item)?
alt accepted
Buffer->>Buffer: enqueue to dataChan
else rejected
Buffer->>Handler: OnReject(item)
end
deactivate Buffer
RunLoop->>Buffer: read from dataChan
activate RunLoop
loop process items / timer
RunLoop->>RunLoop: accumulate batch
RunLoop->>RunLoop: ShouldFlush? or timer elapsed
alt flush condition
RunLoop->>RunLoop: WillOverflow(next)?
RunLoop->>Sink: Flush(ctx, batch)
activate Sink
Sink-->>RunLoop: success / error
deactivate Sink
alt error
RunLoop->>Handler: OnFlushError(err, batch)
end
RunLoop->>RunLoop: reset batch & timer
end
end
Producer->>Buffer: Close()
RunLoop->>RunLoop: drain remaining items
RunLoop->>Sink: Flush final batch
RunLoop-->>Producer: finished
deactivate RunLoop
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (3)
buffer/buffer.go (1)
348-366: Variable shadowing:errinflush()shadows the named return.The
errvariable on line 360 shadows the named return valueerrfrom line 348. While harmless here (the function always returnsnil), it could cause confusion or bugs if the code evolves. Consider renaming the inner variable.✨ Suggested rename
flush := func() { if len(batch) == 0 { return } - if err := cfg.Flush(ctx, batch); err != nil { - cfg.OnFlushError(err, batch) + if flushErr := cfg.Flush(ctx, batch); flushErr != nil { + cfg.OnFlushError(flushErr, batch) } batch = batch[:0] ticker.Reset(cfg.FlushInterval) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@buffer/buffer.go` around lines 348 - 366, The inner closure flush() in Buffer.Run shadows the named return variable err by declaring a new err when calling cfg.Flush; rename the inner variable (e.g., flushErr) or assign the call result to a blank identifier so the named return isn't shadowed. Locate the flush closure inside Run on Buffer[T] and change the line that reads "if err := cfg.Flush(ctx, batch); err != nil {" to use a non-shadowing name (flushErr) or use "_ = cfg.Flush(...)" with proper error handling via cfg.OnFlushError(flushErr, batch).buffer/buffer_test.go (1)
135-135: HandleNewBuffererrors consistently across all tests.Multiple tests discard the error from
NewBuffer. Consider using a test helper to reduce boilerplate:✨ Suggested helper
func mustNewBuffer[T any](t *testing.T, cfg Config[T]) *Buffer[T] { t.Helper() buf, err := NewBuffer(cfg) if err != nil { t.Fatal(err) } return buf }Then replace all
buf, _ := NewBuffer(cfg)withbuf := mustNewBuffer(t, cfg).Also applies to: 168-168, 209-209, 263-263, 301-301, 336-336, 386-386, 424-424, 457-457
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@buffer/buffer_test.go` at line 135, Tests currently ignore the error return from NewBuffer; add a test helper mustNewBuffer(t, cfg) that calls NewBuffer, fails the test on error (using t.Helper and t.Fatal(err)), and returns the buffer, then replace all occurrences of "buf, _ := NewBuffer(cfg)" in tests with "buf := mustNewBuffer(t, cfg)" (references: NewBuffer and the new mustNewBuffer helper) to ensure errors are handled consistently across the listed test sites.buffer/README.md (1)
58-65: Add language specifiers to fenced code blocks.The static analysis tool flagged two code blocks without language specifiers. For ASCII diagrams and benchmark output, use
textto satisfy linters while preserving formatting.📝 Proposed fix
-``` +```text producers buffer sink ───────── ────── ────-``` +```text goos: darwin goarch: arm64Also applies to: 244-250
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@buffer/README.md` around lines 58 - 65, Update the two fenced code blocks in README.md that currently lack language specifiers by changing the opening backticks from ``` to ```text so the ASCII diagram and the benchmark output are fenced as text; specifically modify the diagram block (the block containing "producers buffer sink" and the ticker diagram) and the benchmark block (the block containing "goos: darwin" / "goarch: arm64"), and also apply the same change to the other occurrence around lines 244-250 to ensure all code fences use ```text.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@buffer/buffer_test.go`:
- Line 94: The call to NewBuffer in the test discards its error return (buf, _
:= NewBuffer(cfg)); update the test to capture the returned error (buf, err :=
NewBuffer(cfg)) and fail the test if err != nil (e.g., t.Fatalf or use your test
helper like require.NoError) so the test properly surfaces construction failures
from NewBuffer.
---
Nitpick comments:
In `@buffer/buffer_test.go`:
- Line 135: Tests currently ignore the error return from NewBuffer; add a test
helper mustNewBuffer(t, cfg) that calls NewBuffer, fails the test on error
(using t.Helper and t.Fatal(err)), and returns the buffer, then replace all
occurrences of "buf, _ := NewBuffer(cfg)" in tests with "buf := mustNewBuffer(t,
cfg)" (references: NewBuffer and the new mustNewBuffer helper) to ensure errors
are handled consistently across the listed test sites.
In `@buffer/buffer.go`:
- Around line 348-366: The inner closure flush() in Buffer.Run shadows the named
return variable err by declaring a new err when calling cfg.Flush; rename the
inner variable (e.g., flushErr) or assign the call result to a blank identifier
so the named return isn't shadowed. Locate the flush closure inside Run on
Buffer[T] and change the line that reads "if err := cfg.Flush(ctx, batch); err
!= nil {" to use a non-shadowing name (flushErr) or use "_ = cfg.Flush(...)"
with proper error handling via cfg.OnFlushError(flushErr, batch).
In `@buffer/README.md`:
- Around line 58-65: Update the two fenced code blocks in README.md that
currently lack language specifiers by changing the opening backticks from ``` to
```text so the ASCII diagram and the benchmark output are fenced as text;
specifically modify the diagram block (the block containing "producers
buffer sink" and the ticker diagram) and the benchmark
block (the block containing "goos: darwin" / "goarch: arm64"), and also apply
the same change to the other occurrence around lines 244-250 to ensure all code
fences use ```text.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 6b1d4819-8c68-4dfb-9bb3-ee9ad3cab428
📒 Files selected for processing (3)
buffer/README.mdbuffer/buffer.gobuffer/buffer_test.go
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@buffer/buffer.go`:
- Around line 218-220: The current default no-op cfg.OnFlushError and
unconditional clearing of b.batch after calling flush() causes silent data loss;
change flush/Run behavior so failures are surfaced before resetting the batch:
make Run return the first non-nil error from flush() (or if you prefer enforce
an explicit handler, panic/return a configuration error when cfg.OnFlushError is
nil) and only clear b.batch after a successful flush, and when you do call
cfg.OnFlushError invoke it with the error and the unmodified batch so callers
can decide retry; update both places noted (the initialization of
cfg.OnFlushError and the code paths in flush()/Run that clear b.batch) to follow
this behavior.
- Around line 252-258: The Add method can still send to b.dataChan if ctx was
already cancelled but the channel has space; fix Buffer.Add by checking the
context before attempting the send: in function Add (receiver *Buffer[T]) call
if err := ctx.Err(); err != nil { return err } immediately before the select, so
a cancelled context is rejected deterministically and the method still falls
through to the existing select for non-cancelled contexts.
- Around line 192-203: Ensure NewBuffer validates Config fields before using
them: after calling Config.withDefaults() (method withDefaults), check that
Capacity and ChanSize are positive and that FlushInterval is > 0; detect
overflow when computing ChanSize as 2*Capacity (e.g., compute as int64 and
ensure result fits and is > 0) and reject if any check fails. Return an explicit
error from NewBuffer (or panic consistently with project conventions) when
Capacity, ChanSize, or FlushInterval are invalid instead of proceeding to
make(chan ...) or time.NewTicker; update callers as needed to handle the error.
| func (c *Config[T]) withDefaults() Config[T] { | ||
| cfg := *c | ||
|
|
||
| if cfg.Capacity == 0 { | ||
| cfg.Capacity = 100 | ||
| } | ||
| if cfg.ChanSize == 0 { | ||
| cfg.ChanSize = 2 * cfg.Capacity | ||
| } | ||
| if cfg.FlushInterval == 0 { | ||
| cfg.FlushInterval = 5 * time.Second | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -name "buffer.go" -path "*/buffer/*" | head -5Repository: CloudStuffTech/go-utils
Length of output: 85
🏁 Script executed:
find . -name "buffer.go" | head -10Repository: CloudStuffTech/go-utils
Length of output: 85
🏁 Script executed:
git ls-files | grep -i bufferRepository: CloudStuffTech/go-utils
Length of output: 145
🏁 Script executed:
git ls-files | head -30Repository: CloudStuffTech/go-utils
Length of output: 543
🏁 Script executed:
wc -l buffer/buffer.goRepository: CloudStuffTech/go-utils
Length of output: 87
🏁 Script executed:
cat -n buffer/buffer.goRepository: CloudStuffTech/go-utils
Length of output: 15261
🏁 Script executed:
cat -n buffer/buffer_test.go | head -100Repository: CloudStuffTech/go-utils
Length of output: 3448
🏁 Script executed:
rg -n "NewBuffer" buffer/ -A 5 -B 2Repository: CloudStuffTech/go-utils
Length of output: 6907
🏁 Script executed:
rg -n "negative\|invalid\|Capacity.*-\|ChanSize.*-\|FlushInterval.*-" buffer/Repository: CloudStuffTech/go-utils
Length of output: 49
🏁 Script executed:
grep -n "TestNewBuffer\|test.*config\|test.*validation" buffer/buffer_test.goRepository: CloudStuffTech/go-utils
Length of output: 49
🏁 Script executed:
python3 << 'EOF'
# Verify Go's behavior with invalid values
print("Go runtime behavior for invalid values:")
print("\n1. make([]T, 0, capacity) with negative capacity:")
print(" - Panics: runtime error: negative capacity")
print("\n2. make(chan T, bufsize) with negative bufsize:")
print(" - Panics: runtime error: negative channel buffer size")
print("\n3. time.NewTicker(duration) with duration <= 0:")
print(" - For negative: Panics with 'time: invalid duration'")
print(" - For zero: Also invalid (ticker requires positive duration)")
print("\n4. Integer overflow in 2*Capacity:")
print(" - If Capacity = 2^31 - 1 (max int32), 2*Capacity overflows to negative")
EOFRepository: CloudStuffTech/go-utils
Length of output: 588
🏁 Script executed:
rg -n "2 \* cfg.Capacity\|2 \* Capacity" buffer/Repository: CloudStuffTech/go-utils
Length of output: 49
🏁 Script executed:
git log --oneline buffer/buffer.go | head -20Repository: CloudStuffTech/go-utils
Length of output: 144
🏁 Script executed:
sed -n '230,240p' buffer/buffer.goRepository: CloudStuffTech/go-utils
Length of output: 314
🏁 Script executed:
sed -n '348,355p' buffer/buffer.goRepository: CloudStuffTech/go-utils
Length of output: 257
🏁 Script executed:
sed -n '192,203p' buffer/buffer.goRepository: CloudStuffTech/go-utils
Length of output: 304
🏁 Script executed:
sed -n '230,238p' buffer/buffer.goRepository: CloudStuffTech/go-utils
Length of output: 311
Add validation in NewBuffer to reject invalid config values.
Negative Capacity, ChanSize, or FlushInterval values currently escape withDefaults() and cause runtime panics in NewBuffer (line 236: make(chan ...) panics on negative ChanSize), or later in Run (line 350: make([]T, 0, Capacity) panics on negative Capacity; line 352: time.NewTicker(FlushInterval) panics on non-positive FlushInterval). Integer overflow in 2*Capacity (line 199) can also wrap to negative. Validate these fields in NewBuffer before constructing the buffer or channel.
Also applies to: 230-238
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@buffer/buffer.go` around lines 192 - 203, Ensure NewBuffer validates Config
fields before using them: after calling Config.withDefaults() (method
withDefaults), check that Capacity and ChanSize are positive and that
FlushInterval is > 0; detect overflow when computing ChanSize as 2*Capacity
(e.g., compute as int64 and ensure result fits and is > 0) and reject if any
check fails. Return an explicit error from NewBuffer (or panic consistently with
project conventions) when Capacity, ChanSize, or FlushInterval are invalid
instead of proceeding to make(chan ...) or time.NewTicker; update callers as
needed to handle the error.
Buffer flushes when:
Allows pluggable sink implementation with normal defaults.
For more info, please look here: https://cloudstuff-team.atlassian.net/browse/TP-2544
Summary by CodeRabbit
New Features
Documentation
Tests