zoobzio December 8, 2025 Edit this page

Architecture

Understanding capitan's internals helps you reason about performance, ordering, and failure modes.

Overview

┌─────────────────────────────────────────────────────────────────┐
│                          Capitan                                │
│                                                                 │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │                      Registry                            │   │
│  │         Signal → []*Listener mapping                     │   │
│  └──────────────────────────────────────────────────────────┘   │
│                                                                 │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │
│  │  Signal A   │  │  Signal B   │  │  Signal C   │    ...       │
│  │             │  │             │  │             │              │
│  │ ┌─────────┐ │  │ ┌─────────┐ │  │ ┌─────────┐ │              │
│  │ │ Worker  │ │  │ │ Worker  │ │  │ │ Worker  │ │              │
│  │ │ Gorout. │ │  │ │ Gorout. │ │  │ │ Gorout. │ │              │
│  │ └────┬────┘ │  │ └────┬────┘ │  │ └────┬────┘ │              │
│  │      │      │  │      │      │  │      │      │              │
│  │ ┌────▼────┐ │  │ ┌────▼────┐ │  │ ┌────▼────┐ │              │
│  │ │ Buffer  │ │  │ │ Buffer  │ │  │ │ Buffer  │ │              │
│  │ │ (chan)  │ │  │ │ (chan)  │ │  │ │ (chan)  │ │              │
│  │ └─────────┘ │  │ └─────────┘ │  │ └─────────┘ │              │
│  │             │  │             │  │             │              │
│  │ Listeners:  │  │ Listeners:  │  │ Listeners:  │              │
│  │  - handler1 │  │  - handler3 │  │  - handler5 │              │
│  │  - handler2 │  │  - handler4 │  │             │              │
│  └─────────────┘  └─────────────┘  └─────────────┘              │
│                                                                 │
│  ┌──────────────────────────────────────────────────────────┐   │
│  │                      Observers                           │   │
│  │       Attached to all signals (cross-cutting)            │   │
│  └──────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

Per-Signal Workers

Each signal gets its own worker goroutine and buffered channel. This provides:

Isolation - A slow listener on Signal A doesn't block Signal B. Each signal processes independently.

Ordering - Events to the same signal process in emission order. The worker processes its queue sequentially.

Lazy creation - Workers spawn on first emission, not at signal definition. No upfront cost for unused signals.

Worker Lifecycle

  1. First Emit to a signal creates the worker goroutine
  2. Worker reads from its buffered channel in a loop
  3. For each event, worker invokes all registered listeners
  4. Worker exits when:
    • The last listener closes (signal has no subscribers)
    • Shutdown is called (global termination)
// Simplified worker loop
func (c *Capitan) processEvents(signal Signal, state *workerState) {
    for {
        select {
        case event := <-state.events:
            c.processEvent(signal, event)
        case <-state.done:
            c.drainEvents(signal, state.events)
            return
        case <-c.shutdown:
            c.drainEvents(signal, state.events)
            return
        }
    }
}

Buffered Channels

Each signal's worker has a buffered channel (default: 16 events).

events: make(chan *Event, c.bufferSize)

Backpressure

When the buffer fills, Emit blocks until space is available. This provides natural backpressure:

  • Slow listeners cause their signal's buffer to fill
  • Emitters to that signal slow down
  • Other signals are unaffected

This prevents unbounded memory growth while maintaining throughput for healthy signals.

Tuning Buffer Size

capitan.Configure(capitan.WithBufferSize(128))

Larger buffers absorb bursts but use more memory. Smaller buffers apply backpressure sooner.

Event Pooling

Events are pooled using sync.Pool to reduce allocations:

var eventPool = sync.Pool{
    New: func() any {
        return &Event{
            fields:   make(map[string]Field),
            severity: SeverityInfo,
        }
    },
}

Pool Lifecycle

  1. Emit acquires an event from the pool
  2. Event is populated with signal, timestamp, fields
  3. Event is queued to the worker
  4. Worker processes event, invokes listeners
  5. Event is returned to the pool

Events are reused across emissions. Don't hold references to events outside listener scope.

Listener Invocation

When processing an event, the worker:

  1. Checks if listener list has changed (via version counter)
  2. Copies the listener slice only if version differs (under read lock)
  3. Invokes each listener with panic recovery
  4. Returns event to pool

Version-Based Caching

Each signal maintains a listener version counter that increments whenever listeners are added or removed. Workers cache the listener slice and only re-copy when the version changes:

// workerState caches listeners between events
type workerState struct {
    // ...
    cachedListeners []*Listener
    listenerVersion uint64
}

func (c *Capitan) processWorkerEvent(signal Signal, state *workerState, event *Event) {
    // Skip if context canceled
    if event.ctx.Err() != nil {
        eventPool.Put(event)
        return
    }

    // Copy listener slice only if version changed
    c.mu.RLock()
    if state.listenerVersion != c.listenerVersions[signal] {
        state.cachedListeners = make([]*Listener, len(c.registry[signal]))
        copy(state.cachedListeners, c.registry[signal])
        state.listenerVersion = c.listenerVersions[signal]
    }
    listeners := state.cachedListeners
    c.mu.RUnlock()

    // Invoke with panic recovery
    for _, listener := range listeners {
        func() {
            defer func() {
                if r := recover(); r != nil && c.panicHandler != nil {
                    c.panicHandler(signal, r)
                }
            }()
            listener.callback(event.ctx, event)
        }()
    }

    eventPool.Put(event)
}

This optimisation eliminates allocations in steady state—listeners typically change rarely compared to event frequency. The version counter is incremented by Hook, Close, and Observe operations.

Note: Sync mode (used primarily for testing) copies on every event for simplicity.

Panic Recovery

Listener panics are caught and recovered. If a panic handler is configured, it's called with the signal and recovered value. Other listeners still execute.

Context Handling

Events carry the context passed at emission:

capitan.Emit(ctx, signal, fields...)

Context is checked at two points:

  1. Before queueing - If context is canceled while waiting for buffer space, the event is dropped
  2. Before processing - If context was canceled while queued, the event is skipped

This prevents processing stale events from canceled requests.

Locking Strategy

Capitan uses a single sync.RWMutex for the registry:

OperationLock Type
Hook (register listener)Write
Emit (check/create worker)Read → Write
processEvent (version check, conditional copy)Read
Close (unregister listener)Write
Stats (read metrics)Read

The hot path (Emit with existing worker) uses only a read lock. Write locks occur only during setup and teardown. Event processing holds the read lock briefly for version checking—the actual listener invocation happens outside the lock.

Observer Attachment

Observers maintain a list of underlying listeners—one per signal they observe.

When a new signal is first used:

  1. Capitan checks for active observers
  2. For each observer (respecting whitelists), creates a listener
  3. Attaches listener to the signal's registry

This allows observers registered early to receive events from signals created later.

Shutdown Sequence

Shutdown initiates graceful termination:

  1. Close the shutdown channel (under write lock)
  2. Workers detect closure and drain their queues
  3. wg.Wait() blocks until all workers exit
func (c *Capitan) Shutdown() {
    c.shutdownOnce.Do(func() {
        c.mu.Lock()
        close(c.shutdown)
        c.mu.Unlock()
    })
    c.wg.Wait()
}

Events queued before shutdown are processed. Events emitted after shutdown may be dropped (workers won't accept new events).

Runtime Metrics

Stats() provides visibility into internal state:

stats := capitan.Stats()

stats.ActiveWorkers   // Number of running worker goroutines
stats.SignalCount     // Number of registered signals
stats.DroppedEvents   // Events dropped (no listeners)
stats.QueueDepths     // Current buffer usage per signal
stats.ListenerCounts  // Listeners registered per signal
stats.EmitCounts      // Total emissions per signal
stats.FieldSchemas    // Field keys from first emission per signal

Use these for monitoring and debugging in production.

Concurrency Guarantees

GuaranteeScope
Ordered processingWithin a single signal
No orderingAcross different signals
Thread-safeAll public API methods
Panic isolationPer listener invocation

Events emitted to the same signal process in order. Events to different signals may interleave arbitrarily.