Transactional Outbox with Change Data Capture: Reliable Event Publishing at Scale

Transactional Outbox with Change Data Capture: Reliable Event Publishing at Scale

The Problem: Dual-Write Consistency

A fundamental challenge in distributed systems is publishing events reliably when updating a database. Consider an e-commerce order service that needs to:

  1. Save the order to the database
  2. Publish an “OrderCreated” event to a message broker

The naive approach creates a dual-write problem:

# Problematic: Dual-write anti-pattern
def create_order(order_data):
    # Write 1: Database
    order = db.save_order(order_data)
    
    # Write 2: Message broker
    event_bus.publish("OrderCreated", order)
    
    return order

What can go wrong?

These failures violate the fundamental guarantee that events reflect actual database state.

The Solution: Transactional Outbox + CDC

The Transactional Outbox pattern solves this by treating event publishing as a database concern, then using Change Data Capture (CDC) to reliably publish events asynchronously.

Architecture Components

1. Outbox Table: Stores unpublished events within your application database
2. Transactional Write: Application writes business data + outbox entry in single transaction
3. CDC Connector: Monitors database transaction log and publishes outbox entries
4. Message Broker: Receives events from CDC connector for downstream consumption

How It Works

Step 1: Application commits business data and event in single database transaction
Step 2: CDC tool (Debezium, AWS DMS, custom) reads database transaction log
Step 3: CDC publishes events to message broker (Kafka, RabbitMQ, etc.)
Step 4: Downstream services consume events from broker

The key insight: Database transaction log is the single source of truth. If it’s in the log, it’s committed and will be published.

Implementation in Go

Here’s a practical implementation using PostgreSQL and Kafka:

// domain/order.go
type Order struct {
    ID          string
    CustomerID  string
    TotalAmount decimal.Decimal
    Status      string
    CreatedAt   time.Time
}

type OutboxEvent struct {
    ID           string
    AggregateID  string    // Order ID
    EventType    string    // "OrderCreated"
    Payload      json.RawMessage
    CreatedAt    time.Time
    Published    bool
}

// service/order_service.go
type OrderService struct {
    db *sql.DB
}

func (s *OrderService) CreateOrder(ctx context.Context, order *Order) error {
    // Start transaction
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin transaction: %w", err)
    }
    defer tx.Rollback() // Rollback if not committed

    // 1. Insert order
    _, err = tx.ExecContext(ctx, `
        INSERT INTO orders (id, customer_id, total_amount, status, created_at)
        VALUES ($1, $2, $3, $4, $5)
    `, order.ID, order.CustomerID, order.TotalAmount, order.Status, order.CreatedAt)
    if err != nil {
        return fmt.Errorf("insert order: %w", err)
    }

    // 2. Create event payload
    payload, err := json.Marshal(map[string]interface{}{
        "order_id":     order.ID,
        "customer_id":  order.CustomerID,
        "total_amount": order.TotalAmount,
        "status":       order.Status,
    })
    if err != nil {
        return fmt.Errorf("marshal event: %w", err)
    }

    // 3. Insert into outbox table (same transaction!)
    _, err = tx.ExecContext(ctx, `
        INSERT INTO outbox_events (id, aggregate_id, event_type, payload, created_at, published)
        VALUES ($1, $2, $3, $4, $5, $6)
    `, uuid.New().String(), order.ID, "OrderCreated", payload, time.Now(), false)
    if err != nil {
        return fmt.Errorf("insert outbox event: %w", err)
    }

    // Commit transaction (atomic: both order and event, or neither)
    if err := tx.Commit(); err != nil {
        return fmt.Errorf("commit transaction: %w", err)
    }

    return nil
}

Database Schema:

CREATE TABLE orders (
    id VARCHAR(36) PRIMARY KEY,
    customer_id VARCHAR(36) NOT NULL,
    total_amount DECIMAL(10,2) NOT NULL,
    status VARCHAR(20) NOT NULL,
    created_at TIMESTAMP NOT NULL
);

CREATE TABLE outbox_events (
    id VARCHAR(36) PRIMARY KEY,
    aggregate_id VARCHAR(36) NOT NULL,
    event_type VARCHAR(50) NOT NULL,
    payload JSONB NOT NULL,
    created_at TIMESTAMP NOT NULL,
    published BOOLEAN DEFAULT FALSE,
    published_at TIMESTAMP
);

CREATE INDEX idx_outbox_unpublished ON outbox_events(published, created_at)
    WHERE published = FALSE;

CDC Configuration with Debezium

Debezium connects to PostgreSQL’s write-ahead log (WAL) and streams changes to Kafka:

{
  "name": "order-outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "secret",
    "database.dbname": "orders_db",
    "database.server.name": "orders",
    "table.include.list": "public.outbox_events",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.timestamp": "created_at",
    "transforms.outbox.route.topic.replacement": "${routedByValue}.events"
  }
}

This configuration:

Alternative: Polling-Based Outbox (Simpler for Small Scale)

If you can’t use CDC (operational complexity, database limitations), implement polling:

// publisher/outbox_publisher.go
type OutboxPublisher struct {
    db        *sql.DB
    kafka     *kafka.Producer
    interval  time.Duration
}

func (p *OutboxPublisher) Start(ctx context.Context) {
    ticker := time.NewTicker(p.interval) // e.g., 5 seconds
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            if err := p.publishPendingEvents(ctx); err != nil {
                log.Printf("publish error: %v", err)
            }
        case <-ctx.Done():
            return
        }
    }
}

func (p *OutboxPublisher) publishPendingEvents(ctx context.Context) error {
    // Select unpublished events (with limit to avoid overwhelming broker)
    rows, err := p.db.QueryContext(ctx, `
        SELECT id, aggregate_id, event_type, payload
        FROM outbox_events
        WHERE published = FALSE
        ORDER BY created_at
        LIMIT 100
        FOR UPDATE SKIP LOCKED  -- Prevent concurrent publishers from duplicating
    `)
    if err != nil {
        return fmt.Errorf("query outbox: %w", err)
    }
    defer rows.Close()

    for rows.Next() {
        var event OutboxEvent
        if err := rows.Scan(&event.ID, &event.AggregateID, &event.EventType, &event.Payload); err != nil {
            return fmt.Errorf("scan event: %w", err)
        }

        // Publish to Kafka
        if err := p.publishToKafka(&event); err != nil {
            log.Printf("failed to publish event %s: %v", event.ID, err)
            continue // Try next event (or implement retry logic)
        }

        // Mark as published
        _, err := p.db.ExecContext(ctx, `
            UPDATE outbox_events
            SET published = TRUE, published_at = $1
            WHERE id = $2
        `, time.Now(), event.ID)
        if err != nil {
            log.Printf("failed to mark event %s as published: %v", event.ID, err)
        }
    }

    return nil
}

Polling Trade-offs:

Python Implementation (Django Example)

# models.py
from django.db import models, transaction
import json
import uuid

class Order(models.Model):
    id = models.UUIDField(primary_key=True, default=uuid.uuid4)
    customer_id = models.UUIDField()
    total_amount = models.DecimalField(max_digits=10, decimal_places=2)
    status = models.CharField(max_length=20)
    created_at = models.DateTimeField(auto_now_add=True)

class OutboxEvent(models.Model):
    id = models.UUIDField(primary_key=True, default=uuid.uuid4)
    aggregate_id = models.UUIDField(db_index=True)
    event_type = models.CharField(max_length=50)
    payload = models.JSONField()
    created_at = models.DateTimeField(auto_now_add=True)
    published = models.BooleanField(default=False, db_index=True)
    published_at = models.DateTimeField(null=True, blank=True)

    class Meta:
        indexes = [
            models.Index(fields=['published', 'created_at'], 
                        condition=models.Q(published=False))
        ]

# services.py
class OrderService:
    @transaction.atomic  # Critical: ensures atomicity
    def create_order(self, order_data):
        # Create order
        order = Order.objects.create(
            customer_id=order_data['customer_id'],
            total_amount=order_data['total_amount'],
            status='PENDING'
        )
        
        # Create outbox event (same transaction)
        OutboxEvent.objects.create(
            aggregate_id=order.id,
            event_type='OrderCreated',
            payload={
                'order_id': str(order.id),
                'customer_id': str(order.customer_id),
                'total_amount': str(order.total_amount),
                'status': order.status
            }
        )
        
        return order

When to Use This Pattern

Use Transactional Outbox + CDC when:

Consider alternatives when:

Trade-offs and Considerations

Pros:

Cons:

Best Practices

  1. Add event versioning: Include schema version in event_type (e.g., OrderCreated.v2)
  2. Implement retention: Archive/delete old published events to control table size
  3. Monitor lag: Track time between event creation and publication
  4. Idempotent consumers: Use event ID for deduplication on consumer side
  5. Dead letter queue: Handle events that repeatedly fail to publish
  6. Partitioning: Use aggregate ID as partition key for ordered processing

Conclusion

The Transactional Outbox pattern with CDC provides a robust, scalable solution to the dual-write problem in distributed systems. By treating events as first-class database entities and leveraging the transaction log, you gain strong consistency guarantees without sacrificing availability or performance.

For teams building event-driven architectures, this pattern should be the default choice for reliable event publishing. Start with polling-based publishing for simplicity, then graduate to CDC as scale and reliability requirements increase.