Saga Pattern for Distributed Transactions

Saga Pattern for Distributed Transactions

Overview

The Saga pattern is a design pattern for managing distributed transactions across multiple microservices without relying on traditional two-phase commit protocols. Instead of a single atomic transaction, a saga breaks down a business transaction into a sequence of local transactions, each with a corresponding compensating transaction to undo changes if something goes wrong.

In modern distributed systems, maintaining ACID properties across service boundaries is extremely challenging and often impossible. The Saga pattern provides an alternative approach that embraces eventual consistency while maintaining business logic integrity.

When to Use

The Saga pattern is ideal when:

Avoid this pattern when:

Two Approaches: Choreography vs Orchestration

Choreography-Based Saga

Services emit events and listen to events from other services, coordinating through a message bus without a central controller.

Pros:

Cons:

Orchestration-Based Saga

A central orchestrator explicitly calls services and manages the saga flow.

Pros:

Cons:

Implementation Example: Order Processing with Go

Let’s implement an orchestration-based saga for an e-commerce order that involves:

  1. Reserve inventory
  2. Process payment
  3. Create shipment
package saga

import (
    "context"
    "fmt"
)

// SagaStep represents a single step in the saga
type SagaStep struct {
    Name        string
    Action      func(ctx context.Context, data interface{}) error
    Compensate  func(ctx context.Context, data interface{}) error
}

// Saga orchestrates a series of steps with compensation
type Saga struct {
    steps []SagaStep
}

// NewSaga creates a new saga instance
func NewSaga() *Saga {
    return &Saga{
        steps: make([]SagaStep, 0),
    }
}

// AddStep adds a step to the saga
func (s *Saga) AddStep(step SagaStep) *Saga {
    s.steps = append(s.steps, step)
    return s
}

// Execute runs the saga, compensating on failure
func (s *Saga) Execute(ctx context.Context, data interface{}) error {
    completedSteps := make([]int, 0)
    
    // Execute forward flow
    for i, step := range s.steps {
        fmt.Printf("Executing step: %s\n", step.Name)
        
        if err := step.Action(ctx, data); err != nil {
            fmt.Printf("Step %s failed: %v\n", step.Name, err)
            
            // Compensate in reverse order
            s.compensate(ctx, data, completedSteps)
            
            return fmt.Errorf("saga failed at step %s: %w", step.Name, err)
        }
        
        completedSteps = append(completedSteps, i)
    }
    
    fmt.Println("Saga completed successfully")
    return nil
}

// compensate runs compensation in reverse order
func (s *Saga) compensate(ctx context.Context, data interface{}, completedSteps []int) {
    fmt.Println("Starting compensation...")
    
    // Compensate in reverse order
    for i := len(completedSteps) - 1; i >= 0; i-- {
        step := s.steps[completedSteps[i]]
        
        fmt.Printf("Compensating step: %s\n", step.Name)
        
        if err := step.Compensate(ctx, data); err != nil {
            // Log compensation failure - this is critical
            fmt.Printf("CRITICAL: Compensation failed for %s: %v\n", step.Name, err)
            // In production, alert operations team
        }
    }
}

// OrderData represents the saga data
type OrderData struct {
    OrderID         string
    UserID          string
    Items           []string
    PaymentID       string
    ShipmentID      string
    InventoryReserved bool
}

// Example usage with order processing
func ProcessOrder(ctx context.Context, order *OrderData) error {
    saga := NewSaga().
        AddStep(SagaStep{
            Name: "ReserveInventory",
            Action: func(ctx context.Context, data interface{}) error {
                order := data.(*OrderData)
                // Call inventory service
                fmt.Printf("Reserving inventory for order %s\n", order.OrderID)
                // Simulate success
                order.InventoryReserved = true
                return nil
            },
            Compensate: func(ctx context.Context, data interface{}) error {
                order := data.(*OrderData)
                fmt.Printf("Releasing inventory for order %s\n", order.OrderID)
                // Call inventory service to release
                order.InventoryReserved = false
                return nil
            },
        }).
        AddStep(SagaStep{
            Name: "ProcessPayment",
            Action: func(ctx context.Context, data interface{}) error {
                order := data.(*OrderData)
                fmt.Printf("Processing payment for order %s\n", order.OrderID)
                // Call payment service
                order.PaymentID = "pay_12345"
                return nil
            },
            Compensate: func(ctx context.Context, data interface{}) error {
                order := data.(*OrderData)
                fmt.Printf("Refunding payment %s\n", order.PaymentID)
                // Call payment service to refund
                return nil
            },
        }).
        AddStep(SagaStep{
            Name: "CreateShipment",
            Action: func(ctx context.Context, data interface{}) error {
                order := data.(*OrderData)
                fmt.Printf("Creating shipment for order %s\n", order.OrderID)
                // Call shipping service
                order.ShipmentID = "ship_67890"
                return nil
            },
            Compensate: func(ctx context.Context, data interface{}) error {
                order := data.(*OrderData)
                fmt.Printf("Cancelling shipment %s\n", order.ShipmentID)
                // Call shipping service to cancel
                return nil
            },
        })
    
    return saga.Execute(ctx, order)
}

Python Implementation with Async/Await

For Python-based microservices, here’s an async saga implementation:

from typing import Callable, List, Any
from dataclasses import dataclass
import asyncio

@dataclass
class SagaStep:
    name: str
    action: Callable
    compensate: Callable

class AsyncSaga:
    def __init__(self):
        self.steps: List[SagaStep] = []
    
    def add_step(self, step: SagaStep) -> 'AsyncSaga':
        self.steps.append(step)
        return self
    
    async def execute(self, context: dict) -> bool:
        completed_steps = []
        
        try:
            for step in self.steps:
                print(f"Executing: {step.name}")
                await step.action(context)
                completed_steps.append(step)
            
            print("Saga completed successfully")
            return True
            
        except Exception as e:
            print(f"Saga failed: {e}")
            await self._compensate(context, completed_steps)
            return False
    
    async def _compensate(self, context: dict, completed_steps: List[SagaStep]):
        print("Starting compensation...")
        
        for step in reversed(completed_steps):
            try:
                print(f"Compensating: {step.name}")
                await step.compensate(context)
            except Exception as e:
                print(f"CRITICAL: Compensation failed for {step.name}: {e}")
                # Alert operations team

# Example usage
async def reserve_inventory(context: dict):
    print(f"Reserving inventory for order {context['order_id']}")
    # Call inventory service
    context['inventory_reserved'] = True

async def release_inventory(context: dict):
    print(f"Releasing inventory for order {context['order_id']}")
    context['inventory_reserved'] = False

async def process_payment(context: dict):
    print(f"Processing payment for order {context['order_id']}")
    context['payment_id'] = 'pay_12345'

async def refund_payment(context: dict):
    print(f"Refunding payment {context['payment_id']}")

Key Trade-offs

Benefits

Challenges

Best Practices

  1. Make compensations idempotent: Compensation may be called multiple times
  2. Log everything: Comprehensive logging is essential for debugging distributed sagas
  3. Use correlation IDs: Track the entire saga flow across services
  4. Handle compensation failures: Have alerts and manual intervention procedures
  5. Design for failure: Assume any step can fail at any time
  6. Timeout management: Set appropriate timeouts for each step
  7. Monitoring: Track saga success rates, duration, and failure points
  8. State persistence: Store saga state to recover from orchestrator failures

Monitoring and Observability

type SagaMetrics struct {
    StartTime     time.Time
    CompletedSteps int
    FailedStep    string
    TotalDuration time.Duration
}

func (s *Saga) ExecuteWithMetrics(ctx context.Context, data interface{}) (error, SagaMetrics) {
    metrics := SagaMetrics{
        StartTime: time.Now(),
    }
    
    err := s.Execute(ctx, data)
    
    metrics.TotalDuration = time.Since(metrics.StartTime)
    // Record metrics to monitoring system
    
    return err, metrics
}

Conclusion

The Saga pattern is essential for building reliable distributed systems that need to coordinate business transactions across multiple microservices. While it introduces complexity through compensation logic and eventual consistency, it provides the scalability and resilience required for modern cloud-native applications. Choose orchestration for simpler flows with clear business logic, and choreography for loosely-coupled systems requiring high autonomy.