zoobzio December 12, 2025 Edit this page

Testing

Capitan provides utilities for testing event-driven code. The testing package offers helpers for capturing events, counting emissions, and recording panics. See Testing Package Reference for complete API documentation.

Sync Mode

By default, capitan processes events asynchronously. For deterministic tests, use sync mode:

func TestOrderWorkflow(t *testing.T) {
    c := capitan.New(capitan.WithSyncMode())
    defer c.Shutdown()

    var received string
    c.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
        if id, ok := orderID.From(e); ok {
            received = id
        }
    })

    c.Emit(context.Background(), orderCreated, orderID.Field("ORDER-123"))

    // No waiting needed - event processed synchronously
    if received != "ORDER-123" {
        t.Errorf("expected ORDER-123, got %s", received)
    }
}

In sync mode, Emit calls listeners directly instead of queueing to workers. This eliminates timing dependencies.

Event Capture

The capitantesting.EventCapture helper captures events for assertions.

Import the testing package:

import capitantesting "github.com/zoobz-io/capitan/testing"

Use it in tests:

import (
    "testing"

    "github.com/zoobz-io/capitan"
    capitantesting "github.com/zoobz-io/capitan/testing"
)

func TestEmitsOrderCreated(t *testing.T) {
    c := capitan.New(capitan.WithSyncMode())
    defer c.Shutdown()

    capture := capitantesting.NewEventCapture()
    c.Hook(orderCreated, capture.Handler())

    // Exercise code that emits events
    createOrder("ORDER-123", 99.99)

    // Assert captured events
    events := capture.Events()
    if len(events) != 1 {
        t.Fatalf("expected 1 event, got %d", len(events))
    }

    if events[0].Signal != orderCreated {
        t.Errorf("wrong signal: %v", events[0].Signal)
    }
}

Extracting Field Values

Use ExtractFromFields to get typed values from captured events:

events := capture.Events()
id := orderID.ExtractFromFields(events[0].Fields)
total := totalKey.ExtractFromFields(events[0].Fields)

if id != "ORDER-123" {
    t.Errorf("expected ORDER-123, got %s", id)
}
if total != 99.99 {
    t.Errorf("expected 99.99, got %f", total)
}

Waiting for Events (Async Mode)

When testing without sync mode, use WaitForCount:

func TestAsyncEmission(t *testing.T) {
    c := capitan.New() // async mode
    defer c.Shutdown()

    capture := capitantesting.NewEventCapture()
    c.Hook(orderCreated, capture.Handler())

    c.Emit(context.Background(), orderCreated, orderID.Field("ORDER-123"))

    // Wait for event to be processed
    if !capture.WaitForCount(1, time.Second) {
        t.Fatal("timeout waiting for event")
    }

    events := capture.Events()
    // ... assertions
}

Using Drain for Synchronization

Alternatively, use Drain to wait for all queued events to process:

func TestAsyncWithDrain(t *testing.T) {
    c := capitan.New() // async mode
    defer c.Shutdown()

    var received string
    c.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
        if id, ok := orderID.From(e); ok {
            received = id
        }
    })

    c.Emit(context.Background(), orderCreated, orderID.Field("ORDER-123"))

    // Wait for all queued events to process
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()
    if err := c.Drain(ctx); err != nil {
        t.Fatal("drain timed out")
    }

    if received != "ORDER-123" {
        t.Errorf("expected ORDER-123, got %s", received)
    }
}

Unlike sync mode, Drain preserves async semantics while providing a synchronization point. Unlike Shutdown, it leaves workers running so you can continue emitting events.

Event Counter

For tests that only need to verify event counts:

func TestEmitsCorrectCount(t *testing.T) {
    c := capitan.New(capitan.WithSyncMode())
    defer c.Shutdown()

    counter := capitantesting.NewEventCounter()
    c.Hook(orderCreated, counter.Handler())

    for i := 0; i < 100; i++ {
        c.Emit(context.Background(), orderCreated, orderID.Field(fmt.Sprintf("ORDER-%d", i)))
    }

    if counter.Count() != 100 {
        t.Errorf("expected 100 events, got %d", counter.Count())
    }
}

Panic Recording

Test that panics are recovered and handled:

func TestPanicRecovery(t *testing.T) {
    recorder := capitantesting.NewPanicRecorder()
    c := capitan.New(
        capitan.WithSyncMode(),
        capitan.WithPanicHandler(recorder.Handler()),
    )
    defer c.Shutdown()

    c.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
        panic("intentional panic")
    })

    // Should not crash
    c.Emit(context.Background(), orderCreated, orderID.Field("ORDER-123"))

    panics := recorder.Panics()
    if len(panics) != 1 {
        t.Fatalf("expected 1 panic, got %d", len(panics))
    }

    if panics[0].Recovered != "intentional panic" {
        t.Errorf("wrong panic value: %v", panics[0].Recovered)
    }
}

Stats Waiter

For async tests that need to wait for specific conditions:

func TestWorkerCreation(t *testing.T) {
    c := capitan.New()
    defer c.Shutdown()

    waiter := capitantesting.NewStatsWaiter(c)

    c.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {})
    c.Emit(context.Background(), orderCreated, orderID.Field("ORDER-123"))

    // Wait for worker to be created
    if !waiter.WaitForWorkers(1, time.Second) {
        t.Fatal("timeout waiting for worker")
    }

    // Wait for queue to drain
    if !waiter.WaitForEmptyQueues(time.Second) {
        t.Fatal("timeout waiting for queue to drain")
    }
}

Isolated Instances

Always create isolated instances for tests to avoid cross-test contamination:

func TestFeatureA(t *testing.T) {
    c := capitan.New(capitan.WithSyncMode())
    defer c.Shutdown()
    // Test with isolated instance
}

func TestFeatureB(t *testing.T) {
    c := capitan.New(capitan.WithSyncMode())
    defer c.Shutdown()
    // Separate instance, no shared state
}

Avoid using the default singleton (capitan.Emit, capitan.Hook) in tests—instances don't reset between test runs.

See Configuration for more on sync mode and instance creation.

Testing Observers

Capture events from observers the same way:

func TestObserverReceivesAllEvents(t *testing.T) {
    c := capitan.New(capitan.WithSyncMode())
    defer c.Shutdown()

    capture := capitantesting.NewEventCapture()
    c.Observe(capture.Handler())

    c.Emit(context.Background(), orderCreated, orderID.Field("ORDER-1"))
    c.Emit(context.Background(), orderShipped, orderID.Field("ORDER-1"))

    events := capture.Events()
    if len(events) != 2 {
        t.Fatalf("expected 2 events, got %d", len(events))
    }

    if events[0].Signal != orderCreated {
        t.Errorf("first event should be orderCreated")
    }
    if events[1].Signal != orderShipped {
        t.Errorf("second event should be orderShipped")
    }
}

Testing Workflows

For workflow tests, capture at multiple points:

func TestOrderWorkflow(t *testing.T) {
    c := capitan.New(capitan.WithSyncMode())
    defer c.Shutdown()

    orderCapture := capitantesting.NewEventCapture()
    inventoryCapture := capitantesting.NewEventCapture()

    c.Hook(orderCreated, orderCapture.Handler())
    c.Hook(inventoryReserved, inventoryCapture.Handler())

    // Wire workflow: order.created → inventory.reserved
    c.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
        id, ok := orderID.From(e)
        if !ok {
            return
        }
        c.Emit(ctx, inventoryReserved, orderID.Field(id))
    })

    // Trigger workflow
    c.Emit(context.Background(), orderCreated, orderID.Field("ORDER-123"))

    // Verify both events fired
    if orderCapture.Count() != 1 {
        t.Error("order.created not captured")
    }
    if inventoryCapture.Count() != 1 {
        t.Error("inventory.reserved not captured")
    }
}

See Also