Event Sourcing Pattern for Distributed Systems

Event Sourcing Pattern for Distributed Systems

What is Event Sourcing?

Event Sourcing is an architectural pattern where state changes are stored as a sequence of events rather than persisting only the current state. Instead of updating records in place, the system appends immutable events to an event store. The current state is derived by replaying events from the beginning or from a snapshot.

Think of it like a bank account statement: rather than just storing your current balance, the bank records every deposit and withdrawal. Your balance is the sum of all transactions. If you need to understand why your balance is what it is, you can replay the history.

Core Concepts

Events as Source of Truth

In event sourcing, events are the primary source of truth:

Event Store

The event store is a specialized database for storing events:

Event Replay and Projections

Current state is built by replaying events:

When to Use Event Sourcing

Event Sourcing excels in these scenarios:

1. Domain-Driven Design with Rich Business Logic

When business rules require understanding historical context and state transitions matter as much as current state.

2. Audit and Compliance Requirements

Financial services, healthcare, and regulated industries need complete audit trails of all changes.

3. Temporal Queries

“What was the state at time X?” or “How did we get to this state?” questions are common.

4. Complex Event Processing

Systems that need to react to patterns of events over time, like fraud detection or recommendation engines.

5. Microservices Communication

Events provide a natural integration pattern between services, enabling eventual consistency.

Implementation Examples

Go Implementation

package eventsourcing

import (
    "context"
    "encoding/json"
    "time"
)

// Event represents a domain event
type Event interface {
    AggregateID() string
    EventType() string
    OccurredAt() time.Time
    EventData() interface{}
}

// BaseEvent provides common event fields
type BaseEvent struct {
    ID          string    `json:"id"`
    AggregateId string    `json:"aggregate_id"`
    Type        string    `json:"type"`
    Timestamp   time.Time `json:"timestamp"`
    Version     int       `json:"version"`
}

func (e BaseEvent) AggregateID() string  { return e.AggregateId }
func (e BaseEvent) EventType() string    { return e.Type }
func (e BaseEvent) OccurredAt() time.Time { return e.Timestamp }

// Example: Order domain events
type OrderCreatedEvent struct {
    BaseEvent
    CustomerID string  `json:"customer_id"`
    Amount     float64 `json:"amount"`
}

func (e OrderCreatedEvent) EventData() interface{} { return e }

type OrderShippedEvent struct {
    BaseEvent
    TrackingNumber string `json:"tracking_number"`
}

func (e OrderShippedEvent) EventData() interface{} { return e }

// EventStore interface
type EventStore interface {
    SaveEvents(ctx context.Context, aggregateID string, events []Event, expectedVersion int) error
    GetEvents(ctx context.Context, aggregateID string) ([]Event, error)
}

// Order aggregate
type Order struct {
    ID         string
    CustomerID string
    Amount     float64
    Status     string
    Version    int
    
    changes []Event // Uncommitted changes
}

// Apply event to aggregate
func (o *Order) Apply(event Event) {
    switch e := event.(type) {
    case OrderCreatedEvent:
        o.ID = e.AggregateID()
        o.CustomerID = e.CustomerID
        o.Amount = e.Amount
        o.Status = "created"
        
    case OrderShippedEvent:
        o.Status = "shipped"
    }
    
    o.Version++
}

// Create new order (command handler)
func (o *Order) Create(customerID string, amount float64) {
    event := OrderCreatedEvent{
        BaseEvent: BaseEvent{
            AggregateId: generateID(),
            Type:        "order.created",
            Timestamp:   time.Now(),
        },
        CustomerID: customerID,
        Amount:     amount,
    }
    
    o.Apply(event)
    o.changes = append(o.changes, event)
}

// Repository pattern for loading/saving aggregates
type OrderRepository struct {
    eventStore EventStore
}

func (r *OrderRepository) Load(ctx context.Context, orderID string) (*Order, error) {
    events, err := r.eventStore.GetEvents(ctx, orderID)
    if err != nil {
        return nil, err
    }
    
    order := &Order{}
    for _, event := range events {
        order.Apply(event)
    }
    
    return order, nil
}

func (r *OrderRepository) Save(ctx context.Context, order *Order) error {
    return r.eventStore.SaveEvents(ctx, order.ID, order.changes, order.Version)
}

Python Implementation with Projections

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Any, Protocol
from abc import ABC, abstractmethod

@dataclass
class Event:
    """Base event class"""
    aggregate_id: str
    event_type: str
    timestamp: datetime
    version: int
    data: Dict[str, Any]

class EventStore(Protocol):
    """Event store interface"""
    def save_events(self, aggregate_id: str, events: List[Event], expected_version: int) -> None:
        ...
    
    def get_events(self, aggregate_id: str) -> List[Event]:
        ...
    
    def get_all_events(self, event_type: str = None) -> List[Event]:
        ...

class Projection(ABC):
    """Base class for read model projections"""
    
    @abstractmethod
    def handle(self, event: Event) -> None:
        """Process an event and update the projection"""
        pass
    
    @abstractmethod
    def reset(self) -> None:
        """Reset projection state"""
        pass

class OrderSummaryProjection(Projection):
    """Projection for order summary view"""
    
    def __init__(self):
        self.orders: Dict[str, Dict[str, Any]] = {}
    
    def handle(self, event: Event) -> None:
        if event.event_type == "order.created":
            self.orders[event.aggregate_id] = {
                "id": event.aggregate_id,
                "customer_id": event.data["customer_id"],
                "amount": event.data["amount"],
                "status": "created",
                "created_at": event.timestamp,
            }
        
        elif event.event_type == "order.shipped":
            if event.aggregate_id in self.orders:
                self.orders[event.aggregate_id]["status"] = "shipped"
                self.orders[event.aggregate_id]["shipped_at"] = event.timestamp
    
    def reset(self) -> None:
        self.orders.clear()
    
    def get_order(self, order_id: str) -> Dict[str, Any]:
        return self.orders.get(order_id)
    
    def get_customer_orders(self, customer_id: str) -> List[Dict[str, Any]]:
        return [o for o in self.orders.values() if o["customer_id"] == customer_id]

class ProjectionManager:
    """Manages projection rebuilding and updates"""
    
    def __init__(self, event_store: EventStore):
        self.event_store = event_store
        self.projections: List[Projection] = []
    
    def register(self, projection: Projection) -> None:
        self.projections.append(projection)
    
    def rebuild_all(self) -> None:
        """Rebuild all projections from scratch"""
        for projection in self.projections:
            projection.reset()
        
        events = self.event_store.get_all_events()
        for event in events:
            for projection in self.projections:
                projection.handle(event)
    
    def handle_event(self, event: Event) -> None:
        """Handle a new event in all projections"""
        for projection in self.projections:
            projection.handle(event)

# Example: Order aggregate
@dataclass
class Order:
    id: str = ""
    customer_id: str = ""
    amount: float = 0.0
    status: str = "pending"
    version: int = 0
    changes: List[Event] = field(default_factory=list)
    
    def apply(self, event: Event) -> None:
        if event.event_type == "order.created":
            self.id = event.aggregate_id
            self.customer_id = event.data["customer_id"]
            self.amount = event.data["amount"]
            self.status = "created"
        
        elif event.event_type == "order.shipped":
            self.status = "shipped"
        
        self.version += 1
    
    def create(self, customer_id: str, amount: float) -> None:
        event = Event(
            aggregate_id=self.id,
            event_type="order.created",
            timestamp=datetime.now(),
            version=self.version + 1,
            data={"customer_id": customer_id, "amount": amount}
        )
        
        self.apply(event)
        self.changes.append(event)
    
    def ship(self, tracking_number: str) -> None:
        if self.status != "created":
            raise ValueError("Order must be in created state to ship")
        
        event = Event(
            aggregate_id=self.id,
            event_type="order.shipped",
            timestamp=datetime.now(),
            version=self.version + 1,
            data={"tracking_number": tracking_number}
        )
        
        self.apply(event)
        self.changes.append(event)

ReactJS State Management with Event Sourcing

// Event-sourced state management for React
import { useReducer, useEffect } from 'react';

// Event types
const EventTypes = {
  ORDER_CREATED: 'order.created',
  ITEM_ADDED: 'order.item_added',
  ITEM_REMOVED: 'order.item_removed',
  ORDER_SUBMITTED: 'order.submitted',
};

// Event creators
const createEvent = (type, data) => ({
  id: generateId(),
  type,
  timestamp: Date.now(),
  data,
});

// Reducer applies events to state
const orderReducer = (state, event) => {
  switch (event.type) {
    case EventTypes.ORDER_CREATED:
      return {
        ...state,
        id: event.data.orderId,
        items: [],
        status: 'draft',
      };
    
    case EventTypes.ITEM_ADDED:
      return {
        ...state,
        items: [...state.items, event.data.item],
      };
    
    case EventTypes.ITEM_REMOVED:
      return {
        ...state,
        items: state.items.filter(item => item.id !== event.data.itemId),
      };
    
    case EventTypes.ORDER_SUBMITTED:
      return {
        ...state,
        status: 'submitted',
        submittedAt: event.timestamp,
      };
    
    default:
      return state;
  }
};

// Custom hook for event-sourced state
const useEventSourcedState = (reducer, initialEvents = []) => {
  const [state, dispatch] = useReducer(reducer, initialEvents.reduce(reducer, {}));
  const [events, setEvents] = useReducer((events, newEvent) => [...events, newEvent], initialEvents);
  
  const dispatchEvent = (event) => {
    setEvents(event);
    dispatch(event);
    
    // Persist event to backend
    persistEvent(event);
  };
  
  // Time travel: rebuild state at any point
  const replayTo = (targetTimestamp) => {
    const relevantEvents = events.filter(e => e.timestamp <= targetTimestamp);
    return relevantEvents.reduce(reducer, {});
  };
  
  return [state, dispatchEvent, { events, replayTo }];
};

// Usage in component
const OrderComponent = () => {
  const [order, dispatchEvent, { events, replayTo }] = useEventSourcedState(orderReducer);
  
  const addItem = (item) => {
    const event = createEvent(EventTypes.ITEM_ADDED, { item });
    dispatchEvent(event);
  };
  
  const removeItem = (itemId) => {
    const event = createEvent(EventTypes.ITEM_REMOVED, { itemId });
    dispatchEvent(event);
  };
  
  const submitOrder = () => {
    const event = createEvent(EventTypes.ORDER_SUBMITTED, {});
    dispatchEvent(event);
  };
  
  // Debug: Time travel to previous state
  const showStateAt = (timestamp) => {
    const historicalState = replayTo(timestamp);
    console.log('State at', new Date(timestamp), ':', historicalState);
  };
  
  return (
    <div>
      <h2>Order {order.id}</h2>
      <div>Status: {order.status}</div>
      <ul>
        {order.items?.map(item => (
          <li key={item.id}>
            {item.name} - ${item.price}
            <button onClick={() => removeItem(item.id)}>Remove</button>
          </li>
        ))}
      </ul>
      <button onClick={submitOrder}>Submit Order</button>
      
      {/* Event history for debugging */}
      <details>
        <summary>Event History ({events.length} events)</summary>
        <ul>
          {events.map(event => (
            <li key={event.id}>
              {event.type} at {new Date(event.timestamp).toISOString()}
              <button onClick={() => showStateAt(event.timestamp)}>
                Show state here
              </button>
            </li>
          ))}
        </ul>
      </details>
    </div>
  );
};

Trade-offs and Considerations

Advantages

  1. Complete Audit Trail: Every change is recorded with full context
  2. Temporal Queries: Can reconstruct state at any point in time
  3. Event-Driven Integration: Natural fit for microservices and event-driven architectures
  4. Debugging: Event history makes troubleshooting easier
  5. Domain Insights: Events capture business intent, not just data changes
  6. Flexibility: Can create new projections from historical events

Challenges

  1. Complexity: Steeper learning curve than CRUD
  2. Event Schema Evolution: Handling event structure changes over time
  3. Performance: Replaying long event streams can be slow (mitigated with snapshots)
  4. Storage: Events accumulate over time (but storage is cheap)
  5. Eventual Consistency: Projections may lag behind event store
  6. Queries: Complex queries may require multiple projections

Mitigation Strategies

Snapshots: Periodically save aggregate state to avoid replaying all events

type Snapshot struct {
    AggregateID string
    Version     int
    State       []byte
    CreatedAt   time.Time
}

Event Versioning: Use upcasters to transform old events to new schema

def upcast_event(event: Event) -> Event:
    if event.event_type == "order.created" and event.version == 1:
        # Add new required field to old events
        event.data["payment_method"] = "legacy"
        event.version = 2
    return event

CQRS Pattern: Separate write (commands) and read (queries) models for optimal performance

Best Practices for Principal Engineers

  1. Start Small: Begin with a single bounded context, not the entire system
  2. Define Event Granularity: Balance between too many fine-grained events and too few coarse events
  3. Immutable Events: Never modify events once persisted; create new compensating events instead
  4. Event Naming: Use past tense (OrderCreated, not CreateOrder) to reflect something that happened
  5. Idempotency: Ensure event handlers can process the same event multiple times safely
  6. Monitoring: Track event processing lag, projection health, and event store performance
  7. Testing: Events make testing easier - replay events to verify behavior

Conclusion

Event Sourcing is a powerful pattern for systems where history, auditability, and complex domain logic matter. While it introduces complexity, the benefits of complete audit trails, temporal queries, and natural event-driven integration make it valuable for many enterprise systems. Principal Engineers should consider Event Sourcing for domains with rich business logic, compliance requirements, or where understanding “how we got here” is as important as “where we are.”

Start with CQRS and event-driven architecture before committing fully to Event Sourcing. Once you understand the domain deeply, Event Sourcing provides a robust foundation for building scalable, auditable, and maintainable systems.