Transactional Outbox with Change Data Capture: Reliable Event Publishing at Scale
Transactional Outbox with Change Data Capture: Reliable Event Publishing at Scale
The Problem: Dual-Write Consistency
A fundamental challenge in distributed systems is publishing events reliably when updating a database. Consider an e-commerce order service that needs to:
- Save the order to the database
- Publish an “OrderCreated” event to a message broker
The naive approach creates a dual-write problem:
# Problematic: Dual-write anti-pattern
def create_order(order_data):
# Write 1: Database
order = db.save_order(order_data)
# Write 2: Message broker
event_bus.publish("OrderCreated", order)
return order
What can go wrong?
- Database commits, but message broker publish fails → downstream services never notified
- Message broker publish succeeds, but database transaction rolls back → invalid event published
- Partial failures create inconsistent system state
- Retry logic can cause duplicate events
These failures violate the fundamental guarantee that events reflect actual database state.
The Solution: Transactional Outbox + CDC
The Transactional Outbox pattern solves this by treating event publishing as a database concern, then using Change Data Capture (CDC) to reliably publish events asynchronously.
Architecture Components
1. Outbox Table: Stores unpublished events within your application database
2. Transactional Write: Application writes business data + outbox entry in single transaction
3. CDC Connector: Monitors database transaction log and publishes outbox entries
4. Message Broker: Receives events from CDC connector for downstream consumption
How It Works
Step 1: Application commits business data and event in single database transaction
Step 2: CDC tool (Debezium, AWS DMS, custom) reads database transaction log
Step 3: CDC publishes events to message broker (Kafka, RabbitMQ, etc.)
Step 4: Downstream services consume events from broker
The key insight: Database transaction log is the single source of truth. If it’s in the log, it’s committed and will be published.
Implementation in Go
Here’s a practical implementation using PostgreSQL and Kafka:
// domain/order.go
type Order struct {
ID string
CustomerID string
TotalAmount decimal.Decimal
Status string
CreatedAt time.Time
}
type OutboxEvent struct {
ID string
AggregateID string // Order ID
EventType string // "OrderCreated"
Payload json.RawMessage
CreatedAt time.Time
Published bool
}
// service/order_service.go
type OrderService struct {
db *sql.DB
}
func (s *OrderService) CreateOrder(ctx context.Context, order *Order) error {
// Start transaction
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}
defer tx.Rollback() // Rollback if not committed
// 1. Insert order
_, err = tx.ExecContext(ctx, `
INSERT INTO orders (id, customer_id, total_amount, status, created_at)
VALUES ($1, $2, $3, $4, $5)
`, order.ID, order.CustomerID, order.TotalAmount, order.Status, order.CreatedAt)
if err != nil {
return fmt.Errorf("insert order: %w", err)
}
// 2. Create event payload
payload, err := json.Marshal(map[string]interface{}{
"order_id": order.ID,
"customer_id": order.CustomerID,
"total_amount": order.TotalAmount,
"status": order.Status,
})
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
// 3. Insert into outbox table (same transaction!)
_, err = tx.ExecContext(ctx, `
INSERT INTO outbox_events (id, aggregate_id, event_type, payload, created_at, published)
VALUES ($1, $2, $3, $4, $5, $6)
`, uuid.New().String(), order.ID, "OrderCreated", payload, time.Now(), false)
if err != nil {
return fmt.Errorf("insert outbox event: %w", err)
}
// Commit transaction (atomic: both order and event, or neither)
if err := tx.Commit(); err != nil {
return fmt.Errorf("commit transaction: %w", err)
}
return nil
}
Database Schema:
CREATE TABLE orders (
id VARCHAR(36) PRIMARY KEY,
customer_id VARCHAR(36) NOT NULL,
total_amount DECIMAL(10,2) NOT NULL,
status VARCHAR(20) NOT NULL,
created_at TIMESTAMP NOT NULL
);
CREATE TABLE outbox_events (
id VARCHAR(36) PRIMARY KEY,
aggregate_id VARCHAR(36) NOT NULL,
event_type VARCHAR(50) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP NOT NULL,
published BOOLEAN DEFAULT FALSE,
published_at TIMESTAMP
);
CREATE INDEX idx_outbox_unpublished ON outbox_events(published, created_at)
WHERE published = FALSE;
CDC Configuration with Debezium
Debezium connects to PostgreSQL’s write-ahead log (WAL) and streams changes to Kafka:
{
"name": "order-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "secret",
"database.dbname": "orders_db",
"database.server.name": "orders",
"table.include.list": "public.outbox_events",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.timestamp": "created_at",
"transforms.outbox.route.topic.replacement": "${routedByValue}.events"
}
}
This configuration:
- Monitors
outbox_eventstable - Routes events to Kafka topics based on
event_type - Extracts event payload automatically
- Guarantees at-least-once delivery
Alternative: Polling-Based Outbox (Simpler for Small Scale)
If you can’t use CDC (operational complexity, database limitations), implement polling:
// publisher/outbox_publisher.go
type OutboxPublisher struct {
db *sql.DB
kafka *kafka.Producer
interval time.Duration
}
func (p *OutboxPublisher) Start(ctx context.Context) {
ticker := time.NewTicker(p.interval) // e.g., 5 seconds
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := p.publishPendingEvents(ctx); err != nil {
log.Printf("publish error: %v", err)
}
case <-ctx.Done():
return
}
}
}
func (p *OutboxPublisher) publishPendingEvents(ctx context.Context) error {
// Select unpublished events (with limit to avoid overwhelming broker)
rows, err := p.db.QueryContext(ctx, `
SELECT id, aggregate_id, event_type, payload
FROM outbox_events
WHERE published = FALSE
ORDER BY created_at
LIMIT 100
FOR UPDATE SKIP LOCKED -- Prevent concurrent publishers from duplicating
`)
if err != nil {
return fmt.Errorf("query outbox: %w", err)
}
defer rows.Close()
for rows.Next() {
var event OutboxEvent
if err := rows.Scan(&event.ID, &event.AggregateID, &event.EventType, &event.Payload); err != nil {
return fmt.Errorf("scan event: %w", err)
}
// Publish to Kafka
if err := p.publishToKafka(&event); err != nil {
log.Printf("failed to publish event %s: %v", event.ID, err)
continue // Try next event (or implement retry logic)
}
// Mark as published
_, err := p.db.ExecContext(ctx, `
UPDATE outbox_events
SET published = TRUE, published_at = $1
WHERE id = $2
`, time.Now(), event.ID)
if err != nil {
log.Printf("failed to mark event %s as published: %v", event.ID, err)
}
}
return nil
}
Polling Trade-offs:
- ✅ Simpler operational model (no CDC infrastructure)
- ✅ Works with any database
- ❌ Higher latency (bounded by polling interval)
- ❌ Less efficient (constant querying)
- ❌ Requires careful handling of concurrency (multiple publishers)
Python Implementation (Django Example)
# models.py
from django.db import models, transaction
import json
import uuid
class Order(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4)
customer_id = models.UUIDField()
total_amount = models.DecimalField(max_digits=10, decimal_places=2)
status = models.CharField(max_length=20)
created_at = models.DateTimeField(auto_now_add=True)
class OutboxEvent(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4)
aggregate_id = models.UUIDField(db_index=True)
event_type = models.CharField(max_length=50)
payload = models.JSONField()
created_at = models.DateTimeField(auto_now_add=True)
published = models.BooleanField(default=False, db_index=True)
published_at = models.DateTimeField(null=True, blank=True)
class Meta:
indexes = [
models.Index(fields=['published', 'created_at'],
condition=models.Q(published=False))
]
# services.py
class OrderService:
@transaction.atomic # Critical: ensures atomicity
def create_order(self, order_data):
# Create order
order = Order.objects.create(
customer_id=order_data['customer_id'],
total_amount=order_data['total_amount'],
status='PENDING'
)
# Create outbox event (same transaction)
OutboxEvent.objects.create(
aggregate_id=order.id,
event_type='OrderCreated',
payload={
'order_id': str(order.id),
'customer_id': str(order.customer_id),
'total_amount': str(order.total_amount),
'status': order.status
}
)
return order
When to Use This Pattern
Use Transactional Outbox + CDC when:
- ✅ You need guaranteed event publishing (no lost events)
- ✅ Event order matters (CDC preserves transaction log order)
- ✅ You’re building event-sourced systems or CQRS architectures
- ✅ Downstream services depend on eventual consistency guarantees
- ✅ You have operational capability to run CDC infrastructure
Consider alternatives when:
- ❌ Simple request/response is sufficient (not everything needs events)
- ❌ Your database doesn’t support CDC (though polling fallback works)
- ❌ Team lacks CDC operational expertise (start with polling, migrate to CDC later)
- ❌ Event volume is extremely high (may need specialized event store instead)
Trade-offs and Considerations
Pros:
- Strong consistency guarantee: Events and data always match
- No message loss: Database is single source of truth
- Audit trail: Outbox table provides event history
- Decouples publishing: Application doesn’t block on message broker
Cons:
- Operational complexity: CDC requires infrastructure (Debezium, Kafka Connect)
- Storage overhead: Outbox table grows (need retention policy)
- Eventual consistency: Events published asynchronously (small delay)
- Duplicate handling: CDC provides at-least-once delivery (consumers must be idempotent)
Best Practices
- Add event versioning: Include schema version in
event_type(e.g.,OrderCreated.v2) - Implement retention: Archive/delete old published events to control table size
- Monitor lag: Track time between event creation and publication
- Idempotent consumers: Use event ID for deduplication on consumer side
- Dead letter queue: Handle events that repeatedly fail to publish
- Partitioning: Use aggregate ID as partition key for ordered processing
Conclusion
The Transactional Outbox pattern with CDC provides a robust, scalable solution to the dual-write problem in distributed systems. By treating events as first-class database entities and leveraging the transaction log, you gain strong consistency guarantees without sacrificing availability or performance.
For teams building event-driven architectures, this pattern should be the default choice for reliable event publishing. Start with polling-based publishing for simplicity, then graduate to CDC as scale and reliability requirements increase.