CQRS with Event Sourcing: Separating Reads and Writes for Scalable Systems

CQRS with Event Sourcing: Separating Reads and Writes for Scalable Systems

What is CQRS?

Command Query Responsibility Segregation (CQRS) is an architectural pattern that separates read operations (queries) from write operations (commands). When combined with Event Sourcing, it creates a powerful foundation for building scalable, auditable, and eventually consistent systems.

Core Principles:

When to Use CQRS + Event Sourcing

Ideal Use Cases

When NOT to Use

Architecture Overview

┌─────────────┐         ┌──────────────┐         ┌─────────────┐
│   Client    │         │   API Layer  │         │  Command    │
│             ├────────>│              ├────────>│  Handler    │
│             │ Command │              │         │             │
└─────────────┘         └──────────────┘         └──────┬──────┘
                        ┌────────────────────────────────────────┐
                        │         Event Store                    │
                        │  (Append-only immutable events)        │
                        └──────────┬─────────────────────────────┘
                        ┌────────────────────┐
                        │   Event Stream     │
                        │   (Pub/Sub)        │
                        └──────────┬─────────┘
                        ┌──────────┴──────────┐
                        ▼                     ▼
                ┌──────────────┐      ┌──────────────┐
                │  Read Model  │      │  Read Model  │
                │  Projection  │      │  Projection  │
                │      #1      │      │      #2      │
                └──────┬───────┘      └──────┬───────┘
                       │                     │
                       ▼                     ▼
                ┌──────────────┐      ┌──────────────┐
                │  Read Store  │      │  Read Store  │
                │  (Postgres)  │      │  (Redis)     │
                └──────────────┘      └──────────────┘
                       │                     │
                       └──────────┬──────────┘
                        ┌──────────────────┐
                        │   Query Handler  │
                        └────────┬─────────┘
                        ┌──────────────────┐
                        │    Client        │
                        └──────────────────┘

Implementation: Go Example

Event Sourcing Foundation

package eventsourcing

import (
    "encoding/json"
    "time"
    "github.com/google/uuid"
)

// Event represents an immutable state change
type Event interface {
    GetAggregateID() string
    GetEventType() string
    GetVersion() int64
    GetTimestamp() time.Time
    GetData() interface{}
}

// BaseEvent provides common event fields
type BaseEvent struct {
    AggregateID string      `json:"aggregate_id"`
    EventType   string      `json:"event_type"`
    Version     int64       `json:"version"`
    Timestamp   time.Time   `json:"timestamp"`
    EventID     string      `json:"event_id"`
}

func (e BaseEvent) GetAggregateID() string { return e.AggregateID }
func (e BaseEvent) GetEventType() string   { return e.EventType }
func (e BaseEvent) GetVersion() int64      { return e.Version }
func (e BaseEvent) GetTimestamp() time.Time { return e.Timestamp }

// Domain Events for Account Aggregate
type AccountCreatedEvent struct {
    BaseEvent
    AccountID    string  `json:"account_id"`
    OwnerID      string  `json:"owner_id"`
    InitialBalance float64 `json:"initial_balance"`
}

type MoneyDepositedEvent struct {
    BaseEvent
    Amount float64 `json:"amount"`
}

type MoneyWithdrawnEvent struct {
    BaseEvent
    Amount float64 `json:"amount"`
}

// EventStore interface for persistence
type EventStore interface {
    SaveEvents(aggregateID string, events []Event, expectedVersion int64) error
    LoadEvents(aggregateID string) ([]Event, error)
    Subscribe(eventTypes []string, handler EventHandler) error
}

type EventHandler func(event Event) error

Command Handler with Aggregate

package account

import (
    "errors"
    "github.com/google/uuid"
    "time"
)

// Account aggregate
type Account struct {
    id         string
    balance    float64
    version    int64
    changes    []Event
}

// Command definitions
type CreateAccountCommand struct {
    OwnerID        string
    InitialBalance float64
}

type DepositMoneyCommand struct {
    AccountID string
    Amount    float64
}

type WithdrawMoneyCommand struct {
    AccountID string
    Amount    float64
}

// Command handlers
func (a *Account) Handle(cmd interface{}) error {
    switch c := cmd.(type) {
    case CreateAccountCommand:
        return a.handleCreateAccount(c)
    case DepositMoneyCommand:
        return a.handleDepositMoney(c)
    case WithdrawMoneyCommand:
        return a.handleWithdrawMoney(c)
    default:
        return errors.New("unknown command")
    }
}

func (a *Account) handleCreateAccount(cmd CreateAccountCommand) error {
    if a.id != "" {
        return errors.New("account already exists")
    }
    
    event := AccountCreatedEvent{
        BaseEvent: BaseEvent{
            AggregateID: uuid.New().String(),
            EventType:   "AccountCreated",
            Version:     1,
            Timestamp:   time.Now(),
            EventID:     uuid.New().String(),
        },
        AccountID:      uuid.New().String(),
        OwnerID:        cmd.OwnerID,
        InitialBalance: cmd.InitialBalance,
    }
    
    a.apply(event)
    return nil
}

func (a *Account) handleDepositMoney(cmd DepositMoneyCommand) error {
    if cmd.Amount <= 0 {
        return errors.New("deposit amount must be positive")
    }
    
    event := MoneyDepositedEvent{
        BaseEvent: BaseEvent{
            AggregateID: a.id,
            EventType:   "MoneyDeposited",
            Version:     a.version + 1,
            Timestamp:   time.Now(),
            EventID:     uuid.New().String(),
        },
        Amount: cmd.Amount,
    }
    
    a.apply(event)
    return nil
}

func (a *Account) handleWithdrawMoney(cmd WithdrawMoneyCommand) error {
    if cmd.Amount <= 0 {
        return errors.New("withdrawal amount must be positive")
    }
    if a.balance < cmd.Amount {
        return errors.New("insufficient funds")
    }
    
    event := MoneyWithdrawnEvent{
        BaseEvent: BaseEvent{
            AggregateID: a.id,
            EventType:   "MoneyWithdrawn",
            Version:     a.version + 1,
            Timestamp:   time.Now(),
            EventID:     uuid.New().String(),
        },
        Amount: cmd.Amount,
    }
    
    a.apply(event)
    return nil
}

// Apply events to update aggregate state
func (a *Account) apply(event Event) {
    switch e := event.(type) {
    case AccountCreatedEvent:
        a.id = e.AccountID
        a.balance = e.InitialBalance
        a.version = e.Version
    case MoneyDepositedEvent:
        a.balance += e.Amount
        a.version = e.Version
    case MoneyWithdrawnEvent:
        a.balance -= e.Amount
        a.version = e.Version
    }
    
    a.changes = append(a.changes, event)
}

// GetUncommittedChanges returns events that haven't been persisted
func (a *Account) GetUncommittedChanges() []Event {
    return a.changes
}

// MarkChangesAsCommitted clears the uncommitted changes
func (a *Account) MarkChangesAsCommitted() {
    a.changes = []Event{}
}

Read Model Projection (Python Example)

from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List
import asyncpg
import json

@dataclass
class AccountReadModel:
    account_id: str
    owner_id: str
    current_balance: float
    total_deposits: float
    total_withdrawals: float
    transaction_count: int
    last_transaction: datetime
    created_at: datetime

class AccountProjection:
    """Projects events into optimized read model for queries"""
    
    def __init__(self, db_pool: asyncpg.Pool):
        self.db = db_pool
        
    async def handle_event(self, event: Dict):
        """Handle incoming events from event stream"""
        event_type = event['event_type']
        
        handlers = {
            'AccountCreated': self._handle_account_created,
            'MoneyDeposited': self._handle_money_deposited,
            'MoneyWithdrawn': self._handle_money_withdrawn,
        }
        
        handler = handlers.get(event_type)
        if handler:
            await handler(event)
    
    async def _handle_account_created(self, event: Dict):
        """Create initial read model entry"""
        async with self.db.acquire() as conn:
            await conn.execute("""
                INSERT INTO account_read_model 
                (account_id, owner_id, current_balance, total_deposits, 
                 total_withdrawals, transaction_count, last_transaction, created_at)
                VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
            """, 
                event['account_id'],
                event['owner_id'],
                event['initial_balance'],
                event['initial_balance'],
                0.0,
                1,
                event['timestamp'],
                event['timestamp']
            )
    
    async def _handle_money_deposited(self, event: Dict):
        """Update read model for deposit"""
        async with self.db.acquire() as conn:
            await conn.execute("""
                UPDATE account_read_model 
                SET current_balance = current_balance + $1,
                    total_deposits = total_deposits + $1,
                    transaction_count = transaction_count + 1,
                    last_transaction = $2
                WHERE account_id = $3
            """, 
                event['amount'],
                event['timestamp'],
                event['aggregate_id']
            )
    
    async def _handle_money_withdrawn(self, event: Dict):
        """Update read model for withdrawal"""
        async with self.db.acquire() as conn:
            await conn.execute("""
                UPDATE account_read_model 
                SET current_balance = current_balance - $1,
                    total_withdrawals = total_withdrawals + $1,
                    transaction_count = transaction_count + 1,
                    last_transaction = $2
                WHERE account_id = $3
            """, 
                event['amount'],
                event['timestamp'],
                event['aggregate_id']
            )
    
    async def get_account(self, account_id: str) -> AccountReadModel:
        """Query optimized read model"""
        async with self.db.acquire() as conn:
            row = await conn.fetchrow("""
                SELECT * FROM account_read_model 
                WHERE account_id = $1
            """, account_id)
            
            if row:
                return AccountReadModel(**dict(row))
            return None
    
    async def get_high_value_accounts(self, min_balance: float) -> List[AccountReadModel]:
        """Optimized query for specific use case"""
        async with self.db.acquire() as conn:
            rows = await conn.fetch("""
                SELECT * FROM account_read_model 
                WHERE current_balance >= $1
                ORDER BY current_balance DESC
                LIMIT 100
            """, min_balance)
            
            return [AccountReadModel(**dict(row)) for row in rows]

ReactJS Frontend Integration

// React hooks for CQRS integration
import { useState, useEffect } from 'react';
import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query';

interface Account {
  accountId: string;
  ownerId: string;
  currentBalance: number;
  totalDeposits: number;
  totalWithdrawals: number;
  transactionCount: number;
  lastTransaction: string;
}

// Command API (writes)
const commandAPI = {
  depositMoney: async (accountId: string, amount: number) => {
    const response = await fetch('/api/commands/deposit', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ accountId, amount }),
    });
    
    if (!response.ok) throw new Error('Deposit failed');
    return response.json(); // Returns command acknowledgment
  },
  
  withdrawMoney: async (accountId: string, amount: number) => {
    const response = await fetch('/api/commands/withdraw', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ accountId, amount }),
    });
    
    if (!response.ok) throw new Error('Withdrawal failed');
    return response.json();
  },
};

// Query API (reads)
const queryAPI = {
  getAccount: async (accountId: string): Promise<Account> => {
    const response = await fetch(`/api/queries/accounts/${accountId}`);
    if (!response.ok) throw new Error('Failed to fetch account');
    return response.json();
  },
};

// React component using CQRS pattern
export function AccountDashboard({ accountId }: { accountId: string }) {
  const queryClient = useQueryClient();
  
  // Query for read model
  const { data: account, isLoading } = useQuery({
    queryKey: ['account', accountId],
    queryFn: () => queryAPI.getAccount(accountId),
    refetchInterval: 2000, // Poll for eventual consistency updates
  });
  
  // Command mutations
  const depositMutation = useMutation({
    mutationFn: ({ amount }: { amount: number }) => 
      commandAPI.depositMoney(accountId, amount),
    onSuccess: () => {
      // Optimistically invalidate and refetch
      queryClient.invalidateQueries({ queryKey: ['account', accountId] });
    },
  });
  
  const withdrawMutation = useMutation({
    mutationFn: ({ amount }: { amount: number }) => 
      commandAPI.withdrawMoney(accountId, amount),
    onSuccess: () => {
      queryClient.invalidateQueries({ queryKey: ['account', accountId] });
    },
  });
  
  if (isLoading) return <div>Loading...</div>;
  if (!account) return <div>Account not found</div>;
  
  return (
    <div className="account-dashboard">
      <h2>Account Balance: ${account.currentBalance.toFixed(2)}</h2>
      <p>Total Deposits: ${account.totalDeposits.toFixed(2)}</p>
      <p>Total Withdrawals: ${account.totalWithdrawals.toFixed(2)}</p>
      <p>Transaction Count: {account.transactionCount}</p>
      
      <button 
        onClick={() => depositMutation.mutate({ amount: 100 })}
        disabled={depositMutation.isPending}
      >
        Deposit $100
      </button>
      
      <button 
        onClick={() => withdrawMutation.mutate({ amount: 50 })}
        disabled={withdrawMutation.isPending}
      >
        Withdraw $50
      </button>
      
      {depositMutation.isPending && <p>Processing deposit...</p>}
      {withdrawMutation.isPending && <p>Processing withdrawal...</p>}
    </div>
  );
}

Trade-offs and Considerations

Advantages

Independent scaling - Scale reads and writes separately based on load ✅ Complete audit trail - Every state change captured as immutable event ✅ Temporal queries - Reconstruct state at any point in time ✅ Multiple read models - Optimize different query patterns independently ✅ Event-driven integration - Easy to add new projections and subscribers ✅ Domain model focus - Commands encode business intent clearly

Disadvantages

Operational complexity - More moving parts (event store, projections, message bus) ❌ Eventual consistency - Read models lag behind write model ❌ Debugging difficulty - Distributed system with async event processing ❌ Storage overhead - Event store grows indefinitely (requires snapshotting) ❌ Team learning curve - Requires shift in thinking from CRUD mentality ❌ Tooling maturity - Less mature ecosystem compared to traditional ORM approaches

Mitigating Challenges

For Eventual Consistency:

For Operational Complexity:

For Storage Growth:

Conclusion

CQRS with Event Sourcing is a powerful pattern for complex, scalable systems where auditability and flexible querying are critical. It’s particularly well-suited for financial systems, collaborative applications, and domains with complex business logic.

However, it introduces significant complexity. Only adopt CQRS when you have clear requirements that justify the trade-offs: high read/write asymmetry, need for audit trails, complex temporal queries, or independent scaling needs.

Start simple with separated command and query endpoints, then evolve to full event sourcing as requirements demand. The pattern is most successful when the entire team understands event-driven thinking and has operational maturity to handle distributed systems.