Saga Pattern for Distributed Transactions

Saga Pattern for Distributed Transactions

Overview

The Saga pattern is a design pattern for managing distributed transactions across multiple microservices without relying on traditional two-phase commit protocols. Instead of locking resources across services, Sagas break long-lived transactions into a sequence of local transactions, each with a corresponding compensating transaction to handle failures.

The Problem

In microservices architectures, maintaining data consistency across services is challenging. Traditional ACID transactions with distributed locking don’t scale well and create tight coupling. Consider an e-commerce order workflow:

  1. Reserve inventory
  2. Process payment
  3. Create shipment
  4. Send notification

If step 3 fails after steps 1 and 2 succeed, how do you rollback the inventory reservation and payment? Distributed transactions are complex, slow, and often impossible when integrating with external services.

The Saga Solution

A Saga coordinates a sequence of local transactions through either:

  1. Choreography: Each service produces and listens to events, no central coordinator
  2. Orchestration: A central orchestrator tells each service what to do

Each local transaction updates its database and publishes an event or message. If a transaction fails, the Saga executes compensating transactions to undo the changes.

When to Use

Use the Saga pattern when:

Avoid when:

Implementation: Orchestration Approach (Go)

Here’s a practical implementation using the orchestration pattern with Go:

package saga

import (
    "context"
    "fmt"
    "log"
)

// Step represents a single step in the saga
type Step struct {
    Name        string
    Action      func(ctx context.Context) error
    Compensate  func(ctx context.Context) error
}

// Orchestrator manages the saga execution
type Orchestrator struct {
    steps []Step
}

func NewOrchestrator() *Orchestrator {
    return &Orchestrator{
        steps: make([]Step, 0),
    }
}

func (o *Orchestrator) AddStep(step Step) {
    o.steps = append(o.steps, step)
}

// Execute runs the saga with automatic compensation on failure
func (o *Orchestrator) Execute(ctx context.Context) error {
    completedSteps := make([]Step, 0)
    
    // Execute each step
    for _, step := range o.steps {
        log.Printf("Executing step: %s", step.Name)
        
        if err := step.Action(ctx); err != nil {
            log.Printf("Step %s failed: %v. Starting compensation...", step.Name, err)
            
            // Compensate in reverse order
            if compensateErr := o.compensate(ctx, completedSteps); compensateErr != nil {
                return fmt.Errorf("saga failed and compensation failed: original error: %w, compensation error: %v", 
                    err, compensateErr)
            }
            
            return fmt.Errorf("saga failed at step %s: %w", step.Name, err)
        }
        
        completedSteps = append(completedSteps, step)
    }
    
    log.Println("Saga completed successfully")
    return nil
}

func (o *Orchestrator) compensate(ctx context.Context, completedSteps []Step) error {
    // Compensate in reverse order
    for i := len(completedSteps) - 1; i >= 0; i-- {
        step := completedSteps[i]
        log.Printf("Compensating step: %s", step.Name)
        
        if err := step.Compensate(ctx); err != nil {
            // Log but continue compensation
            log.Printf("Warning: compensation failed for step %s: %v", step.Name, err)
            return err
        }
    }
    
    return nil
}

Real-World Example: Order Processing

package main

import (
    "context"
    "errors"
    "fmt"
    "saga"
)

type OrderService struct {
    inventoryService *InventoryService
    paymentService   *PaymentService
    shippingService  *ShippingService
}

type OrderRequest struct {
    OrderID   string
    ProductID string
    Amount    float64
}

func (s *OrderService) ProcessOrder(ctx context.Context, req OrderRequest) error {
    orchestrator := saga.NewOrchestrator()
    
    var reservationID, paymentID, shipmentID string
    
    // Step 1: Reserve Inventory
    orchestrator.AddStep(saga.Step{
        Name: "ReserveInventory",
        Action: func(ctx context.Context) error {
            id, err := s.inventoryService.Reserve(ctx, req.ProductID, 1)
            if err != nil {
                return fmt.Errorf("inventory reservation failed: %w", err)
            }
            reservationID = id
            return nil
        },
        Compensate: func(ctx context.Context) error {
            if reservationID == "" {
                return nil
            }
            return s.inventoryService.CancelReservation(ctx, reservationID)
        },
    })
    
    // Step 2: Process Payment
    orchestrator.AddStep(saga.Step{
        Name: "ProcessPayment",
        Action: func(ctx context.Context) error {
            id, err := s.paymentService.Charge(ctx, req.Amount)
            if err != nil {
                return fmt.Errorf("payment failed: %w", err)
            }
            paymentID = id
            return nil
        },
        Compensate: func(ctx context.Context) error {
            if paymentID == "" {
                return nil
            }
            return s.paymentService.Refund(ctx, paymentID)
        },
    })
    
    // Step 3: Create Shipment
    orchestrator.AddStep(saga.Step{
        Name: "CreateShipment",
        Action: func(ctx context.Context) error {
            id, err := s.shippingService.CreateShipment(ctx, req.OrderID)
            if err != nil {
                return fmt.Errorf("shipment creation failed: %w", err)
            }
            shipmentID = id
            return nil
        },
        Compensate: func(ctx context.Context) error {
            if shipmentID == "" {
                return nil
            }
            return s.shippingService.CancelShipment(ctx, shipmentID)
        },
    })
    
    return orchestrator.Execute(ctx)
}

Python Implementation with Async/Await

For Python microservices using async frameworks:

from typing import Callable, List, Optional
import asyncio
import logging

logger = logging.getLogger(__name__)

class SagaStep:
    def __init__(
        self,
        name: str,
        action: Callable,
        compensate: Callable,
    ):
        self.name = name
        self.action = action
        self.compensate = compensate

class SagaOrchestrator:
    def __init__(self):
        self.steps: List[SagaStep] = []
    
    def add_step(self, step: SagaStep):
        self.steps.append(step)
    
    async def execute(self) -> bool:
        completed_steps = []
        
        try:
            for step in self.steps:
                logger.info(f"Executing step: {step.name}")
                await step.action()
                completed_steps.append(step)
                
        except Exception as e:
            logger.error(f"Saga failed: {e}. Starting compensation...")
            await self._compensate(completed_steps)
            raise
        
        logger.info("Saga completed successfully")
        return True
    
    async def _compensate(self, completed_steps: List[SagaStep]):
        # Compensate in reverse order
        for step in reversed(completed_steps):
            try:
                logger.info(f"Compensating step: {step.name}")
                await step.compensate()
            except Exception as e:
                logger.error(f"Compensation failed for {step.name}: {e}")

# Example usage
async def process_order(order_id: str, amount: float):
    saga = SagaOrchestrator()
    
    inventory_id = None
    payment_id = None
    
    async def reserve_inventory():
        nonlocal inventory_id
        inventory_id = await inventory_service.reserve(order_id)
    
    async def compensate_inventory():
        if inventory_id:
            await inventory_service.release(inventory_id)
    
    async def process_payment():
        nonlocal payment_id
        payment_id = await payment_service.charge(amount)
    
    async def compensate_payment():
        if payment_id:
            await payment_service.refund(payment_id)
    
    saga.add_step(SagaStep("reserve_inventory", reserve_inventory, compensate_inventory))
    saga.add_step(SagaStep("process_payment", process_payment, compensate_payment))
    
    await saga.execute()

State Persistence and Recovery

For production systems, persist saga state to handle service restarts:

type SagaState struct {
    SagaID          string
    CurrentStep     int
    CompletedSteps  []string
    Status          string // "running", "completed", "compensating", "failed"
    CreatedAt       time.Time
    UpdatedAt       time.Time
}

type PersistentOrchestrator struct {
    steps      []Step
    repository SagaRepository
}

func (o *PersistentOrchestrator) Execute(ctx context.Context, sagaID string) error {
    state, err := o.repository.Get(ctx, sagaID)
    if err != nil {
        state = &SagaState{
            SagaID:    sagaID,
            Status:    "running",
            CreatedAt: time.Now(),
        }
    }
    
    for i := state.CurrentStep; i < len(o.steps); i++ {
        step := o.steps[i]
        
        if err := step.Action(ctx); err != nil {
            state.Status = "compensating"
            o.repository.Update(ctx, state)
            
            return o.compensate(ctx, state)
        }
        
        state.CurrentStep = i + 1
        state.CompletedSteps = append(state.CompletedSteps, step.Name)
        o.repository.Update(ctx, state)
    }
    
    state.Status = "completed"
    o.repository.Update(ctx, state)
    return nil
}

Trade-offs

Advantages:

Disadvantages:

Best Practices

  1. Make operations idempotent: Services may receive duplicate messages
  2. Design compensating transactions carefully: They must reliably undo the forward transaction
  3. Use correlation IDs: Track saga execution across services
  4. Implement timeouts: Don’t wait forever for failed services
  5. Monitor saga state: Track success rates, compensation frequency, and step durations
  6. Handle partial failures: Compensations themselves can fail
  7. Persist saga state: Enable recovery after service restarts
  8. Use semantic locks: Prevent concurrent modifications (e.g., “reservation” instead of pessimistic lock)

Conclusion

The Saga pattern is essential for maintaining consistency in distributed systems without sacrificing availability and scalability. While it introduces complexity through compensating transactions and eventual consistency, it’s often the only practical approach for managing multi-service workflows. The orchestration approach provides better visibility and control, making it preferable for complex business processes where monitoring and debugging are critical.