Event Sourcing with CQRS Pattern

Event Sourcing with CQRS Pattern

Overview

Event Sourcing (ES) combined with Command Query Responsibility Segregation (CQRS) is an architectural pattern that stores the state of a system as a sequence of events rather than the current state itself. CQRS separates read and write operations into different models, enabling independent optimization of each.

This pattern is particularly powerful for systems requiring strong audit trails, temporal queries, complex business logic, or high scalability needs.

Core Concepts

Event Sourcing

Instead of storing current state in a database, you store a log of all state-changing events:

Traditional: User { id: 1, name: "Alice", email: "alice@example.com", balance: 100 }

Event Sourcing:
1. UserCreated { userId: 1, name: "Alice", email: "alice@example.com" }
2. BalanceDeposited { userId: 1, amount: 100 }
3. EmailChanged { userId: 1, newEmail: "alice@newdomain.com" }

Current state is derived by replaying events from the beginning.

CQRS

Separate your read model (queries) from your write model (commands):

When to Use This Pattern

Good Fit

Poor Fit

Implementation Example (Go)

1. Define Events

package events

import (
    "time"
    "github.com/google/uuid"
)

// BaseEvent contains common fields for all events
type BaseEvent struct {
    EventID     uuid.UUID `json:"event_id"`
    AggregateID uuid.UUID `json:"aggregate_id"`
    EventType   string    `json:"event_type"`
    OccurredAt  time.Time `json:"occurred_at"`
    Version     int       `json:"version"`
}

// AccountCreated event
type AccountCreated struct {
    BaseEvent
    OwnerName string  `json:"owner_name"`
    Currency  string  `json:"currency"`
}

// MoneyDeposited event
type MoneyDeposited struct {
    BaseEvent
    Amount      float64 `json:"amount"`
    Description string  `json:"description"`
}

// MoneyWithdrawn event
type MoneyWithdrawn struct {
    BaseEvent
    Amount      float64 `json:"amount"`
    Description string  `json:"description"`
}

2. Implement Aggregate

package domain

import (
    "errors"
    "github.com/google/uuid"
    "yourapp/events"
)

// Account is our aggregate root
type Account struct {
    id            uuid.UUID
    ownerName     string
    balance       float64
    currency      string
    version       int
    uncommitted   []events.BaseEvent
}

// NewAccount creates a new account
func NewAccount(ownerName, currency string) (*Account, error) {
    if ownerName == "" {
        return nil, errors.New("owner name required")
    }
    
    account := &Account{}
    event := events.AccountCreated{
        BaseEvent: events.BaseEvent{
            EventID:     uuid.New(),
            AggregateID: uuid.New(),
            EventType:   "AccountCreated",
            OccurredAt:  time.Now(),
            Version:     1,
        },
        OwnerName: ownerName,
        Currency:  currency,
    }
    
    account.apply(event)
    account.uncommitted = append(account.uncommitted, event.BaseEvent)
    return account, nil
}

// Deposit adds money to the account
func (a *Account) Deposit(amount float64, description string) error {
    if amount <= 0 {
        return errors.New("deposit amount must be positive")
    }
    
    event := events.MoneyDeposited{
        BaseEvent: events.BaseEvent{
            EventID:     uuid.New(),
            AggregateID: a.id,
            EventType:   "MoneyDeposited",
            OccurredAt:  time.Now(),
            Version:     a.version + 1,
        },
        Amount:      amount,
        Description: description,
    }
    
    a.apply(event)
    a.uncommitted = append(a.uncommitted, event.BaseEvent)
    return nil
}

// Withdraw removes money from the account
func (a *Account) Withdraw(amount float64, description string) error {
    if amount <= 0 {
        return errors.New("withdrawal amount must be positive")
    }
    if a.balance < amount {
        return errors.New("insufficient funds")
    }
    
    event := events.MoneyWithdrawn{
        BaseEvent: events.BaseEvent{
            EventID:     uuid.New(),
            AggregateID: a.id,
            EventType:   "MoneyWithdrawn",
            OccurredAt:  time.Now(),
            Version:     a.version + 1,
        },
        Amount:      amount,
        Description: description,
    }
    
    a.apply(event)
    a.uncommitted = append(a.uncommitted, event.BaseEvent)
    return nil
}

// apply updates internal state based on events
func (a *Account) apply(event interface{}) {
    switch e := event.(type) {
    case events.AccountCreated:
        a.id = e.AggregateID
        a.ownerName = e.OwnerName
        a.currency = e.Currency
        a.balance = 0
        a.version = e.Version
    case events.MoneyDeposited:
        a.balance += e.Amount
        a.version = e.Version
    case events.MoneyWithdrawn:
        a.balance -= e.Amount
        a.version = e.Version
    }
}

// GetUncommittedEvents returns events not yet persisted
func (a *Account) GetUncommittedEvents() []events.BaseEvent {
    return a.uncommitted
}

// MarkEventsCommitted clears uncommitted events
func (a *Account) MarkEventsCommitted() {
    a.uncommitted = nil
}

3. Event Store Interface

package infrastructure

import (
    "context"
    "github.com/google/uuid"
    "yourapp/events"
)

// EventStore defines the interface for event persistence
type EventStore interface {
    // SaveEvents appends events to the store
    SaveEvents(ctx context.Context, aggregateID uuid.UUID, events []events.BaseEvent, expectedVersion int) error
    
    // GetEvents retrieves all events for an aggregate
    GetEvents(ctx context.Context, aggregateID uuid.UUID) ([]events.BaseEvent, error)
    
    // GetEventsSince retrieves events after a specific version
    GetEventsSince(ctx context.Context, aggregateID uuid.UUID, version int) ([]events.BaseEvent, error)
}

4. Read Model Projection (Python)

# projection.py
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime

@dataclass
class AccountSummary:
    """Read model for account summary"""
    account_id: str
    owner_name: str
    current_balance: float
    currency: str
    last_transaction_date: datetime
    transaction_count: int

class AccountProjection:
    """Maintains read model from events"""
    
    def __init__(self):
        self.accounts: Dict[str, AccountSummary] = {}
    
    def handle_event(self, event: Dict):
        """Process an event and update read model"""
        event_type = event['event_type']
        aggregate_id = event['aggregate_id']
        
        if event_type == 'AccountCreated':
            self._handle_account_created(event)
        elif event_type == 'MoneyDeposited':
            self._handle_money_deposited(event)
        elif event_type == 'MoneyWithdrawn':
            self._handle_money_withdrawn(event)
    
    def _handle_account_created(self, event: Dict):
        self.accounts[event['aggregate_id']] = AccountSummary(
            account_id=event['aggregate_id'],
            owner_name=event['owner_name'],
            current_balance=0.0,
            currency=event['currency'],
            last_transaction_date=datetime.fromisoformat(event['occurred_at']),
            transaction_count=0
        )
    
    def _handle_money_deposited(self, event: Dict):
        account = self.accounts[event['aggregate_id']]
        account.current_balance += event['amount']
        account.last_transaction_date = datetime.fromisoformat(event['occurred_at'])
        account.transaction_count += 1
    
    def _handle_money_withdrawn(self, event: Dict):
        account = self.accounts[event['aggregate_id']]
        account.current_balance -= event['amount']
        account.last_transaction_date = datetime.fromisoformat(event['occurred_at'])
        account.transaction_count += 1
    
    def get_account_summary(self, account_id: str) -> AccountSummary:
        """Query method for read model"""
        return self.accounts.get(account_id)
    
    def get_accounts_by_balance_range(self, min_balance: float, max_balance: float) -> List[AccountSummary]:
        """Complex query optimized in read model"""
        return [
            acc for acc in self.accounts.values()
            if min_balance <= acc.current_balance <= max_balance
        ]

5. React Query Component

// AccountDashboard.tsx
import React from 'react';
import { useQuery } from '@tanstack/react-query';
import { AccountSummary } from './types';

interface AccountDashboardProps {
  accountId: string;
}

const AccountDashboard: React.FC<AccountDashboardProps> = ({ accountId }) => {
  // Query read model (eventually consistent)
  const { data: account, isLoading } = useQuery<AccountSummary>({
    queryKey: ['account', accountId],
    queryFn: async () => {
      const response = await fetch(`/api/query/accounts/${accountId}`);
      return response.json();
    },
    // Poll for updates since read model is eventually consistent
    refetchInterval: 2000,
  });

  // Command handler
  const handleDeposit = async (amount: number) => {
    await fetch(`/api/command/accounts/${accountId}/deposit`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ amount, description: 'User deposit' }),
    });
    // Note: UI update will happen via polling/websocket when projection catches up
  };

  if (isLoading) return <div>Loading...</div>;

  return (
    <div className="account-dashboard">
      <h2>{account.ownerName}'s Account</h2>
      <div className="balance">
        Balance: {account.currentBalance} {account.currency}
      </div>
      <div className="stats">
        <span>Transactions: {account.transactionCount}</span>
        <span>Last Activity: {new Date(account.lastTransactionDate).toLocaleDateString()}</span>
      </div>
      <button onClick={() => handleDeposit(100)}>
        Deposit $100
      </button>
    </div>
  );
};

Key Design Decisions

Event Store Technology

Projection Strategy

Snapshots

For aggregates with many events, create periodic snapshots to avoid replaying thousands of events:

type Snapshot struct {
    AggregateID uuid.UUID
    Version     int
    State       []byte // Serialized aggregate state
    CreatedAt   time.Time
}

Replay strategy: Load snapshot, then replay events after snapshot version.

Trade-offs

Advantages

Complete audit trail: Every state change is recorded with metadata
Temporal queries: Can reconstruct state at any point in time
Event replay: Can rebuild read models or create new projections from events
Independent scaling: Scale reads and writes independently
Business insight: Events represent business facts, valuable for analytics
Debugging: Can replay events to reproduce bugs

Disadvantages

Complexity: Significantly more complex than traditional CRUD
Eventual consistency: Read model lags behind write model
Event schema evolution: Changing event structure requires careful migration
Learning curve: Team needs to understand ES/CQRS deeply
Operational overhead: More moving parts to monitor and maintain
Storage: Events accumulate indefinitely (mitigated by snapshotting)

Best Practices

  1. Immutable events: Never modify or delete events; append corrections if needed
  2. Event versioning: Include version in event schema for evolution
  3. Idempotent handlers: Projection handlers should be safe to replay
  4. Bounded contexts: Use ES/CQRS within bounded contexts, not everywhere
  5. Monitoring: Track projection lag, event throughput, and replay times
  6. Testing: Test aggregates by given events, when command, then expect events

Real-World Applications

Conclusion

Event Sourcing with CQRS is a powerful pattern for systems requiring strong auditability, complex domain logic, or independent read/write scaling. However, it introduces significant complexity and eventual consistency challenges. Use it when the benefits clearly outweigh the costs, and ensure your team has the expertise to implement it correctly.

Start with traditional architecture and migrate to ES/CQRS when you have clear requirements that justify the complexity.