zoobzio December 12, 2025 Edit this page

Core Concepts

Capitan has four primitives: signals, keys, fields, and listeners. Understanding these unlocks the full API.

Signals

A signal identifies an event type. Signals route events to listeners.

var orderCreated = capitan.NewSignal("order.created", "New order placed")

The first argument is the name—used for routing and logging. The second is a human-readable description.

Define signals as package-level variables. Each signal creates internal state (worker goroutines, registries). Dynamic signal creation leads to unbounded memory growth.

// Good: fixed set of signals
var (
    OrderCreated   = capitan.NewSignal("order.created", "New order placed")
    OrderConfirmed = capitan.NewSignal("order.confirmed", "Order confirmed")
    OrderCanceled  = capitan.NewSignal("order.canceled", "Order canceled")
)

// Bad: dynamic signal per user
signal := capitan.NewSignal(fmt.Sprintf("user.%s.action", userID), "...")

Use fields to carry variable data, not signal names.

Keys

A key defines a typed field name. Keys are bound to a specific type at compile time.

orderID := capitan.NewStringKey("order_id")
total := capitan.NewFloat64Key("total")
count := capitan.NewIntKey("count")
active := capitan.NewBoolKey("active")

Built-in Key Types

ConstructorTypeExample
NewStringKeystring"ORDER-123"
NewIntKeyint42
NewInt32Keyint3242
NewInt64Keyint6442
NewUintKeyuint42
NewUint32Keyuint3242
NewUint64Keyuint6442
NewFloat32Keyfloat323.14
NewFloat64Keyfloat643.14
NewBoolKeybooltrue
NewTimeKeytime.Timetime.Now()
NewDurationKeytime.Duration5 * time.Second
NewBytesKey[]byte[]byte{0x01, 0x02}
NewErrorKeyerrorerrors.New("failed")

Custom Types

Use NewKey[T] for structs or any custom type:

type Order struct {
    ID    string
    Total float64
    Items int
}

var orderKey = capitan.NewKey[Order]("order", "myapp.Order")

The second argument is the variant—a string that identifies the type. Use namespaced strings to avoid collisions (e.g., "myapp.Order" or "github.com/org/pkg.Type").

Fields

A field is a key-value pair. Create fields from keys:

orderID := capitan.NewStringKey("order_id")
total := capitan.NewFloat64Key("total")

// Create fields
idField := orderID.Field("ORDER-123")
totalField := total.Field(99.99)

Fields are passed to Emit:

capitan.Emit(ctx, orderCreated,
    orderID.Field("ORDER-123"),
    total.Field(99.99),
)

Extracting Values

Extract typed values from events using From:

capitan.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
    id, ok := orderID.From(e)
    if !ok {
        // Field not present or wrong type
        return
    }
    // id is a string
})

The From method returns (T, bool). The boolean is false if the field is missing or the type doesn't match.

Accessing All Fields

Get all fields from an event:

for _, field := range e.Fields() {
    fmt.Printf("%s: %v\n", field.Key().Name(), field.Value())
}

Or get a specific field by key:

field := e.Get(orderID)
if field != nil {
    // field.Value() returns any
}

Listeners

A listener handles events for a specific signal. Register with Hook:

listener := capitan.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
    // Handle event
})

Hook returns a *Listener that can be closed:

listener.Close()  // Stop receiving events

Multiple listeners can subscribe to the same signal. All receive every event.

One-Time Listeners

For events you only need to handle once (e.g., initialization, first occurrence), use HookOnce:

listener := capitan.HookOnce(signal, func(ctx context.Context, e *capitan.Event) {
    // Runs once, then auto-unregisters
    fmt.Println("First event received!")
})

The listener automatically closes after the callback fires. You can also close it early to prevent the callback from firing at all.

Listener Lifecycle

  • Listeners receive events asynchronously via the signal's worker goroutine
  • Listener panics are recovered automatically—one bad handler won't crash others
  • Closing a listener removes it from the registry immediately
  • When the last listener for a signal closes, the worker goroutine exits

Observers

An observer receives events from multiple signals. Use Observe for cross-cutting concerns:

// Observe all signals
observer := capitan.Observe(func(ctx context.Context, e *capitan.Event) {
    log.Printf("[%s] %s", e.Signal().Name(), e.Signal().Description())
})

Observers attach to existing signals and any signals created after registration.

Whitelisting Signals

Observe specific signals only:

observer := capitan.Observe(handler, orderCreated, orderConfirmed, orderCanceled)

The observer receives only events from the listed signals.

Closing Observers

observer.Close()  // Removes all underlying listeners

Severity Levels

Events have a severity level for filtering and logging:

capitan.Debug(ctx, signal, fields...)  // Development, troubleshooting
capitan.Info(ctx, signal, fields...)   // Normal operations
capitan.Warn(ctx, signal, fields...)   // Warning conditions
capitan.Error(ctx, signal, fields...)  // Error conditions

Emit dispatches with SeverityInfo and is the most common choice for normal operations. Use the named severity functions when you need explicit control:

// These are equivalent
capitan.Emit(ctx, orderCreated, fields...)
capitan.Info(ctx, orderCreated, fields...)

Access severity in listeners:

capitan.Hook(signal, func(ctx context.Context, e *capitan.Event) {
    switch e.Severity() {
    case capitan.SeverityError:
        alertOps(e)
    case capitan.SeverityWarn:
        logWarning(e)
    default:
        logInfo(e)
    }
})

Event Properties

Events expose metadata:

e.Signal()      // Signal - the event type
e.Timestamp()   // time.Time - when the event was created
e.Severity()    // Severity - DEBUG, INFO, WARN, ERROR
e.Context()     // context.Context - passed at emission time
e.Fields()      // []Field - all fields
e.Get(key)      // Field - specific field by key

Configuration

Optionally configure the default instance before first use:

capitan.Configure(
    capitan.WithBufferSize(128),
    capitan.WithPanicHandler(func(sig capitan.Signal, recovered any) {
        log.Printf("Panic on %s: %v", sig.Name(), recovered)
    }),
)

Options

OptionDefaultDescription
WithBufferSize(n)16Event queue buffer per signal
WithPanicHandler(fn)silentCalled when a listener panics
WithSyncMode()offProcess events synchronously (testing)

Multiple Instances

Create isolated instances for testing or separation:

c := capitan.New(
    capitan.WithBufferSize(256),
)

c.Hook(signal, handler)
c.Emit(ctx, signal, fields...)
c.Shutdown()

Shutdown and Drain

Gracefully drain pending events before exit:

capitan.Shutdown()

Shutdown closes all worker goroutines, processing remaining queued events first. Safe to call multiple times.

Drain Without Shutdown

To wait for queued events without stopping workers, use Drain:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := cap.Drain(ctx); err != nil {
    // Context cancelled before drain completed
}

Drain blocks until all queued events are processed but leaves workers running. Useful in tests and when you need synchronization points without full shutdown.

Checking Shutdown State

if cap.IsShutdown() {
    // Instance has been shut down
}

Replay

Capitan supports replaying historical events from storage. This is useful for debugging, backfilling new listeners, or reproducing issues.

Creating Events for Replay

Use NewEvent to construct events from stored data:

e := capitan.NewEvent(orderCreated, capitan.SeverityInfo, storedTimestamp,
    orderID.Field("ORD-123"),
    total.Field(99.99),
)

Unlike internally emitted events, NewEvent events are not pooled—they can be held safely.

Replaying Events

Use Replay to re-emit a constructed event:

capitan.Replay(ctx, e)

Replay events:

  • Preserve their original timestamp and severity
  • Are marked as replays—check with e.IsReplay()
  • Process synchronously in the calling goroutine (bypass worker queues entirely)
  • Are not pooled—safe to hold references after replay completes
  • Invoke all registered listeners, including observers

Replaying during normal operation is safe. Replayed events invoke the same listeners as regular events but run synchronously rather than through the async worker queue. This means replay doesn't compete for buffer space and won't experience backpressure.

Detecting Replays

Listeners can check if an event is a replay to skip side effects:

capitan.Hook(orderConfirmed, func(ctx context.Context, e *capitan.Event) {
    // Skip external calls during replay
    if !e.IsReplay() {
        sendConfirmationEmail(e)
    }

    // Always update internal state
    recordOrderConfirmed(e)
})

See Persistence for a complete replay implementation.