Outbox Pattern for Reliable Microservices Communication

Outbox Pattern for Reliable Microservices Communication

The Problem: Dual-Write Inconsistency

In distributed systems, services often need to atomically update their database AND publish an event to a message broker. The naive approach of doing both operations sequentially creates a critical failure mode:

# Dangerous: Non-atomic dual write
def create_order(order_data):
    # Step 1: Write to database
    order = db.orders.create(order_data)
    
    # Step 2: Publish event
    event_bus.publish("order.created", order)
    
    return order

What can go wrong:

This is the dual-write problem, and it breaks distributed system guarantees.

The Outbox Pattern Solution

The Outbox Pattern solves this by storing events in the same database transaction as the business data, then reliably publishing them asynchronously.

Architecture Overview

┌─────────────────────────────────────────┐
│         Order Service                   │
│                                         │
│  ┌──────────────────────────────────┐  │
│  │   Single Database Transaction    │  │
│  │                                  │  │
│  │  1. Insert Order                 │  │
│  │  2. Insert Outbox Event          │  │
│  │                                  │  │
│  └──────────────────────────────────┘  │
│                                         │
│  ┌──────────────────────────────────┐  │
│  │   Outbox Publisher (async)       │  │
│  │                                  │  │
│  │  - Poll outbox table             │  │
│  │  - Publish to message broker     │  │
│  │  - Mark as published             │  │
│  └──────────────────────────────────┘  │
└─────────────────────────────────────────┘
                 │ Events
         ┌──────────────┐
         │ Message Bus  │
         │ (Kafka/RMQ)  │
         └──────────────┘

Core Principles

  1. Single transaction: Business data and event record stored atomically
  2. Guaranteed delivery: Events in outbox will eventually be published
  3. At-least-once semantics: Consumers must handle duplicate events
  4. Ordering preservation: Events published in transaction commit order

Implementation in Go

Database Schema

CREATE TABLE orders (
    id UUID PRIMARY KEY,
    customer_id UUID NOT NULL,
    total_amount DECIMAL(10,2),
    status VARCHAR(50),
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE outbox_events (
    id UUID PRIMARY KEY,
    aggregate_type VARCHAR(100) NOT NULL,  -- e.g., "order"
    aggregate_id UUID NOT NULL,            -- e.g., order.id
    event_type VARCHAR(100) NOT NULL,      -- e.g., "order.created"
    payload JSONB NOT NULL,
    created_at TIMESTAMP DEFAULT NOW(),
    published_at TIMESTAMP NULL,
    published BOOLEAN DEFAULT FALSE
);

CREATE INDEX idx_outbox_unpublished 
    ON outbox_events(published, created_at) 
    WHERE published = FALSE;

Service Implementation

package order

import (
    "context"
    "database/sql"
    "encoding/json"
    "time"
    
    "github.com/google/uuid"
)

type OrderService struct {
    db *sql.DB
}

type Order struct {
    ID           uuid.UUID
    CustomerID   uuid.UUID
    TotalAmount  float64
    Status       string
    CreatedAt    time.Time
}

type OutboxEvent struct {
    ID            uuid.UUID
    AggregateType string
    AggregateID   uuid.UUID
    EventType     string
    Payload       json.RawMessage
    CreatedAt     time.Time
}

// CreateOrder atomically creates order and outbox event
func (s *OrderService) CreateOrder(ctx context.Context, customerID uuid.UUID, amount float64) (*Order, error) {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return nil, err
    }
    defer tx.Rollback()
    
    // 1. Create the order
    order := &Order{
        ID:          uuid.New(),
        CustomerID:  customerID,
        TotalAmount: amount,
        Status:      "pending",
        CreatedAt:   time.Now(),
    }
    
    _, err = tx.ExecContext(ctx,
        `INSERT INTO orders (id, customer_id, total_amount, status, created_at)
         VALUES ($1, $2, $3, $4, $5)`,
        order.ID, order.CustomerID, order.TotalAmount, order.Status, order.CreatedAt,
    )
    if err != nil {
        return nil, err
    }
    
    // 2. Create outbox event in SAME transaction
    payload, _ := json.Marshal(order)
    event := &OutboxEvent{
        ID:            uuid.New(),
        AggregateType: "order",
        AggregateID:   order.ID,
        EventType:     "order.created",
        Payload:       payload,
        CreatedAt:     time.Now(),
    }
    
    _, err = tx.ExecContext(ctx,
        `INSERT INTO outbox_events (id, aggregate_type, aggregate_id, event_type, payload, created_at)
         VALUES ($1, $2, $3, $4, $5, $6)`,
        event.ID, event.AggregateType, event.AggregateID, 
        event.EventType, event.Payload, event.CreatedAt,
    )
    if err != nil {
        return nil, err
    }
    
    // Commit atomically
    if err := tx.Commit(); err != nil {
        return nil, err
    }
    
    return order, nil
}

Outbox Publisher (Background Worker)

package publisher

import (
    "context"
    "database/sql"
    "encoding/json"
    "log"
    "time"
)

type MessageBroker interface {
    Publish(topic string, payload []byte) error
}

type OutboxPublisher struct {
    db     *sql.DB
    broker MessageBroker
}

func (p *OutboxPublisher) Start(ctx context.Context) {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if err := p.publishPendingEvents(ctx); err != nil {
                log.Printf("Error publishing events: %v", err)
            }
        }
    }
}

func (p *OutboxPublisher) publishPendingEvents(ctx context.Context) error {
    // Fetch unpublished events (batched)
    rows, err := p.db.QueryContext(ctx,
        `SELECT id, aggregate_type, aggregate_id, event_type, payload
         FROM outbox_events
         WHERE published = FALSE
         ORDER BY created_at
         LIMIT 100
         FOR UPDATE SKIP LOCKED`, // Prevent concurrent processing
    )
    if err != nil {
        return err
    }
    defer rows.Close()
    
    for rows.Next() {
        var event OutboxEvent
        err := rows.Scan(&event.ID, &event.AggregateType, 
                        &event.AggregateID, &event.EventType, &event.Payload)
        if err != nil {
            log.Printf("Error scanning event: %v", err)
            continue
        }
        
        // Publish to message broker
        topic := event.AggregateType + "." + event.EventType
        if err := p.broker.Publish(topic, event.Payload); err != nil {
            log.Printf("Failed to publish event %s: %v", event.ID, err)
            continue
        }
        
        // Mark as published
        _, err = p.db.ExecContext(ctx,
            `UPDATE outbox_events 
             SET published = TRUE, published_at = NOW()
             WHERE id = $1`,
            event.ID,
        )
        if err != nil {
            log.Printf("Failed to mark event %s as published: %v", event.ID, err)
        }
    }
    
    return rows.Err()
}

Implementation in Python (Django/SQLAlchemy)

from sqlalchemy import create_engine, Column, String, Boolean, DateTime
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import Session
import uuid
from datetime import datetime

class OutboxEvent(Base):
    __tablename__ = 'outbox_events'
    
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    aggregate_type = Column(String(100), nullable=False)
    aggregate_id = Column(UUID(as_uuid=True), nullable=False)
    event_type = Column(String(100), nullable=False)
    payload = Column(JSONB, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    published = Column(Boolean, default=False)
    published_at = Column(DateTime, nullable=True)

def create_order_with_event(session: Session, customer_id: uuid.UUID, amount: float):
    """Create order and outbox event in single transaction"""
    
    # Create order
    order = Order(
        id=uuid.uuid4(),
        customer_id=customer_id,
        total_amount=amount,
        status='pending'
    )
    session.add(order)
    
    # Create outbox event
    event = OutboxEvent(
        aggregate_type='order',
        aggregate_id=order.id,
        event_type='order.created',
        payload={
            'order_id': str(order.id),
            'customer_id': str(customer_id),
            'amount': amount
        }
    )
    session.add(event)
    
    # Commit atomically
    session.commit()
    
    return order

When to Use the Outbox Pattern

Good Fit

Poor Fit

Trade-offs

Advantages

Guaranteed delivery: Events will eventually be published
Atomicity: Business data and events always consistent
Simple to implement: Just add a table and background worker
Database-agnostic: Works with any transactional database
Resilient: Survives process crashes and network failures

Disadvantages

Latency: Polling delay (typically 1-5 seconds)
Database load: Additional writes and polling queries
Duplicate events: At-least-once delivery requires idempotent consumers
Outbox table growth: Needs cleanup/archival strategy
Complex ordering: Ordering across aggregates is challenging

Optimizations

1. Change Data Capture (CDC)

Instead of polling, use database transaction logs (Debezium, AWS DMS):

# Debezium connector config
{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "table.include.list": "public.outbox_events",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter"
  }
}

Benefits: Near-zero latency, no polling overhead
Cost: Additional infrastructure complexity

2. Partitioning

Partition outbox table by time to manage growth:

CREATE TABLE outbox_events_2025_10 PARTITION OF outbox_events
    FOR VALUES FROM ('2025-10-01') TO ('2025-11-01');

3. Batch Publishing

Publish multiple events in single message broker transaction:

events := fetchBatch(100)
broker.BeginTransaction()
for _, event := range events {
    broker.Publish(event)
}
broker.CommitTransaction()

Real-World Considerations

Idempotent Consumers

Since outbox provides at-least-once delivery, consumers must handle duplicates:

type EventHandler struct {
    processedEvents map[uuid.UUID]bool // Or use distributed cache
}

func (h *EventHandler) Handle(event OutboxEvent) {
    if h.processedEvents[event.ID] {
        return // Already processed
    }
    
    // Process event
    processOrder(event)
    
    // Mark as processed
    h.processedEvents[event.ID] = true
}

Ordering Guarantees

Events for same aggregate are ordered; cross-aggregate ordering requires additional logic:

// Partition key ensures ordering per aggregate
func (p *OutboxPublisher) publishWithKey(event OutboxEvent) {
    partitionKey := event.AggregateID.String()
    p.broker.PublishWithKey(event.EventType, partitionKey, event.Payload)
}

Monitoring & Observability

Key metrics to track:

metrics.Gauge("outbox.unpublished_count", unpublishedCount)
metrics.Histogram("outbox.publish_latency", publishTime)

Conclusion

The Outbox Pattern is the gold standard for reliable event publishing in microservices. It trades minor latency for guaranteed consistency and delivery. While CDC-based approaches offer better performance, polling-based outbox is simpler to implement and understand.

For principal engineers designing distributed systems, the outbox pattern should be the default choice when:

  1. Events must be reliably delivered
  2. Business data and events must stay consistent
  3. You can tolerate 1-5 second event latency

The pattern’s simplicity and reliability make it a foundational tool in the distributed systems toolkit.