zoobzio December 8, 2025 Edit this page

Persistence

Store events to a database for audit trails, debugging, and replay. An observer captures events; the replay API re-emits them on demand.

The Pattern

Emit(ctx, signal, fields...)
       │
       ▼
┌─────────────┐
│   Capitan   │───▶ Listeners
└──────┬──────┘
       │
 ┌─────▼─────┐
 │  Observer │
 │ (persist) │
 └─────┬─────┘
       │
 ┌─────▼─────┐
 │  Database │
 └─────┬─────┘
       │
 ┌─────▼─────┐
 │  Replay() │
 └───────────┘

Storage Format

Capture all event data for reconstruction:

type StoredEvent struct {
    ID        string         `json:"id"`
    Signal    string         `json:"signal"`
    Severity  string         `json:"severity"`
    Timestamp time.Time      `json:"timestamp"`
    Fields    map[string]any `json:"fields"`
}

func FromEvent(e *capitan.Event) StoredEvent {
    fields := make(map[string]any)
    for _, f := range e.Fields() {
        fields[f.Key().Name()] = f.Value()
    }
    return StoredEvent{
        ID:        generateID(),
        Signal:    e.Signal().Name(),
        Severity:  string(e.Severity()),
        Timestamp: e.Timestamp(),
        Fields:    fields,
    }
}

Persistence Observer

Attach an observer that writes events to storage:

func PersistenceObserver(store *EventStore) capitan.EventCallback {
    return func(ctx context.Context, e *capitan.Event) {
        if e.IsReplay() {
            return // Don't re-persist replayed events
        }
        store.Save(ctx, FromEvent(e))
    }
}

// Attach at startup
capitan.Observe(PersistenceObserver(store))

Selective Persistence

Persist only specific signals if storage is constrained:

capitan.Observe(PersistenceObserver(store),
    OrderCreated,
    OrderConfirmed,
    PaymentFailed, // Persist failures for debugging
)

Replay

Reconstruct and re-emit stored events:

func Replay(ctx context.Context, stored StoredEvent, signals map[string]capitan.Signal) {
    signal, ok := signals[stored.Signal]
    if !ok {
        return // Unknown signal
    }

    fields := decodeFields(stored.Fields)
    severity := parseSeverity(stored.Severity)

    e := capitan.NewEvent(signal, severity, stored.Timestamp, fields...)
    capitan.Replay(ctx, e)
}

Signal Registry

Map signal names to values for replay:

var signals = map[string]capitan.Signal{
    "order.created":   OrderCreated,
    "order.confirmed": OrderConfirmed,
    "payment.failed":  PaymentFailed,
}

Handling Replays in Listeners

Check IsReplay() to skip side effects:

capitan.Hook(OrderConfirmed, func(ctx context.Context, e *capitan.Event) {
    // Skip external calls during replay
    if !e.IsReplay() {
        sendConfirmationEmail(e)
    }

    // Always update internal state
    recordOrderConfirmed(e)
})

Considerations

Idempotency

Listeners should handle replay safely. Use IsReplay() to guard side effects like emails, payments, or external API calls.

Replay Ordering

Events replay in the order you query them. Use timestamp ordering to preserve original sequence.

Storage Growth

Events accumulate. Implement retention policies:

func (s *EventStore) Prune(ctx context.Context, olderThan time.Duration) error {
    cutoff := time.Now().Add(-olderThan)
    _, err := s.db.ExecContext(ctx, `DELETE FROM events WHERE timestamp < $1`, cutoff)
    return err
}

Replay vs Normal Processing

AspectNormalReplay
ProcessingAsync (worker queue)Sync (calling goroutine)
PoolingEvents pooled and reusedEvents not pooled
IsReplay()falsetrue
PersistenceStored by observerSkipped (already stored)

Use Cases

  • Audit trails — Complete record of system activity
  • Debugging — Replay events to reproduce issues
  • Backfilling — Populate new listeners with historical data
  • Testing — Replay production events in test environments

See Also