Testing
Capitan provides utilities for testing event-driven code. The testing package offers helpers for capturing events, counting emissions, and recording panics. See Testing Package Reference for complete API documentation.
Sync Mode
By default, capitan processes events asynchronously. For deterministic tests, use sync mode:
func TestOrderWorkflow(t *testing.T) {
c := capitan.New(capitan.WithSyncMode())
defer c.Shutdown()
var received string
c.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
if id, ok := orderID.From(e); ok {
received = id
}
})
c.Emit(context.Background(), orderCreated, orderID.Field("ORDER-123"))
// No waiting needed - event processed synchronously
if received != "ORDER-123" {
t.Errorf("expected ORDER-123, got %s", received)
}
}
In sync mode, Emit calls listeners directly instead of queueing to workers. This eliminates timing dependencies.
Event Capture
The capitantesting.EventCapture helper captures events for assertions.
Import the testing package:
import capitantesting "github.com/zoobz-io/capitan/testing"
Use it in tests:
import (
"testing"
"github.com/zoobz-io/capitan"
capitantesting "github.com/zoobz-io/capitan/testing"
)
func TestEmitsOrderCreated(t *testing.T) {
c := capitan.New(capitan.WithSyncMode())
defer c.Shutdown()
capture := capitantesting.NewEventCapture()
c.Hook(orderCreated, capture.Handler())
// Exercise code that emits events
createOrder("ORDER-123", 99.99)
// Assert captured events
events := capture.Events()
if len(events) != 1 {
t.Fatalf("expected 1 event, got %d", len(events))
}
if events[0].Signal != orderCreated {
t.Errorf("wrong signal: %v", events[0].Signal)
}
}
Extracting Field Values
Use ExtractFromFields to get typed values from captured events:
events := capture.Events()
id := orderID.ExtractFromFields(events[0].Fields)
total := totalKey.ExtractFromFields(events[0].Fields)
if id != "ORDER-123" {
t.Errorf("expected ORDER-123, got %s", id)
}
if total != 99.99 {
t.Errorf("expected 99.99, got %f", total)
}
Waiting for Events (Async Mode)
When testing without sync mode, use WaitForCount:
func TestAsyncEmission(t *testing.T) {
c := capitan.New() // async mode
defer c.Shutdown()
capture := capitantesting.NewEventCapture()
c.Hook(orderCreated, capture.Handler())
c.Emit(context.Background(), orderCreated, orderID.Field("ORDER-123"))
// Wait for event to be processed
if !capture.WaitForCount(1, time.Second) {
t.Fatal("timeout waiting for event")
}
events := capture.Events()
// ... assertions
}
Using Drain for Synchronization
Alternatively, use Drain to wait for all queued events to process:
func TestAsyncWithDrain(t *testing.T) {
c := capitan.New() // async mode
defer c.Shutdown()
var received string
c.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
if id, ok := orderID.From(e); ok {
received = id
}
})
c.Emit(context.Background(), orderCreated, orderID.Field("ORDER-123"))
// Wait for all queued events to process
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := c.Drain(ctx); err != nil {
t.Fatal("drain timed out")
}
if received != "ORDER-123" {
t.Errorf("expected ORDER-123, got %s", received)
}
}
Unlike sync mode, Drain preserves async semantics while providing a synchronization point. Unlike Shutdown, it leaves workers running so you can continue emitting events.
Event Counter
For tests that only need to verify event counts:
func TestEmitsCorrectCount(t *testing.T) {
c := capitan.New(capitan.WithSyncMode())
defer c.Shutdown()
counter := capitantesting.NewEventCounter()
c.Hook(orderCreated, counter.Handler())
for i := 0; i < 100; i++ {
c.Emit(context.Background(), orderCreated, orderID.Field(fmt.Sprintf("ORDER-%d", i)))
}
if counter.Count() != 100 {
t.Errorf("expected 100 events, got %d", counter.Count())
}
}
Panic Recording
Test that panics are recovered and handled:
func TestPanicRecovery(t *testing.T) {
recorder := capitantesting.NewPanicRecorder()
c := capitan.New(
capitan.WithSyncMode(),
capitan.WithPanicHandler(recorder.Handler()),
)
defer c.Shutdown()
c.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
panic("intentional panic")
})
// Should not crash
c.Emit(context.Background(), orderCreated, orderID.Field("ORDER-123"))
panics := recorder.Panics()
if len(panics) != 1 {
t.Fatalf("expected 1 panic, got %d", len(panics))
}
if panics[0].Recovered != "intentional panic" {
t.Errorf("wrong panic value: %v", panics[0].Recovered)
}
}
Stats Waiter
For async tests that need to wait for specific conditions:
func TestWorkerCreation(t *testing.T) {
c := capitan.New()
defer c.Shutdown()
waiter := capitantesting.NewStatsWaiter(c)
c.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {})
c.Emit(context.Background(), orderCreated, orderID.Field("ORDER-123"))
// Wait for worker to be created
if !waiter.WaitForWorkers(1, time.Second) {
t.Fatal("timeout waiting for worker")
}
// Wait for queue to drain
if !waiter.WaitForEmptyQueues(time.Second) {
t.Fatal("timeout waiting for queue to drain")
}
}
Isolated Instances
Always create isolated instances for tests to avoid cross-test contamination:
func TestFeatureA(t *testing.T) {
c := capitan.New(capitan.WithSyncMode())
defer c.Shutdown()
// Test with isolated instance
}
func TestFeatureB(t *testing.T) {
c := capitan.New(capitan.WithSyncMode())
defer c.Shutdown()
// Separate instance, no shared state
}
Avoid using the default singleton (capitan.Emit, capitan.Hook) in tests—instances don't reset between test runs.
See Configuration for more on sync mode and instance creation.
Testing Observers
Capture events from observers the same way:
func TestObserverReceivesAllEvents(t *testing.T) {
c := capitan.New(capitan.WithSyncMode())
defer c.Shutdown()
capture := capitantesting.NewEventCapture()
c.Observe(capture.Handler())
c.Emit(context.Background(), orderCreated, orderID.Field("ORDER-1"))
c.Emit(context.Background(), orderShipped, orderID.Field("ORDER-1"))
events := capture.Events()
if len(events) != 2 {
t.Fatalf("expected 2 events, got %d", len(events))
}
if events[0].Signal != orderCreated {
t.Errorf("first event should be orderCreated")
}
if events[1].Signal != orderShipped {
t.Errorf("second event should be orderShipped")
}
}
Testing Workflows
For workflow tests, capture at multiple points:
func TestOrderWorkflow(t *testing.T) {
c := capitan.New(capitan.WithSyncMode())
defer c.Shutdown()
orderCapture := capitantesting.NewEventCapture()
inventoryCapture := capitantesting.NewEventCapture()
c.Hook(orderCreated, orderCapture.Handler())
c.Hook(inventoryReserved, inventoryCapture.Handler())
// Wire workflow: order.created → inventory.reserved
c.Hook(orderCreated, func(ctx context.Context, e *capitan.Event) {
id, ok := orderID.From(e)
if !ok {
return
}
c.Emit(ctx, inventoryReserved, orderID.Field(id))
})
// Trigger workflow
c.Emit(context.Background(), orderCreated, orderID.Field("ORDER-123"))
// Verify both events fired
if orderCapture.Count() != 1 {
t.Error("order.created not captured")
}
if inventoryCapture.Count() != 1 {
t.Error("inventory.reserved not captured")
}
}
See Also
- Testing Package Reference - Complete API documentation for all testing utilities
- Configuration - Sync mode and instance creation options
- Best Practices - Testing guidelines and recommendations