Transactional Inbox Pattern for Idempotent Message Processing
Transactional Inbox Pattern for Idempotent Message Processing
The Problem: Duplicate Message Handling
In distributed systems using message queues (Kafka, RabbitMQ, SQS), messages can be delivered more than once due to network failures, consumer crashes, or acknowledgment timeouts. Without proper handling, duplicate processing causes:
- Financial transactions processed multiple times
- Duplicate database records
- Inconsistent aggregate states
- Violated business invariants
The at-least-once delivery guarantee common in message brokers requires consumers to implement idempotency—processing the same message multiple times produces the same result as processing it once.
The Solution: Transactional Inbox Pattern
The Transactional Inbox pattern ensures idempotent message processing by:
- Recording incoming messages in an “inbox” table within the same transaction as business logic
- Checking the inbox before processing to detect duplicates
- Atomically updating both business data and inbox status
This leverages database ACID properties to guarantee exactly-once processing semantics at the application level, even with at-least-once message delivery.
When to Use This Pattern
Use the Transactional Inbox when:
- Processing messages from external systems or message brokers
- Financial transactions or critical operations require exactly-once semantics
- Distributed microservices communicate via events
- Message processing has side effects that cannot be safely retried
- Downstream systems don’t support idempotency
Don’t use when:
- Operations are naturally idempotent (e.g., PUT requests with full object replacement)
- Performance requirements prohibit database writes for every message
- Message volume exceeds millions per second (consider streaming alternatives)
- Business logic tolerates duplicate processing
Architecture and Implementation
Core Components
- Inbox Table: Records all received messages with unique identifiers
- Message Handler: Business logic that processes messages
- Transaction Coordinator: Ensures atomic inbox updates and business operations
- Cleanup Process: Removes old processed messages to prevent unbounded growth
Implementation in Go
package inbox
import (
"context"
"database/sql"
"encoding/json"
"time"
)
// InboxMessage represents a received message
type InboxMessage struct {
MessageID string `db:"message_id"`
Payload []byte `db:"payload"`
ReceivedAt time.Time `db:"received_at"`
ProcessedAt *time.Time `db:"processed_at"`
Status string `db:"status"` // pending, processed, failed
}
// MessageProcessor handles business logic
type MessageProcessor interface {
Process(ctx context.Context, payload []byte) error
}
// InboxService manages idempotent message processing
type InboxService struct {
db *sql.DB
processor MessageProcessor
}
// ProcessMessage implements the transactional inbox pattern
func (s *InboxService) ProcessMessage(ctx context.Context, messageID string, payload []byte) error {
// Start transaction
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// Check if message already exists (idempotency check)
var existingMsg InboxMessage
err = tx.QueryRowContext(ctx,
"SELECT message_id, status FROM inbox WHERE message_id = $1",
messageID,
).Scan(&existingMsg.MessageID, &existingMsg.Status)
if err == nil {
// Message already processed
if existingMsg.Status == "processed" {
return nil // Idempotent - already done
}
if existingMsg.Status == "pending" {
// Retry processing failed message
return s.retryProcessing(ctx, tx, messageID, payload)
}
} else if err != sql.ErrNoRows {
return err
}
// Insert message into inbox
_, err = tx.ExecContext(ctx,
`INSERT INTO inbox (message_id, payload, received_at, status)
VALUES ($1, $2, $3, 'pending')`,
messageID, payload, time.Now(),
)
if err != nil {
return err
}
// Process business logic within transaction
if err := s.processor.Process(ctx, payload); err != nil {
// Mark as failed for retry
tx.ExecContext(ctx,
"UPDATE inbox SET status = 'failed' WHERE message_id = $1",
messageID,
)
return err
}
// Mark as processed
_, err = tx.ExecContext(ctx,
"UPDATE inbox SET status = 'processed', processed_at = $1 WHERE message_id = $2",
time.Now(), messageID,
)
if err != nil {
return err
}
// Commit transaction - both business logic and inbox update succeed together
return tx.Commit()
}
func (s *InboxService) retryProcessing(ctx context.Context, tx *sql.Tx, messageID string, payload []byte) error {
if err := s.processor.Process(ctx, payload); err != nil {
return err
}
_, err := tx.ExecContext(ctx,
"UPDATE inbox SET status = 'processed', processed_at = $1 WHERE message_id = $2",
time.Now(), messageID,
)
return err
}
// Cleanup removes old processed messages
func (s *InboxService) Cleanup(ctx context.Context, retentionDays int) error {
_, err := s.db.ExecContext(ctx,
"DELETE FROM inbox WHERE status = 'processed' AND processed_at < $1",
time.Now().AddDate(0, 0, -retentionDays),
)
return err
}
Database Schema
CREATE TABLE inbox (
message_id VARCHAR(255) PRIMARY KEY,
payload JSONB NOT NULL,
received_at TIMESTAMP NOT NULL DEFAULT NOW(),
processed_at TIMESTAMP,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
retry_count INTEGER DEFAULT 0,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
-- Index for cleanup queries
CREATE INDEX idx_inbox_processed ON inbox(processed_at)
WHERE status = 'processed';
-- Index for retry processing
CREATE INDEX idx_inbox_status ON inbox(status)
WHERE status IN ('pending', 'failed');
Python Implementation with SQLAlchemy
from datetime import datetime, timedelta
from sqlalchemy import Column, String, DateTime, Integer, JSON, create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from contextlib import contextmanager
import logging
Base = declarative_base()
class InboxMessage(Base):
__tablename__ = 'inbox'
message_id = Column(String(255), primary_key=True)
payload = Column(JSON, nullable=False)
received_at = Column(DateTime, default=datetime.utcnow)
processed_at = Column(DateTime, nullable=True)
status = Column(String(20), default='pending')
retry_count = Column(Integer, default=0)
class InboxService:
def __init__(self, db_url: str, processor):
self.engine = create_engine(db_url)
self.Session = sessionmaker(bind=self.engine)
self.processor = processor
self.logger = logging.getLogger(__name__)
def process_message(self, message_id: str, payload: dict) -> bool:
"""Process message idempotently using transactional inbox pattern."""
with self.Session() as session:
try:
# Check for existing message
existing = session.query(InboxMessage).filter_by(
message_id=message_id
).first()
if existing:
if existing.status == 'processed':
self.logger.info(f"Message {message_id} already processed")
return True # Idempotent
elif existing.status == 'failed':
# Retry failed message
return self._retry_processing(session, existing, payload)
# Insert new message
inbox_msg = InboxMessage(
message_id=message_id,
payload=payload,
status='pending'
)
session.add(inbox_msg)
session.flush() # Get ID before processing
# Process business logic within transaction
self.processor.process(payload)
# Mark as processed
inbox_msg.status = 'processed'
inbox_msg.processed_at = datetime.utcnow()
session.commit()
return True
except Exception as e:
session.rollback()
self.logger.error(f"Failed to process message {message_id}: {e}")
# Mark as failed
if 'inbox_msg' in locals():
inbox_msg.status = 'failed'
inbox_msg.retry_count += 1
session.commit()
raise
def _retry_processing(self, session, inbox_msg: InboxMessage, payload: dict) -> bool:
"""Retry processing a failed message."""
self.processor.process(payload)
inbox_msg.status = 'processed'
inbox_msg.processed_at = datetime.utcnow()
session.commit()
return True
def cleanup(self, retention_days: int = 30):
"""Remove old processed messages."""
cutoff = datetime.utcnow() - timedelta(days=retention_days)
with self.Session() as session:
deleted = session.query(InboxMessage).filter(
InboxMessage.status == 'processed',
InboxMessage.processed_at < cutoff
).delete()
session.commit()
self.logger.info(f"Cleaned up {deleted} processed messages")
Integration with Message Brokers
Kafka Consumer Example (Go)
func consumeKafkaMessages(consumer *kafka.Consumer, inboxService *InboxService) {
for {
msg, err := consumer.ReadMessage(-1)
if err != nil {
log.Printf("Consumer error: %v", err)
continue
}
// Use Kafka offset as unique message ID
messageID := fmt.Sprintf("%s-%d-%d", *msg.TopicPartition.Topic,
msg.TopicPartition.Partition, msg.TopicPartition.Offset)
err = inboxService.ProcessMessage(context.Background(), messageID, msg.Value)
if err != nil {
log.Printf("Failed to process message: %v", err)
// Don't commit offset - message will be redelivered
continue
}
// Commit offset after successful processing
consumer.CommitMessages(context.Background(), msg)
}
}
Trade-offs and Considerations
Advantages
- Guaranteed Idempotency: Duplicate messages never cause duplicate side effects
- Atomic Processing: Business logic and deduplication happen atomically
- Audit Trail: Inbox table provides complete message processing history
- Retry Support: Failed messages can be retried without risk of duplication
Disadvantages
- Database Write Overhead: Every message requires database write, limiting throughput
- Storage Growth: Inbox table grows linearly with message volume (requires cleanup)
- Transaction Scope: Business logic must execute within database transaction
- Single Database Dependency: Pattern assumes single database (doesn’t span microservices)
Performance Optimization
- Batch Processing: Process multiple messages in single transaction
- Partitioning: Shard inbox table by message ID hash for write scalability
- Async Cleanup: Run cleanup as background job during off-peak hours
- Read Replicas: Use replicas for duplicate detection, primary for writes
- TTL/Expiry: Use database TTL features (Postgres partitions, DynamoDB TTL) for automatic cleanup
Complementary Patterns
- Outbox Pattern: For transactional event publishing (opposite direction)
- Saga Pattern: For distributed transactions spanning multiple services
- Circuit Breaker: Protect against downstream failures during message processing
- Dead Letter Queue: Handle messages that fail repeatedly after retries
Conclusion
The Transactional Inbox pattern elegantly solves the duplicate message problem in distributed systems by leveraging database transactions for idempotency. While it adds database overhead, the guarantee of exactly-once processing semantics is invaluable for critical operations.
For Principal Engineers designing event-driven microservices, this pattern should be a standard tool in the architecture toolkit, especially when working with Go or Python services that process Kafka, RabbitMQ, or SQS messages.