Circuit Breaker Pattern for Resilient Distributed Services

Circuit Breaker Pattern for Resilient Distributed Services

In distributed systems, failures are inevitable. A single slow or failing service can cascade through your entire system, creating widespread outages. The Circuit Breaker pattern provides an elegant solution by preventing cascading failures and allowing systems to gracefully degrade.

What is the Circuit Breaker Pattern?

Inspired by electrical circuit breakers that prevent electrical overload, the software Circuit Breaker pattern monitors for failures and “opens the circuit” when failure rates exceed a threshold. This prevents the application from repeatedly attempting operations likely to fail, giving failing services time to recover while maintaining system stability.

The Three States

  1. Closed State (Normal Operation)

    • Requests pass through to the service
    • Failures are counted
    • When failure threshold is exceeded, transitions to Open
  2. Open State (Failing Fast)

    • Requests immediately fail without calling the service
    • After a timeout period, transitions to Half-Open
    • Returns fallback responses or cached data
  3. Half-Open State (Testing Recovery)

    • Limited number of requests are allowed through
    • If requests succeed, transitions back to Closed
    • If requests fail, returns to Open

When to Use Circuit Breakers

Ideal Use Cases

When NOT to Use

Implementation in Go

Here’s a production-ready Circuit Breaker implementation in Go:

package circuitbreaker

import (
    "context"
    "errors"
    "sync"
    "time"
)

var (
    ErrCircuitOpen     = errors.New("circuit breaker is open")
    ErrTooManyRequests = errors.New("too many requests in half-open state")
)

type State int

const (
    StateClosed State = iota
    StateHalfOpen
    StateOpen
)

type Config struct {
    MaxRequests       uint32        // Max requests allowed in half-open state
    Interval          time.Duration // Period to reset failure count in closed state
    Timeout           time.Duration // Period to transition from open to half-open
    FailureThreshold  uint32        // Number of failures to open circuit
    SuccessThreshold  uint32        // Number of successes to close circuit
}

type CircuitBreaker struct {
    config      Config
    state       State
    generation  uint64
    counts      *counters
    expiry      time.Time
    mu          sync.RWMutex
}

type counters struct {
    requests             uint32
    totalSuccesses       uint32
    totalFailures        uint32
    consecutiveSuccesses uint32
    consecutiveFailures  uint32
}

func NewCircuitBreaker(config Config) *CircuitBreaker {
    return &CircuitBreaker{
        config: config,
        state:  StateClosed,
        counts: &counters{},
        expiry: time.Now().Add(config.Interval),
    }
}

func (cb *CircuitBreaker) Execute(ctx context.Context, fn func() error) error {
    generation, err := cb.beforeRequest()
    if err != nil {
        return err
    }

    defer func() {
        if r := recover(); r != nil {
            cb.afterRequest(generation, false)
            panic(r)
        }
    }()

    err = fn()
    cb.afterRequest(generation, err == nil)
    return err
}

func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    now := time.Now()
    state, generation := cb.currentState(now)

    if state == StateOpen {
        return generation, ErrCircuitOpen
    }

    if state == StateHalfOpen && cb.counts.requests >= cb.config.MaxRequests {
        return generation, ErrTooManyRequests
    }

    cb.counts.requests++
    return generation, nil
}

func (cb *CircuitBreaker) afterRequest(generation uint64, success bool) {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    now := time.Now()
    state, currentGen := cb.currentState(now)

    if generation != currentGen {
        return
    }

    if success {
        cb.onSuccess(state)
    } else {
        cb.onFailure(state)
    }
}

func (cb *CircuitBreaker) onSuccess(state State) {
    cb.counts.totalSuccesses++
    cb.counts.consecutiveSuccesses++
    cb.counts.consecutiveFailures = 0

    if state == StateHalfOpen && 
       cb.counts.consecutiveSuccesses >= cb.config.SuccessThreshold {
        cb.setState(StateClosed)
    }
}

func (cb *CircuitBreaker) onFailure(state State) {
    cb.counts.totalFailures++
    cb.counts.consecutiveFailures++
    cb.counts.consecutiveSuccesses = 0

    if state == StateClosed && 
       cb.counts.consecutiveFailures >= cb.config.FailureThreshold {
        cb.setState(StateOpen)
    } else if state == StateHalfOpen {
        cb.setState(StateOpen)
    }
}

func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) {
    switch cb.state {
    case StateClosed:
        if now.After(cb.expiry) {
            cb.resetCounts()
            cb.expiry = now.Add(cb.config.Interval)
        }
    case StateOpen:
        if now.After(cb.expiry) {
            cb.setState(StateHalfOpen)
        }
    }
    return cb.state, cb.generation
}

func (cb *CircuitBreaker) setState(state State) {
    cb.generation++
    cb.resetCounts()
    
    switch state {
    case StateClosed:
        cb.expiry = time.Now().Add(cb.config.Interval)
    case StateOpen:
        cb.expiry = time.Now().Add(cb.config.Timeout)
    }
    
    cb.state = state
}

func (cb *CircuitBreaker) resetCounts() {
    cb.counts = &counters{}
}

// Usage example
func main() {
    cb := NewCircuitBreaker(Config{
        MaxRequests:      5,
        Interval:         10 * time.Second,
        Timeout:          60 * time.Second,
        FailureThreshold: 5,
        SuccessThreshold: 2,
    })

    err := cb.Execute(context.Background(), func() error {
        return callExternalService()
    })

    if err != nil {
        if errors.Is(err, ErrCircuitOpen) {
            // Return cached data or fallback response
            return handleFallback()
        }
        return err
    }
}

Implementation in Python

For Python services, particularly useful in ML inference pipelines:

from enum import Enum
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Callable, Any, Optional
import threading
import functools

class State(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

@dataclass
class CircuitBreakerConfig:
    max_requests: int = 5
    interval: timedelta = timedelta(seconds=10)
    timeout: timedelta = timedelta(seconds=60)
    failure_threshold: int = 5
    success_threshold: int = 2

class CircuitBreakerOpenError(Exception):
    pass

class CircuitBreaker:
    def __init__(self, config: CircuitBreakerConfig):
        self.config = config
        self.state = State.CLOSED
        self.generation = 0
        self.consecutive_failures = 0
        self.consecutive_successes = 0
        self.requests = 0
        self.expiry = datetime.now() + config.interval
        self.lock = threading.RLock()
    
    def __call__(self, func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            return self.execute(lambda: func(*args, **kwargs))
        return wrapper
    
    def execute(self, fn: Callable) -> Any:
        generation = self._before_request()
        
        try:
            result = fn()
            self._after_request(generation, success=True)
            return result
        except Exception as e:
            self._after_request(generation, success=False)
            raise
    
    def _before_request(self) -> int:
        with self.lock:
            now = datetime.now()
            state, generation = self._current_state(now)
            
            if state == State.OPEN:
                raise CircuitBreakerOpenError("Circuit breaker is open")
            
            if state == State.HALF_OPEN and self.requests >= self.config.max_requests:
                raise CircuitBreakerOpenError("Too many requests in half-open state")
            
            self.requests += 1
            return generation
    
    def _after_request(self, generation: int, success: bool):
        with self.lock:
            now = datetime.now()
            state, current_gen = self._current_state(now)
            
            if generation != current_gen:
                return
            
            if success:
                self._on_success(state)
            else:
                self._on_failure(state)
    
    def _on_success(self, state: State):
        self.consecutive_successes += 1
        self.consecutive_failures = 0
        
        if (state == State.HALF_OPEN and 
            self.consecutive_successes >= self.config.success_threshold):
            self._set_state(State.CLOSED)
    
    def _on_failure(self, state: State):
        self.consecutive_failures += 1
        self.consecutive_successes = 0
        
        if (state == State.CLOSED and 
            self.consecutive_failures >= self.config.failure_threshold):
            self._set_state(State.OPEN)
        elif state == State.HALF_OPEN:
            self._set_state(State.OPEN)
    
    def _current_state(self, now: datetime) -> tuple[State, int]:
        if self.state == State.CLOSED and now > self.expiry:
            self._reset_counts()
            self.expiry = now + self.config.interval
        elif self.state == State.OPEN and now > self.expiry:
            self._set_state(State.HALF_OPEN)
        
        return self.state, self.generation
    
    def _set_state(self, state: State):
        self.generation += 1
        self._reset_counts()
        
        if state == State.CLOSED:
            self.expiry = datetime.now() + self.config.interval
        elif state == State.OPEN:
            self.expiry = datetime.now() + self.config.timeout
        
        self.state = state
    
    def _reset_counts(self):
        self.requests = 0
        self.consecutive_successes = 0
        self.consecutive_failures = 0

# Usage with decorator
config = CircuitBreakerConfig(
    failure_threshold=5,
    timeout=timedelta(seconds=60),
    success_threshold=2
)
cb = CircuitBreaker(config)

@cb
def call_ml_inference_api(data):
    # Call to external ML inference service
    return requests.post("https://ml-api.example.com/predict", json=data)

# Usage with fallback
try:
    result = call_ml_inference_api({"features": [1, 2, 3]})
except CircuitBreakerOpenError:
    result = get_cached_prediction()

Trade-offs and Considerations

Advantages

Disadvantages

Best Practices

  1. Tune per service: Different services need different thresholds and timeouts
  2. Implement fallbacks: Always have graceful degradation strategies
  3. Monitor state changes: Alert on circuit opens and extended open periods
  4. Use adaptive thresholds: Consider dynamic adjustment based on traffic patterns
  5. Combine with retries: Use exponential backoff before circuit breaker catches failures
  6. Test failure scenarios: Regularly verify circuit breaker behavior in chaos testing
  7. Consider bulkheads: Combine with thread pool isolation for complete resilience
  8. Emit metrics: Track success rates, response times, and state transitions

Integration with Observability

Circuit breakers are most effective when integrated with comprehensive monitoring:

// Example with metrics
func (cb *CircuitBreaker) setState(state State) {
    oldState := cb.state
    cb.state = state
    cb.generation++
    
    // Emit metrics
    metrics.Increment(fmt.Sprintf("circuit_breaker.state.%s", state))
    
    // Log state transitions
    log.WithFields(log.Fields{
        "old_state": oldState,
        "new_state": state,
        "service":   cb.serviceName,
    }).Warn("Circuit breaker state changed")
    
    // Alert on prolonged open state
    if state == StateOpen {
        time.AfterFunc(5*time.Minute, func() {
            if cb.State() == StateOpen {
                alerts.Send("Circuit breaker stuck open", cb.serviceName)
            }
        })
    }
}

Conclusion

The Circuit Breaker pattern is essential for building resilient distributed systems. By preventing cascading failures and providing automatic recovery testing, it enables systems to gracefully handle partial failures—a fundamental requirement in microservice architectures. While it adds complexity, the operational benefits of fast failure and self-healing far outweigh the implementation overhead for production systems handling critical workloads.