Errors
Capitan provides patterns for handling errors in event-driven systems: error fields, severity levels, and failure workflows.
Error Fields
Use ErrorKey to attach errors to events:
var errKey = capitan.NewErrorKey("error")
func processPayment(ctx context.Context, orderID string) {
err := chargeCustomer(orderID)
if err != nil {
capitan.Error(ctx, paymentFailed,
orderIDKey.Field(orderID),
errKey.Field(err),
)
return
}
capitan.Emit(ctx, paymentProcessed, orderIDKey.Field(orderID))
}
Extract errors in listeners:
capitan.Hook(paymentFailed, func(ctx context.Context, e *capitan.Event) {
err, ok := errKey.From(e)
if ok {
log.Printf("Payment failed: %v", err)
}
})
Severity Levels
Use severity to distinguish error conditions:
// Normal operation
capitan.Emit(ctx, orderCreated, fields...) // INFO (default)
capitan.Info(ctx, orderCreated, fields...) // Explicit INFO
// Warning conditions
capitan.Warn(ctx, lowStock, fields...) // WARN
// Error conditions
capitan.Error(ctx, paymentFailed, fields...) // ERROR
// Debug information
capitan.Debug(ctx, queryExecuted, fields...) // DEBUG
Routing by Severity
Filter events by severity in observers:
capitan.Observe(func(ctx context.Context, e *capitan.Event) {
switch e.Severity() {
case capitan.SeverityError:
alertOps(e)
logError(e)
case capitan.SeverityWarn:
logWarning(e)
case capitan.SeverityInfo:
logInfo(e)
case capitan.SeverityDebug:
if debugEnabled {
logDebug(e)
}
}
})
Error-Only Observer
Observe only error events:
capitan.Observe(func(ctx context.Context, e *capitan.Event) {
if e.Severity() != capitan.SeverityError {
return
}
// Alert on all errors
alerting.Send(alerting.Alert{
Signal: e.Signal().Name(),
Message: e.Signal().Description(),
Fields: extractFields(e),
})
})
Failure Signals
Define explicit failure signals for error workflows:
var (
paymentProcessed = capitan.NewSignal("payment.processed", "Payment successful")
paymentFailed = capitan.NewSignal("payment.failed", "Payment failed")
inventoryReserved = capitan.NewSignal("inventory.reserved", "Stock reserved")
inventoryReserveFailed = capitan.NewSignal("inventory.reserve_failed", "Stock reservation failed")
)
var (
orderIDKey = capitan.NewStringKey("order_id")
errKey = capitan.NewErrorKey("error")
reasonKey = capitan.NewStringKey("reason")
)
Error Workflows
Handle failures as workflow branches:
// Success path
capitan.Hook(paymentProcessed, func(ctx context.Context, e *capitan.Event) {
orderID, _ := orderIDKey.From(e)
confirmOrder(orderID)
capitan.Emit(ctx, orderConfirmed, orderIDKey.Field(orderID))
})
// Failure path
capitan.Hook(paymentFailed, func(ctx context.Context, e *capitan.Event) {
orderID, _ := orderIDKey.From(e)
err, _ := errKey.From(e)
// Release reserved inventory
releaseStock(orderID)
capitan.Emit(ctx, inventoryReleased, orderIDKey.Field(orderID))
// Cancel order
cancelOrder(orderID, err.Error())
capitan.Emit(ctx, orderCanceled,
orderIDKey.Field(orderID),
reasonKey.Field(err.Error()),
)
// Notify customer
notifyCustomer(orderID, "payment_failed")
})
Retry Patterns
Implement retries with failure events:
var (
processJob = capitan.NewSignal("job.process", "Process job")
jobCompleted = capitan.NewSignal("job.completed", "Job completed")
jobFailed = capitan.NewSignal("job.failed", "Job failed")
jobRetryExceeded = capitan.NewSignal("job.retry_exceeded", "Job exceeded retry limit")
)
var (
jobIDKey = capitan.NewStringKey("job_id")
attemptKey = capitan.NewIntKey("attempt")
maxRetries = 3
)
capitan.Hook(processJob, func(ctx context.Context, e *capitan.Event) {
jobID, _ := jobIDKey.From(e)
attempt, _ := attemptKey.From(e)
if attempt == 0 {
attempt = 1
}
err := doWork(jobID)
if err != nil {
if attempt >= maxRetries {
capitan.Error(ctx, jobRetryExceeded,
jobIDKey.Field(jobID),
attemptKey.Field(attempt),
errKey.Field(err),
)
return
}
// Schedule retry
capitan.Warn(ctx, jobFailed,
jobIDKey.Field(jobID),
attemptKey.Field(attempt),
errKey.Field(err),
)
return
}
capitan.Emit(ctx, jobCompleted, jobIDKey.Field(jobID))
})
// Retry handler
capitan.Hook(jobFailed, func(ctx context.Context, e *capitan.Event) {
jobID, _ := jobIDKey.From(e)
attempt, _ := attemptKey.From(e)
// Exponential backoff
delay := time.Duration(attempt*attempt) * time.Second
time.AfterFunc(delay, func() {
capitan.Emit(context.Background(), processJob,
jobIDKey.Field(jobID),
attemptKey.Field(attempt+1),
)
})
})
// Dead letter handler
capitan.Hook(jobRetryExceeded, func(ctx context.Context, e *capitan.Event) {
jobID, _ := jobIDKey.From(e)
err, _ := errKey.From(e)
log.Printf("Job %s failed permanently: %v", jobID, err)
moveToDeadLetter(jobID)
alertOps(jobID, err)
})
Error Aggregation
Aggregate errors for batch reporting:
type ErrorAggregator struct {
errors []ErrorRecord
mu sync.Mutex
}
type ErrorRecord struct {
Signal string
Error error
Timestamp time.Time
Fields map[string]any
}
func (a *ErrorAggregator) Handler() capitan.EventCallback {
return func(ctx context.Context, e *capitan.Event) {
if e.Severity() != capitan.SeverityError {
return
}
err, _ := errKey.From(e)
a.mu.Lock()
a.errors = append(a.errors, ErrorRecord{
Signal: e.Signal().Name(),
Error: err,
Timestamp: e.Timestamp(),
Fields: extractFields(e),
})
a.mu.Unlock()
}
}
func (a *ErrorAggregator) Flush() []ErrorRecord {
a.mu.Lock()
defer a.mu.Unlock()
result := a.errors
a.errors = nil
return result
}
// Usage
aggregator := &ErrorAggregator{}
capitan.Observe(aggregator.Handler())
// Periodic flush
go func() {
for range time.Tick(time.Minute) {
errors := aggregator.Flush()
if len(errors) > 0 {
sendErrorReport(errors)
}
}
}()
Panic vs Error Events
Panics and error events serve different purposes:
| Panic | Error Event |
|---|---|
| Unexpected, unrecoverable | Expected, handled |
| Bug in listener code | Business logic failure |
| Logged via panic handler | Part of normal flow |
| Listener terminates | Listener completes |
Use panics for programming errors. Use error events for domain failures. See Configuration for panic handler setup and Testing for testing error paths:
capitan.Hook(processOrder, func(ctx context.Context, e *capitan.Event) {
order, ok := orderKey.From(e)
if !ok {
// Programming error - this shouldn't happen
panic("order field missing from processOrder event")
}
err := validateOrder(order)
if err != nil {
// Domain error - expected failure case
capitan.Error(ctx, orderValidationFailed,
orderIDKey.Field(order.ID),
errKey.Field(err),
)
return
}
// Continue processing...
})
Error Context
Include context for debugging:
var (
errKey = capitan.NewErrorKey("error")
componentKey = capitan.NewStringKey("component")
operationKey = capitan.NewStringKey("operation")
inputKey = capitan.NewStringKey("input")
)
func processWithContext(ctx context.Context, input string) {
result, err := riskyOperation(input)
if err != nil {
capitan.Error(ctx, operationFailed,
errKey.Field(err),
componentKey.Field("payment-service"),
operationKey.Field("charge"),
inputKey.Field(input),
)
return
}
// ...
}
Observers can then log rich error context:
capitan.Observe(func(ctx context.Context, e *capitan.Event) {
if e.Severity() != capitan.SeverityError {
return
}
log.Printf("ERROR [%s] %s: %v (component=%s, operation=%s)",
e.Signal().Name(),
e.Signal().Description(),
errKey.ExtractFromFields(e.Fields()),
componentKey.ExtractFromFields(e.Fields()),
operationKey.ExtractFromFields(e.Fields()),
)
})