From 5b84361c09fe37b49189bff83edd5eadd744b9aa Mon Sep 17 00:00:00 2001 From: Manan Singh Date: Fri, 6 Mar 2026 14:19:46 +0530 Subject: [PATCH 1/4] Add generic buffer with capacity and timed flushing to sink --- buffer/README.md | 256 ++++++++++++++++++++++ buffer/buffer.go | 399 +++++++++++++++++++++++++++++++++++ buffer/buffer_test.go | 481 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 1136 insertions(+) create mode 100644 buffer/README.md create mode 100644 buffer/buffer.go create mode 100644 buffer/buffer_test.go diff --git a/buffer/README.md b/buffer/README.md new file mode 100644 index 0000000..edcf9e2 --- /dev/null +++ b/buffer/README.md @@ -0,0 +1,256 @@ +# buffer + +A generic, batching buffer for Go. It accumulates items from concurrent producers into batches and flushes them to a sink — by count, by size, or on a time interval. + +Designed for high-throughput pipelines where writing one item at a time is too expensive: Kinesis, BigQuery, Kafka, HTTP bulk APIs, and similar backends. + +--- + +## Features + +- **Generic** — works with any type `T` +- **Concurrent-safe** — multiple producers can call `Add` simultaneously +- **Flexible flush triggers** — batch size, payload size, elapsed time, or any custom condition +- **Overflow protection** — pre-emptively flush before a backend hard limit is breached +- **Backpressure** — `Add` blocks naturally when the internal channel is full +- **Graceful shutdown** — `Close` drains all buffered items before returning + +--- + +## Installation + +```bash +go get github.com/CloudStuffTech/go-utils/buffer +``` + +--- + +## Quick Start + +```go +buf, err := buffer.NewBuffer(buffer.Config[MyMsg]{ + Capacity: 500, + Flush: func(ctx context.Context, batch []MyMsg) error { + return kinesis.PutRecords(ctx, batch) + }, +}) +if err != nil { + log.Fatal(err) +} + +ctx, cancel := context.WithCancel(context.Background()) + +// Start the processing loop in the background. +go buf.Run(ctx) + +// Produce items from anywhere. +buf.Add(ctx, msg) + +// Shutdown: stop producers first, then close the buffer. +cancel() +buf.Close() +``` + +--- + +## How It Works + +``` +producers buffer sink +───────── ────── ──── +Add(item) ──→ [ dataChan ] ──→ Run() loop ──→ Flush(batch) +Add(item) ──→ [ dataChan ] ↑ +Add(item) ──→ [ dataChan ] ticker fires + (interval) +``` + +`Run` is a single goroutine that owns the batch. It reads from the internal channel and accumulates items. A flush is triggered when any of the following occur: + +- `ShouldFlush` returns true after an item is appended (default: `len(batch) >= Capacity`) +- `WillOverflow` returns true before an item is appended (pre-emptive flush) +- The `FlushInterval` ticker fires +- `Close()` is called (final drain) + +Because only `Run` ever reads from the channel and mutates the batch, no internal locking is needed. + +--- + +## Configuration + +Only `Flush` is required. All other fields have sensible defaults. + +```go +buf, err := buffer.NewBuffer(buffer.Config[MyMsg]{ + // Required: called with each completed batch. + Flush: func(ctx context.Context, batch []MyMsg) error { + return kinesis.PutRecords(ctx, batch) + }, + + // Target batch size. Also the default flush trigger. + // Default: 100. + Capacity: 500, + + // Internal channel buffer. Larger values absorb bursts better. + // Default: 2 * Capacity. + ChanSize: 2000, + + // Maximum time between flushes, even if Capacity isn't reached. + // Default: 5 seconds. + FlushInterval: 2 * time.Second, + + // Pre-emptive flush when adding an item would breach a hard limit. + // The existing batch is flushed first; then the item is re-evaluated. + WillOverflow: func(batch []MyMsg, item MyMsg) bool { + return totalBytes(batch)+len(item.Data) > 5*1024*1024 + }, + + // Per-item admission check. Return false to reject an item. + CanAdd: func(batch []MyMsg, item MyMsg) bool { + return len(item.Data) <= 1*1024*1024 // reject items over 1MB + }, + + // Called for every rejected item. + OnReject: func(item MyMsg) { + log.Printf("rejected oversized message: %s", item.ID) + }, + + // Custom flush condition. Overrides the default Capacity check. + ShouldFlush: func(batch []MyMsg) bool { + return len(batch) >= 500 || totalBytes(batch) >= 4*1024*1024 + }, + + // Called when Flush returns an error. Batch is already reset. + OnFlushError: func(err error, batch []MyMsg) { + metrics.FlushErrors.Inc() + deadLetterQueue.Send(batch) + }, +}) +``` + +--- + +## Shutdown + +The buffer separates two concerns that are easy to conflate: + +| Concern | Mechanism | +|---|---| +| Stop the Flush from hitting a cancelled backend | `ctx` passed to `Run` | +| Stop `Run` itself and drain remaining items | `buf.Close()` | + +**Cancelling `ctx` does not stop `Run`.** This is intentional — the buffer has no visibility into whether producers have finished calling `Add`. Stopping `Run` on context cancellation alone would risk silently dropping items that are already in the channel. + +The correct shutdown sequence is always: + +1. **Stop all producers** — ensure no further `Add` calls will be made +2. **Call `buf.Close()`** — signals `Run` that input is exhausted +3. **Wait for `Run` to return** — it will drain and flush everything first + +```go +// Correct shutdown pattern with a WaitGroup. +var producerWg sync.WaitGroup + +producerWg.Add(1) +go func() { + defer producerWg.Done() + produceItems(ctx, buf) +}() + +// Signal producers to stop. +cancel() + +// Wait until the last Add() call has returned. +producerWg.Wait() + +// Now safe to close — no concurrent Add() calls are possible. +buf.Close() +``` + +### With Pub/Sub (Google Cloud) + +`sub.Receive` handles producer coordination for you. It blocks until `ctx` is cancelled **and** all in-flight message handlers have returned — guaranteeing no further `Add` calls before it exits. + +```go +go buf.Run(ctx) + +sub.Receive(ctx, func(msgCtx context.Context, m *pubsub.Message) { + if err := buf.Add(msgCtx, m); err != nil { + m.Nack() + } +}) + +// sub.Receive has fully stopped — safe to close. +buf.Close() +``` + +--- + +## Flush Context and Graceful Shutdown + +When `buf.Close()` is called during shutdown, the final drain flush fires with the original `ctx` — which may already be cancelled. If your `Flush` implementation passes that context directly to a backend call, the call will fail immediately. + +Shield against this with `context.WithoutCancel`: + +```go +Flush: func(ctx context.Context, batch []MyMsg) error { + // Detach from cancellation, but enforce a hard deadline + // so the flush can't block indefinitely. + flushCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second) + defer cancel() + return backend.Send(flushCtx, batch) +}, +``` + +--- + +## Backpressure + +`Add` blocks if the internal channel is full. This is intentional — it propagates backpressure to producers rather than silently dropping items or growing memory unboundedly. + +To bound how long `Add` may block, pass a context with a timeout or deadline: + +```go +addCtx, cancel := context.WithTimeout(ctx, 200*time.Millisecond) +defer cancel() + +if err := buf.Add(addCtx, item); err != nil { + // Channel was full for too long — handle accordingly. + item.Nack() +} +``` + +Tune `ChanSize` to control how much burst the buffer can absorb before producers start to feel backpressure. A good starting point is `2–4x Capacity`. + +--- + +## Batch Ownership + +The batch slice passed to `Flush` must not be retained after `Flush` returns. The buffer reuses the underlying array for the next batch. + +If you need to hold onto the batch — for example, to hand it off to a goroutine — copy it first: + +```go +Flush: func(ctx context.Context, batch []MyMsg) error { + owned := append([]MyMsg(nil), batch...) // copy before returning + go process(owned) + return nil +}, +``` + +--- + +## Benchmarks + +``` +goos: darwin +goarch: arm64 +pkg: github.com/CloudStuffTech/go-utils/buffer +cpu: Apple M4 Pro +BenchmarkBuffer_ParallelThroughput-12 12784041 185.5 ns/op 0 B/op 0 allocs/op +``` + +Run yourself: + +```bash +go test -bench=. -benchmem ./... +``` \ No newline at end of file diff --git a/buffer/buffer.go b/buffer/buffer.go new file mode 100644 index 0000000..d6d98cd --- /dev/null +++ b/buffer/buffer.go @@ -0,0 +1,399 @@ +package buffer + +import ( + "context" + "fmt" + "time" +) + +// FlushFunc processes a batch of accumulated items. +// +// It is called when: +// - ShouldFlush returns true after an item is appended +// - The flush interval ticker fires +// - dataChan is closed (graceful shutdown) +// +// The context passed is the exact same one given to Run(). During a +// graceful shutdown, it is highly likely this context has already been +// cancelled by the parent application. Therefore, implementations must +// shield the backend call to ensure the final drain succeeds: +// +// Flush: func(ctx context.Context, batch []MyMsg) error { +// // Shield the context from parent cancellation but enforce a hard timeout +// flushCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second) +// defer cancel() +// return backend.Send(flushCtx, batch) +// }, +// +// Returning a non-nil error triggers OnFlushError if set. The batch is +// reset regardless — the buffer does not retry. Retry logic, if needed, +// belongs inside FlushFunc or OnFlushError. +// +// The batch slice must not be retained after FlushFunc returns. The buffer +// will reuse the underlying array. Copy if you need to hold on to items: +// +// Flush: func(ctx context.Context, batch []MyMsg) error { +// owned := append([]MyMsg(nil), batch...) // safe to retain +// go process(owned) +// return nil +// }, +type FlushFunc[T any] func(ctx context.Context, batch []T) error + +// Config holds all behavioural knobs for a Buffer. +// +// Only Flush is required. All other fields have sensible defaults and +// can be omitted. +// +// Example minimal configuration: +// +// buf, err := buffer.NewBuffer(buffer.Config[MyMsg]{ +// Capacity: 500, +// Flush: func(ctx context.Context, batch []MyMsg) error { +// return kinesis.PutRecords(ctx, batch) +// }, +// }) +// +// Example full configuration: +// +// buf, err := buffer.NewBuffer(buffer.Config[MyMsg]{ +// Capacity: 500, +// ChanSize: 2000, +// FlushInterval: 2 * time.Second, +// +// Flush: func(ctx context.Context, batch []MyMsg) error { +// return kinesis.PutRecords(ctx, batch) +// }, +// CanAdd: func(batch []MyMsg, item MyMsg) bool { +// return totalBytes(batch)+len(item.Data) <= 5*1024*1024 +// }, +// OnReject: func(item MyMsg) { +// log.Printf("rejected oversized message: %v", item.ID) +// }, +// ShouldFlush: func(batch []MyMsg) bool { +// return len(batch) >= 500 || totalBytes(batch) >= 4*1024*1024 +// }, +// OnFlushError: func(err error, batch []MyMsg) { +// metrics.FlushErrors.Inc() +// deadLetterQueue.Send(batch) +// }, +// }) +type Config[T any] struct { + // Flush is the only required field. + // + // It is called with the current batch every time a flush is triggered. + // See FlushFunc for full semantics. + Flush FlushFunc[T] + + // WillOverflow is called before an item is added to the current batch to check + // if appending it would breach a strict backend limit (e.g., a 5MB payload size). + // + // If it returns true, the buffer immediately flushes the *existing* batch + // to make room, and then evaluates the new item for the next empty batch. + // + // This is crucial for byte-size constraints where adding an item might + // create an invalid payload that the backend (like AWS Kinesis) would reject. + // + // WillOverflow: func(batch []MyMsg, item MyMsg) bool { + // return totalBytes(batch)+len(item.Data) > 5*1024*1024 // 5MB limit + // }, + // + // This function MUST NOT mutate the batch. + // + // Default: always returns false (no preemptive size-based flushing). + WillOverflow func(batch []T, item T) bool + + // CanAdd is called before appending each incoming item to the batch. + // Return false to reject the item — OnReject will be called instead. + // + // Useful for enforcing constraints that go beyond simple count limits, + // such as total payload size (e.g. Kinesis 5MB per PutRecords call): + // + // CanAdd: func(batch []MyMsg, item MyMsg) bool { + // return totalBytes(batch)+len(item.Data) <= 5*1024*1024 + // }, + // + // This function MUST NOT mutate batch. + // + // Default: always returns true (all items are accepted). + CanAdd func(batch []T, item T) bool + + // OnReject is called when CanAdd returns false for an item. + // + // Use this to log rejections, forward to a dead-letter queue, + // or record metrics. It must not block indefinitely, as it is + // called synchronously inside Run(). + // + // Default: no-op. + OnReject func(item T) + + // ShouldFlush is called after each successful append. + // Return true to trigger an immediate flush. + // + // This is where you define your primary flush condition. + // Common strategies: + // - Flush by count: len(batch) >= maxRecords + // - Flush by size: totalBytes(batch) >= maxBytes + // - Flush by both: either condition above + // + // Time-based flushing is handled separately by FlushInterval — + // you do not need to track time here. + // + // This function must be side-effect free. + // + // Default: flush when len(batch) >= Capacity. + ShouldFlush func(batch []T) bool + + // OnFlushError is called when Flush returns a non-nil error. + // + // The batch passed here is the one that failed. The buffer has + // already reset — it will not retry. If you need retry or + // dead-lettering, implement it here or inside Flush. + // + // Note: if Flush spawns a goroutine and returns nil optimistically, + // errors from that goroutine are outside the buffer's visibility — + // handle them within the goroutine itself. + // + // Default: no-op (errors are silently ignored). + OnFlushError func(err error, batch []T) + + // Capacity is the target batch size: the number of items to accumulate + // before ShouldFlush (default) triggers a flush. + // + // It also controls the initial allocation of the internal batch slice, + // so setting it close to your expected batch size avoids reallocations. + // + // Default: 100. + Capacity int + + // ChanSize is the buffer size of the internal dataChan. + // + // A larger value decouples producers from the flush cycle, reducing + // the chance that Add() blocks under bursty load. However, it increases + // memory usage and the number of in-flight unprocessed items on shutdown. + // + // Rule of thumb: set to at least Capacity, ideally 2–4x Capacity. + // + // Default: 2 * Capacity. + ChanSize int + + // FlushInterval is the maximum time between flushes, regardless of + // whether ShouldFlush has fired. + // + // This acts as a safety net to ensure items don't sit indefinitely + // in a partially filled batch during low-traffic periods. + // + // Setting this too low increases flush frequency and backend pressure. + // Setting this too high increases latency for tail items. + // + // Default: 5 seconds. + FlushInterval time.Duration +} + +func (c *Config[T]) withDefaults() Config[T] { + cfg := *c + + if cfg.Capacity == 0 { + cfg.Capacity = 100 + } + if cfg.ChanSize == 0 { + cfg.ChanSize = 2 * cfg.Capacity + } + if cfg.FlushInterval == 0 { + cfg.FlushInterval = 5 * time.Second + } + if cfg.CanAdd == nil { + cfg.CanAdd = func([]T, T) bool { return true } + } + if cfg.WillOverflow == nil { + cfg.WillOverflow = func([]T, T) bool { return false } + } + if cfg.OnReject == nil { + cfg.OnReject = func(T) {} + } + if cfg.ShouldFlush == nil { + cfg.ShouldFlush = func(batch []T) bool { + return len(batch) >= cfg.Capacity + } + } + if cfg.OnFlushError == nil { + cfg.OnFlushError = func(error, []T) {} + } + + return cfg +} + +type Buffer[T any] struct { + dataChan chan T + cfg Config[T] +} + +func NewBuffer[T any](cfg Config[T]) (*Buffer[T], error) { + if cfg.Flush == nil { + return nil, fmt.Errorf("buffer: Flush is required") + } + c := cfg.withDefaults() + return &Buffer[T]{ + dataChan: make(chan T, c.ChanSize), + cfg: c, + }, nil +} + +// Add sends an item to the buffer's internal channel for processing by Run. +// +// It blocks if the channel is full, providing natural backpressure to the +// producer. Pass a context with a timeout or deadline to bound how long Add +// may block before giving up. +// +// Add returns ctx.Err() if the context is cancelled before the item could +// be queued. The item is not added in that case — the caller is responsible +// for handling it (e.g. nacking a Pub/Sub message). +// +// Add must not be called after Close. Doing so will panic. +func (b *Buffer[T]) Add(ctx context.Context, item T) error { + select { + case b.dataChan <- item: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// Close signals Run that no more items will be produced. +// +// It closes the internal channel, which causes Run to drain all remaining +// buffered items, perform a final flush, and return nil. +// +// # Caller responsibility +// +// Close must only be called once all producers have finished calling Add. +// Calling Close while a producer might still call Add will cause a panic +// (send on closed channel). The caller is responsible for coordinating this, +// typically with a sync.WaitGroup: +// +// var wg sync.WaitGroup +// wg.Add(1) +// go func() { +// defer wg.Done() +// produceItems(ctx, buf) +// }() +// +// // Shutdown sequence: wait for producers, then signal the buffer. +// wg.Wait() +// buf.Close() +// +// Close is the only mechanism that stops Run. Cancelling the context passed +// to Run does NOT stop it — see Run for details. +func (b *Buffer[T]) Close() { + close(b.dataChan) +} + +// ItemsInChannel returns the number of items currently sitting in the +// internal channel, waiting to be picked up by Run. +// +// This is a snapshot value — it may be stale by the time the caller +// acts on it. Intended for monitoring and diagnostics only. +func (b *Buffer[T]) ItemsInChannel() int { + return len(b.dataChan) +} + +// Run is the buffer's main processing loop. It must be called exactly once, +// in its own goroutine, before any calls to Add. +// +// Run blocks until the internal channel is closed via [Buffer.Close], at +// which point it drains any remaining items, performs a final flush, and +// returns nil. +// +// # Context +// +// The context controls the lifecycle of Flush calls, not the lifecycle of +// Run itself. Cancelling ctx does NOT stop Run — it only causes that +// cancelled context to be forwarded to each subsequent Flush invocation. +// +// This is an intentional design decision. The buffer cannot safely stop on +// context cancellation alone because it has no visibility into whether +// producers have finished calling Add. Stopping Run prematurely could +// cause items that are already in the channel to be silently dropped. +// +// The correct shutdown sequence is always: +// +// 1. Stop all producers (ensure no further Add calls will be made). +// 2. Call buf.Close() to signal Run that the input is exhausted. +// 3. Wait for Run to return (it will drain and flush everything first). +// +// Example with a Pub/Sub subscriber as the producer: +// +// go buf.Run(ctx) +// +// // sub.Receive blocks until ctx is cancelled AND all in-flight +// // message handlers have returned — guaranteeing no more Add calls. +// sub.Receive(ctx, func(msgCtx context.Context, m *pubsub.Message) { +// if err := buf.Add(msgCtx, m); err != nil { +// m.Nack() +// } +// }) +// +// // Safe to close: sub.Receive has exited, no more Add calls possible. +// buf.Close() +// +// # Flush context and graceful shutdown +// +// Because ctx is likely already cancelled by the time the final drain flush +// fires, Flush implementations should shield their backend calls: +// +// Flush: func(ctx context.Context, batch []MyMsg) error { +// flushCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Second) +// defer cancel() +// return backend.Send(flushCtx, batch) +// }, +func (b *Buffer[T]) Run(ctx context.Context) (err error) { + cfg := b.cfg + batch := make([]T, 0, cfg.Capacity) + + ticker := time.NewTicker(cfg.FlushInterval) + defer ticker.Stop() + + flush := func() { + if len(batch) == 0 { + return + } + + if err := cfg.Flush(ctx, batch); err != nil { + cfg.OnFlushError(err, batch) + } + + batch = batch[:0] + ticker.Reset(cfg.FlushInterval) + } + + processItem := func(item T) { + if cfg.WillOverflow(batch, item) { + if len(batch) > 0 { + flush() + } + } + + if !cfg.CanAdd(batch, item) { + cfg.OnReject(item) + return + } + + batch = append(batch, item) + + if cfg.ShouldFlush(batch) { + flush() + } + } + + for { + select { + case item, ok := <-b.dataChan: + if !ok { + flush() + return nil + } + processItem(item) + case <-ticker.C: + flush() + } + } +} diff --git a/buffer/buffer_test.go b/buffer/buffer_test.go new file mode 100644 index 0000000..79a3ce0 --- /dev/null +++ b/buffer/buffer_test.go @@ -0,0 +1,481 @@ +package buffer + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" +) + +func BenchmarkBuffer_ParallelThroughput(b *testing.B) { + // 1. Setup mock telemetry to verify everything processed + var totalFlushed atomic.Int64 + var batchCount atomic.Int64 + + // 2. Configure the buffer for maximum speed + // We use a dummy integer instead of a complex struct to isolate the buffer's pure mechanical overhead. + cfg := Config[int]{ + Capacity: 500, + FlushInterval: 1 * time.Second, + // Mock Flush: Instantly returns without doing any I/O + Flush: func(ctx context.Context, batch []int) error { + totalFlushed.Add(int64(len(batch))) + batchCount.Add(1) + return nil + }, + } + + buf, err := NewBuffer(cfg) + if err != nil { + b.Fatal(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // 3. Start the consumer loop in the background + errChan := make(chan error, 1) + go func() { + errChan <- buf.Run(ctx) + }() + + // 4. Reset the timer and track memory allocations! + // We don't want the setup time to affect the nanosecond score. + b.ResetTimer() + b.ReportAllocs() + + // 5. THE CRASH TEST + // b.RunParallel spins up multiple goroutines based on your CPU cores + // and hammers the Add() function concurrently, exactly like Pub/Sub. + b.RunParallel(func(pb *testing.PB) { + item := 42 // Dummy payload + for pb.Next() { + if err := buf.Add(ctx, item); err != nil { + b.Errorf("Failed to add item: %v", err) + } + } + }) + + // 6. Stop the timer to isolate the pure Add/Batch speed + b.StopTimer() + + // 7. Cleanly shut down to ensure no deadlocks + buf.Close() + cancel() + + if err := <-errChan; err != nil { + b.Fatalf("Buffer run exited with error: %v", err) + } + + // Optional: Print the results to verify no messages were dropped + b.Logf("Processed %d items across %d batches.", totalFlushed.Load(), batchCount.Load()) +} + +// TestBuffer_Basic verifies that the buffer flushes exactly when Capacity is reached. +func TestBuffer_Basic(t *testing.T) { + var mu sync.Mutex + var received [][]int + + cfg := Config[int]{ + Capacity: 3, + Flush: func(ctx context.Context, batch []int) error { + mu.Lock() + // We MUST copy the slice because the buffer reuses the underlying array + cb := make([]int, len(batch)) + copy(cb, batch) + received = append(received, cb) + mu.Unlock() + return nil + }, + } + + buf, _ := NewBuffer(cfg) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) // Automatically fires when the test finishes (or panics) + + go buf.Run(ctx) + + // Send 5 items. With Capacity 3, we expect 1 flush of [1, 2, 3]. + // The items [4, 5] should still be in the buffer. + for i := 1; i <= 5; i++ { + buf.Add(ctx, i) + } + + // Give the goroutine a tiny bit of time to process + time.Sleep(50 * time.Millisecond) + + mu.Lock() + if len(received) != 1 { + t.Errorf("expected 1 flush, got %d", len(received)) + } + if len(received[0]) != 3 { + t.Errorf("expected batch size 3, got %d", len(received[0])) + } + mu.Unlock() +} + +// TestBuffer_FlushInterval verifies that items are flushed even if Capacity isn't met. +func TestBuffer_FlushInterval(t *testing.T) { + var mu sync.Mutex + var received [][]int + + cfg := Config[int]{ + Capacity: 100, + FlushInterval: 100 * time.Millisecond, + Flush: func(ctx context.Context, batch []int) error { + mu.Lock() + received = append(received, append([]int(nil), batch...)) + mu.Unlock() + return nil + }, + } + + buf, _ := NewBuffer(cfg) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) // Automatically fires when the test finishes (or panics) + + go buf.Run(ctx) + + buf.Add(ctx, 42) // Only one item, won't trigger Capacity flush + + // Wait longer than FlushInterval + time.Sleep(200 * time.Millisecond) + + mu.Lock() + if len(received) != 1 { + t.Errorf("expected timer-based flush, but got %d flushes", len(received)) + } + mu.Unlock() +} + +// TestBuffer_Close verifies that closing the channel drains the remaining items. +func TestBuffer_Close(t *testing.T) { + var mu sync.Mutex + var received [][]int + + cfg := Config[int]{ + Capacity: 10, + Flush: func(ctx context.Context, batch []int) error { + mu.Lock() + received = append(received, append([]int(nil), batch...)) + mu.Unlock() + return nil + }, + } + + buf, _ := NewBuffer(cfg) + + // We use a channel to signal when Run() is finished + done := make(chan struct{}) + go func() { + buf.Run(context.Background()) + close(done) + }() + + buf.Add(context.Background(), 1) + buf.Add(context.Background(), 2) + + buf.Close() // This should trigger a flush of [1, 2] + <-done + + if len(received) != 1 || len(received[0]) != 2 { + t.Errorf("expected final drain flush of 2 items, got %v", received) + } +} + +func TestBuffer_WillOverflow(t *testing.T) { + var mu sync.Mutex + var received [][]int + + cfg := Config[int]{ + Capacity: 100, + WillOverflow: func(batch []int, item int) bool { + sum := 0 + for _, v := range batch { + sum += v + } + return sum+item > 10 + }, + Flush: func(ctx context.Context, batch []int) error { + mu.Lock() + received = append(received, append([]int(nil), batch...)) + mu.Unlock() + return nil + }, + } + + buf, _ := NewBuffer(cfg) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) // Automatically fires when the test finishes (or panics) + + done := make(chan struct{}) + go func() { + buf.Run(ctx) + close(done) + }() + + buf.Add(ctx, 4) + buf.Add(ctx, 5) + buf.Add(ctx, 3) + + // Close the buffer and wait for Run() to fully exit before asserting. + // This guarantees the final batch containing [3] has been flushed. + buf.Close() + <-done + + mu.Lock() + defer mu.Unlock() + + if len(received) != 2 { + t.Fatalf("expected 2 flushes, got %d", len(received)) + } + if len(received[0]) != 2 || received[0][0] != 4 || received[0][1] != 5 { + t.Errorf("expected first batch [4, 5], got %v", received[0]) + } + // This is the assertion that was missing — verifies the overflow-triggering + // item was not silently dropped and correctly landed in the next batch. + if len(received[1]) != 1 || received[1][0] != 3 { + t.Errorf("expected second batch [3], got %v", received[1]) + } +} + +func TestBuffer_OnReject(t *testing.T) { + var mu sync.Mutex + var rejectedItem int + var rejectCalled bool + + cfg := Config[int]{ + // CanAdd: Only allow even numbers + CanAdd: func(batch []int, item int) bool { + return item%2 == 0 + }, + OnReject: func(item int) { + mu.Lock() + defer mu.Unlock() + rejectedItem = item + rejectCalled = true + }, + Flush: func(ctx context.Context, batch []int) error { return nil }, + } + + buf, _ := NewBuffer(cfg) + ctx := context.Background() + + // Start the buffer and immediately add an odd number + go buf.Run(ctx) + buf.Add(ctx, 7) + + // Small sleep for the goroutine to process the internal channel + time.Sleep(20 * time.Millisecond) + + mu.Lock() + defer mu.Unlock() + if !rejectCalled { + t.Error("expected OnReject to be called for odd number") + } + if rejectedItem != 7 { + t.Errorf("expected rejected item 7, got %d", rejectedItem) + } +} + +func TestBuffer_OnFlushError(t *testing.T) { + var mu sync.Mutex + var capturedErr error + var capturedBatch []int + + cfg := Config[int]{ + Capacity: 2, + Flush: func(ctx context.Context, batch []int) error { + return fmt.Errorf("kinesis service unavailable") + }, + OnFlushError: func(err error, batch []int) { + mu.Lock() + defer mu.Unlock() + capturedErr = err + capturedBatch = append([]int(nil), batch...) + }, + } + + buf, _ := NewBuffer(cfg) + ctx := context.Background() + go buf.Run(ctx) + + // Trigger a flush + buf.Add(ctx, 1) + buf.Add(ctx, 2) + + time.Sleep(20 * time.Millisecond) + + mu.Lock() + defer mu.Unlock() + if capturedErr == nil || capturedErr.Error() != "kinesis service unavailable" { + t.Errorf("expected specific error, got %v", capturedErr) + } + if len(capturedBatch) != 2 { + t.Errorf("expected batch of 2 to be passed to error handler, got %d", len(capturedBatch)) + } +} + +func TestBuffer_ConcurrencyCorrectness(t *testing.T) { + var flushCount atomic.Int64 + var itemCount atomic.Int64 + const totalItems = 1000 + const capacity = 100 + + cfg := Config[int]{ + Capacity: capacity, + Flush: func(ctx context.Context, batch []int) error { + flushCount.Add(1) + itemCount.Add(int64(len(batch))) + return nil + }, + } + + buf, _ := NewBuffer(cfg) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) // Automatically fires when the test finishes (or panics) + + doneChan := make(chan struct{}) + + go func() { + buf.Run(ctx) + close(doneChan) + }() + + // Hammer the buffer from 10 different goroutines + var wg sync.WaitGroup + for range 10 { + wg.Add(1) + go func() { + defer wg.Done() + for i := range 100 { + buf.Add(ctx, i) + } + }() + } + wg.Wait() + + // Wait for the final items to be processed + time.Sleep(100 * time.Millisecond) + buf.Close() + <-doneChan + + if itemCount.Load() != totalItems { + t.Errorf("expected %d total items, got %d", totalItems, itemCount.Load()) + } + // Note: flushCount might be slightly higher than 10 if timer flushes + // happened, but it should be at least 10. + if flushCount.Load() < 10 { + t.Errorf("expected at least 10 flushes, got %d", flushCount.Load()) + } +} + +func TestBuffer_Backpressure(t *testing.T) { + // 1. Create a buffer with a tiny channel and a slow Flush + cfg := Config[int]{ + Capacity: 1, + ChanSize: 1, // Tiny channel + Flush: func(ctx context.Context, batch []int) error { + time.Sleep(100 * time.Millisecond) // Artificial slowness + return nil + }, + } + + buf, _ := NewBuffer(cfg) + ctx := context.Background() + go buf.Run(ctx) + + // 2. Fill the channel + buf.Add(ctx, 1) // Item 1: Processing in Flush + buf.Add(ctx, 2) // Item 2: Sitting in the channel + + // 3. The 3rd Add should block because the channel is full and Flush is sleeping + start := time.Now() + timeoutCtx, cancel := context.WithTimeout(ctx, 50*time.Millisecond) + defer cancel() + + err := buf.Add(timeoutCtx, 3) + + if err == nil { + t.Error("expected Add to block and return context deadline exceeded") + } + if time.Since(start) < 40*time.Millisecond { + t.Error("Add returned too fast; it should have been blocked by the full channel") + } +} + +func TestBuffer_FlushContextCancellation(t *testing.T) { + var mu sync.Mutex + var ctxCanceledDuringFlush bool + + cfg := Config[int]{ + Capacity: 1, + Flush: func(ctx context.Context, batch []int) error { + <-ctx.Done() // Wait for the context to be canceled + mu.Lock() + defer mu.Unlock() + ctxCanceledDuringFlush = true + return ctx.Err() + }, + } + + buf, _ := NewBuffer(cfg) + ctx, cancel := context.WithCancel(context.Background()) + + go buf.Run(ctx) + + buf.Add(ctx, 1) // Triggers flush, which hangs waiting for cancel + + time.Sleep(20 * time.Millisecond) + cancel() // Cancel the master context + + time.Sleep(50 * time.Millisecond) + + mu.Lock() + defer mu.Unlock() + if !ctxCanceledDuringFlush { + t.Error("expected Flush to notice context cancellation") + } +} + +func TestBuffer_NoEmptyFlushes(t *testing.T) { + var mu sync.Mutex + var flushCount int + + cfg := Config[int]{ + FlushInterval: 10 * time.Millisecond, + Flush: func(ctx context.Context, batch []int) error { + mu.Lock() + defer mu.Unlock() + flushCount++ + return nil + }, + } + + buf, _ := NewBuffer(cfg) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) // Automatically fires when the test finishes (or panics) + + done := make(chan struct{}) + go func() { + buf.Run(ctx) + close(done) + }() + + // Wait for several ticks of the FlushInterval with an empty buffer. + time.Sleep(50 * time.Millisecond) + + // Wait for Run() to fully exit before asserting, so we're + // not racing against a flush that hasn't been counted yet. + buf.Close() + <-done + + mu.Lock() + defer mu.Unlock() + + if flushCount > 0 { + t.Errorf("expected 0 flushes for empty buffer, got %d", flushCount) + } +} From dbca8e9f330901e543e4ccc2dd684d2f3ff738b1 Mon Sep 17 00:00:00 2001 From: Manan Singh Date: Fri, 6 Mar 2026 14:34:25 +0530 Subject: [PATCH 2/4] Fix variable shadowing of err in Run() function --- buffer/buffer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buffer/buffer.go b/buffer/buffer.go index d6d98cd..e6d82f7 100644 --- a/buffer/buffer.go +++ b/buffer/buffer.go @@ -345,7 +345,7 @@ func (b *Buffer[T]) ItemsInChannel() int { // defer cancel() // return backend.Send(flushCtx, batch) // }, -func (b *Buffer[T]) Run(ctx context.Context) (err error) { +func (b *Buffer[T]) Run(ctx context.Context) error { cfg := b.cfg batch := make([]T, 0, cfg.Capacity) From 415b5510c8483bc9445a49e515c307e1a2fdd1ad Mon Sep 17 00:00:00 2001 From: Manan Singh Date: Fri, 6 Mar 2026 14:41:11 +0530 Subject: [PATCH 3/4] Add a mustNewBuffer helper to explicitly catch initialization errors. --- buffer/buffer_test.go | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/buffer/buffer_test.go b/buffer/buffer_test.go index 79a3ce0..190c851 100644 --- a/buffer/buffer_test.go +++ b/buffer/buffer_test.go @@ -9,6 +9,17 @@ import ( "time" ) +// mustNewBuffer is a test helper that fails the test/benchmark immediately +// if the buffer fails to initialize. +func mustNewBuffer[T any](tb testing.TB, cfg Config[T]) *Buffer[T] { + tb.Helper() // Marks this as a helper so error line numbers point to the test, not here! + buf, err := NewBuffer(cfg) + if err != nil { + tb.Fatalf("failed to initialize buffer: %v", err) + } + return buf +} + func BenchmarkBuffer_ParallelThroughput(b *testing.B) { // 1. Setup mock telemetry to verify everything processed var totalFlushed atomic.Int64 @@ -27,10 +38,7 @@ func BenchmarkBuffer_ParallelThroughput(b *testing.B) { }, } - buf, err := NewBuffer(cfg) - if err != nil { - b.Fatal(err) - } + buf := mustNewBuffer(b, cfg) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -91,7 +99,7 @@ func TestBuffer_Basic(t *testing.T) { }, } - buf, _ := NewBuffer(cfg) + buf := mustNewBuffer(t, cfg) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) // Automatically fires when the test finishes (or panics) @@ -132,7 +140,7 @@ func TestBuffer_FlushInterval(t *testing.T) { }, } - buf, _ := NewBuffer(cfg) + buf := mustNewBuffer(t, cfg) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) // Automatically fires when the test finishes (or panics) @@ -165,7 +173,7 @@ func TestBuffer_Close(t *testing.T) { }, } - buf, _ := NewBuffer(cfg) + buf := mustNewBuffer(t, cfg) // We use a channel to signal when Run() is finished done := make(chan struct{}) @@ -206,7 +214,7 @@ func TestBuffer_WillOverflow(t *testing.T) { }, } - buf, _ := NewBuffer(cfg) + buf := mustNewBuffer(t, cfg) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) // Automatically fires when the test finishes (or panics) @@ -260,7 +268,7 @@ func TestBuffer_OnReject(t *testing.T) { Flush: func(ctx context.Context, batch []int) error { return nil }, } - buf, _ := NewBuffer(cfg) + buf := mustNewBuffer(t, cfg) ctx := context.Background() // Start the buffer and immediately add an odd number @@ -298,7 +306,7 @@ func TestBuffer_OnFlushError(t *testing.T) { }, } - buf, _ := NewBuffer(cfg) + buf := mustNewBuffer(t, cfg) ctx := context.Background() go buf.Run(ctx) @@ -333,7 +341,7 @@ func TestBuffer_ConcurrencyCorrectness(t *testing.T) { }, } - buf, _ := NewBuffer(cfg) + buf := mustNewBuffer(t, cfg) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) // Automatically fires when the test finishes (or panics) @@ -383,7 +391,7 @@ func TestBuffer_Backpressure(t *testing.T) { }, } - buf, _ := NewBuffer(cfg) + buf := mustNewBuffer(t, cfg) ctx := context.Background() go buf.Run(ctx) @@ -421,7 +429,7 @@ func TestBuffer_FlushContextCancellation(t *testing.T) { }, } - buf, _ := NewBuffer(cfg) + buf := mustNewBuffer(t, cfg) ctx, cancel := context.WithCancel(context.Background()) go buf.Run(ctx) @@ -454,7 +462,7 @@ func TestBuffer_NoEmptyFlushes(t *testing.T) { }, } - buf, _ := NewBuffer(cfg) + buf := mustNewBuffer(t, cfg) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) // Automatically fires when the test finishes (or panics) From a2dcccd1b8e02d8ae940c87b55d81c3580707e12 Mon Sep 17 00:00:00 2001 From: Manan Singh Date: Fri, 6 Mar 2026 14:51:46 +0530 Subject: [PATCH 4/4] Fix Add() to return in case ctx is cancelled. --- buffer/buffer.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/buffer/buffer.go b/buffer/buffer.go index e6d82f7..bba346f 100644 --- a/buffer/buffer.go +++ b/buffer/buffer.go @@ -250,6 +250,12 @@ func NewBuffer[T any](cfg Config[T]) (*Buffer[T], error) { // // Add must not be called after Close. Doing so will panic. func (b *Buffer[T]) Add(ctx context.Context, item T) error { + // If context is already cancelled, we just return + // so that items are not added in the select + if err := ctx.Err(); err != nil { + return err + } + select { case b.dataChan <- item: return nil