Configuration
Capitan provides two levels of configuration:
- Instance options - Set once at creation (
WithBufferSize,WithPanicHandler) - Per-signal configuration - Dynamic, hot-reloadable settings per signal (
ApplyConfig)
See API Reference for complete API documentation.
Configuring the Default Instance
If you need custom configuration, use Configure before any other capitan calls:
func main() {
capitan.Configure(
capitan.WithBufferSize(128),
capitan.WithPanicHandler(logPanic),
)
// Now use the module-level API
capitan.Hook(signal, handler)
capitan.Emit(ctx, signal, fields...)
}
Configure is optional. If called, it must be called before the first Hook, Emit, Observe, or Shutdown. Once the default instance is created (lazily, on first use), configuration is locked.
Buffer Size
Each signal has a buffered channel for queuing events. The default size is 16.
capitan.Configure(capitan.WithBufferSize(128))
When to Adjust
Increase buffer size when:
- Bursts of events are common and listeners process quickly
- You want to absorb temporary spikes without backpressure
- Memory is plentiful and latency matters more
Decrease buffer size when:
- You want earlier backpressure signals
- Memory is constrained
- Slow listeners should slow down emitters sooner
Backpressure Behavior
When a buffer fills, Emit blocks until space is available. This affects only that signal—other signals continue processing normally.
// Signal A's buffer is full
capitan.Emit(ctx, signalA, fields...) // Blocks here
// Signal B is unaffected
capitan.Emit(ctx, signalB, fields...) // Proceeds normally
Monitor Stats().QueueDepths to detect signals approaching capacity. See Architecture for details on per-signal isolation.
Panic Handler
By default, listener panics are recovered silently. Configure a handler to log or alert:
capitan.Configure(
capitan.WithPanicHandler(func(sig capitan.Signal, recovered any) {
log.Printf("PANIC in %s listener: %v", sig.Name(), recovered)
// Stack trace
debug.PrintStack()
// Alert
alerting.Send("listener-panic", map[string]any{
"signal": sig.Name(),
"panic": recovered,
})
}),
)
The panic handler receives:
sig- The signal being processed when the panic occurredrecovered- The value passed topanic()
Other listeners for the same event still execute after a panic.
Runtime Metrics
Stats() returns a snapshot of internal state:
stats := capitan.Stats()
Available Metrics
| Field | Type | Description |
|---|---|---|
ActiveWorkers | int | Running worker goroutines |
SignalCount | int | Registered signals |
DroppedEvents | uint64 | Events dropped (no listeners) |
QueueDepths | map[Signal]int | Current buffer usage per signal |
ListenerCounts | map[Signal]int | Listeners per signal |
EmitCounts | map[Signal]uint64 | Total emissions per signal |
FieldSchemas | map[Signal][]Key | Field keys from first emission |
Monitoring in Production
Expose metrics to your monitoring system:
func recordMetrics() {
stats := capitan.Stats()
// Gauge: active workers
metrics.Gauge("capitan_workers_active", float64(stats.ActiveWorkers))
// Gauge: dropped events
metrics.Gauge("capitan_events_dropped_total", float64(stats.DroppedEvents))
// Per-signal metrics
for signal, depth := range stats.QueueDepths {
metrics.Gauge("capitan_queue_depth",
float64(depth),
"signal", signal.Name())
}
for signal, count := range stats.EmitCounts {
metrics.Counter("capitan_events_emitted_total",
float64(count),
"signal", signal.Name())
}
}
Health Checks
Use stats for health checks:
func healthCheck() error {
stats := capitan.Stats()
// Check for queue buildup
for signal, depth := range stats.QueueDepths {
if depth > 100 {
return fmt.Errorf("signal %s queue depth %d exceeds threshold", signal.Name(), depth)
}
}
return nil
}
Debugging
Field schemas help debug unexpected events:
stats := capitan.Stats()
for signal, keys := range stats.FieldSchemas {
fmt.Printf("%s fields:\n", signal.Name())
for _, key := range keys {
fmt.Printf(" - %s (%s)\n", key.Name(), key.Variant())
}
}
Multiple Instances
Create isolated instances when you need separate configuration:
// High-throughput instance
fast := capitan.New(
capitan.WithBufferSize(1024),
)
// Strict instance with panic logging
strict := capitan.New(
capitan.WithBufferSize(16),
capitan.WithPanicHandler(logAndAlert),
)
// Each instance has independent workers, buffers, and registries
fast.Hook(signal, fastHandler)
strict.Hook(signal, strictHandler)
// Shutdown each independently
fast.Shutdown()
strict.Shutdown()
Use cases for multiple instances:
- Tenant isolation in multi-tenant systems
- Separate configuration for different subsystems
- Testing without affecting the default singleton
Sync Mode
For testing, sync mode processes events synchronously:
c := capitan.New(capitan.WithSyncMode())
In sync mode:
Emitcalls listeners directly (no workers, no buffering)- Events process in the calling goroutine
- Deterministic ordering for tests
Do not use sync mode in production—it loses isolation and backpressure benefits. See Testing for testing patterns.
Per-Signal Configuration
Configure individual signals or groups of signals dynamically with ApplyConfig. Unlike instance options, per-signal config can be changed at runtime.
Basic Usage
cfg := capitan.Config{
Signals: map[string]capitan.SignalConfig{
"order.created": {BufferSize: 64},
"payment.*": {MinSeverity: capitan.SeverityWarn},
},
}
cap.ApplyConfig(cfg)
SignalConfig Options
| Option | Type | Default | Description |
|---|---|---|---|
BufferSize | int | instance default | Event queue size for this signal |
Disabled | bool | false | Drop all events for this signal |
MinSeverity | Severity | none | Filter events below this level |
MaxListeners | int | unlimited | Cap listener registrations |
DropPolicy | DropPolicy | block | Behavior when buffer full |
RateLimit | float64 | unlimited | Max events per second |
BurstSize | int | 1 | Burst allowance above rate limit |
Disabled Signals
Disable noisy signals without code changes:
cfg := capitan.Config{
Signals: map[string]capitan.SignalConfig{
"debug.*": {Disabled: true},
},
}
Disabled signals drop all events silently. Re-enable by applying config without Disabled: true.
Severity Filtering
Filter low-priority events in production:
cfg := capitan.Config{
Signals: map[string]capitan.SignalConfig{
"audit.*": {MinSeverity: capitan.SeverityInfo}, // Drop DEBUG
},
}
Severity order: DEBUG < INFO < WARN < ERROR
Drop Policy
Choose between backpressure and lossy behavior:
cfg := capitan.Config{
Signals: map[string]capitan.SignalConfig{
// Block emitters when buffer full (default)
"critical.*": {DropPolicy: capitan.DropPolicyBlock},
// Drop newest events when buffer full
"metrics.*": {DropPolicy: capitan.DropPolicyDropNewest, BufferSize: 100},
},
}
Use DropPolicyDropNewest for high-volume, loss-tolerant signals like metrics.
Rate Limiting
Protect downstream systems from event floods:
cfg := capitan.Config{
Signals: map[string]capitan.SignalConfig{
"notification.*": {
RateLimit: 10, // 10 events/second max
BurstSize: 50, // Allow bursts up to 50
},
},
}
Events exceeding the rate are dropped. Monitor Stats().DroppedEvents.
Glob Patterns
Configure signal families with glob patterns:
cfg := capitan.Config{
Signals: map[string]capitan.SignalConfig{
"order.*": {BufferSize: 32},
"order.payment.*": {BufferSize: 64}, // More specific
"order.created": {BufferSize: 128}, // Exact match
},
}
Resolution rules (in order):
- Exact match wins
- Longest glob pattern wins (more specific)
- Alphabetical order for ties
Supported patterns: * (any chars), ? (single char), [abc] (char class)
Config Replacement
Each ApplyConfig call replaces the entire configuration:
// Config A
cap.ApplyConfig(capitan.Config{
Signals: map[string]capitan.SignalConfig{
"order.*": {BufferSize: 32},
},
})
// Config B - replaces A entirely
cap.ApplyConfig(capitan.Config{
Signals: map[string]capitan.SignalConfig{
"payment.*": {BufferSize: 64},
},
})
// Now: order.* has default config, payment.* has BufferSize=64
Signals not in the new config revert to defaults. Only workers with actual config changes are rebuilt.
Hot-Reload with Flux
Flux is a reactive configuration library that watches files for changes and invokes callbacks when they update. Integrate it with capitan for file-based hot-reload:
import (
"github.com/zoobz-io/capitan"
"github.com/zoobz-io/flux"
"github.com/zoobz-io/flux/file"
)
func main() {
cap := capitan.New()
// Watch config file, apply on change
watcher := file.JSON[capitan.Config]("/etc/myapp/capitan.json")
flux.New(watcher, func(cfg capitan.Config) {
if err := cap.ApplyConfig(cfg); err != nil {
log.Printf("config error: %v", err)
}
})
// ...
}
Example config file (capitan.json):
{
"signals": {
"order.*": {"bufferSize": 64, "minSeverity": "INFO"},
"debug.*": {"disabled": true},
"metrics.*": {"dropPolicy": "drop_newest", "rateLimit": 100}
}
}
Changes apply immediately without restart.
Configuration Summary
| Option | Default | Production Guidance |
|---|---|---|
WithBufferSize(n) | 16 | Start with default, increase if queue depths spike |
WithPanicHandler(fn) | silent | Always configure in production for visibility |
WithSyncMode() | off | Testing only |
ApplyConfig(cfg) | none | Use for per-signal tuning and hot-reload |