Home/Resilience Patterns/Circuit Breaker

Circuit Breaker

Preventing cascade failures

advanced
circuit-breakerfault-toleranceresilience
🎮 Interactive Playground

What is a Circuit Breaker?

A circuit breaker prevents cascading failures by stopping calls to a failing service. Like an electrical circuit breaker, it "trips" when failures exceed a threshold, allowing the system to recover gracefully.

The Problem

Without circuit breakers:

  • Cascading failures: One slow service takes down the entire system
  • Resource exhaustion: Threads/connections blocked waiting for timeouts
  • No recovery time: Continuously hammering a failing service
  • Poor user experience: Long waits instead of fast failures

Example Code

use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;

/// Circuit breaker states
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CircuitState {
    /// Normal operation, requests allowed
    Closed,
    /// Failures exceeded threshold, requests blocked
    Open,
    /// Testing if service recovered
    HalfOpen,
}

/// Circuit breaker configuration
#[derive(Debug, Clone)]
pub struct CircuitBreakerConfig {
    /// Failures needed to open circuit
    pub failure_threshold: u32,
    /// Successes needed to close circuit from half-open
    pub success_threshold: u32,
    /// Time to wait before testing (open -> half-open)
    pub timeout: Duration,
    /// Window for counting failures
    pub failure_window: Duration,
}

impl Default for CircuitBreakerConfig {
    fn default() -> Self {
        CircuitBreakerConfig {
            failure_threshold: 5,
            success_threshold: 2,
            timeout: Duration::from_secs(30),
            failure_window: Duration::from_secs(60),
        }
    }
}

/// Circuit breaker implementation
pub struct CircuitBreaker {
    config: CircuitBreakerConfig,
    state: RwLock<CircuitState>,
    failure_count: AtomicU32,
    success_count: AtomicU32,
    last_failure_time: RwLock<Option<Instant>>,
    opened_at: RwLock<Option<Instant>>,
}

impl CircuitBreaker {
    pub fn new(config: CircuitBreakerConfig) -> Self {
        CircuitBreaker {
            config,
            state: RwLock::new(CircuitState::Closed),
            failure_count: AtomicU32::new(0),
            success_count: AtomicU32::new(0),
            last_failure_time: RwLock::new(None),
            opened_at: RwLock::new(None),
        }
    }

    /// Get current state
    pub async fn state(&self) -> CircuitState {
        let mut state = self.state.write().await;

        // Check if we should transition from Open to HalfOpen
        if *state == CircuitState::Open {
            if let Some(opened) = *self.opened_at.read().await {
                if opened.elapsed() >= self.config.timeout {
                    *state = CircuitState::HalfOpen;
                    self.success_count.store(0, Ordering::SeqCst);
                }
            }
        }

        *state
    }

    /// Check if request is allowed
    pub async fn allow_request(&self) -> bool {
        match self.state().await {
            CircuitState::Closed => true,
            CircuitState::Open => false,
            CircuitState::HalfOpen => true, // Allow test requests
        }
    }

    /// Record a successful call
    pub async fn record_success(&self) {
        let mut state = self.state.write().await;

        match *state {
            CircuitState::Closed => {
                // Reset failure count on success
                self.failure_count.store(0, Ordering::SeqCst);
            }
            CircuitState::HalfOpen => {
                let successes = self.success_count.fetch_add(1, Ordering::SeqCst) + 1;
                if successes >= self.config.success_threshold {
                    // Service recovered, close circuit
                    *state = CircuitState::Closed;
                    self.failure_count.store(0, Ordering::SeqCst);
                    self.success_count.store(0, Ordering::SeqCst);
                    *self.opened_at.write().await = None;
                }
            }
            CircuitState::Open => {
                // Shouldn't happen, but handle gracefully
            }
        }
    }

    /// Record a failed call
    pub async fn record_failure(&self) {
        let mut state = self.state.write().await;

        match *state {
            CircuitState::Closed => {
                // Check if we should reset the failure window
                let mut last_failure = self.last_failure_time.write().await;
                if let Some(last) = *last_failure {
                    if last.elapsed() > self.config.failure_window {
                        self.failure_count.store(0, Ordering::SeqCst);
                    }
                }
                *last_failure = Some(Instant::now());

                let failures = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
                if failures >= self.config.failure_threshold {
                    // Too many failures, open circuit
                    *state = CircuitState::Open;
                    *self.opened_at.write().await = Some(Instant::now());
                }
            }
            CircuitState::HalfOpen => {
                // Test request failed, back to open
                *state = CircuitState::Open;
                *self.opened_at.write().await = Some(Instant::now());
                self.success_count.store(0, Ordering::SeqCst);
            }
            CircuitState::Open => {
                // Already open, nothing to do
            }
        }
    }

    /// Execute a fallible operation through the circuit breaker
    pub async fn call<T, E, F, Fut>(&self, operation: F) -> Result<T, CircuitError<E>>
    where
        F: FnOnce() -> Fut,
        Fut: std::future::Future<Output = Result<T, E>>,
    {
        if !self.allow_request().await {
            return Err(CircuitError::Open);
        }

        match operation().await {
            Ok(value) => {
                self.record_success().await;
                Ok(value)
            }
            Err(e) => {
                self.record_failure().await;
                Err(CircuitError::ServiceError(e))
            }
        }
    }
}

/// Errors from circuit breaker
#[derive(Debug)]
pub enum CircuitError<E> {
    /// Circuit is open, request not attempted
    Open,
    /// Underlying service error
    ServiceError(E),
}

impl<E: std::fmt::Display> std::fmt::Display for CircuitError<E> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            CircuitError::Open => write!(f, "Circuit breaker is open"),
            CircuitError::ServiceError(e) => write!(f, "Service error: {}", e),
        }
    }
}

/// Circuit breaker with metrics
pub struct MeteredCircuitBreaker {
    inner: CircuitBreaker,
    total_calls: AtomicU64,
    successful_calls: AtomicU64,
    failed_calls: AtomicU64,
    rejected_calls: AtomicU64,
}

impl MeteredCircuitBreaker {
    pub fn new(config: CircuitBreakerConfig) -> Self {
        MeteredCircuitBreaker {
            inner: CircuitBreaker::new(config),
            total_calls: AtomicU64::new(0),
            successful_calls: AtomicU64::new(0),
            failed_calls: AtomicU64::new(0),
            rejected_calls: AtomicU64::new(0),
        }
    }

    pub async fn call<T, E, F, Fut>(&self, operation: F) -> Result<T, CircuitError<E>>
    where
        F: FnOnce() -> Fut,
        Fut: std::future::Future<Output = Result<T, E>>,
    {
        self.total_calls.fetch_add(1, Ordering::Relaxed);

        if !self.inner.allow_request().await {
            self.rejected_calls.fetch_add(1, Ordering::Relaxed);
            return Err(CircuitError::Open);
        }

        match operation().await {
            Ok(value) => {
                self.inner.record_success().await;
                self.successful_calls.fetch_add(1, Ordering::Relaxed);
                Ok(value)
            }
            Err(e) => {
                self.inner.record_failure().await;
                self.failed_calls.fetch_add(1, Ordering::Relaxed);
                Err(CircuitError::ServiceError(e))
            }
        }
    }

    pub fn metrics(&self) -> CircuitMetrics {
        CircuitMetrics {
            total: self.total_calls.load(Ordering::Relaxed),
            successful: self.successful_calls.load(Ordering::Relaxed),
            failed: self.failed_calls.load(Ordering::Relaxed),
            rejected: self.rejected_calls.load(Ordering::Relaxed),
        }
    }
}

#[derive(Debug)]
pub struct CircuitMetrics {
    pub total: u64,
    pub successful: u64,
    pub failed: u64,
    pub rejected: u64,
}

impl CircuitMetrics {
    pub fn success_rate(&self) -> f64 {
        if self.total == 0 {
            1.0
        } else {
            self.successful as f64 / self.total as f64
        }
    }

    pub fn error_rate(&self) -> f64 {
        if self.total == 0 {
            0.0
        } else {
            self.failed as f64 / self.total as f64
        }
    }
}

/// Per-service circuit breaker registry
pub struct CircuitBreakerRegistry {
    breakers: RwLock<std::collections::HashMap<String, Arc<CircuitBreaker>>>,
    default_config: CircuitBreakerConfig,
}

impl CircuitBreakerRegistry {
    pub fn new(default_config: CircuitBreakerConfig) -> Self {
        CircuitBreakerRegistry {
            breakers: RwLock::new(std::collections::HashMap::new()),
            default_config,
        }
    }

    pub async fn get(&self, service: &str) -> Arc<CircuitBreaker> {
        // Check if exists
        {
            let breakers = self.breakers.read().await;
            if let Some(cb) = breakers.get(service) {
                return Arc::clone(cb);
            }
        }

        // Create new
        let mut breakers = self.breakers.write().await;
        breakers
            .entry(service.to_string())
            .or_insert_with(|| Arc::new(CircuitBreaker::new(self.default_config.clone())))
            .clone()
    }
}

fn main() {
    println!("Circuit Breaker Pattern");

    let config = CircuitBreakerConfig {
        failure_threshold: 5,
        success_threshold: 2,
        timeout: Duration::from_secs(30),
        failure_window: Duration::from_secs(60),
    };

    println!("Config: {:?}", config);
    println!("\nState transitions:");
    println!("  Closed -> Open: After {} failures", config.failure_threshold);
    println!("  Open -> HalfOpen: After {:?}", config.timeout);
    println!("  HalfOpen -> Closed: After {} successes", config.success_threshold);
    println!("  HalfOpen -> Open: On any failure");
}

Why This Works

  1. Fail fast: Returns immediately when circuit is open
  2. Recovery time: Gives failing service time to recover
  3. Gradual recovery: Half-open state tests service before full traffic
  4. Isolation: Prevents one service failure from affecting others

State Transitions

failure_threshold exceeded
    ┌──────────────────────────────────┐
    │                                  ▼
┌───────┐                         ┌────────┐
│CLOSED │                         │  OPEN  │
└───────┘                         └────────┘
    ▲                                  │
    │                                  │ timeout elapsed
    │   success_threshold reached      ▼
    │                            ┌──────────┐
    └────────────────────────────│HALF-OPEN│
                                 └──────────┘
                                      │
                       any failure    │
                           ┌──────────┘
                           ▼
                       ┌────────┐
                       │  OPEN  │
                       └────────┘

⚠️ Anti-patterns

// DON'T: Circuit breaker without timeout
struct BadCircuit {
    open: bool, // Once open, stays open forever!
}

// DON'T: Shared circuit for unrelated services
let shared_cb = CircuitBreaker::new(config);
call_service_a(&shared_cb).await; // Failure here...
call_service_b(&shared_cb).await; // ...affects unrelated service B

// DON'T: Circuit breaker without fallback
let result = cb.call(|| fetch_data()).await;
// Returns error to user when circuit is open

// DO: Provide fallback
let result = match cb.call(|| fetch_data()).await {
    Ok(data) => data,
    Err(CircuitError::Open) => get_cached_data(), // Fallback
    Err(CircuitError::ServiceError(e)) => return Err(e),
};

Exercises

  1. Implement sliding window failure counting
  2. Add circuit breaker events (open/close notifications)
  3. Create health check endpoint exposing circuit states
  4. Implement circuit breaker with different failure categories

🎮 Try it Yourself

🎮

Circuit Breaker - Playground

Run this code in the official Rust Playground