Transactional Inbox Pattern for Idempotent Message Processing

Transactional Inbox Pattern for Idempotent Message Processing

The Problem: Duplicate Message Handling

In distributed systems using message queues (Kafka, RabbitMQ, SQS), messages can be delivered more than once due to network failures, consumer crashes, or acknowledgment timeouts. Without proper handling, duplicate processing causes:

The at-least-once delivery guarantee common in message brokers requires consumers to implement idempotency—processing the same message multiple times produces the same result as processing it once.

The Solution: Transactional Inbox Pattern

The Transactional Inbox pattern ensures idempotent message processing by:

  1. Recording incoming messages in an “inbox” table within the same transaction as business logic
  2. Checking the inbox before processing to detect duplicates
  3. Atomically updating both business data and inbox status

This leverages database ACID properties to guarantee exactly-once processing semantics at the application level, even with at-least-once message delivery.

When to Use This Pattern

Use the Transactional Inbox when:

Don’t use when:

Architecture and Implementation

Core Components

  1. Inbox Table: Records all received messages with unique identifiers
  2. Message Handler: Business logic that processes messages
  3. Transaction Coordinator: Ensures atomic inbox updates and business operations
  4. Cleanup Process: Removes old processed messages to prevent unbounded growth

Implementation in Go

package inbox

import (
    "context"
    "database/sql"
    "encoding/json"
    "time"
)

// InboxMessage represents a received message
type InboxMessage struct {
    MessageID   string    `db:"message_id"`
    Payload     []byte    `db:"payload"`
    ReceivedAt  time.Time `db:"received_at"`
    ProcessedAt *time.Time `db:"processed_at"`
    Status      string    `db:"status"` // pending, processed, failed
}

// MessageProcessor handles business logic
type MessageProcessor interface {
    Process(ctx context.Context, payload []byte) error
}

// InboxService manages idempotent message processing
type InboxService struct {
    db        *sql.DB
    processor MessageProcessor
}

// ProcessMessage implements the transactional inbox pattern
func (s *InboxService) ProcessMessage(ctx context.Context, messageID string, payload []byte) error {
    // Start transaction
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // Check if message already exists (idempotency check)
    var existingMsg InboxMessage
    err = tx.QueryRowContext(ctx,
        "SELECT message_id, status FROM inbox WHERE message_id = $1",
        messageID,
    ).Scan(&existingMsg.MessageID, &existingMsg.Status)

    if err == nil {
        // Message already processed
        if existingMsg.Status == "processed" {
            return nil // Idempotent - already done
        }
        if existingMsg.Status == "pending" {
            // Retry processing failed message
            return s.retryProcessing(ctx, tx, messageID, payload)
        }
    } else if err != sql.ErrNoRows {
        return err
    }

    // Insert message into inbox
    _, err = tx.ExecContext(ctx,
        `INSERT INTO inbox (message_id, payload, received_at, status) 
         VALUES ($1, $2, $3, 'pending')`,
        messageID, payload, time.Now(),
    )
    if err != nil {
        return err
    }

    // Process business logic within transaction
    if err := s.processor.Process(ctx, payload); err != nil {
        // Mark as failed for retry
        tx.ExecContext(ctx,
            "UPDATE inbox SET status = 'failed' WHERE message_id = $1",
            messageID,
        )
        return err
    }

    // Mark as processed
    _, err = tx.ExecContext(ctx,
        "UPDATE inbox SET status = 'processed', processed_at = $1 WHERE message_id = $2",
        time.Now(), messageID,
    )
    if err != nil {
        return err
    }

    // Commit transaction - both business logic and inbox update succeed together
    return tx.Commit()
}

func (s *InboxService) retryProcessing(ctx context.Context, tx *sql.Tx, messageID string, payload []byte) error {
    if err := s.processor.Process(ctx, payload); err != nil {
        return err
    }

    _, err := tx.ExecContext(ctx,
        "UPDATE inbox SET status = 'processed', processed_at = $1 WHERE message_id = $2",
        time.Now(), messageID,
    )
    return err
}

// Cleanup removes old processed messages
func (s *InboxService) Cleanup(ctx context.Context, retentionDays int) error {
    _, err := s.db.ExecContext(ctx,
        "DELETE FROM inbox WHERE status = 'processed' AND processed_at < $1",
        time.Now().AddDate(0, 0, -retentionDays),
    )
    return err
}

Database Schema

CREATE TABLE inbox (
    message_id VARCHAR(255) PRIMARY KEY,
    payload JSONB NOT NULL,
    received_at TIMESTAMP NOT NULL DEFAULT NOW(),
    processed_at TIMESTAMP,
    status VARCHAR(20) NOT NULL DEFAULT 'pending',
    retry_count INTEGER DEFAULT 0,
    created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

-- Index for cleanup queries
CREATE INDEX idx_inbox_processed ON inbox(processed_at) 
WHERE status = 'processed';

-- Index for retry processing
CREATE INDEX idx_inbox_status ON inbox(status) 
WHERE status IN ('pending', 'failed');

Python Implementation with SQLAlchemy

from datetime import datetime, timedelta
from sqlalchemy import Column, String, DateTime, Integer, JSON, create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from contextlib import contextmanager
import logging

Base = declarative_base()

class InboxMessage(Base):
    __tablename__ = 'inbox'
    
    message_id = Column(String(255), primary_key=True)
    payload = Column(JSON, nullable=False)
    received_at = Column(DateTime, default=datetime.utcnow)
    processed_at = Column(DateTime, nullable=True)
    status = Column(String(20), default='pending')
    retry_count = Column(Integer, default=0)

class InboxService:
    def __init__(self, db_url: str, processor):
        self.engine = create_engine(db_url)
        self.Session = sessionmaker(bind=self.engine)
        self.processor = processor
        self.logger = logging.getLogger(__name__)
    
    def process_message(self, message_id: str, payload: dict) -> bool:
        """Process message idempotently using transactional inbox pattern."""
        with self.Session() as session:
            try:
                # Check for existing message
                existing = session.query(InboxMessage).filter_by(
                    message_id=message_id
                ).first()
                
                if existing:
                    if existing.status == 'processed':
                        self.logger.info(f"Message {message_id} already processed")
                        return True  # Idempotent
                    elif existing.status == 'failed':
                        # Retry failed message
                        return self._retry_processing(session, existing, payload)
                
                # Insert new message
                inbox_msg = InboxMessage(
                    message_id=message_id,
                    payload=payload,
                    status='pending'
                )
                session.add(inbox_msg)
                session.flush()  # Get ID before processing
                
                # Process business logic within transaction
                self.processor.process(payload)
                
                # Mark as processed
                inbox_msg.status = 'processed'
                inbox_msg.processed_at = datetime.utcnow()
                
                session.commit()
                return True
                
            except Exception as e:
                session.rollback()
                self.logger.error(f"Failed to process message {message_id}: {e}")
                
                # Mark as failed
                if 'inbox_msg' in locals():
                    inbox_msg.status = 'failed'
                    inbox_msg.retry_count += 1
                    session.commit()
                
                raise
    
    def _retry_processing(self, session, inbox_msg: InboxMessage, payload: dict) -> bool:
        """Retry processing a failed message."""
        self.processor.process(payload)
        inbox_msg.status = 'processed'
        inbox_msg.processed_at = datetime.utcnow()
        session.commit()
        return True
    
    def cleanup(self, retention_days: int = 30):
        """Remove old processed messages."""
        cutoff = datetime.utcnow() - timedelta(days=retention_days)
        with self.Session() as session:
            deleted = session.query(InboxMessage).filter(
                InboxMessage.status == 'processed',
                InboxMessage.processed_at < cutoff
            ).delete()
            session.commit()
            self.logger.info(f"Cleaned up {deleted} processed messages")

Integration with Message Brokers

Kafka Consumer Example (Go)

func consumeKafkaMessages(consumer *kafka.Consumer, inboxService *InboxService) {
    for {
        msg, err := consumer.ReadMessage(-1)
        if err != nil {
            log.Printf("Consumer error: %v", err)
            continue
        }

        // Use Kafka offset as unique message ID
        messageID := fmt.Sprintf("%s-%d-%d", *msg.TopicPartition.Topic, 
            msg.TopicPartition.Partition, msg.TopicPartition.Offset)

        err = inboxService.ProcessMessage(context.Background(), messageID, msg.Value)
        if err != nil {
            log.Printf("Failed to process message: %v", err)
            // Don't commit offset - message will be redelivered
            continue
        }

        // Commit offset after successful processing
        consumer.CommitMessages(context.Background(), msg)
    }
}

Trade-offs and Considerations

Advantages

Disadvantages

Performance Optimization

  1. Batch Processing: Process multiple messages in single transaction
  2. Partitioning: Shard inbox table by message ID hash for write scalability
  3. Async Cleanup: Run cleanup as background job during off-peak hours
  4. Read Replicas: Use replicas for duplicate detection, primary for writes
  5. TTL/Expiry: Use database TTL features (Postgres partitions, DynamoDB TTL) for automatic cleanup

Complementary Patterns

Conclusion

The Transactional Inbox pattern elegantly solves the duplicate message problem in distributed systems by leveraging database transactions for idempotency. While it adds database overhead, the guarantee of exactly-once processing semantics is invaluable for critical operations.

For Principal Engineers designing event-driven microservices, this pattern should be a standard tool in the architecture toolkit, especially when working with Go or Python services that process Kafka, RabbitMQ, or SQS messages.