Architecture
Understanding capitan's internals helps you reason about performance, ordering, and failure modes.
Overview
┌─────────────────────────────────────────────────────────────────┐
│ Capitan │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Registry │ │
│ │ Signal → []*Listener mapping │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Signal A │ │ Signal B │ │ Signal C │ ... │
│ │ │ │ │ │ │ │
│ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │
│ │ │ Worker │ │ │ │ Worker │ │ │ │ Worker │ │ │
│ │ │ Gorout. │ │ │ │ Gorout. │ │ │ │ Gorout. │ │ │
│ │ └────┬────┘ │ │ └────┬────┘ │ │ └────┬────┘ │ │
│ │ │ │ │ │ │ │ │ │ │
│ │ ┌────▼────┐ │ │ ┌────▼────┐ │ │ ┌────▼────┐ │ │
│ │ │ Buffer │ │ │ │ Buffer │ │ │ │ Buffer │ │ │
│ │ │ (chan) │ │ │ │ (chan) │ │ │ │ (chan) │ │ │
│ │ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │ │
│ │ │ │ │ │ │ │
│ │ Listeners: │ │ Listeners: │ │ Listeners: │ │
│ │ - handler1 │ │ - handler3 │ │ - handler5 │ │
│ │ - handler2 │ │ - handler4 │ │ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Observers │ │
│ │ Attached to all signals (cross-cutting) │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Per-Signal Workers
Each signal gets its own worker goroutine and buffered channel. This provides:
Isolation - A slow listener on Signal A doesn't block Signal B. Each signal processes independently.
Ordering - Events to the same signal process in emission order. The worker processes its queue sequentially.
Lazy creation - Workers spawn on first emission, not at signal definition. No upfront cost for unused signals.
Worker Lifecycle
- First
Emitto a signal creates the worker goroutine - Worker reads from its buffered channel in a loop
- For each event, worker invokes all registered listeners
- Worker exits when:
- The last listener closes (signal has no subscribers)
Shutdownis called (global termination)
// Simplified worker loop
func (c *Capitan) processEvents(signal Signal, state *workerState) {
for {
select {
case event := <-state.events:
c.processEvent(signal, event)
case <-state.done:
c.drainEvents(signal, state.events)
return
case <-c.shutdown:
c.drainEvents(signal, state.events)
return
}
}
}
Buffered Channels
Each signal's worker has a buffered channel (default: 16 events).
events: make(chan *Event, c.bufferSize)
Backpressure
When the buffer fills, Emit blocks until space is available. This provides natural backpressure:
- Slow listeners cause their signal's buffer to fill
- Emitters to that signal slow down
- Other signals are unaffected
This prevents unbounded memory growth while maintaining throughput for healthy signals.
Tuning Buffer Size
capitan.Configure(capitan.WithBufferSize(128))
Larger buffers absorb bursts but use more memory. Smaller buffers apply backpressure sooner.
Event Pooling
Events are pooled using sync.Pool to reduce allocations:
var eventPool = sync.Pool{
New: func() any {
return &Event{
fields: make(map[string]Field),
severity: SeverityInfo,
}
},
}
Pool Lifecycle
Emitacquires an event from the pool- Event is populated with signal, timestamp, fields
- Event is queued to the worker
- Worker processes event, invokes listeners
- Event is returned to the pool
Events are reused across emissions. Don't hold references to events outside listener scope.
Listener Invocation
When processing an event, the worker:
- Checks if listener list has changed (via version counter)
- Copies the listener slice only if version differs (under read lock)
- Invokes each listener with panic recovery
- Returns event to pool
Version-Based Caching
Each signal maintains a listener version counter that increments whenever listeners are added or removed. Workers cache the listener slice and only re-copy when the version changes:
// workerState caches listeners between events
type workerState struct {
// ...
cachedListeners []*Listener
listenerVersion uint64
}
func (c *Capitan) processWorkerEvent(signal Signal, state *workerState, event *Event) {
// Skip if context canceled
if event.ctx.Err() != nil {
eventPool.Put(event)
return
}
// Copy listener slice only if version changed
c.mu.RLock()
if state.listenerVersion != c.listenerVersions[signal] {
state.cachedListeners = make([]*Listener, len(c.registry[signal]))
copy(state.cachedListeners, c.registry[signal])
state.listenerVersion = c.listenerVersions[signal]
}
listeners := state.cachedListeners
c.mu.RUnlock()
// Invoke with panic recovery
for _, listener := range listeners {
func() {
defer func() {
if r := recover(); r != nil && c.panicHandler != nil {
c.panicHandler(signal, r)
}
}()
listener.callback(event.ctx, event)
}()
}
eventPool.Put(event)
}
This optimisation eliminates allocations in steady state—listeners typically change rarely compared to event frequency. The version counter is incremented by Hook, Close, and Observe operations.
Note: Sync mode (used primarily for testing) copies on every event for simplicity.
Panic Recovery
Listener panics are caught and recovered. If a panic handler is configured, it's called with the signal and recovered value. Other listeners still execute.
Context Handling
Events carry the context passed at emission:
capitan.Emit(ctx, signal, fields...)
Context is checked at two points:
- Before queueing - If context is canceled while waiting for buffer space, the event is dropped
- Before processing - If context was canceled while queued, the event is skipped
This prevents processing stale events from canceled requests.
Locking Strategy
Capitan uses a single sync.RWMutex for the registry:
| Operation | Lock Type |
|---|---|
Hook (register listener) | Write |
Emit (check/create worker) | Read → Write |
processEvent (version check, conditional copy) | Read |
Close (unregister listener) | Write |
Stats (read metrics) | Read |
The hot path (Emit with existing worker) uses only a read lock. Write locks occur only during setup and teardown. Event processing holds the read lock briefly for version checking—the actual listener invocation happens outside the lock.
Observer Attachment
Observers maintain a list of underlying listeners—one per signal they observe.
When a new signal is first used:
- Capitan checks for active observers
- For each observer (respecting whitelists), creates a listener
- Attaches listener to the signal's registry
This allows observers registered early to receive events from signals created later.
Shutdown Sequence
Shutdown initiates graceful termination:
- Close the shutdown channel (under write lock)
- Workers detect closure and drain their queues
wg.Wait()blocks until all workers exit
func (c *Capitan) Shutdown() {
c.shutdownOnce.Do(func() {
c.mu.Lock()
close(c.shutdown)
c.mu.Unlock()
})
c.wg.Wait()
}
Events queued before shutdown are processed. Events emitted after shutdown may be dropped (workers won't accept new events).
Runtime Metrics
Stats() provides visibility into internal state:
stats := capitan.Stats()
stats.ActiveWorkers // Number of running worker goroutines
stats.SignalCount // Number of registered signals
stats.DroppedEvents // Events dropped (no listeners)
stats.QueueDepths // Current buffer usage per signal
stats.ListenerCounts // Listeners registered per signal
stats.EmitCounts // Total emissions per signal
stats.FieldSchemas // Field keys from first emission per signal
Use these for monitoring and debugging in production.
Concurrency Guarantees
| Guarantee | Scope |
|---|---|
| Ordered processing | Within a single signal |
| No ordering | Across different signals |
| Thread-safe | All public API methods |
| Panic isolation | Per listener invocation |
Events emitted to the same signal process in order. Events to different signals may interleave arbitrarily.