CQRS Pattern: Separating Reads and Writes for Scalable Systems

CQRS Pattern: Command Query Responsibility Segregation

What is CQRS?

Command Query Responsibility Segregation (CQRS) is an architectural pattern that separates read operations (queries) from write operations (commands) using different models. Unlike traditional CRUD architectures where a single model handles both reads and writes, CQRS recognizes that these operations have fundamentally different characteristics and optimizes them independently.

Core Principle: The model you use to update information should be different from the model you use to read information.

The Fundamental Insight

In traditional architectures, we use the same data model for:

This creates compromises:

CQRS solves this by acknowledging that reads and writes are different workloads deserving different optimizations.

When to Use CQRS

Ideal Scenarios

1. Read-Heavy Systems with Complex Queries

2. Different Scalability Requirements

3. Domain Complexity

4. Collaborative Environments

When NOT to Use CQRS

Architecture Components

Command Side (Write Model)

Responsibilities:

Characteristics:

Query Side (Read Model)

Responsibilities:

Characteristics:

Synchronization Mechanism

The command and query sides must stay synchronized:

Implementation Example: E-Commerce Order System

Go Implementation

// Command Side - Domain Model
package command

type Order struct {
    ID          string
    CustomerID  string
    Items       []OrderItem
    TotalAmount decimal.Decimal
    Status      OrderStatus
    Version     int // for optimistic locking
}

type OrderItem struct {
    ProductID string
    Quantity  int
    Price     decimal.Decimal
}

type OrderStatus int

const (
    OrderPending OrderStatus = iota
    OrderConfirmed
    OrderShipped
    OrderDelivered
)

// Command definitions
type CreateOrderCommand struct {
    CustomerID string
    Items      []OrderItem
}

type ConfirmOrderCommand struct {
    OrderID string
}

// Command Handler
type OrderCommandHandler struct {
    repo      OrderRepository
    eventBus  EventBus
}

func (h *OrderCommandHandler) HandleCreateOrder(cmd CreateOrderCommand) error {
    // Business logic validation
    if len(cmd.Items) == 0 {
        return errors.New("order must contain items")
    }
    
    // Create domain entity
    order := &Order{
        ID:         generateID(),
        CustomerID: cmd.CustomerID,
        Items:      cmd.Items,
        Status:     OrderPending,
        Version:    1,
    }
    
    // Calculate total
    order.TotalAmount = calculateTotal(cmd.Items)
    
    // Persist
    if err := h.repo.Save(order); err != nil {
        return fmt.Errorf("failed to save order: %w", err)
    }
    
    // Emit event
    event := OrderCreatedEvent{
        OrderID:    order.ID,
        CustomerID: order.CustomerID,
        Items:      order.Items,
        Total:      order.TotalAmount,
        Timestamp:  time.Now(),
    }
    
    h.eventBus.Publish(event)
    
    return nil
}

// Query Side - Read Model
package query

// Optimized for reads - denormalized
type OrderReadModel struct {
    OrderID         string
    CustomerID      string
    CustomerName    string  // denormalized from customer table
    CustomerEmail   string  // denormalized
    ItemCount       int     // pre-calculated
    TotalAmount     decimal.Decimal
    Status          string
    CreatedAt       time.Time
    LastUpdatedAt   time.Time
}

// Query Handler
type OrderQueryHandler struct {
    db *sql.DB
}

func (h *OrderQueryHandler) GetOrdersByCustomer(customerID string) ([]OrderReadModel, error) {
    // Single query, no joins needed - data already denormalized
    query := `
        SELECT order_id, customer_id, customer_name, customer_email,
               item_count, total_amount, status, created_at, last_updated_at
        FROM order_read_model
        WHERE customer_id = $1
        ORDER BY created_at DESC
    `
    
    rows, err := h.db.Query(query, customerID)
    if err != nil {
        return nil, err
    }
    defer rows.Close()
    
    var orders []OrderReadModel
    for rows.Next() {
        var order OrderReadModel
        err := rows.Scan(
            &order.OrderID, &order.CustomerID, &order.CustomerName,
            &order.CustomerEmail, &order.ItemCount, &order.TotalAmount,
            &order.Status, &order.CreatedAt, &order.LastUpdatedAt,
        )
        if err != nil {
            return nil, err
        }
        orders = append(orders, order)
    }
    
    return orders, nil
}

// Event Handler - Syncs Command to Query Side
type OrderEventHandler struct {
    db *sql.DB
}

func (h *OrderEventHandler) OnOrderCreated(event OrderCreatedEvent) error {
    // Fetch customer details
    customer, err := h.getCustomer(event.CustomerID)
    if err != nil {
        return err
    }
    
    // Update read model with denormalized data
    query := `
        INSERT INTO order_read_model 
        (order_id, customer_id, customer_name, customer_email, 
         item_count, total_amount, status, created_at, last_updated_at)
        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
    `
    
    _, err = h.db.Exec(
        query,
        event.OrderID,
        event.CustomerID,
        customer.Name,
        customer.Email,
        len(event.Items),
        event.Total,
        "PENDING",
        event.Timestamp,
        event.Timestamp,
    )
    
    return err
}

Python Implementation (with FastAPI)

# Command Side
from dataclasses import dataclass
from typing import List
from decimal import Decimal
from datetime import datetime
import uuid

@dataclass
class CreateOrderCommand:
    customer_id: str
    items: List[dict]

class OrderCommandHandler:
    def __init__(self, repository, event_bus):
        self.repository = repository
        self.event_bus = event_bus
    
    async def handle_create_order(self, command: CreateOrderCommand):
        # Validation
        if not command.items:
            raise ValueError("Order must contain items")
        
        # Create order
        order_id = str(uuid.uuid4())
        total = sum(Decimal(item['price']) * item['quantity'] 
                   for item in command.items)
        
        order = {
            'id': order_id,
            'customer_id': command.customer_id,
            'items': command.items,
            'total': total,
            'status': 'PENDING',
            'created_at': datetime.utcnow()
        }
        
        # Persist
        await self.repository.save(order)
        
        # Emit event
        event = {
            'type': 'OrderCreated',
            'order_id': order_id,
            'customer_id': command.customer_id,
            'items': command.items,
            'total': float(total),
            'timestamp': datetime.utcnow().isoformat()
        }
        
        await self.event_bus.publish('orders', event)
        
        return order_id

# Query Side
from fastapi import FastAPI, Depends
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

app = FastAPI()

class OrderQueryHandler:
    def __init__(self, db: AsyncSession):
        self.db = db
    
    async def get_orders_by_customer(self, customer_id: str):
        # Single query, denormalized data
        query = select(OrderReadModel).where(
            OrderReadModel.customer_id == customer_id
        ).order_by(OrderReadModel.created_at.desc())
        
        result = await self.db.execute(query)
        orders = result.scalars().all()
        
        return [
            {
                'order_id': o.order_id,
                'customer_name': o.customer_name,
                'item_count': o.item_count,
                'total': float(o.total_amount),
                'status': o.status,
                'created_at': o.created_at.isoformat()
            }
            for o in orders
        ]

@app.get("/orders/customer/{customer_id}")
async def get_customer_orders(
    customer_id: str,
    query_handler: OrderQueryHandler = Depends()
):
    return await query_handler.get_orders_by_customer(customer_id)

# Event Handler for synchronization
class OrderReadModelUpdater:
    def __init__(self, db: AsyncSession):
        self.db = db
    
    async def on_order_created(self, event: dict):
        # Fetch customer for denormalization
        customer = await self.get_customer(event['customer_id'])
        
        # Create read model entry
        read_model = OrderReadModel(
            order_id=event['order_id'],
            customer_id=event['customer_id'],
            customer_name=customer['name'],
            customer_email=customer['email'],
            item_count=len(event['items']),
            total_amount=Decimal(event['total']),
            status='PENDING',
            created_at=datetime.fromisoformat(event['timestamp'])
        )
        
        self.db.add(read_model)
        await self.db.commit()

ReactJS Query Integration

// Query API calls - read side only
import { useQuery, useMutation } from '@tanstack/react-query';

interface OrderReadModel {
  orderId: string;
  customerName: string;
  itemCount: number;
  total: number;
  status: string;
  createdAt: string;
}

// Query hook - reads from query side
export function useCustomerOrders(customerId: string) {
  return useQuery<OrderReadModel[]>({
    queryKey: ['orders', 'customer', customerId],
    queryFn: async () => {
      const response = await fetch(
        `/api/query/orders/customer/${customerId}`
      );
      return response.json();
    },
    // Polling for eventual consistency
    refetchInterval: 5000,
  });
}

// Command hook - writes to command side
export function useCreateOrder() {
  return useMutation({
    mutationFn: async (command: { customerId: string; items: any[] }) => {
      const response = await fetch('/api/command/orders', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(command),
      });
      return response.json();
    },
    // Invalidate queries after command
    onSuccess: (data, variables) => {
      // Optimistic update or wait for event
      queryClient.invalidateQueries({
        queryKey: ['orders', 'customer', variables.customerId]
      });
    },
  });
}

// Component usage
function CustomerOrders({ customerId }: { customerId: string }) {
  const { data: orders, isLoading } = useCustomerOrders(customerId);
  const createOrder = useCreateOrder();

  if (isLoading) return <div>Loading...</div>;

  return (
    <div>
      <button onClick={() => {
        createOrder.mutate({
          customerId,
          items: [{ productId: '123', quantity: 1, price: 99.99 }]
        });
      }}>
        Create New Order
      </button>
      
      {orders?.map(order => (
        <div key={order.orderId}>
          <h3>Order #{order.orderId}</h3>
          <p>{order.itemCount} items - ${order.total}</p>
          <p>Status: {order.status}</p>
        </div>
      ))}
    </div>
  );
}

Trade-offs and Considerations

Benefits

Independent Scaling: Scale reads and writes separately based on actual load ✅ Optimized Performance: Each side optimized for its specific workload ✅ Flexibility: Use different databases for reads vs. writes (polyglot persistence) ✅ Simplified Queries: Denormalized read models eliminate complex joins ✅ Clear Separation: Commands express intent, queries express views

Challenges

Eventual Consistency: Read model may lag behind write model (seconds to minutes) ❌ Increased Complexity: More moving parts, more code to maintain ❌ Data Duplication: Same data stored in multiple models ❌ Event Management: Need reliable event delivery mechanism ❌ Learning Curve: Team must understand distributed systems concepts

Best Practices

  1. Start Simple: Don’t apply CQRS everywhere. Identify high-value areas first.

  2. Handle Eventual Consistency in UI:

    // Show optimistic update while waiting for event
    const createOrder = useMutation({
      onMutate: async (newOrder) => {
        // Cancel ongoing queries
        await queryClient.cancelQueries(['orders']);
    
        // Snapshot current state
        const previous = queryClient.getQueryData(['orders']);
    
        // Optimistically update
        queryClient.setQueryData(['orders'], (old) => [...old, newOrder]);
    
        return { previous };
      },
      onError: (err, newOrder, context) => {
        // Rollback on error
        queryClient.setQueryData(['orders'], context.previous);
      },
    });
    
  3. Use Event Sourcing with CQRS: They complement each other perfectly—event store becomes the write model.

  4. Monitor Synchronization Lag: Track time between command and query model update.

  5. Idempotent Event Handlers: Events may be delivered multiple times; handlers must handle this gracefully.

Conclusion

CQRS is a powerful pattern for systems with different read and write characteristics. For principal engineers, it’s not about applying CQRS everywhere—it’s about recognizing when the complexity is justified by the benefits. Start with high-value use cases (analytics, dashboards, collaborative editing) and expand as your team gains experience.

The key insight: Reads and writes are different workloads that deserve different optimizations. CQRS formalizes this separation, enabling you to scale, optimize, and evolve each side independently.