Event Bus Architecture (Design)
This document defines the unified event bus architecture for Gestalt. It is a design reference for the implementation steps that follow.
Goals
- Provide a single, reusable fan-out primitive for events and streams.
- Keep the bus passive: no internal worker goroutines, synchronous Publish.
- Make the API type-safe and easy to test.
- Support filtered subscriptions and backpressure policies.
- Integrate with metrics without coupling callers to metrics internals.
Non-goals (deferred)
- Event persistence or replay (planned in a later step).
- Priority scheduling between subscribers.
- Delivery guarantees across process boundaries.
Core API sketch
The bus is a concrete struct in internal/event (not an interface).
go
package event
type Event interface {
Type() string
Timestamp() time.Time
}
type BusOptions struct {
Name string
SubscriberBufferSize int
BlockOnFull bool
WriteTimeout time.Duration
MaxSubscribers int
SlowSubscriberThreshold time.Duration
}
type Bus[T any] struct {
// internal: subscriber map, options, metrics handle, closed state
}
func NewBus[T any](ctx context.Context, opts BusOptions) *Bus[T]
func (b *Bus[T]) Subscribe() (<-chan T, func())
func (b *Bus[T]) SubscribeFiltered(filter func(T) bool) (<-chan T, func())
func (b *Bus[T]) Publish(event T)
func (b *Bus[T]) Close()Event-specific helpers
For T that satisfies event.Event, provide convenience helpers:
SubscribeType(eventType string)SubscribeTypes(eventTypes ...string)
These helpers are thin wrappers around SubscribeFiltered.
Concurrency and lifecycle
Subscribe,Publish,Closeare safe to call concurrently.Publishis synchronous fan-out. No goroutines are spawned per publish.- Callers can publish asynchronously by calling
Publishin their own goroutine. NewBusaccepts a context; if provided, it starts a tiny goroutine to callClose()whenctx.Done()fires. This is the only internal goroutine.- After
Close, all subscriber channels are closed and futureSubscribecalls return a closed channel and a no-op cancel function.
Backpressure and delivery
The bus supports two delivery policies:
- Drop mode (
BlockOnFull=false): non-blocking send. If a subscriber's channel buffer is full, the event is dropped for that subscriber. - Block mode (
BlockOnFull=true): send blocks up toWriteTimeout. On timeout, the subscriber is treated as unhealthy and is removed; the event is still delivered to remaining subscribers.
Defaults:
SubscriberBufferSize: 128.BlockOnFull: false.WriteTimeout: 0 (ignored whenBlockOnFull=false).SlowSubscriberThreshold: optional. If set, log warnings when a send blocks longer than the threshold.
Bus policies per use case
- Filesystem/config/agent/workflow events: drop mode; losing some events is acceptable and avoids head-of-line blocking.
- Logs: block mode with short timeout (e.g., 100ms). Do not drop log entries; remove slow subscribers on timeout.
- Terminal output stream: block mode with a high timeout (3m) and a larger buffer (256+). Preserve stream ordering. Retain slow-subscriber warnings.
Filtering
SubscribeFilteredattaches a filter function to a subscription.- If the filter returns false, the event is not sent and does not consume buffer.
- Filter panics are recovered; the subscriber is removed and a warning is logged.
Error handling
Publishtreats zero-length payloads for byte slices as no-ops.- If a subscriber closes its channel,
Publishrecovers from the send panic, removes the subscriber, and continues. Publishdoes not return errors; metrics and logs capture drop/timeout events.
Metrics
Each bus reports to metrics.Registry using its Name:
events_published_totalevents_dropped_totalsubscribers_active
For buses carrying event.Event, also track counts per event.Type().
Usage guidelines
Use the event bus when:
- You need fan-out to multiple independent consumers.
- You want to decouple producers and consumers.
- You can tolerate async delivery or per-subscriber drop behavior.
Use direct function calls when:
- The caller depends on a synchronous response or error propagation.
- You need strict ordering with caller-controlled backpressure.
- The coupling is intentional and unlikely to change.
Invariants
- The bus is a fan-out mechanism only; it does not store history.
- Output/log buffers remain separate from the bus.
- The frontend and backend ship together; WebSocket event formats are verified via contract tests in later steps.