Event-Driven Architecture with the Transactional Outbox Pattern

Event-Driven Architecture with the Transactional Outbox Pattern

The Problem: Dual Writes in Distributed Systems

When building microservices, a common challenge is maintaining data consistency across service boundaries. Consider this scenario: you need to update your database AND publish an event to a message broker (Kafka, RabbitMQ, etc.). The naive approach looks like this:

def create_order(order_data):
    # Write to database
    order = db.orders.insert(order_data)
    
    # Publish event
    event_broker.publish("order.created", order)
    
    return order

This code has a critical flaw: dual writes. What happens if the database commit succeeds but the event publish fails? Or vice versa? You end up with inconsistent state—your database says one thing, but downstream services never receive the notification.

The Solution: Transactional Outbox Pattern

The Transactional Outbox pattern solves this by treating event publishing as part of your database transaction. Instead of publishing directly to the message broker, you write events to an “outbox” table within the same transaction as your business data. A separate process then reads from the outbox and publishes to the message broker.

Architecture Overview

┌─────────────────┐
│   Service API   │
└────────┬────────┘
┌─────────────────────────────────┐
│      Database Transaction       │
│  ┌───────────┐  ┌────────────┐ │
│  │ Business  │  │  Outbox    │ │
│  │   Data    │  │   Table    │ │
│  └───────────┘  └────────────┘ │
└─────────────────────────────────┘
┌─────────────────┐
│ Outbox Processor│ (polling or CDC)
└────────┬────────┘
┌─────────────────┐
│  Message Broker │
│  (Kafka, etc.)  │
└─────────────────┘

Key Benefits

  1. Atomic Operations: Database writes and event creation happen in a single transaction
  2. Guaranteed Delivery: Events are persisted durably before publishing
  3. No Message Loss: If publishing fails, the outbox processor retries
  4. Ordering Guarantees: Events are published in the order they were created

Implementation Example in Go

Here’s a practical implementation using Go with PostgreSQL:

package outbox

import (
    "context"
    "database/sql"
    "encoding/json"
    "time"
)

type Event struct {
    ID          string
    AggregateID string
    EventType   string
    Payload     json.RawMessage
    CreatedAt   time.Time
}

type OutboxRepository struct {
    db *sql.DB
}

// CreateOrderWithEvent demonstrates the pattern
func (s *OrderService) CreateOrder(ctx context.Context, order Order) error {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 1. Write business data
    _, err = tx.ExecContext(ctx,
        "INSERT INTO orders (id, customer_id, total) VALUES ($1, $2, $3)",
        order.ID, order.CustomerID, order.Total)
    if err != nil {
        return err
    }

    // 2. Write event to outbox in the SAME transaction
    event := Event{
        ID:          uuid.New().String(),
        AggregateID: order.ID,
        EventType:   "order.created",
        Payload:     marshalOrderEvent(order),
        CreatedAt:   time.Now(),
    }
    
    _, err = tx.ExecContext(ctx,
        `INSERT INTO outbox (id, aggregate_id, event_type, payload, created_at)
         VALUES ($1, $2, $3, $4, $5)`,
        event.ID, event.AggregateID, event.EventType, event.Payload, event.CreatedAt)
    if err != nil {
        return err
    }

    // 3. Commit atomically
    return tx.Commit()
}

// OutboxProcessor runs separately to publish events
type OutboxProcessor struct {
    db       *sql.DB
    broker   MessageBroker
    interval time.Duration
}

func (p *OutboxProcessor) Start(ctx context.Context) {
    ticker := time.NewTicker(p.interval)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            p.processOutbox(ctx)
        case <-ctx.Done():
            return
        }
    }
}

func (p *OutboxProcessor) processOutbox(ctx context.Context) error {
    tx, err := p.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // Fetch unpublished events with row locking
    rows, err := tx.QueryContext(ctx,
        `SELECT id, aggregate_id, event_type, payload, created_at
         FROM outbox
         WHERE published_at IS NULL
         ORDER BY created_at
         LIMIT 100
         FOR UPDATE SKIP LOCKED`)
    if err != nil {
        return err
    }
    defer rows.Close()

    events := []Event{}
    for rows.Next() {
        var e Event
        if err := rows.Scan(&e.ID, &e.AggregateID, &e.EventType, &e.Payload, &e.CreatedAt); err != nil {
            return err
        }
        events = append(events, e)
    }

    // Publish each event
    for _, event := range events {
        if err := p.broker.Publish(event.EventType, event.Payload); err != nil {
            return err // Rollback and retry later
        }

        // Mark as published
        _, err = tx.ExecContext(ctx,
            "UPDATE outbox SET published_at = $1 WHERE id = $2",
            time.Now(), event.ID)
        if err != nil {
            return err
        }
    }

    return tx.Commit()
}

Implementation Considerations

1. Outbox Processing Strategy

Polling Approach (shown above):

Change Data Capture (CDC) Approach:

2. Event Cleanup

The outbox table grows over time. Implement a cleanup strategy:

func (p *OutboxProcessor) cleanupPublished(ctx context.Context) error {
    // Delete events published more than 7 days ago
    _, err := p.db.ExecContext(ctx,
        `DELETE FROM outbox
         WHERE published_at IS NOT NULL
         AND published_at < NOW() - INTERVAL '7 days'`)
    return err
}

3. Idempotency

Consumers must handle duplicate events gracefully. Include unique event IDs and make handlers idempotent:

def handle_order_created(event):
    # Check if already processed
    if cache.exists(f"processed_event:{event.id}"):
        return
    
    # Process event
    process_order(event.payload)
    
    # Mark as processed
    cache.set(f"processed_event:{event.id}", "1", ttl=86400)

4. Ordering Guarantees

If strict ordering matters, ensure:

When to Use This Pattern

Good Fit:

Alternatives:

Trade-offs

Advantages:

Disadvantages:

Python Example with SQLAlchemy

from sqlalchemy import Column, String, DateTime, Text, Boolean
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
import json

Base = declarative_base()

class OutboxEvent(Base):
    __tablename__ = 'outbox'
    
    id = Column(String, primary_key=True)
    aggregate_id = Column(String, nullable=False)
    event_type = Column(String, nullable=False)
    payload = Column(Text, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    published_at = Column(DateTime, nullable=True)

def create_order_with_event(session, order_data):
    # Both operations in same transaction
    order = Order(**order_data)
    session.add(order)
    
    event = OutboxEvent(
        id=str(uuid.uuid4()),
        aggregate_id=order.id,
        event_type="order.created",
        payload=json.dumps(order_data)
    )
    session.add(event)
    
    session.commit()  # Atomic commit

Conclusion

The Transactional Outbox pattern is a robust solution for maintaining consistency in event-driven architectures. While it adds complexity with the outbox table and processor, the trade-off is worthwhile for systems where data consistency is critical. For principal engineers designing distributed systems, this pattern should be in your standard toolkit alongside sagas, circuit breakers, and other resilience patterns.

Start simple with polling-based outbox processing, then migrate to CDC if you need lower latency. Focus on getting idempotency and monitoring right from the start—these are where most real-world implementations struggle.