Event Sourcing with Golang: Building Auditable, Time-Travel Systems

Event Sourcing with Golang: Building Auditable, Time-Travel Systems

Event Sourcing is a powerful architectural pattern where you store application state as a sequence of events rather than just the current state. Instead of updating records in-place, you append immutable events that describe what happened. The current state is derived by replaying these events.

Core Concept

Traditional systems store the current state of entities:

User { id: 123, name: "Alice", email: "alice@example.com", balance: 1000 }

Event-sourced systems store the history:

1. UserCreated { id: 123, name: "Alice", email: "alice@example.com" }
2. DepositMade { id: 123, amount: 1000 }
3. EmailUpdated { id: 123, newEmail: "alice@example.com" }

When to Use Event Sourcing

Ideal Use Cases

When to Avoid

Implementation in Go

1. Define Events

package events

import (
    "time"
    "github.com/google/uuid"
)

// Event is the base interface all events must implement
type Event interface {
    EventID() uuid.UUID
    EventType() string
    AggregateID() uuid.UUID
    Timestamp() time.Time
    Version() int
}

// BaseEvent provides common event fields
type BaseEvent struct {
    ID          uuid.UUID `json:"id"`
    Type        string    `json:"type"`
    AggrID      uuid.UUID `json:"aggregate_id"`
    OccurredAt  time.Time `json:"occurred_at"`
    EventVersion int      `json:"version"`
}

func (e BaseEvent) EventID() uuid.UUID      { return e.ID }
func (e BaseEvent) EventType() string       { return e.Type }
func (e BaseEvent) AggregateID() uuid.UUID  { return e.AggrID }
func (e BaseEvent) Timestamp() time.Time    { return e.OccurredAt }
func (e BaseEvent) Version() int            { return e.EventVersion }

// Domain-specific events
type AccountCreated struct {
    BaseEvent
    Owner   string  `json:"owner"`
    Currency string `json:"currency"`
}

type MoneyDeposited struct {
    BaseEvent
    Amount  float64 `json:"amount"`
    Reason  string  `json:"reason"`
}

type MoneyWithdrawn struct {
    BaseEvent
    Amount  float64 `json:"amount"`
    Reason  string  `json:"reason"`
}

2. Event Store Interface

package eventstore

import (
    "context"
    "github.com/google/uuid"
)

// EventStore is the interface for persisting and retrieving events
type EventStore interface {
    // Save appends events to the store with optimistic locking
    Save(ctx context.Context, events []Event, expectedVersion int) error
    
    // Load retrieves all events for an aggregate
    Load(ctx context.Context, aggregateID uuid.UUID) ([]Event, error)
    
    // LoadFromVersion retrieves events starting from a specific version
    LoadFromVersion(ctx context.Context, aggregateID uuid.UUID, version int) ([]Event, error)
    
    // Subscribe creates a subscription to new events
    Subscribe(ctx context.Context, handler EventHandler) error
}

type EventHandler func(Event) error

3. PostgreSQL Event Store Implementation

package postgres

import (
    "context"
    "database/sql"
    "encoding/json"
    "fmt"
    
    "github.com/google/uuid"
    _ "github.com/lib/pq"
)

type PostgresEventStore struct {
    db *sql.DB
}

func NewPostgresEventStore(connStr string) (*PostgresEventStore, error) {
    db, err := sql.Open("postgres", connStr)
    if err != nil {
        return nil, err
    }
    return &PostgresEventStore{db: db}, nil
}

func (s *PostgresEventStore) Save(ctx context.Context, events []Event, expectedVersion int) error {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    
    // Optimistic concurrency check
    var currentVersion int
    err = tx.QueryRowContext(ctx,
        `SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = $1`,
        events[0].AggregateID(),
    ).Scan(&currentVersion)
    
    if err != nil && err != sql.ErrNoRows {
        return err
    }
    
    if currentVersion != expectedVersion {
        return fmt.Errorf("concurrency conflict: expected version %d, got %d", 
            expectedVersion, currentVersion)
    }
    
    // Insert events
    stmt, err := tx.PrepareContext(ctx, `
        INSERT INTO events (id, aggregate_id, event_type, event_data, version, occurred_at)
        VALUES ($1, $2, $3, $4, $5, $6)
    `)
    if err != nil {
        return err
    }
    defer stmt.Close()
    
    for i, event := range events {
        data, err := json.Marshal(event)
        if err != nil {
            return err
        }
        
        _, err = stmt.ExecContext(ctx,
            event.EventID(),
            event.AggregateID(),
            event.EventType(),
            data,
            expectedVersion+i+1,
            event.Timestamp(),
        )
        if err != nil {
            return err
        }
    }
    
    return tx.Commit()
}

func (s *PostgresEventStore) Load(ctx context.Context, aggregateID uuid.UUID) ([]Event, error) {
    rows, err := s.db.QueryContext(ctx, `
        SELECT event_type, event_data 
        FROM events 
        WHERE aggregate_id = $1 
        ORDER BY version ASC
    `, aggregateID)
    if err != nil {
        return nil, err
    }
    defer rows.Close()
    
    var events []Event
    for rows.Next() {
        var eventType string
        var eventData []byte
        
        if err := rows.Scan(&eventType, &eventData); err != nil {
            return nil, err
        }
        
        event, err := s.deserializeEvent(eventType, eventData)
        if err != nil {
            return nil, err
        }
        events = append(events, event)
    }
    
    return events, rows.Err()
}

func (s *PostgresEventStore) deserializeEvent(eventType string, data []byte) (Event, error) {
    // Factory pattern for event deserialization
    var event Event
    switch eventType {
    case "AccountCreated":
        event = &AccountCreated{}
    case "MoneyDeposited":
        event = &MoneyDeposited{}
    case "MoneyWithdrawn":
        event = &MoneyWithdrawn{}
    default:
        return nil, fmt.Errorf("unknown event type: %s", eventType)
    }
    
    if err := json.Unmarshal(data, event); err != nil {
        return nil, err
    }
    return event, nil
}

4. Aggregate Root with Event Sourcing

package domain

import (
    "fmt"
    "github.com/google/uuid"
)

type Account struct {
    id       uuid.UUID
    owner    string
    balance  float64
    currency string
    version  int
    
    // Uncommitted events
    changes []Event
}

func NewAccount(owner, currency string) *Account {
    acc := &Account{
        id:      uuid.New(),
        version: 0,
    }
    
    event := AccountCreated{
        BaseEvent: NewBaseEvent(acc.id, "AccountCreated", 1),
        Owner:     owner,
        Currency:  currency,
    }
    
    acc.apply(event)
    acc.changes = append(acc.changes, event)
    return acc
}

func (a *Account) Deposit(amount float64, reason string) error {
    if amount <= 0 {
        return fmt.Errorf("deposit amount must be positive")
    }
    
    event := MoneyDeposited{
        BaseEvent: NewBaseEvent(a.id, "MoneyDeposited", a.version+1),
        Amount:    amount,
        Reason:    reason,
    }
    
    a.apply(event)
    a.changes = append(a.changes, event)
    return nil
}

func (a *Account) Withdraw(amount float64, reason string) error {
    if amount <= 0 {
        return fmt.Errorf("withdrawal amount must be positive")
    }
    if a.balance < amount {
        return fmt.Errorf("insufficient funds")
    }
    
    event := MoneyWithdrawn{
        BaseEvent: NewBaseEvent(a.id, "MoneyWithdrawn", a.version+1),
        Amount:    amount,
        Reason:    reason,
    }
    
    a.apply(event)
    a.changes = append(a.changes, event)
    return nil
}

// apply applies an event to the aggregate state
func (a *Account) apply(event Event) {
    switch e := event.(type) {
    case AccountCreated:
        a.id = e.AggregateID()
        a.owner = e.Owner
        a.currency = e.Currency
        a.balance = 0
    case MoneyDeposited:
        a.balance += e.Amount
    case MoneyWithdrawn:
        a.balance -= e.Amount
    }
    a.version = event.Version()
}

// LoadFromHistory reconstructs aggregate from events
func LoadAccountFromHistory(events []Event) *Account {
    acc := &Account{}
    for _, event := range events {
        acc.apply(event)
    }
    return acc
}

func (a *Account) GetUncommittedChanges() []Event {
    return a.changes
}

func (a *Account) MarkChangesAsCommitted() {
    a.changes = nil
}

Key Patterns and Trade-offs

Benefits

  1. Complete Audit Trail - Every state change is recorded with context
  2. Time Travel - Reconstruct system state at any point in history
  3. Event Replay - Rebuild projections, fix bugs by replaying with corrected logic
  4. Decoupling - Multiple read models can subscribe to same events
  5. Debugging - Production issues can be reproduced by replaying events

Challenges

  1. Storage Growth - Events accumulate indefinitely (use snapshots)
  2. Event Schema Evolution - Need versioning strategy for event formats
  3. Eventual Consistency - Read models lag behind event stream
  4. Query Complexity - Requires CQRS for efficient queries
  5. Learning Curve - Team needs to understand event-driven thinking

Best Practices

1. Use Snapshots for Performance

// Save snapshot every 100 events
if version % 100 == 0 {
    snapshot := AccountSnapshot{
        AggregateID: account.id,
        Balance:     account.balance,
        Owner:       account.owner,
        Version:     version,
    }
    snapshotStore.Save(snapshot)
}

2. Handle Event Schema Evolution

type MoneyDepositedV2 struct {
    BaseEvent
    Amount   float64 `json:"amount"`
    Reason   string  `json:"reason"`
    Currency string  `json:"currency"` // New field
}

// Upconvert old events when loading
func upconvertEvent(old MoneyDeposited) MoneyDepositedV2 {
    return MoneyDepositedV2{
        BaseEvent: old.BaseEvent,
        Amount:    old.Amount,
        Reason:    old.Reason,
        Currency:  "USD", // Default for old events
    }
}

3. Combine with CQRS Event sourcing pairs naturally with Command Query Responsibility Segregation. Use event sourcing for writes, maintain separate optimized read models.

Conclusion

Event Sourcing in Go provides a robust foundation for systems requiring auditability, temporal queries, and complex business logic. While it adds complexity, the benefits of complete history, debuggability, and flexibility make it invaluable for financial systems, collaborative platforms, and analytics-heavy applications.

Start small—apply event sourcing to a bounded context within your system, not everywhere. Use established libraries like EventStore or build on top of PostgreSQL/Kafka for simpler use cases. As your team gains experience, expand to more aggregates.

The investment in event sourcing pays dividends when you need to understand “how did we get here?” rather than just “where are we now?”