Two-Phase Commit Protocol for Distributed Transactions

Two-Phase Commit Protocol for Distributed Transactions

The Problem

In distributed systems, you often need to perform operations across multiple databases or services atomically - either all operations succeed, or none do. A classic example: transferring money between accounts stored in different databases, or creating an order while updating inventory in separate systems.

Without coordination, you face consistency problems:

Local database transactions don’t span network boundaries, and eventual consistency patterns don’t work when you need immediate guarantees.

What is Two-Phase Commit (2PC)?

Two-Phase Commit is a distributed algorithm that ensures atomic transaction commitment across multiple participants. It divides the transaction into two phases:

Phase 1 - Prepare (Voting Phase):

Phase 2 - Commit (Decision Phase):

The protocol guarantees atomicity: all participants commit or all abort.

When to Use Two-Phase Commit

Good Use Cases:

When to Avoid:

How It Works: Detailed Protocol

The Happy Path

Client          Coordinator         Service A         Service B
  |                 |                   |                 |
  |--Transaction--->|                   |                 |
  |                 |                   |                 |
  |                 |----PREPARE------->|                 |
  |                 |----PREPARE---------------------→|
  |                 |                   |                 |
  |                 |<-----YES----------|                 |
  |                 |<-----YES------------------------|
  |                 |                   |                 |
  |                 |----COMMIT-------->|                 |
  |                 |----COMMIT-------------------→|
  |                 |                   |                 |
  |                 |<-----ACK----------|                 |
  |                 |<-----ACK------------------------|
  |<--Success-------|                   |                 |

The Failure Path

Client          Coordinator         Service A         Service B
  |                 |                   |                 |
  |--Transaction--->|                   |                 |
  |                 |                   |                 |
  |                 |----PREPARE------->|                 |
  |                 |----PREPARE---------------------→|
  |                 |                   |                 |
  |                 |<-----YES----------|                 |
  |                 |<-----NO-------------------------|  (validation failed)
  |                 |                   |                 |
  |                 |----ABORT--------->|                 |
  |                 |----ABORT--------------------→|
  |                 |                   |                 |
  |                 |<-----ACK----------|                 |
  |                 |<-----ACK------------------------|
  |<--Failure-------|                   |                 |

Implementation Example in Go

Here’s a practical 2PC implementation for coordinating database updates:

package twophasecommit

import (
    "context"
    "errors"
    "fmt"
    "sync"
    "time"
)

// TransactionID uniquely identifies a distributed transaction
type TransactionID string

// Participant represents a service that can participate in 2PC
type Participant interface {
    Prepare(ctx context.Context, txID TransactionID) error
    Commit(ctx context.Context, txID TransactionID) error
    Abort(ctx context.Context, txID TransactionID) error
}

// Coordinator manages the two-phase commit protocol
type Coordinator struct {
    participants []Participant
    timeout      time.Duration
    log          TransactionLog
}

// TransactionLog provides durable storage for transaction state
type TransactionLog interface {
    LogPreparing(txID TransactionID) error
    LogCommitting(txID TransactionID) error
    LogAborting(txID TransactionID) error
    LogCommitted(txID TransactionID) error
    LogAborted(txID TransactionID) error
}

func NewCoordinator(participants []Participant, timeout time.Duration, log TransactionLog) *Coordinator {
    return &Coordinator{
        participants: participants,
        timeout:      timeout,
        log:          log,
    }
}

// Execute runs a distributed transaction using 2PC
func (c *Coordinator) Execute(ctx context.Context, txID TransactionID) error {
    // Log that we're starting the prepare phase
    if err := c.log.LogPreparing(txID); err != nil {
        return fmt.Errorf("failed to log prepare phase: %w", err)
    }

    // Phase 1: Prepare
    if err := c.preparePhase(ctx, txID); err != nil {
        // If prepare fails, abort the transaction
        c.abortPhase(ctx, txID)
        return fmt.Errorf("prepare phase failed: %w", err)
    }

    // Log that we're committing
    if err := c.log.LogCommitting(txID); err != nil {
        // Critical: we're past prepare, must try to commit
        // In production, this would trigger recovery procedures
        return fmt.Errorf("failed to log commit phase: %w", err)
    }

    // Phase 2: Commit
    if err := c.commitPhase(ctx, txID); err != nil {
        // This is a critical failure - we're in an inconsistent state
        // Production systems would retry indefinitely or trigger manual intervention
        return fmt.Errorf("commit phase failed: %w", err)
    }

    // Log successful completion
    if err := c.log.LogCommitted(txID); err != nil {
        // Transaction is committed, log failure is non-critical
        return fmt.Errorf("transaction committed but logging failed: %w", err)
    }

    return nil
}

func (c *Coordinator) preparePhase(ctx context.Context, txID TransactionID) error {
    ctx, cancel := context.WithTimeout(ctx, c.timeout)
    defer cancel()

    type result struct {
        index int
        err   error
    }

    results := make(chan result, len(c.participants))
    var wg sync.WaitGroup

    // Send prepare to all participants in parallel
    for i, participant := range c.participants {
        wg.Add(1)
        go func(index int, p Participant) {
            defer wg.Done()
            err := p.Prepare(ctx, txID)
            results <- result{index: index, err: err}
        }(i, participant)
    }

    // Wait for all participants to respond
    wg.Wait()
    close(results)

    // Check if all participants voted YES
    for res := range results {
        if res.err != nil {
            return fmt.Errorf("participant %d voted NO: %w", res.index, res.err)
        }
    }

    return nil
}

func (c *Coordinator) commitPhase(ctx context.Context, txID TransactionID) error {
    ctx, cancel := context.WithTimeout(ctx, c.timeout)
    defer cancel()

    var wg sync.WaitGroup
    errors := make(chan error, len(c.participants))

    // Send commit to all participants
    for _, participant := range c.participants {
        wg.Add(1)
        go func(p Participant) {
            defer wg.Done()
            if err := p.Commit(ctx, txID); err != nil {
                errors <- err
            }
        }(participant)
    }

    wg.Wait()
    close(errors)

    // Collect any errors
    var errs []error
    for err := range errors {
        errs = append(errs, err)
    }

    if len(errs) > 0 {
        return fmt.Errorf("commit phase had %d failures: %v", len(errs), errs)
    }

    return nil
}

func (c *Coordinator) abortPhase(ctx context.Context, txID TransactionID) error {
    if err := c.log.LogAborting(txID); err != nil {
        return err
    }

    ctx, cancel := context.WithTimeout(ctx, c.timeout)
    defer cancel()

    var wg sync.WaitGroup

    // Send abort to all participants (best effort)
    for _, participant := range c.participants {
        wg.Add(1)
        go func(p Participant) {
            defer wg.Done()
            _ = p.Abort(ctx, txID)  // Ignore errors during abort
        }(participant)
    }

    wg.Wait()

    return c.log.LogAborted(txID)
}

Example Participant Implementation

package twophasecommit

import (
    "context"
    "database/sql"
    "fmt"
)

// DatabaseParticipant wraps a database connection to participate in 2PC
type DatabaseParticipant struct {
    db     *sql.DB
    preparedTxns map[TransactionID]*sql.Tx
    mu     sync.Mutex
}

func NewDatabaseParticipant(db *sql.DB) *DatabaseParticipant {
    return &DatabaseParticipant{
        db:           db,
        preparedTxns: make(map[TransactionID]*sql.Tx),
    }
}

func (p *DatabaseParticipant) Prepare(ctx context.Context, txID TransactionID) error {
    // Begin a database transaction
    tx, err := p.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("failed to begin transaction: %w", err)
    }

    // Perform the business logic (example: update account balance)
    _, err = tx.ExecContext(ctx, "UPDATE accounts SET balance = balance - 100 WHERE id = ?", 123)
    if err != nil {
        tx.Rollback()
        return fmt.Errorf("failed to execute update: %w", err)
    }

    // Store the transaction for later commit/abort
    p.mu.Lock()
    p.preparedTxns[txID] = tx
    p.mu.Unlock()

    // Return success (vote YES)
    return nil
}

func (p *DatabaseParticipant) Commit(ctx context.Context, txID TransactionID) error {
    p.mu.Lock()
    tx, exists := p.preparedTxns[txID]
    delete(p.preparedTxns, txID)
    p.mu.Unlock()

    if !exists {
        return errors.New("transaction not found")
    }

    return tx.Commit()
}

func (p *DatabaseParticipant) Abort(ctx context.Context, txID TransactionID) error {
    p.mu.Lock()
    tx, exists := p.preparedTxns[txID]
    delete(p.preparedTxns, txID)
    p.mu.Unlock()

    if !exists {
        return nil  // Already aborted or never prepared
    }

    return tx.Rollback()
}

Implementation in Python with Microservices

from typing import List, Protocol
from dataclasses import dataclass
from enum import Enum
import asyncio
import logging

class TransactionState(Enum):
    PREPARING = "preparing"
    COMMITTING = "committing"
    ABORTING = "aborting"
    COMMITTED = "committed"
    ABORTED = "aborted"

@dataclass
class TransactionID:
    id: str

class Participant(Protocol):
    async def prepare(self, tx_id: TransactionID) -> bool:
        """Returns True if ready to commit, False otherwise"""
        ...
    
    async def commit(self, tx_id: TransactionID) -> None:
        """Commits the transaction"""
        ...
    
    async def abort(self, tx_id: TransactionID) -> None:
        """Aborts the transaction"""
        ...

class TransactionLog:
    """Durable log for transaction state"""
    
    async def log_state(self, tx_id: TransactionID, state: TransactionState) -> None:
        # In production, write to durable storage (database, file, etc.)
        logging.info(f"Transaction {tx_id.id}: {state.value}")

class Coordinator:
    def __init__(
        self, 
        participants: List[Participant], 
        log: TransactionLog,
        timeout: float = 30.0
    ):
        self.participants = participants
        self.log = log
        self.timeout = timeout
    
    async def execute(self, tx_id: TransactionID) -> bool:
        """
        Executes a distributed transaction using 2PC.
        Returns True if committed, False if aborted.
        """
        try:
            # Log prepare phase
            await self.log.log_state(tx_id, TransactionState.PREPARING)
            
            # Phase 1: Prepare
            if not await self._prepare_phase(tx_id):
                await self._abort_phase(tx_id)
                return False
            
            # Log commit phase
            await self.log.log_state(tx_id, TransactionState.COMMITTING)
            
            # Phase 2: Commit
            await self._commit_phase(tx_id)
            
            # Log success
            await self.log.log_state(tx_id, TransactionState.COMMITTED)
            return True
            
        except Exception as e:
            logging.error(f"Transaction {tx_id.id} failed: {e}")
            await self._abort_phase(tx_id)
            return False
    
    async def _prepare_phase(self, tx_id: TransactionID) -> bool:
        """Returns True if all participants vote YES"""
        try:
            # Send prepare to all participants with timeout
            results = await asyncio.wait_for(
                asyncio.gather(
                    *[p.prepare(tx_id) for p in self.participants],
                    return_exceptions=True
                ),
                timeout=self.timeout
            )
            
            # Check if all participants voted YES
            for i, result in enumerate(results):
                if isinstance(result, Exception):
                    logging.error(f"Participant {i} failed: {result}")
                    return False
                if not result:
                    logging.info(f"Participant {i} voted NO")
                    return False
            
            return True
            
        except asyncio.TimeoutError:
            logging.error(f"Prepare phase timed out for transaction {tx_id.id}")
            return False
    
    async def _commit_phase(self, tx_id: TransactionID) -> None:
        """Sends commit to all participants"""
        results = await asyncio.gather(
            *[p.commit(tx_id) for p in self.participants],
            return_exceptions=True
        )
        
        # Log any failures (but transaction is still committed)
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                logging.error(f"Participant {i} commit failed: {result}")
    
    async def _abort_phase(self, tx_id: TransactionID) -> None:
        """Sends abort to all participants"""
        await self.log.log_state(tx_id, TransactionState.ABORTING)
        
        # Best effort abort
        await asyncio.gather(
            *[p.abort(tx_id) for p in self.participants],
            return_exceptions=True
        )
        
        await self.log.log_state(tx_id, TransactionState.ABORTED)

Trade-offs and Limitations

Advantages

Strong Consistency: Guarantees atomicity across distributed systems
Simple to Understand: Clear protocol with well-defined semantics
Immediate Consistency: No eventual consistency window
Works with Existing Databases: Leverages local transaction support

Disadvantages

Blocking Protocol: Participants must wait for coordinator decision
Single Point of Failure: Coordinator failure blocks all participants
Poor Performance: Multiple network round trips and resource locking
Scalability Issues: Doesn’t scale to large numbers of participants
Availability Impact: One slow/failed participant blocks everyone

Comparison with Alternatives

PatternConsistencyAvailabilityPerformanceComplexity
2PCStrongLowPoorMedium
SagaEventualHighGoodHigh
3PCStrongMediumPoorHigh
Event SourcingEventualHighGoodHigh

Production Considerations

Timeout Handling

Set aggressive timeouts but implement retry logic for transient failures:

const (
    prepareTimeout = 5 * time.Second
    commitTimeout = 10 * time.Second
    maxRetries = 3
)

Transaction Log Durability

The coordinator’s transaction log MUST be durable. Use:

Failure Recovery

Implement recovery procedures for coordinator crashes:

  1. Read transaction log on startup
  2. For transactions in COMMITTING state: retry commit
  3. For transactions in PREPARING state: timeout and abort
  4. For transactions in ABORTING state: retry abort

Monitoring and Alerting

Track these metrics:

Conclusion

Two-Phase Commit provides strong consistency guarantees for distributed transactions but comes with significant availability and performance costs. Use it when you absolutely need atomic commits across systems and can tolerate the blocking behavior.

For most modern distributed systems, consider alternative patterns like Sagas or Event Sourcing that offer better availability and scalability. Reserve 2PC for specific use cases where the consistency guarantees justify the trade-offs - typically financial systems, booking engines, or critical inventory management.

When implementing 2PC, invest heavily in monitoring, alerting, and recovery procedures. The protocol’s blocking nature means failures are particularly painful, so operational excellence is critical.