Two-Phase Commit Protocol for Distributed Transactions
Two-Phase Commit Protocol for Distributed Transactions
The Problem
In distributed systems, you often need to perform operations across multiple databases or services atomically - either all operations succeed, or none do. A classic example: transferring money between accounts stored in different databases, or creating an order while updating inventory in separate systems.
Without coordination, you face consistency problems:
- Account A is debited but Account B is never credited (one database fails)
- Order is created but inventory isn’t decremented (network partition)
- Partial state changes leave the system in an invalid state
Local database transactions don’t span network boundaries, and eventual consistency patterns don’t work when you need immediate guarantees.
What is Two-Phase Commit (2PC)?
Two-Phase Commit is a distributed algorithm that ensures atomic transaction commitment across multiple participants. It divides the transaction into two phases:
Phase 1 - Prepare (Voting Phase):
- Coordinator asks all participants: “Can you commit this transaction?”
- Each participant prepares the transaction, locks resources, and responds YES or NO
- Participants must be ready to commit or rollback based on the coordinator’s decision
Phase 2 - Commit (Decision Phase):
- If ALL participants vote YES, coordinator sends COMMIT to everyone
- If ANY participant votes NO (or times out), coordinator sends ABORT to everyone
- Participants execute the decision and acknowledge completion
The protocol guarantees atomicity: all participants commit or all abort.
When to Use Two-Phase Commit
Good Use Cases:
- Financial transactions requiring strict consistency across services
- Inventory management where overselling is unacceptable
- Booking systems that must prevent double-booking
- Cross-database schema migrations that need atomic deployment
- Legacy system integration where you can’t modify source systems for sagas
When to Avoid:
- High-latency networks - 2PC requires multiple round trips
- Systems requiring high availability - 2PC is a blocking protocol
- Long-running transactions - holding locks causes contention
- Systems with frequent failures - recovery is complex
- When eventual consistency is acceptable - use sagas instead
How It Works: Detailed Protocol
The Happy Path
Client Coordinator Service A Service B
| | | |
|--Transaction--->| | |
| | | |
| |----PREPARE------->| |
| |----PREPARE---------------------→|
| | | |
| |<-----YES----------| |
| |<-----YES------------------------|
| | | |
| |----COMMIT-------->| |
| |----COMMIT-------------------→|
| | | |
| |<-----ACK----------| |
| |<-----ACK------------------------|
|<--Success-------| | |
The Failure Path
Client Coordinator Service A Service B
| | | |
|--Transaction--->| | |
| | | |
| |----PREPARE------->| |
| |----PREPARE---------------------→|
| | | |
| |<-----YES----------| |
| |<-----NO-------------------------| (validation failed)
| | | |
| |----ABORT--------->| |
| |----ABORT--------------------→|
| | | |
| |<-----ACK----------| |
| |<-----ACK------------------------|
|<--Failure-------| | |
Implementation Example in Go
Here’s a practical 2PC implementation for coordinating database updates:
package twophasecommit
import (
"context"
"errors"
"fmt"
"sync"
"time"
)
// TransactionID uniquely identifies a distributed transaction
type TransactionID string
// Participant represents a service that can participate in 2PC
type Participant interface {
Prepare(ctx context.Context, txID TransactionID) error
Commit(ctx context.Context, txID TransactionID) error
Abort(ctx context.Context, txID TransactionID) error
}
// Coordinator manages the two-phase commit protocol
type Coordinator struct {
participants []Participant
timeout time.Duration
log TransactionLog
}
// TransactionLog provides durable storage for transaction state
type TransactionLog interface {
LogPreparing(txID TransactionID) error
LogCommitting(txID TransactionID) error
LogAborting(txID TransactionID) error
LogCommitted(txID TransactionID) error
LogAborted(txID TransactionID) error
}
func NewCoordinator(participants []Participant, timeout time.Duration, log TransactionLog) *Coordinator {
return &Coordinator{
participants: participants,
timeout: timeout,
log: log,
}
}
// Execute runs a distributed transaction using 2PC
func (c *Coordinator) Execute(ctx context.Context, txID TransactionID) error {
// Log that we're starting the prepare phase
if err := c.log.LogPreparing(txID); err != nil {
return fmt.Errorf("failed to log prepare phase: %w", err)
}
// Phase 1: Prepare
if err := c.preparePhase(ctx, txID); err != nil {
// If prepare fails, abort the transaction
c.abortPhase(ctx, txID)
return fmt.Errorf("prepare phase failed: %w", err)
}
// Log that we're committing
if err := c.log.LogCommitting(txID); err != nil {
// Critical: we're past prepare, must try to commit
// In production, this would trigger recovery procedures
return fmt.Errorf("failed to log commit phase: %w", err)
}
// Phase 2: Commit
if err := c.commitPhase(ctx, txID); err != nil {
// This is a critical failure - we're in an inconsistent state
// Production systems would retry indefinitely or trigger manual intervention
return fmt.Errorf("commit phase failed: %w", err)
}
// Log successful completion
if err := c.log.LogCommitted(txID); err != nil {
// Transaction is committed, log failure is non-critical
return fmt.Errorf("transaction committed but logging failed: %w", err)
}
return nil
}
func (c *Coordinator) preparePhase(ctx context.Context, txID TransactionID) error {
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
type result struct {
index int
err error
}
results := make(chan result, len(c.participants))
var wg sync.WaitGroup
// Send prepare to all participants in parallel
for i, participant := range c.participants {
wg.Add(1)
go func(index int, p Participant) {
defer wg.Done()
err := p.Prepare(ctx, txID)
results <- result{index: index, err: err}
}(i, participant)
}
// Wait for all participants to respond
wg.Wait()
close(results)
// Check if all participants voted YES
for res := range results {
if res.err != nil {
return fmt.Errorf("participant %d voted NO: %w", res.index, res.err)
}
}
return nil
}
func (c *Coordinator) commitPhase(ctx context.Context, txID TransactionID) error {
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
var wg sync.WaitGroup
errors := make(chan error, len(c.participants))
// Send commit to all participants
for _, participant := range c.participants {
wg.Add(1)
go func(p Participant) {
defer wg.Done()
if err := p.Commit(ctx, txID); err != nil {
errors <- err
}
}(participant)
}
wg.Wait()
close(errors)
// Collect any errors
var errs []error
for err := range errors {
errs = append(errs, err)
}
if len(errs) > 0 {
return fmt.Errorf("commit phase had %d failures: %v", len(errs), errs)
}
return nil
}
func (c *Coordinator) abortPhase(ctx context.Context, txID TransactionID) error {
if err := c.log.LogAborting(txID); err != nil {
return err
}
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
var wg sync.WaitGroup
// Send abort to all participants (best effort)
for _, participant := range c.participants {
wg.Add(1)
go func(p Participant) {
defer wg.Done()
_ = p.Abort(ctx, txID) // Ignore errors during abort
}(participant)
}
wg.Wait()
return c.log.LogAborted(txID)
}
Example Participant Implementation
package twophasecommit
import (
"context"
"database/sql"
"fmt"
)
// DatabaseParticipant wraps a database connection to participate in 2PC
type DatabaseParticipant struct {
db *sql.DB
preparedTxns map[TransactionID]*sql.Tx
mu sync.Mutex
}
func NewDatabaseParticipant(db *sql.DB) *DatabaseParticipant {
return &DatabaseParticipant{
db: db,
preparedTxns: make(map[TransactionID]*sql.Tx),
}
}
func (p *DatabaseParticipant) Prepare(ctx context.Context, txID TransactionID) error {
// Begin a database transaction
tx, err := p.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
// Perform the business logic (example: update account balance)
_, err = tx.ExecContext(ctx, "UPDATE accounts SET balance = balance - 100 WHERE id = ?", 123)
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to execute update: %w", err)
}
// Store the transaction for later commit/abort
p.mu.Lock()
p.preparedTxns[txID] = tx
p.mu.Unlock()
// Return success (vote YES)
return nil
}
func (p *DatabaseParticipant) Commit(ctx context.Context, txID TransactionID) error {
p.mu.Lock()
tx, exists := p.preparedTxns[txID]
delete(p.preparedTxns, txID)
p.mu.Unlock()
if !exists {
return errors.New("transaction not found")
}
return tx.Commit()
}
func (p *DatabaseParticipant) Abort(ctx context.Context, txID TransactionID) error {
p.mu.Lock()
tx, exists := p.preparedTxns[txID]
delete(p.preparedTxns, txID)
p.mu.Unlock()
if !exists {
return nil // Already aborted or never prepared
}
return tx.Rollback()
}
Implementation in Python with Microservices
from typing import List, Protocol
from dataclasses import dataclass
from enum import Enum
import asyncio
import logging
class TransactionState(Enum):
PREPARING = "preparing"
COMMITTING = "committing"
ABORTING = "aborting"
COMMITTED = "committed"
ABORTED = "aborted"
@dataclass
class TransactionID:
id: str
class Participant(Protocol):
async def prepare(self, tx_id: TransactionID) -> bool:
"""Returns True if ready to commit, False otherwise"""
...
async def commit(self, tx_id: TransactionID) -> None:
"""Commits the transaction"""
...
async def abort(self, tx_id: TransactionID) -> None:
"""Aborts the transaction"""
...
class TransactionLog:
"""Durable log for transaction state"""
async def log_state(self, tx_id: TransactionID, state: TransactionState) -> None:
# In production, write to durable storage (database, file, etc.)
logging.info(f"Transaction {tx_id.id}: {state.value}")
class Coordinator:
def __init__(
self,
participants: List[Participant],
log: TransactionLog,
timeout: float = 30.0
):
self.participants = participants
self.log = log
self.timeout = timeout
async def execute(self, tx_id: TransactionID) -> bool:
"""
Executes a distributed transaction using 2PC.
Returns True if committed, False if aborted.
"""
try:
# Log prepare phase
await self.log.log_state(tx_id, TransactionState.PREPARING)
# Phase 1: Prepare
if not await self._prepare_phase(tx_id):
await self._abort_phase(tx_id)
return False
# Log commit phase
await self.log.log_state(tx_id, TransactionState.COMMITTING)
# Phase 2: Commit
await self._commit_phase(tx_id)
# Log success
await self.log.log_state(tx_id, TransactionState.COMMITTED)
return True
except Exception as e:
logging.error(f"Transaction {tx_id.id} failed: {e}")
await self._abort_phase(tx_id)
return False
async def _prepare_phase(self, tx_id: TransactionID) -> bool:
"""Returns True if all participants vote YES"""
try:
# Send prepare to all participants with timeout
results = await asyncio.wait_for(
asyncio.gather(
*[p.prepare(tx_id) for p in self.participants],
return_exceptions=True
),
timeout=self.timeout
)
# Check if all participants voted YES
for i, result in enumerate(results):
if isinstance(result, Exception):
logging.error(f"Participant {i} failed: {result}")
return False
if not result:
logging.info(f"Participant {i} voted NO")
return False
return True
except asyncio.TimeoutError:
logging.error(f"Prepare phase timed out for transaction {tx_id.id}")
return False
async def _commit_phase(self, tx_id: TransactionID) -> None:
"""Sends commit to all participants"""
results = await asyncio.gather(
*[p.commit(tx_id) for p in self.participants],
return_exceptions=True
)
# Log any failures (but transaction is still committed)
for i, result in enumerate(results):
if isinstance(result, Exception):
logging.error(f"Participant {i} commit failed: {result}")
async def _abort_phase(self, tx_id: TransactionID) -> None:
"""Sends abort to all participants"""
await self.log.log_state(tx_id, TransactionState.ABORTING)
# Best effort abort
await asyncio.gather(
*[p.abort(tx_id) for p in self.participants],
return_exceptions=True
)
await self.log.log_state(tx_id, TransactionState.ABORTED)
Trade-offs and Limitations
Advantages
✅ Strong Consistency: Guarantees atomicity across distributed systems
✅ Simple to Understand: Clear protocol with well-defined semantics
✅ Immediate Consistency: No eventual consistency window
✅ Works with Existing Databases: Leverages local transaction support
Disadvantages
❌ Blocking Protocol: Participants must wait for coordinator decision
❌ Single Point of Failure: Coordinator failure blocks all participants
❌ Poor Performance: Multiple network round trips and resource locking
❌ Scalability Issues: Doesn’t scale to large numbers of participants
❌ Availability Impact: One slow/failed participant blocks everyone
Comparison with Alternatives
| Pattern | Consistency | Availability | Performance | Complexity |
|---|---|---|---|---|
| 2PC | Strong | Low | Poor | Medium |
| Saga | Eventual | High | Good | High |
| 3PC | Strong | Medium | Poor | High |
| Event Sourcing | Eventual | High | Good | High |
Production Considerations
Timeout Handling
Set aggressive timeouts but implement retry logic for transient failures:
const (
prepareTimeout = 5 * time.Second
commitTimeout = 10 * time.Second
maxRetries = 3
)
Transaction Log Durability
The coordinator’s transaction log MUST be durable. Use:
- Replicated database with synchronous replication
- Distributed log like Kafka with appropriate acknowledgment settings
- File-based log with fsync before proceeding
Failure Recovery
Implement recovery procedures for coordinator crashes:
- Read transaction log on startup
- For transactions in COMMITTING state: retry commit
- For transactions in PREPARING state: timeout and abort
- For transactions in ABORTING state: retry abort
Monitoring and Alerting
Track these metrics:
- Transaction latency (p50, p95, p99)
- Prepare phase failures and reasons
- Commit phase failures (critical!)
- Transaction timeouts
- Coordinator failures requiring recovery
Conclusion
Two-Phase Commit provides strong consistency guarantees for distributed transactions but comes with significant availability and performance costs. Use it when you absolutely need atomic commits across systems and can tolerate the blocking behavior.
For most modern distributed systems, consider alternative patterns like Sagas or Event Sourcing that offer better availability and scalability. Reserve 2PC for specific use cases where the consistency guarantees justify the trade-offs - typically financial systems, booking engines, or critical inventory management.
When implementing 2PC, invest heavily in monitoring, alerting, and recovery procedures. The protocol’s blocking nature means failures are particularly painful, so operational excellence is critical.