A lightweight, type-safe event bus for Go with generics support. Build decoupled applications with compile-time type safety.
ebu stands for Event BUs - a simple, powerful event bus implementation for Go.
- π Type-safe - Full compile-time type safety with generics
- β‘ Fast - Zero allocations in hot paths, optimized for performance
- π Async support - Built-in async handlers with optional sequential processing
- π― Simple API - Clean, intuitive API with options pattern
- π§΅ Thread-safe - Safe for concurrent use across goroutines
- π Context support - First-class context support for cancellation and tracing
- π‘οΈ Panic recovery - Handlers are isolated from each other's panics
- π Zero dependencies - Pure Go standard library (core package)
- πΎ Event persistence - Built-in support for event storage and replay
- π Remote storage - Native support for remote backends like durable-streams
- π Event upcasting - Seamless event schema migration and versioning
- β 100% test coverage - Thoroughly tested for reliability
go get github.com/jilio/ebupackage main
import (
"fmt"
"time"
eventbus "github.com/jilio/ebu"
)
// Define your event types
type UserLoginEvent struct {
UserID string
Timestamp time.Time
}
type OrderCreatedEvent struct {
OrderID string
Amount float64
}
func main() {
// Create a new event bus
bus := eventbus.New()
// Subscribe to events with type-safe handlers
eventbus.Subscribe(bus, func(event UserLoginEvent) {
fmt.Printf("User %s logged in at %v\n", event.UserID, event.Timestamp)
})
eventbus.Subscribe(bus, func(event OrderCreatedEvent) {
fmt.Printf("Order %s created for $%.2f\n", event.OrderID, event.Amount)
})
// Publish events - compile-time type safety!
eventbus.Publish(bus, UserLoginEvent{
UserID: "user123",
Timestamp: time.Now(),
})
eventbus.Publish(bus, OrderCreatedEvent{
OrderID: "order456",
Amount: 99.99,
})
}// Simple subscription
eventbus.Subscribe(bus, func(event UserEvent) {
// Handle event
})
// With options
eventbus.Subscribe(bus, func(event EmailEvent) {
sendEmail(event)
}, eventbus.Async(), eventbus.Once())
// Publish events
eventbus.Publish(bus, UserEvent{UserID: "123"})// Parallel async processing (default)
eventbus.Subscribe(bus, func(event EmailEvent) {
sendEmail(event) // Each email sent in parallel
}, eventbus.Async())
// Sequential async processing (preserves order)
eventbus.Subscribe(bus, func(event PaymentEvent) {
processPayment(event) // Processed one at a time
}, eventbus.Async(), eventbus.Sequential())
// Wait for all async handlers
bus.Wait()Shutdown the event bus gracefully, waiting for async handlers to complete with timeout support:
// Shutdown with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := bus.Shutdown(ctx); err != nil {
log.Printf("Shutdown timed out: %v", err)
}The Shutdown method:
- Waits for all async handlers to complete
- Respects context timeout and cancellation
- Returns
context.DeadlineExceededif handlers don't finish in time - Returns
nilon successful graceful shutdown
// Context-aware handlers
eventbus.SubscribeContext(bus, func(ctx context.Context, event RequestEvent) {
traceID := ctx.Value("traceID")
// Handle with context
})
// Publish with context
ctx := context.WithTimeout(context.Background(), 5*time.Second)
eventbus.PublishContext(bus, ctx, RequestEvent{Path: "/api/users"})eventbus.Subscribe(bus, func(event PriceEvent) {
fmt.Printf("Alert: Price changed %.2f%%\n", event.Change)
}, eventbus.WithFilter(func(event PriceEvent) bool {
return math.Abs(event.Change) > 5.0 // Only large changes
}))// Check for handlers
if eventbus.HasHandlers[UserEvent](bus) {
eventbus.Publish(bus, UserEvent{})
}
// Unsubscribe a handler
handler := func(event UserEvent) { /* ... */ }
eventbus.Subscribe(bus, handler)
eventbus.Unsubscribe(bus, handler)
// Clear all handlers for a type
eventbus.Clear[UserEvent](bus)
// Clear all handlers
bus.ClearAll()Store and replay events for event sourcing, audit logs, and resumable subscriptions:
// Create persistent bus with in-memory store
store := eventbus.NewMemoryStore()
bus := eventbus.New(eventbus.WithStore(store))
// Events are automatically persisted
eventbus.Publish(bus, UserCreatedEvent{UserID: "123"})
// Replay events from the beginning
bus.Replay(ctx, eventbus.OffsetOldest, func(event *eventbus.StoredEvent) error {
// Process stored event
return nil
})
// Subscribe with automatic offset tracking
eventbus.SubscribeWithReplay(bus, "email-sender",
func(event EmailEvent) {
sendEmail(event)
// Offset saved automatically after success
})See Persistence Guide for custom stores and advanced patterns.
Use remote storage backends for distributed event persistence. ebu supports Durable Streams - an HTTP protocol for reliable, resumable, real-time data streaming developed by Electric:
import (
eventbus "github.com/jilio/ebu"
"github.com/jilio/ebu/stores/durablestream"
)
// Connect to a durable-streams server
store, err := durablestream.New(
"http://localhost:4437/v1/stream", // server base URL
"mystream", // stream name
durablestream.WithTimeout(30*time.Second),
)
if err != nil {
log.Fatal(err)
}
// Use with event bus - same API as local storage
bus := eventbus.New(eventbus.WithStore(store))
// Events are now persisted to the remote durable-streams server
eventbus.Publish(bus, OrderCreatedEvent{OrderID: "123", Amount: 99.99})Available storage backends:
- MemoryStore - Built-in in-memory store for development
- SQLite -
stores/sqlite- Persistent local storage - Durable-Streams -
stores/durablestream- Remote HTTP-based storage (protocol spec)
See Persistence Guide for all storage options.
Add metrics and distributed tracing with OpenTelemetry:
import (
eventbus "github.com/jilio/ebu"
"github.com/jilio/ebu/otel"
)
// Create observability implementation
obs, err := otel.New(
otel.WithTracerProvider(tracerProvider),
otel.WithMeterProvider(meterProvider),
)
// Create bus with observability
bus := eventbus.New(eventbus.WithObservability(obs))
// Events, handlers, and persistence are automatically tracked
eventbus.Publish(bus, UserCreatedEvent{UserID: "123"})The otel package provides:
- Metrics: Event counts, handler duration, error rates, persistence metrics
- Tracing: Distributed tracing with spans for publish, handlers, and persistence
- Zero overhead: Optional - no performance impact if not used
- Vendor-neutral: Built on OpenTelemetry standards
See examples/observability for a complete example.
Migrate event schemas seamlessly without breaking existing handlers:
// V1 event
type UserCreatedV1 struct {
UserID string
Name string
}
// V2 event with split name
type UserCreatedV2 struct {
UserID string
FirstName string
LastName string
}
// Register upcast transformation
eventbus.RegisterUpcast(bus, func(v1 UserCreatedV1) UserCreatedV2 {
parts := strings.Split(v1.Name, " ")
return UserCreatedV2{
UserID: v1.UserID,
FirstName: parts[0],
LastName: strings.Join(parts[1:], " "),
}
})
// Old events automatically transformed when replayed
eventbus.SubscribeWithReplay(bus, "processor", func(event UserCreatedV2) {
// Receives V2 format even for old V1 events
})Upcasting supports:
- Automatic chain resolution (V1βV2βV3)
- Circular dependency detection
- Type-safe transformations
- Error handling hooks
Handlers are isolated - one panic won't affect others:
bus.SetPanicHandler(func(event any, handlerType reflect.Type, panicValue any) {
log.Printf("Handler panic: %v", panicValue)
})
eventbus.Subscribe(bus, func(e Event) { panic("error") })
eventbus.Subscribe(bus, func(e Event) { /* Still runs! */ })Intercept all events for logging, metrics, or tracing:
bus.SetBeforePublishHook(func(eventType reflect.Type, event any) {
log.Printf("Publishing %s", eventType.Name())
})
bus.SetAfterPublishHook(func(eventType reflect.Type, event any) {
metrics.Increment("events." + eventType.Name())
})Control event type naming explicitly with the TypeNamer interface for stable names across refactoring:
type UserCreatedEvent struct {
UserID string
}
// Implement TypeNamer for explicit type control
func (e UserCreatedEvent) EventTypeName() string {
return "user.created.v1"
}
// Now EventType() returns "user.created.v1" instead of package-qualified name
eventbus.Publish(bus, UserCreatedEvent{UserID: "123"})Benefits:
- Stable event names across package reorganization
- Version control for event schema evolution
- External compatibility with other event systems
See TypeNamer examples for versioning and migration patterns.
- π Complete Examples - Comprehensive usage examples
- πΎ Persistence Guide - Event storage and replay patterns
- π API Reference - Complete API documentation
| Backend | Package | Description |
|---|---|---|
| MemoryStore | github.com/jilio/ebu |
In-memory store for development/testing |
| SQLite | github.com/jilio/ebu/stores/sqlite |
Local persistent storage with WAL mode |
| Durable-Streams | github.com/jilio/ebu/stores/durablestream |
Remote HTTP-based storage |
See Persistence Guide for detailed usage.
The optional state package implements the Durable Streams State Protocol for database-style state synchronization:
import (
eventbus "github.com/jilio/ebu"
"github.com/jilio/ebu/state"
)
// Define entity type
type User struct {
Name string `json:"name"`
Email string `json:"email"`
}
// Create and publish state changes
bus := eventbus.New(eventbus.WithStore(eventbus.NewMemoryStore()))
insertMsg, _ := state.Insert("user:1", User{Name: "Alice", Email: "alice@example.com"})
eventbus.Publish(bus, insertMsg)
updateMsg, _ := state.Update("user:1", User{Name: "Alice Smith"}, state.WithTxID("tx-123"))
eventbus.Publish(bus, updateMsg)
// Materialize state from events
mat := state.NewMaterializer()
users := state.NewTypedCollection[User](state.NewMemoryStore[User]())
state.RegisterCollection(mat, users)
mat.Replay(ctx, bus, eventbus.OffsetOldest)
// Access materialized state
user, ok := users.Get("user:1") // User{Name: "Alice Smith", ...}Features:
- Type-safe helpers:
Insert,Update,Deletewith Go generics - Options pattern:
WithTxID,WithTimestamp,WithEntityType - Materializer: Build typed state from event streams
- Control messages:
SnapshotStart,SnapshotEnd,Reset - JSON interoperability: Compatible with durable-streams ecosystem
- Define clear event types - Use descriptive structs with meaningful fields
- Keep events immutable - Don't modify events after publishing
- Handle errors gracefully - Prefer returning errors over panicking
- Use async for I/O - Keep synchronous handlers fast
- Leverage context - Use
PublishContextfor cancellable operations - Set panic handlers - Monitor and log handler failures in production
- Test concurrency - The bus is thread-safe, but test your handlers
- Type-based routing with zero reflection for direct handlers
- Zero allocations in hot paths
- Efficient sharding reduces lock contention
- Async handlers run in separate goroutines
Contributions are welcome! Submit a Pull Request or open an Issue for bugs, features, or improvements.
MIT License - see LICENSE file for details.
- π Documentation
- π Issues
- π¬ Discussions