CQRS with Event Sourcing: Separating Reads and Writes for Scalable Systems
CQRS with Event Sourcing: Separating Reads and Writes for Scalable Systems
What is CQRS?
Command Query Responsibility Segregation (CQRS) is an architectural pattern that separates read operations (queries) from write operations (commands). When combined with Event Sourcing, it creates a powerful foundation for building scalable, auditable, and eventually consistent systems.
Core Principles:
- Commands mutate state and return acknowledgment
- Queries return data without modifying state
- Different models for reading and writing
- Event Sourcing stores state changes as immutable events
When to Use CQRS + Event Sourcing
Ideal Use Cases
- Financial systems requiring complete audit trails
- Collaborative applications with complex conflict resolution
- High read/write asymmetry (10:1 or higher read-to-write ratio)
- Complex business logic where state transitions matter
- Temporal queries needing historical state reconstruction
- Scalability requirements with independent read/write scaling
When NOT to Use
- Simple CRUD applications with balanced read/write patterns
- Systems without complex business logic
- Applications where eventual consistency is unacceptable
- Small-scale projects where operational complexity outweighs benefits
- Teams unfamiliar with event-driven architecture (steep learning curve)
Architecture Overview
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Client │ │ API Layer │ │ Command │
│ ├────────>│ ├────────>│ Handler │
│ │ Command │ │ │ │
└─────────────┘ └──────────────┘ └──────┬──────┘
│
▼
┌────────────────────────────────────────┐
│ Event Store │
│ (Append-only immutable events) │
└──────────┬─────────────────────────────┘
│
▼
┌────────────────────┐
│ Event Stream │
│ (Pub/Sub) │
└──────────┬─────────┘
│
┌──────────┴──────────┐
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Read Model │ │ Read Model │
│ Projection │ │ Projection │
│ #1 │ │ #2 │
└──────┬───────┘ └──────┬───────┘
│ │
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Read Store │ │ Read Store │
│ (Postgres) │ │ (Redis) │
└──────────────┘ └──────────────┘
│ │
└──────────┬──────────┘
▼
┌──────────────────┐
│ Query Handler │
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Client │
└──────────────────┘
Implementation: Go Example
Event Sourcing Foundation
package eventsourcing
import (
"encoding/json"
"time"
"github.com/google/uuid"
)
// Event represents an immutable state change
type Event interface {
GetAggregateID() string
GetEventType() string
GetVersion() int64
GetTimestamp() time.Time
GetData() interface{}
}
// BaseEvent provides common event fields
type BaseEvent struct {
AggregateID string `json:"aggregate_id"`
EventType string `json:"event_type"`
Version int64 `json:"version"`
Timestamp time.Time `json:"timestamp"`
EventID string `json:"event_id"`
}
func (e BaseEvent) GetAggregateID() string { return e.AggregateID }
func (e BaseEvent) GetEventType() string { return e.EventType }
func (e BaseEvent) GetVersion() int64 { return e.Version }
func (e BaseEvent) GetTimestamp() time.Time { return e.Timestamp }
// Domain Events for Account Aggregate
type AccountCreatedEvent struct {
BaseEvent
AccountID string `json:"account_id"`
OwnerID string `json:"owner_id"`
InitialBalance float64 `json:"initial_balance"`
}
type MoneyDepositedEvent struct {
BaseEvent
Amount float64 `json:"amount"`
}
type MoneyWithdrawnEvent struct {
BaseEvent
Amount float64 `json:"amount"`
}
// EventStore interface for persistence
type EventStore interface {
SaveEvents(aggregateID string, events []Event, expectedVersion int64) error
LoadEvents(aggregateID string) ([]Event, error)
Subscribe(eventTypes []string, handler EventHandler) error
}
type EventHandler func(event Event) error
Command Handler with Aggregate
package account
import (
"errors"
"github.com/google/uuid"
"time"
)
// Account aggregate
type Account struct {
id string
balance float64
version int64
changes []Event
}
// Command definitions
type CreateAccountCommand struct {
OwnerID string
InitialBalance float64
}
type DepositMoneyCommand struct {
AccountID string
Amount float64
}
type WithdrawMoneyCommand struct {
AccountID string
Amount float64
}
// Command handlers
func (a *Account) Handle(cmd interface{}) error {
switch c := cmd.(type) {
case CreateAccountCommand:
return a.handleCreateAccount(c)
case DepositMoneyCommand:
return a.handleDepositMoney(c)
case WithdrawMoneyCommand:
return a.handleWithdrawMoney(c)
default:
return errors.New("unknown command")
}
}
func (a *Account) handleCreateAccount(cmd CreateAccountCommand) error {
if a.id != "" {
return errors.New("account already exists")
}
event := AccountCreatedEvent{
BaseEvent: BaseEvent{
AggregateID: uuid.New().String(),
EventType: "AccountCreated",
Version: 1,
Timestamp: time.Now(),
EventID: uuid.New().String(),
},
AccountID: uuid.New().String(),
OwnerID: cmd.OwnerID,
InitialBalance: cmd.InitialBalance,
}
a.apply(event)
return nil
}
func (a *Account) handleDepositMoney(cmd DepositMoneyCommand) error {
if cmd.Amount <= 0 {
return errors.New("deposit amount must be positive")
}
event := MoneyDepositedEvent{
BaseEvent: BaseEvent{
AggregateID: a.id,
EventType: "MoneyDeposited",
Version: a.version + 1,
Timestamp: time.Now(),
EventID: uuid.New().String(),
},
Amount: cmd.Amount,
}
a.apply(event)
return nil
}
func (a *Account) handleWithdrawMoney(cmd WithdrawMoneyCommand) error {
if cmd.Amount <= 0 {
return errors.New("withdrawal amount must be positive")
}
if a.balance < cmd.Amount {
return errors.New("insufficient funds")
}
event := MoneyWithdrawnEvent{
BaseEvent: BaseEvent{
AggregateID: a.id,
EventType: "MoneyWithdrawn",
Version: a.version + 1,
Timestamp: time.Now(),
EventID: uuid.New().String(),
},
Amount: cmd.Amount,
}
a.apply(event)
return nil
}
// Apply events to update aggregate state
func (a *Account) apply(event Event) {
switch e := event.(type) {
case AccountCreatedEvent:
a.id = e.AccountID
a.balance = e.InitialBalance
a.version = e.Version
case MoneyDepositedEvent:
a.balance += e.Amount
a.version = e.Version
case MoneyWithdrawnEvent:
a.balance -= e.Amount
a.version = e.Version
}
a.changes = append(a.changes, event)
}
// GetUncommittedChanges returns events that haven't been persisted
func (a *Account) GetUncommittedChanges() []Event {
return a.changes
}
// MarkChangesAsCommitted clears the uncommitted changes
func (a *Account) MarkChangesAsCommitted() {
a.changes = []Event{}
}
Read Model Projection (Python Example)
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List
import asyncpg
import json
@dataclass
class AccountReadModel:
account_id: str
owner_id: str
current_balance: float
total_deposits: float
total_withdrawals: float
transaction_count: int
last_transaction: datetime
created_at: datetime
class AccountProjection:
"""Projects events into optimized read model for queries"""
def __init__(self, db_pool: asyncpg.Pool):
self.db = db_pool
async def handle_event(self, event: Dict):
"""Handle incoming events from event stream"""
event_type = event['event_type']
handlers = {
'AccountCreated': self._handle_account_created,
'MoneyDeposited': self._handle_money_deposited,
'MoneyWithdrawn': self._handle_money_withdrawn,
}
handler = handlers.get(event_type)
if handler:
await handler(event)
async def _handle_account_created(self, event: Dict):
"""Create initial read model entry"""
async with self.db.acquire() as conn:
await conn.execute("""
INSERT INTO account_read_model
(account_id, owner_id, current_balance, total_deposits,
total_withdrawals, transaction_count, last_transaction, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
""",
event['account_id'],
event['owner_id'],
event['initial_balance'],
event['initial_balance'],
0.0,
1,
event['timestamp'],
event['timestamp']
)
async def _handle_money_deposited(self, event: Dict):
"""Update read model for deposit"""
async with self.db.acquire() as conn:
await conn.execute("""
UPDATE account_read_model
SET current_balance = current_balance + $1,
total_deposits = total_deposits + $1,
transaction_count = transaction_count + 1,
last_transaction = $2
WHERE account_id = $3
""",
event['amount'],
event['timestamp'],
event['aggregate_id']
)
async def _handle_money_withdrawn(self, event: Dict):
"""Update read model for withdrawal"""
async with self.db.acquire() as conn:
await conn.execute("""
UPDATE account_read_model
SET current_balance = current_balance - $1,
total_withdrawals = total_withdrawals + $1,
transaction_count = transaction_count + 1,
last_transaction = $2
WHERE account_id = $3
""",
event['amount'],
event['timestamp'],
event['aggregate_id']
)
async def get_account(self, account_id: str) -> AccountReadModel:
"""Query optimized read model"""
async with self.db.acquire() as conn:
row = await conn.fetchrow("""
SELECT * FROM account_read_model
WHERE account_id = $1
""", account_id)
if row:
return AccountReadModel(**dict(row))
return None
async def get_high_value_accounts(self, min_balance: float) -> List[AccountReadModel]:
"""Optimized query for specific use case"""
async with self.db.acquire() as conn:
rows = await conn.fetch("""
SELECT * FROM account_read_model
WHERE current_balance >= $1
ORDER BY current_balance DESC
LIMIT 100
""", min_balance)
return [AccountReadModel(**dict(row)) for row in rows]
ReactJS Frontend Integration
// React hooks for CQRS integration
import { useState, useEffect } from 'react';
import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query';
interface Account {
accountId: string;
ownerId: string;
currentBalance: number;
totalDeposits: number;
totalWithdrawals: number;
transactionCount: number;
lastTransaction: string;
}
// Command API (writes)
const commandAPI = {
depositMoney: async (accountId: string, amount: number) => {
const response = await fetch('/api/commands/deposit', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ accountId, amount }),
});
if (!response.ok) throw new Error('Deposit failed');
return response.json(); // Returns command acknowledgment
},
withdrawMoney: async (accountId: string, amount: number) => {
const response = await fetch('/api/commands/withdraw', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ accountId, amount }),
});
if (!response.ok) throw new Error('Withdrawal failed');
return response.json();
},
};
// Query API (reads)
const queryAPI = {
getAccount: async (accountId: string): Promise<Account> => {
const response = await fetch(`/api/queries/accounts/${accountId}`);
if (!response.ok) throw new Error('Failed to fetch account');
return response.json();
},
};
// React component using CQRS pattern
export function AccountDashboard({ accountId }: { accountId: string }) {
const queryClient = useQueryClient();
// Query for read model
const { data: account, isLoading } = useQuery({
queryKey: ['account', accountId],
queryFn: () => queryAPI.getAccount(accountId),
refetchInterval: 2000, // Poll for eventual consistency updates
});
// Command mutations
const depositMutation = useMutation({
mutationFn: ({ amount }: { amount: number }) =>
commandAPI.depositMoney(accountId, amount),
onSuccess: () => {
// Optimistically invalidate and refetch
queryClient.invalidateQueries({ queryKey: ['account', accountId] });
},
});
const withdrawMutation = useMutation({
mutationFn: ({ amount }: { amount: number }) =>
commandAPI.withdrawMoney(accountId, amount),
onSuccess: () => {
queryClient.invalidateQueries({ queryKey: ['account', accountId] });
},
});
if (isLoading) return <div>Loading...</div>;
if (!account) return <div>Account not found</div>;
return (
<div className="account-dashboard">
<h2>Account Balance: ${account.currentBalance.toFixed(2)}</h2>
<p>Total Deposits: ${account.totalDeposits.toFixed(2)}</p>
<p>Total Withdrawals: ${account.totalWithdrawals.toFixed(2)}</p>
<p>Transaction Count: {account.transactionCount}</p>
<button
onClick={() => depositMutation.mutate({ amount: 100 })}
disabled={depositMutation.isPending}
>
Deposit $100
</button>
<button
onClick={() => withdrawMutation.mutate({ amount: 50 })}
disabled={withdrawMutation.isPending}
>
Withdraw $50
</button>
{depositMutation.isPending && <p>Processing deposit...</p>}
{withdrawMutation.isPending && <p>Processing withdrawal...</p>}
</div>
);
}
Trade-offs and Considerations
Advantages
✅ Independent scaling - Scale reads and writes separately based on load ✅ Complete audit trail - Every state change captured as immutable event ✅ Temporal queries - Reconstruct state at any point in time ✅ Multiple read models - Optimize different query patterns independently ✅ Event-driven integration - Easy to add new projections and subscribers ✅ Domain model focus - Commands encode business intent clearly
Disadvantages
❌ Operational complexity - More moving parts (event store, projections, message bus) ❌ Eventual consistency - Read models lag behind write model ❌ Debugging difficulty - Distributed system with async event processing ❌ Storage overhead - Event store grows indefinitely (requires snapshotting) ❌ Team learning curve - Requires shift in thinking from CRUD mentality ❌ Tooling maturity - Less mature ecosystem compared to traditional ORM approaches
Mitigating Challenges
For Eventual Consistency:
- Use command acknowledgments to show “processing” state in UI
- Implement optimistic updates with rollback on failure
- Set appropriate expectations with users about data freshness
- Use version numbers to detect conflicts
For Operational Complexity:
- Start with single-node event store (PostgreSQL with append-only table)
- Add message bus only when needed (can start with polling)
- Use managed services (e.g., AWS EventBridge, Google Pub/Sub)
- Implement comprehensive monitoring and alerting
For Storage Growth:
- Implement snapshot strategy (save full state periodically)
- Archive old events to cold storage
- Set retention policies based on compliance needs
- Use event store compaction for aggregates
Conclusion
CQRS with Event Sourcing is a powerful pattern for complex, scalable systems where auditability and flexible querying are critical. It’s particularly well-suited for financial systems, collaborative applications, and domains with complex business logic.
However, it introduces significant complexity. Only adopt CQRS when you have clear requirements that justify the trade-offs: high read/write asymmetry, need for audit trails, complex temporal queries, or independent scaling needs.
Start simple with separated command and query endpoints, then evolve to full event sourcing as requirements demand. The pattern is most successful when the entire team understands event-driven thinking and has operational maturity to handle distributed systems.