Event Sourcing with CQRS Pattern
Event Sourcing with CQRS Pattern
Overview
Event Sourcing (ES) combined with Command Query Responsibility Segregation (CQRS) is an architectural pattern that stores the state of a system as a sequence of events rather than the current state itself. CQRS separates read and write operations into different models, enabling independent optimization of each.
This pattern is particularly powerful for systems requiring strong audit trails, temporal queries, complex business logic, or high scalability needs.
Core Concepts
Event Sourcing
Instead of storing current state in a database, you store a log of all state-changing events:
Traditional: User { id: 1, name: "Alice", email: "alice@example.com", balance: 100 }
Event Sourcing:
1. UserCreated { userId: 1, name: "Alice", email: "alice@example.com" }
2. BalanceDeposited { userId: 1, amount: 100 }
3. EmailChanged { userId: 1, newEmail: "alice@newdomain.com" }
Current state is derived by replaying events from the beginning.
CQRS
Separate your read model (queries) from your write model (commands):
- Command Model: Validates business rules and produces events
- Query Model: Optimized read-only projections of event data
- Event Store: Source of truth containing all events
When to Use This Pattern
Good Fit
- Financial systems: Every transaction must be auditable
- Collaborative applications: Need to show history of changes and who made them
- Complex domain logic: Business rules that benefit from event-based thinking
- Temporal queries: “What was the state at time T?” or “How did we get here?”
- High-scale reads: Read and write loads are very different and need independent scaling
Poor Fit
- Simple CRUD applications: Overhead not justified
- Eventual consistency is unacceptable: CQRS introduces eventual consistency between write and read models
- Small teams without ES experience: Significant learning curve and complexity
- Reporting is simple: Standard database indexes suffice
Implementation Example (Go)
1. Define Events
package events
import (
"time"
"github.com/google/uuid"
)
// BaseEvent contains common fields for all events
type BaseEvent struct {
EventID uuid.UUID `json:"event_id"`
AggregateID uuid.UUID `json:"aggregate_id"`
EventType string `json:"event_type"`
OccurredAt time.Time `json:"occurred_at"`
Version int `json:"version"`
}
// AccountCreated event
type AccountCreated struct {
BaseEvent
OwnerName string `json:"owner_name"`
Currency string `json:"currency"`
}
// MoneyDeposited event
type MoneyDeposited struct {
BaseEvent
Amount float64 `json:"amount"`
Description string `json:"description"`
}
// MoneyWithdrawn event
type MoneyWithdrawn struct {
BaseEvent
Amount float64 `json:"amount"`
Description string `json:"description"`
}
2. Implement Aggregate
package domain
import (
"errors"
"github.com/google/uuid"
"yourapp/events"
)
// Account is our aggregate root
type Account struct {
id uuid.UUID
ownerName string
balance float64
currency string
version int
uncommitted []events.BaseEvent
}
// NewAccount creates a new account
func NewAccount(ownerName, currency string) (*Account, error) {
if ownerName == "" {
return nil, errors.New("owner name required")
}
account := &Account{}
event := events.AccountCreated{
BaseEvent: events.BaseEvent{
EventID: uuid.New(),
AggregateID: uuid.New(),
EventType: "AccountCreated",
OccurredAt: time.Now(),
Version: 1,
},
OwnerName: ownerName,
Currency: currency,
}
account.apply(event)
account.uncommitted = append(account.uncommitted, event.BaseEvent)
return account, nil
}
// Deposit adds money to the account
func (a *Account) Deposit(amount float64, description string) error {
if amount <= 0 {
return errors.New("deposit amount must be positive")
}
event := events.MoneyDeposited{
BaseEvent: events.BaseEvent{
EventID: uuid.New(),
AggregateID: a.id,
EventType: "MoneyDeposited",
OccurredAt: time.Now(),
Version: a.version + 1,
},
Amount: amount,
Description: description,
}
a.apply(event)
a.uncommitted = append(a.uncommitted, event.BaseEvent)
return nil
}
// Withdraw removes money from the account
func (a *Account) Withdraw(amount float64, description string) error {
if amount <= 0 {
return errors.New("withdrawal amount must be positive")
}
if a.balance < amount {
return errors.New("insufficient funds")
}
event := events.MoneyWithdrawn{
BaseEvent: events.BaseEvent{
EventID: uuid.New(),
AggregateID: a.id,
EventType: "MoneyWithdrawn",
OccurredAt: time.Now(),
Version: a.version + 1,
},
Amount: amount,
Description: description,
}
a.apply(event)
a.uncommitted = append(a.uncommitted, event.BaseEvent)
return nil
}
// apply updates internal state based on events
func (a *Account) apply(event interface{}) {
switch e := event.(type) {
case events.AccountCreated:
a.id = e.AggregateID
a.ownerName = e.OwnerName
a.currency = e.Currency
a.balance = 0
a.version = e.Version
case events.MoneyDeposited:
a.balance += e.Amount
a.version = e.Version
case events.MoneyWithdrawn:
a.balance -= e.Amount
a.version = e.Version
}
}
// GetUncommittedEvents returns events not yet persisted
func (a *Account) GetUncommittedEvents() []events.BaseEvent {
return a.uncommitted
}
// MarkEventsCommitted clears uncommitted events
func (a *Account) MarkEventsCommitted() {
a.uncommitted = nil
}
3. Event Store Interface
package infrastructure
import (
"context"
"github.com/google/uuid"
"yourapp/events"
)
// EventStore defines the interface for event persistence
type EventStore interface {
// SaveEvents appends events to the store
SaveEvents(ctx context.Context, aggregateID uuid.UUID, events []events.BaseEvent, expectedVersion int) error
// GetEvents retrieves all events for an aggregate
GetEvents(ctx context.Context, aggregateID uuid.UUID) ([]events.BaseEvent, error)
// GetEventsSince retrieves events after a specific version
GetEventsSince(ctx context.Context, aggregateID uuid.UUID, version int) ([]events.BaseEvent, error)
}
4. Read Model Projection (Python)
# projection.py
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class AccountSummary:
"""Read model for account summary"""
account_id: str
owner_name: str
current_balance: float
currency: str
last_transaction_date: datetime
transaction_count: int
class AccountProjection:
"""Maintains read model from events"""
def __init__(self):
self.accounts: Dict[str, AccountSummary] = {}
def handle_event(self, event: Dict):
"""Process an event and update read model"""
event_type = event['event_type']
aggregate_id = event['aggregate_id']
if event_type == 'AccountCreated':
self._handle_account_created(event)
elif event_type == 'MoneyDeposited':
self._handle_money_deposited(event)
elif event_type == 'MoneyWithdrawn':
self._handle_money_withdrawn(event)
def _handle_account_created(self, event: Dict):
self.accounts[event['aggregate_id']] = AccountSummary(
account_id=event['aggregate_id'],
owner_name=event['owner_name'],
current_balance=0.0,
currency=event['currency'],
last_transaction_date=datetime.fromisoformat(event['occurred_at']),
transaction_count=0
)
def _handle_money_deposited(self, event: Dict):
account = self.accounts[event['aggregate_id']]
account.current_balance += event['amount']
account.last_transaction_date = datetime.fromisoformat(event['occurred_at'])
account.transaction_count += 1
def _handle_money_withdrawn(self, event: Dict):
account = self.accounts[event['aggregate_id']]
account.current_balance -= event['amount']
account.last_transaction_date = datetime.fromisoformat(event['occurred_at'])
account.transaction_count += 1
def get_account_summary(self, account_id: str) -> AccountSummary:
"""Query method for read model"""
return self.accounts.get(account_id)
def get_accounts_by_balance_range(self, min_balance: float, max_balance: float) -> List[AccountSummary]:
"""Complex query optimized in read model"""
return [
acc for acc in self.accounts.values()
if min_balance <= acc.current_balance <= max_balance
]
5. React Query Component
// AccountDashboard.tsx
import React from 'react';
import { useQuery } from '@tanstack/react-query';
import { AccountSummary } from './types';
interface AccountDashboardProps {
accountId: string;
}
const AccountDashboard: React.FC<AccountDashboardProps> = ({ accountId }) => {
// Query read model (eventually consistent)
const { data: account, isLoading } = useQuery<AccountSummary>({
queryKey: ['account', accountId],
queryFn: async () => {
const response = await fetch(`/api/query/accounts/${accountId}`);
return response.json();
},
// Poll for updates since read model is eventually consistent
refetchInterval: 2000,
});
// Command handler
const handleDeposit = async (amount: number) => {
await fetch(`/api/command/accounts/${accountId}/deposit`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ amount, description: 'User deposit' }),
});
// Note: UI update will happen via polling/websocket when projection catches up
};
if (isLoading) return <div>Loading...</div>;
return (
<div className="account-dashboard">
<h2>{account.ownerName}'s Account</h2>
<div className="balance">
Balance: {account.currentBalance} {account.currency}
</div>
<div className="stats">
<span>Transactions: {account.transactionCount}</span>
<span>Last Activity: {new Date(account.lastTransactionDate).toLocaleDateString()}</span>
</div>
<button onClick={() => handleDeposit(100)}>
Deposit $100
</button>
</div>
);
};
Key Design Decisions
Event Store Technology
- PostgreSQL with JSONB: Good for moderate scale, easy to query
- EventStoreDB: Purpose-built for event sourcing, excellent projections
- Apache Kafka: High throughput, natural fit for event streams
- AWS DynamoDB: Serverless, pay-per-use, good for variable loads
Projection Strategy
- Synchronous: Update read model in same transaction (limited scalability)
- Asynchronous: Use message queue to update read model (eventual consistency)
- On-demand: Rebuild projection from events when queried (slow but always fresh)
Snapshots
For aggregates with many events, create periodic snapshots to avoid replaying thousands of events:
type Snapshot struct {
AggregateID uuid.UUID
Version int
State []byte // Serialized aggregate state
CreatedAt time.Time
}
Replay strategy: Load snapshot, then replay events after snapshot version.
Trade-offs
Advantages
✅ Complete audit trail: Every state change is recorded with metadata
✅ Temporal queries: Can reconstruct state at any point in time
✅ Event replay: Can rebuild read models or create new projections from events
✅ Independent scaling: Scale reads and writes independently
✅ Business insight: Events represent business facts, valuable for analytics
✅ Debugging: Can replay events to reproduce bugs
Disadvantages
❌ Complexity: Significantly more complex than traditional CRUD
❌ Eventual consistency: Read model lags behind write model
❌ Event schema evolution: Changing event structure requires careful migration
❌ Learning curve: Team needs to understand ES/CQRS deeply
❌ Operational overhead: More moving parts to monitor and maintain
❌ Storage: Events accumulate indefinitely (mitigated by snapshotting)
Best Practices
- Immutable events: Never modify or delete events; append corrections if needed
- Event versioning: Include version in event schema for evolution
- Idempotent handlers: Projection handlers should be safe to replay
- Bounded contexts: Use ES/CQRS within bounded contexts, not everywhere
- Monitoring: Track projection lag, event throughput, and replay times
- Testing: Test aggregates by given events, when command, then expect events
Real-World Applications
- Banking systems: Transaction history and account management
- E-commerce: Order processing and inventory management
- Healthcare: Patient medical history and treatment tracking
- Collaboration tools: Document editing history and user actions
- IoT systems: Device state changes and telemetry
Conclusion
Event Sourcing with CQRS is a powerful pattern for systems requiring strong auditability, complex domain logic, or independent read/write scaling. However, it introduces significant complexity and eventual consistency challenges. Use it when the benefits clearly outweigh the costs, and ensure your team has the expertise to implement it correctly.
Start with traditional architecture and migrate to ES/CQRS when you have clear requirements that justify the complexity.