Event Sourcing Pattern for Distributed Systems
Event Sourcing Pattern for Distributed Systems
What is Event Sourcing?
Event Sourcing is an architectural pattern where state changes are stored as a sequence of events rather than persisting only the current state. Instead of updating records in place, the system appends immutable events to an event store. The current state is derived by replaying events from the beginning or from a snapshot.
Think of it like a bank account statement: rather than just storing your current balance, the bank records every deposit and withdrawal. Your balance is the sum of all transactions. If you need to understand why your balance is what it is, you can replay the history.
Core Concepts
Events as Source of Truth
In event sourcing, events are the primary source of truth:
- Immutable: Once written, events never change
- Append-only: New events are always added, never modified
- Complete history: Every state change is recorded
- Auditable: Full audit trail comes for free
Event Store
The event store is a specialized database for storing events:
- Optimized for append operations
- Provides ordered retrieval of events
- Supports querying by aggregate ID or event type
- Examples: EventStoreDB, Apache Kafka, PostgreSQL with custom schema
Event Replay and Projections
Current state is built by replaying events:
- Aggregates: Domain objects rebuilt from event history
- Projections: Read models optimized for specific queries
- Snapshots: Periodic state captures to avoid replaying all events
When to Use Event Sourcing
Event Sourcing excels in these scenarios:
1. Domain-Driven Design with Rich Business Logic
When business rules require understanding historical context and state transitions matter as much as current state.
2. Audit and Compliance Requirements
Financial services, healthcare, and regulated industries need complete audit trails of all changes.
3. Temporal Queries
“What was the state at time X?” or “How did we get to this state?” questions are common.
4. Complex Event Processing
Systems that need to react to patterns of events over time, like fraud detection or recommendation engines.
5. Microservices Communication
Events provide a natural integration pattern between services, enabling eventual consistency.
Implementation Examples
Go Implementation
package eventsourcing
import (
"context"
"encoding/json"
"time"
)
// Event represents a domain event
type Event interface {
AggregateID() string
EventType() string
OccurredAt() time.Time
EventData() interface{}
}
// BaseEvent provides common event fields
type BaseEvent struct {
ID string `json:"id"`
AggregateId string `json:"aggregate_id"`
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
Version int `json:"version"`
}
func (e BaseEvent) AggregateID() string { return e.AggregateId }
func (e BaseEvent) EventType() string { return e.Type }
func (e BaseEvent) OccurredAt() time.Time { return e.Timestamp }
// Example: Order domain events
type OrderCreatedEvent struct {
BaseEvent
CustomerID string `json:"customer_id"`
Amount float64 `json:"amount"`
}
func (e OrderCreatedEvent) EventData() interface{} { return e }
type OrderShippedEvent struct {
BaseEvent
TrackingNumber string `json:"tracking_number"`
}
func (e OrderShippedEvent) EventData() interface{} { return e }
// EventStore interface
type EventStore interface {
SaveEvents(ctx context.Context, aggregateID string, events []Event, expectedVersion int) error
GetEvents(ctx context.Context, aggregateID string) ([]Event, error)
}
// Order aggregate
type Order struct {
ID string
CustomerID string
Amount float64
Status string
Version int
changes []Event // Uncommitted changes
}
// Apply event to aggregate
func (o *Order) Apply(event Event) {
switch e := event.(type) {
case OrderCreatedEvent:
o.ID = e.AggregateID()
o.CustomerID = e.CustomerID
o.Amount = e.Amount
o.Status = "created"
case OrderShippedEvent:
o.Status = "shipped"
}
o.Version++
}
// Create new order (command handler)
func (o *Order) Create(customerID string, amount float64) {
event := OrderCreatedEvent{
BaseEvent: BaseEvent{
AggregateId: generateID(),
Type: "order.created",
Timestamp: time.Now(),
},
CustomerID: customerID,
Amount: amount,
}
o.Apply(event)
o.changes = append(o.changes, event)
}
// Repository pattern for loading/saving aggregates
type OrderRepository struct {
eventStore EventStore
}
func (r *OrderRepository) Load(ctx context.Context, orderID string) (*Order, error) {
events, err := r.eventStore.GetEvents(ctx, orderID)
if err != nil {
return nil, err
}
order := &Order{}
for _, event := range events {
order.Apply(event)
}
return order, nil
}
func (r *OrderRepository) Save(ctx context.Context, order *Order) error {
return r.eventStore.SaveEvents(ctx, order.ID, order.changes, order.Version)
}
Python Implementation with Projections
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Any, Protocol
from abc import ABC, abstractmethod
@dataclass
class Event:
"""Base event class"""
aggregate_id: str
event_type: str
timestamp: datetime
version: int
data: Dict[str, Any]
class EventStore(Protocol):
"""Event store interface"""
def save_events(self, aggregate_id: str, events: List[Event], expected_version: int) -> None:
...
def get_events(self, aggregate_id: str) -> List[Event]:
...
def get_all_events(self, event_type: str = None) -> List[Event]:
...
class Projection(ABC):
"""Base class for read model projections"""
@abstractmethod
def handle(self, event: Event) -> None:
"""Process an event and update the projection"""
pass
@abstractmethod
def reset(self) -> None:
"""Reset projection state"""
pass
class OrderSummaryProjection(Projection):
"""Projection for order summary view"""
def __init__(self):
self.orders: Dict[str, Dict[str, Any]] = {}
def handle(self, event: Event) -> None:
if event.event_type == "order.created":
self.orders[event.aggregate_id] = {
"id": event.aggregate_id,
"customer_id": event.data["customer_id"],
"amount": event.data["amount"],
"status": "created",
"created_at": event.timestamp,
}
elif event.event_type == "order.shipped":
if event.aggregate_id in self.orders:
self.orders[event.aggregate_id]["status"] = "shipped"
self.orders[event.aggregate_id]["shipped_at"] = event.timestamp
def reset(self) -> None:
self.orders.clear()
def get_order(self, order_id: str) -> Dict[str, Any]:
return self.orders.get(order_id)
def get_customer_orders(self, customer_id: str) -> List[Dict[str, Any]]:
return [o for o in self.orders.values() if o["customer_id"] == customer_id]
class ProjectionManager:
"""Manages projection rebuilding and updates"""
def __init__(self, event_store: EventStore):
self.event_store = event_store
self.projections: List[Projection] = []
def register(self, projection: Projection) -> None:
self.projections.append(projection)
def rebuild_all(self) -> None:
"""Rebuild all projections from scratch"""
for projection in self.projections:
projection.reset()
events = self.event_store.get_all_events()
for event in events:
for projection in self.projections:
projection.handle(event)
def handle_event(self, event: Event) -> None:
"""Handle a new event in all projections"""
for projection in self.projections:
projection.handle(event)
# Example: Order aggregate
@dataclass
class Order:
id: str = ""
customer_id: str = ""
amount: float = 0.0
status: str = "pending"
version: int = 0
changes: List[Event] = field(default_factory=list)
def apply(self, event: Event) -> None:
if event.event_type == "order.created":
self.id = event.aggregate_id
self.customer_id = event.data["customer_id"]
self.amount = event.data["amount"]
self.status = "created"
elif event.event_type == "order.shipped":
self.status = "shipped"
self.version += 1
def create(self, customer_id: str, amount: float) -> None:
event = Event(
aggregate_id=self.id,
event_type="order.created",
timestamp=datetime.now(),
version=self.version + 1,
data={"customer_id": customer_id, "amount": amount}
)
self.apply(event)
self.changes.append(event)
def ship(self, tracking_number: str) -> None:
if self.status != "created":
raise ValueError("Order must be in created state to ship")
event = Event(
aggregate_id=self.id,
event_type="order.shipped",
timestamp=datetime.now(),
version=self.version + 1,
data={"tracking_number": tracking_number}
)
self.apply(event)
self.changes.append(event)
ReactJS State Management with Event Sourcing
// Event-sourced state management for React
import { useReducer, useEffect } from 'react';
// Event types
const EventTypes = {
ORDER_CREATED: 'order.created',
ITEM_ADDED: 'order.item_added',
ITEM_REMOVED: 'order.item_removed',
ORDER_SUBMITTED: 'order.submitted',
};
// Event creators
const createEvent = (type, data) => ({
id: generateId(),
type,
timestamp: Date.now(),
data,
});
// Reducer applies events to state
const orderReducer = (state, event) => {
switch (event.type) {
case EventTypes.ORDER_CREATED:
return {
...state,
id: event.data.orderId,
items: [],
status: 'draft',
};
case EventTypes.ITEM_ADDED:
return {
...state,
items: [...state.items, event.data.item],
};
case EventTypes.ITEM_REMOVED:
return {
...state,
items: state.items.filter(item => item.id !== event.data.itemId),
};
case EventTypes.ORDER_SUBMITTED:
return {
...state,
status: 'submitted',
submittedAt: event.timestamp,
};
default:
return state;
}
};
// Custom hook for event-sourced state
const useEventSourcedState = (reducer, initialEvents = []) => {
const [state, dispatch] = useReducer(reducer, initialEvents.reduce(reducer, {}));
const [events, setEvents] = useReducer((events, newEvent) => [...events, newEvent], initialEvents);
const dispatchEvent = (event) => {
setEvents(event);
dispatch(event);
// Persist event to backend
persistEvent(event);
};
// Time travel: rebuild state at any point
const replayTo = (targetTimestamp) => {
const relevantEvents = events.filter(e => e.timestamp <= targetTimestamp);
return relevantEvents.reduce(reducer, {});
};
return [state, dispatchEvent, { events, replayTo }];
};
// Usage in component
const OrderComponent = () => {
const [order, dispatchEvent, { events, replayTo }] = useEventSourcedState(orderReducer);
const addItem = (item) => {
const event = createEvent(EventTypes.ITEM_ADDED, { item });
dispatchEvent(event);
};
const removeItem = (itemId) => {
const event = createEvent(EventTypes.ITEM_REMOVED, { itemId });
dispatchEvent(event);
};
const submitOrder = () => {
const event = createEvent(EventTypes.ORDER_SUBMITTED, {});
dispatchEvent(event);
};
// Debug: Time travel to previous state
const showStateAt = (timestamp) => {
const historicalState = replayTo(timestamp);
console.log('State at', new Date(timestamp), ':', historicalState);
};
return (
<div>
<h2>Order {order.id}</h2>
<div>Status: {order.status}</div>
<ul>
{order.items?.map(item => (
<li key={item.id}>
{item.name} - ${item.price}
<button onClick={() => removeItem(item.id)}>Remove</button>
</li>
))}
</ul>
<button onClick={submitOrder}>Submit Order</button>
{/* Event history for debugging */}
<details>
<summary>Event History ({events.length} events)</summary>
<ul>
{events.map(event => (
<li key={event.id}>
{event.type} at {new Date(event.timestamp).toISOString()}
<button onClick={() => showStateAt(event.timestamp)}>
Show state here
</button>
</li>
))}
</ul>
</details>
</div>
);
};
Trade-offs and Considerations
Advantages
- Complete Audit Trail: Every change is recorded with full context
- Temporal Queries: Can reconstruct state at any point in time
- Event-Driven Integration: Natural fit for microservices and event-driven architectures
- Debugging: Event history makes troubleshooting easier
- Domain Insights: Events capture business intent, not just data changes
- Flexibility: Can create new projections from historical events
Challenges
- Complexity: Steeper learning curve than CRUD
- Event Schema Evolution: Handling event structure changes over time
- Performance: Replaying long event streams can be slow (mitigated with snapshots)
- Storage: Events accumulate over time (but storage is cheap)
- Eventual Consistency: Projections may lag behind event store
- Queries: Complex queries may require multiple projections
Mitigation Strategies
Snapshots: Periodically save aggregate state to avoid replaying all events
type Snapshot struct {
AggregateID string
Version int
State []byte
CreatedAt time.Time
}
Event Versioning: Use upcasters to transform old events to new schema
def upcast_event(event: Event) -> Event:
if event.event_type == "order.created" and event.version == 1:
# Add new required field to old events
event.data["payment_method"] = "legacy"
event.version = 2
return event
CQRS Pattern: Separate write (commands) and read (queries) models for optimal performance
Best Practices for Principal Engineers
- Start Small: Begin with a single bounded context, not the entire system
- Define Event Granularity: Balance between too many fine-grained events and too few coarse events
- Immutable Events: Never modify events once persisted; create new compensating events instead
- Event Naming: Use past tense (OrderCreated, not CreateOrder) to reflect something that happened
- Idempotency: Ensure event handlers can process the same event multiple times safely
- Monitoring: Track event processing lag, projection health, and event store performance
- Testing: Events make testing easier - replay events to verify behavior
Conclusion
Event Sourcing is a powerful pattern for systems where history, auditability, and complex domain logic matter. While it introduces complexity, the benefits of complete audit trails, temporal queries, and natural event-driven integration make it valuable for many enterprise systems. Principal Engineers should consider Event Sourcing for domains with rich business logic, compliance requirements, or where understanding “how we got here” is as important as “where we are.”
Start with CQRS and event-driven architecture before committing fully to Event Sourcing. Once you understand the domain deeply, Event Sourcing provides a robust foundation for building scalable, auditable, and maintainable systems.