Persistence
Store events to a database for audit trails, debugging, and replay. An observer captures events; the replay API re-emits them on demand.
The Pattern
Emit(ctx, signal, fields...)
│
▼
┌─────────────┐
│ Capitan │───▶ Listeners
└──────┬──────┘
│
┌─────▼─────┐
│ Observer │
│ (persist) │
└─────┬─────┘
│
┌─────▼─────┐
│ Database │
└─────┬─────┘
│
┌─────▼─────┐
│ Replay() │
└───────────┘
Storage Format
Capture all event data for reconstruction:
type StoredEvent struct {
ID string `json:"id"`
Signal string `json:"signal"`
Severity string `json:"severity"`
Timestamp time.Time `json:"timestamp"`
Fields map[string]any `json:"fields"`
}
func FromEvent(e *capitan.Event) StoredEvent {
fields := make(map[string]any)
for _, f := range e.Fields() {
fields[f.Key().Name()] = f.Value()
}
return StoredEvent{
ID: generateID(),
Signal: e.Signal().Name(),
Severity: string(e.Severity()),
Timestamp: e.Timestamp(),
Fields: fields,
}
}
Persistence Observer
Attach an observer that writes events to storage:
func PersistenceObserver(store *EventStore) capitan.EventCallback {
return func(ctx context.Context, e *capitan.Event) {
if e.IsReplay() {
return // Don't re-persist replayed events
}
store.Save(ctx, FromEvent(e))
}
}
// Attach at startup
capitan.Observe(PersistenceObserver(store))
Selective Persistence
Persist only specific signals if storage is constrained:
capitan.Observe(PersistenceObserver(store),
OrderCreated,
OrderConfirmed,
PaymentFailed, // Persist failures for debugging
)
Replay
Reconstruct and re-emit stored events:
func Replay(ctx context.Context, stored StoredEvent, signals map[string]capitan.Signal) {
signal, ok := signals[stored.Signal]
if !ok {
return // Unknown signal
}
fields := decodeFields(stored.Fields)
severity := parseSeverity(stored.Severity)
e := capitan.NewEvent(signal, severity, stored.Timestamp, fields...)
capitan.Replay(ctx, e)
}
Signal Registry
Map signal names to values for replay:
var signals = map[string]capitan.Signal{
"order.created": OrderCreated,
"order.confirmed": OrderConfirmed,
"payment.failed": PaymentFailed,
}
Handling Replays in Listeners
Check IsReplay() to skip side effects:
capitan.Hook(OrderConfirmed, func(ctx context.Context, e *capitan.Event) {
// Skip external calls during replay
if !e.IsReplay() {
sendConfirmationEmail(e)
}
// Always update internal state
recordOrderConfirmed(e)
})
Considerations
Idempotency
Listeners should handle replay safely. Use IsReplay() to guard side effects like emails, payments, or external API calls.
Replay Ordering
Events replay in the order you query them. Use timestamp ordering to preserve original sequence.
Storage Growth
Events accumulate. Implement retention policies:
func (s *EventStore) Prune(ctx context.Context, olderThan time.Duration) error {
cutoff := time.Now().Add(-olderThan)
_, err := s.db.ExecContext(ctx, `DELETE FROM events WHERE timestamp < $1`, cutoff)
return err
}
Replay vs Normal Processing
| Aspect | Normal | Replay |
|---|---|---|
| Processing | Async (worker queue) | Sync (calling goroutine) |
| Pooling | Events pooled and reused | Events not pooled |
IsReplay() | false | true |
| Persistence | Stored by observer | Skipped (already stored) |
Use Cases
- Audit trails — Complete record of system activity
- Debugging — Replay events to reproduce issues
- Backfilling — Populate new listeners with historical data
- Testing — Replay production events in test environments
See Also
- Concepts: Replay — API details for
NewEvent,Replay,IsReplay - Testing — Testing patterns for replay scenarios