zoobzio December 8, 2025 Edit this page

Errors

Capitan provides patterns for handling errors in event-driven systems: error fields, severity levels, and failure workflows.

Error Fields

Use ErrorKey to attach errors to events:

var errKey = capitan.NewErrorKey("error")

func processPayment(ctx context.Context, orderID string) {
    err := chargeCustomer(orderID)
    if err != nil {
        capitan.Error(ctx, paymentFailed,
            orderIDKey.Field(orderID),
            errKey.Field(err),
        )
        return
    }

    capitan.Emit(ctx, paymentProcessed, orderIDKey.Field(orderID))
}

Extract errors in listeners:

capitan.Hook(paymentFailed, func(ctx context.Context, e *capitan.Event) {
    err, ok := errKey.From(e)
    if ok {
        log.Printf("Payment failed: %v", err)
    }
})

Severity Levels

Use severity to distinguish error conditions:

// Normal operation
capitan.Emit(ctx, orderCreated, fields...)      // INFO (default)
capitan.Info(ctx, orderCreated, fields...)      // Explicit INFO

// Warning conditions
capitan.Warn(ctx, lowStock, fields...)          // WARN

// Error conditions
capitan.Error(ctx, paymentFailed, fields...)    // ERROR

// Debug information
capitan.Debug(ctx, queryExecuted, fields...)    // DEBUG

Routing by Severity

Filter events by severity in observers:

capitan.Observe(func(ctx context.Context, e *capitan.Event) {
    switch e.Severity() {
    case capitan.SeverityError:
        alertOps(e)
        logError(e)
    case capitan.SeverityWarn:
        logWarning(e)
    case capitan.SeverityInfo:
        logInfo(e)
    case capitan.SeverityDebug:
        if debugEnabled {
            logDebug(e)
        }
    }
})

Error-Only Observer

Observe only error events:

capitan.Observe(func(ctx context.Context, e *capitan.Event) {
    if e.Severity() != capitan.SeverityError {
        return
    }

    // Alert on all errors
    alerting.Send(alerting.Alert{
        Signal:  e.Signal().Name(),
        Message: e.Signal().Description(),
        Fields:  extractFields(e),
    })
})

Failure Signals

Define explicit failure signals for error workflows:

var (
    paymentProcessed = capitan.NewSignal("payment.processed", "Payment successful")
    paymentFailed    = capitan.NewSignal("payment.failed", "Payment failed")

    inventoryReserved      = capitan.NewSignal("inventory.reserved", "Stock reserved")
    inventoryReserveFailed = capitan.NewSignal("inventory.reserve_failed", "Stock reservation failed")
)

var (
    orderIDKey = capitan.NewStringKey("order_id")
    errKey     = capitan.NewErrorKey("error")
    reasonKey  = capitan.NewStringKey("reason")
)

Error Workflows

Handle failures as workflow branches:

// Success path
capitan.Hook(paymentProcessed, func(ctx context.Context, e *capitan.Event) {
    orderID, _ := orderIDKey.From(e)
    confirmOrder(orderID)
    capitan.Emit(ctx, orderConfirmed, orderIDKey.Field(orderID))
})

// Failure path
capitan.Hook(paymentFailed, func(ctx context.Context, e *capitan.Event) {
    orderID, _ := orderIDKey.From(e)
    err, _ := errKey.From(e)

    // Release reserved inventory
    releaseStock(orderID)
    capitan.Emit(ctx, inventoryReleased, orderIDKey.Field(orderID))

    // Cancel order
    cancelOrder(orderID, err.Error())
    capitan.Emit(ctx, orderCanceled,
        orderIDKey.Field(orderID),
        reasonKey.Field(err.Error()),
    )

    // Notify customer
    notifyCustomer(orderID, "payment_failed")
})

Retry Patterns

Implement retries with failure events:

var (
    processJob       = capitan.NewSignal("job.process", "Process job")
    jobCompleted     = capitan.NewSignal("job.completed", "Job completed")
    jobFailed        = capitan.NewSignal("job.failed", "Job failed")
    jobRetryExceeded = capitan.NewSignal("job.retry_exceeded", "Job exceeded retry limit")
)

var (
    jobIDKey    = capitan.NewStringKey("job_id")
    attemptKey  = capitan.NewIntKey("attempt")
    maxRetries  = 3
)

capitan.Hook(processJob, func(ctx context.Context, e *capitan.Event) {
    jobID, _ := jobIDKey.From(e)
    attempt, _ := attemptKey.From(e)
    if attempt == 0 {
        attempt = 1
    }

    err := doWork(jobID)
    if err != nil {
        if attempt >= maxRetries {
            capitan.Error(ctx, jobRetryExceeded,
                jobIDKey.Field(jobID),
                attemptKey.Field(attempt),
                errKey.Field(err),
            )
            return
        }

        // Schedule retry
        capitan.Warn(ctx, jobFailed,
            jobIDKey.Field(jobID),
            attemptKey.Field(attempt),
            errKey.Field(err),
        )
        return
    }

    capitan.Emit(ctx, jobCompleted, jobIDKey.Field(jobID))
})

// Retry handler
capitan.Hook(jobFailed, func(ctx context.Context, e *capitan.Event) {
    jobID, _ := jobIDKey.From(e)
    attempt, _ := attemptKey.From(e)

    // Exponential backoff
    delay := time.Duration(attempt*attempt) * time.Second
    time.AfterFunc(delay, func() {
        capitan.Emit(context.Background(), processJob,
            jobIDKey.Field(jobID),
            attemptKey.Field(attempt+1),
        )
    })
})

// Dead letter handler
capitan.Hook(jobRetryExceeded, func(ctx context.Context, e *capitan.Event) {
    jobID, _ := jobIDKey.From(e)
    err, _ := errKey.From(e)

    log.Printf("Job %s failed permanently: %v", jobID, err)
    moveToDeadLetter(jobID)
    alertOps(jobID, err)
})

Error Aggregation

Aggregate errors for batch reporting:

type ErrorAggregator struct {
    errors []ErrorRecord
    mu     sync.Mutex
}

type ErrorRecord struct {
    Signal    string
    Error     error
    Timestamp time.Time
    Fields    map[string]any
}

func (a *ErrorAggregator) Handler() capitan.EventCallback {
    return func(ctx context.Context, e *capitan.Event) {
        if e.Severity() != capitan.SeverityError {
            return
        }

        err, _ := errKey.From(e)

        a.mu.Lock()
        a.errors = append(a.errors, ErrorRecord{
            Signal:    e.Signal().Name(),
            Error:     err,
            Timestamp: e.Timestamp(),
            Fields:    extractFields(e),
        })
        a.mu.Unlock()
    }
}

func (a *ErrorAggregator) Flush() []ErrorRecord {
    a.mu.Lock()
    defer a.mu.Unlock()

    result := a.errors
    a.errors = nil
    return result
}

// Usage
aggregator := &ErrorAggregator{}
capitan.Observe(aggregator.Handler())

// Periodic flush
go func() {
    for range time.Tick(time.Minute) {
        errors := aggregator.Flush()
        if len(errors) > 0 {
            sendErrorReport(errors)
        }
    }
}()

Panic vs Error Events

Panics and error events serve different purposes:

PanicError Event
Unexpected, unrecoverableExpected, handled
Bug in listener codeBusiness logic failure
Logged via panic handlerPart of normal flow
Listener terminatesListener completes

Use panics for programming errors. Use error events for domain failures. See Configuration for panic handler setup and Testing for testing error paths:

capitan.Hook(processOrder, func(ctx context.Context, e *capitan.Event) {
    order, ok := orderKey.From(e)
    if !ok {
        // Programming error - this shouldn't happen
        panic("order field missing from processOrder event")
    }

    err := validateOrder(order)
    if err != nil {
        // Domain error - expected failure case
        capitan.Error(ctx, orderValidationFailed,
            orderIDKey.Field(order.ID),
            errKey.Field(err),
        )
        return
    }

    // Continue processing...
})

Error Context

Include context for debugging:

var (
    errKey       = capitan.NewErrorKey("error")
    componentKey = capitan.NewStringKey("component")
    operationKey = capitan.NewStringKey("operation")
    inputKey     = capitan.NewStringKey("input")
)

func processWithContext(ctx context.Context, input string) {
    result, err := riskyOperation(input)
    if err != nil {
        capitan.Error(ctx, operationFailed,
            errKey.Field(err),
            componentKey.Field("payment-service"),
            operationKey.Field("charge"),
            inputKey.Field(input),
        )
        return
    }
    // ...
}

Observers can then log rich error context:

capitan.Observe(func(ctx context.Context, e *capitan.Event) {
    if e.Severity() != capitan.SeverityError {
        return
    }

    log.Printf("ERROR [%s] %s: %v (component=%s, operation=%s)",
        e.Signal().Name(),
        e.Signal().Description(),
        errKey.ExtractFromFields(e.Fields()),
        componentKey.ExtractFromFields(e.Fields()),
        operationKey.ExtractFromFields(e.Fields()),
    )
})