zoobzio December 12, 2025 Edit this page

Configuration

Capitan provides two levels of configuration:

  1. Instance options - Set once at creation (WithBufferSize, WithPanicHandler)
  2. Per-signal configuration - Dynamic, hot-reloadable settings per signal (ApplyConfig)

See API Reference for complete API documentation.

Configuring the Default Instance

If you need custom configuration, use Configure before any other capitan calls:

func main() {
    capitan.Configure(
        capitan.WithBufferSize(128),
        capitan.WithPanicHandler(logPanic),
    )

    // Now use the module-level API
    capitan.Hook(signal, handler)
    capitan.Emit(ctx, signal, fields...)
}

Configure is optional. If called, it must be called before the first Hook, Emit, Observe, or Shutdown. Once the default instance is created (lazily, on first use), configuration is locked.

Buffer Size

Each signal has a buffered channel for queuing events. The default size is 16.

capitan.Configure(capitan.WithBufferSize(128))

When to Adjust

Increase buffer size when:

  • Bursts of events are common and listeners process quickly
  • You want to absorb temporary spikes without backpressure
  • Memory is plentiful and latency matters more

Decrease buffer size when:

  • You want earlier backpressure signals
  • Memory is constrained
  • Slow listeners should slow down emitters sooner

Backpressure Behavior

When a buffer fills, Emit blocks until space is available. This affects only that signal—other signals continue processing normally.

// Signal A's buffer is full
capitan.Emit(ctx, signalA, fields...)  // Blocks here

// Signal B is unaffected
capitan.Emit(ctx, signalB, fields...)  // Proceeds normally

Monitor Stats().QueueDepths to detect signals approaching capacity. See Architecture for details on per-signal isolation.

Panic Handler

By default, listener panics are recovered silently. Configure a handler to log or alert:

capitan.Configure(
    capitan.WithPanicHandler(func(sig capitan.Signal, recovered any) {
        log.Printf("PANIC in %s listener: %v", sig.Name(), recovered)

        // Stack trace
        debug.PrintStack()

        // Alert
        alerting.Send("listener-panic", map[string]any{
            "signal": sig.Name(),
            "panic":  recovered,
        })
    }),
)

The panic handler receives:

  • sig - The signal being processed when the panic occurred
  • recovered - The value passed to panic()

Other listeners for the same event still execute after a panic.

Runtime Metrics

Stats() returns a snapshot of internal state:

stats := capitan.Stats()

Available Metrics

FieldTypeDescription
ActiveWorkersintRunning worker goroutines
SignalCountintRegistered signals
DroppedEventsuint64Events dropped (no listeners)
QueueDepthsmap[Signal]intCurrent buffer usage per signal
ListenerCountsmap[Signal]intListeners per signal
EmitCountsmap[Signal]uint64Total emissions per signal
FieldSchemasmap[Signal][]KeyField keys from first emission

Monitoring in Production

Expose metrics to your monitoring system:

func recordMetrics() {
    stats := capitan.Stats()

    // Gauge: active workers
    metrics.Gauge("capitan_workers_active", float64(stats.ActiveWorkers))

    // Gauge: dropped events
    metrics.Gauge("capitan_events_dropped_total", float64(stats.DroppedEvents))

    // Per-signal metrics
    for signal, depth := range stats.QueueDepths {
        metrics.Gauge("capitan_queue_depth",
            float64(depth),
            "signal", signal.Name())
    }

    for signal, count := range stats.EmitCounts {
        metrics.Counter("capitan_events_emitted_total",
            float64(count),
            "signal", signal.Name())
    }
}

Health Checks

Use stats for health checks:

func healthCheck() error {
    stats := capitan.Stats()

    // Check for queue buildup
    for signal, depth := range stats.QueueDepths {
        if depth > 100 {
            return fmt.Errorf("signal %s queue depth %d exceeds threshold", signal.Name(), depth)
        }
    }

    return nil
}

Debugging

Field schemas help debug unexpected events:

stats := capitan.Stats()
for signal, keys := range stats.FieldSchemas {
    fmt.Printf("%s fields:\n", signal.Name())
    for _, key := range keys {
        fmt.Printf("  - %s (%s)\n", key.Name(), key.Variant())
    }
}

Multiple Instances

Create isolated instances when you need separate configuration:

// High-throughput instance
fast := capitan.New(
    capitan.WithBufferSize(1024),
)

// Strict instance with panic logging
strict := capitan.New(
    capitan.WithBufferSize(16),
    capitan.WithPanicHandler(logAndAlert),
)

// Each instance has independent workers, buffers, and registries
fast.Hook(signal, fastHandler)
strict.Hook(signal, strictHandler)

// Shutdown each independently
fast.Shutdown()
strict.Shutdown()

Use cases for multiple instances:

  • Tenant isolation in multi-tenant systems
  • Separate configuration for different subsystems
  • Testing without affecting the default singleton

Sync Mode

For testing, sync mode processes events synchronously:

c := capitan.New(capitan.WithSyncMode())

In sync mode:

  • Emit calls listeners directly (no workers, no buffering)
  • Events process in the calling goroutine
  • Deterministic ordering for tests

Do not use sync mode in production—it loses isolation and backpressure benefits. See Testing for testing patterns.

Per-Signal Configuration

Configure individual signals or groups of signals dynamically with ApplyConfig. Unlike instance options, per-signal config can be changed at runtime.

Basic Usage

cfg := capitan.Config{
    Signals: map[string]capitan.SignalConfig{
        "order.created": {BufferSize: 64},
        "payment.*":     {MinSeverity: capitan.SeverityWarn},
    },
}
cap.ApplyConfig(cfg)

SignalConfig Options

OptionTypeDefaultDescription
BufferSizeintinstance defaultEvent queue size for this signal
DisabledboolfalseDrop all events for this signal
MinSeveritySeveritynoneFilter events below this level
MaxListenersintunlimitedCap listener registrations
DropPolicyDropPolicyblockBehavior when buffer full
RateLimitfloat64unlimitedMax events per second
BurstSizeint1Burst allowance above rate limit

Disabled Signals

Disable noisy signals without code changes:

cfg := capitan.Config{
    Signals: map[string]capitan.SignalConfig{
        "debug.*": {Disabled: true},
    },
}

Disabled signals drop all events silently. Re-enable by applying config without Disabled: true.

Severity Filtering

Filter low-priority events in production:

cfg := capitan.Config{
    Signals: map[string]capitan.SignalConfig{
        "audit.*": {MinSeverity: capitan.SeverityInfo},  // Drop DEBUG
    },
}

Severity order: DEBUG < INFO < WARN < ERROR

Drop Policy

Choose between backpressure and lossy behavior:

cfg := capitan.Config{
    Signals: map[string]capitan.SignalConfig{
        // Block emitters when buffer full (default)
        "critical.*": {DropPolicy: capitan.DropPolicyBlock},

        // Drop newest events when buffer full
        "metrics.*": {DropPolicy: capitan.DropPolicyDropNewest, BufferSize: 100},
    },
}

Use DropPolicyDropNewest for high-volume, loss-tolerant signals like metrics.

Rate Limiting

Protect downstream systems from event floods:

cfg := capitan.Config{
    Signals: map[string]capitan.SignalConfig{
        "notification.*": {
            RateLimit: 10,    // 10 events/second max
            BurstSize: 50,    // Allow bursts up to 50
        },
    },
}

Events exceeding the rate are dropped. Monitor Stats().DroppedEvents.

Glob Patterns

Configure signal families with glob patterns:

cfg := capitan.Config{
    Signals: map[string]capitan.SignalConfig{
        "order.*":         {BufferSize: 32},
        "order.payment.*": {BufferSize: 64},   // More specific
        "order.created":   {BufferSize: 128},  // Exact match
    },
}

Resolution rules (in order):

  1. Exact match wins
  2. Longest glob pattern wins (more specific)
  3. Alphabetical order for ties

Supported patterns: * (any chars), ? (single char), [abc] (char class)

Config Replacement

Each ApplyConfig call replaces the entire configuration:

// Config A
cap.ApplyConfig(capitan.Config{
    Signals: map[string]capitan.SignalConfig{
        "order.*": {BufferSize: 32},
    },
})

// Config B - replaces A entirely
cap.ApplyConfig(capitan.Config{
    Signals: map[string]capitan.SignalConfig{
        "payment.*": {BufferSize: 64},
    },
})

// Now: order.* has default config, payment.* has BufferSize=64

Signals not in the new config revert to defaults. Only workers with actual config changes are rebuilt.

Hot-Reload with Flux

Flux is a reactive configuration library that watches files for changes and invokes callbacks when they update. Integrate it with capitan for file-based hot-reload:

import (
    "github.com/zoobz-io/capitan"
    "github.com/zoobz-io/flux"
    "github.com/zoobz-io/flux/file"
)

func main() {
    cap := capitan.New()

    // Watch config file, apply on change
    watcher := file.JSON[capitan.Config]("/etc/myapp/capitan.json")
    flux.New(watcher, func(cfg capitan.Config) {
        if err := cap.ApplyConfig(cfg); err != nil {
            log.Printf("config error: %v", err)
        }
    })

    // ...
}

Example config file (capitan.json):

{
  "signals": {
    "order.*": {"bufferSize": 64, "minSeverity": "INFO"},
    "debug.*": {"disabled": true},
    "metrics.*": {"dropPolicy": "drop_newest", "rateLimit": 100}
  }
}

Changes apply immediately without restart.

Configuration Summary

OptionDefaultProduction Guidance
WithBufferSize(n)16Start with default, increase if queue depths spike
WithPanicHandler(fn)silentAlways configure in production for visibility
WithSyncMode()offTesting only
ApplyConfig(cfg)noneUse for per-signal tuning and hot-reload