Preventing cascade failures
A circuit breaker prevents cascading failures by stopping calls to a failing service. Like an electrical circuit breaker, it "trips" when failures exceed a threshold, allowing the system to recover gracefully.
Without circuit breakers:
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
/// Circuit breaker states
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CircuitState {
/// Normal operation, requests allowed
Closed,
/// Failures exceeded threshold, requests blocked
Open,
/// Testing if service recovered
HalfOpen,
}
/// Circuit breaker configuration
#[derive(Debug, Clone)]
pub struct CircuitBreakerConfig {
/// Failures needed to open circuit
pub failure_threshold: u32,
/// Successes needed to close circuit from half-open
pub success_threshold: u32,
/// Time to wait before testing (open -> half-open)
pub timeout: Duration,
/// Window for counting failures
pub failure_window: Duration,
}
impl Default for CircuitBreakerConfig {
fn default() -> Self {
CircuitBreakerConfig {
failure_threshold: 5,
success_threshold: 2,
timeout: Duration::from_secs(30),
failure_window: Duration::from_secs(60),
}
}
}
/// Circuit breaker implementation
pub struct CircuitBreaker {
config: CircuitBreakerConfig,
state: RwLock<CircuitState>,
failure_count: AtomicU32,
success_count: AtomicU32,
last_failure_time: RwLock<Option<Instant>>,
opened_at: RwLock<Option<Instant>>,
}
impl CircuitBreaker {
pub fn new(config: CircuitBreakerConfig) -> Self {
CircuitBreaker {
config,
state: RwLock::new(CircuitState::Closed),
failure_count: AtomicU32::new(0),
success_count: AtomicU32::new(0),
last_failure_time: RwLock::new(None),
opened_at: RwLock::new(None),
}
}
/// Get current state
pub async fn state(&self) -> CircuitState {
let mut state = self.state.write().await;
// Check if we should transition from Open to HalfOpen
if *state == CircuitState::Open {
if let Some(opened) = *self.opened_at.read().await {
if opened.elapsed() >= self.config.timeout {
*state = CircuitState::HalfOpen;
self.success_count.store(0, Ordering::SeqCst);
}
}
}
*state
}
/// Check if request is allowed
pub async fn allow_request(&self) -> bool {
match self.state().await {
CircuitState::Closed => true,
CircuitState::Open => false,
CircuitState::HalfOpen => true, // Allow test requests
}
}
/// Record a successful call
pub async fn record_success(&self) {
let mut state = self.state.write().await;
match *state {
CircuitState::Closed => {
// Reset failure count on success
self.failure_count.store(0, Ordering::SeqCst);
}
CircuitState::HalfOpen => {
let successes = self.success_count.fetch_add(1, Ordering::SeqCst) + 1;
if successes >= self.config.success_threshold {
// Service recovered, close circuit
*state = CircuitState::Closed;
self.failure_count.store(0, Ordering::SeqCst);
self.success_count.store(0, Ordering::SeqCst);
*self.opened_at.write().await = None;
}
}
CircuitState::Open => {
// Shouldn't happen, but handle gracefully
}
}
}
/// Record a failed call
pub async fn record_failure(&self) {
let mut state = self.state.write().await;
match *state {
CircuitState::Closed => {
// Check if we should reset the failure window
let mut last_failure = self.last_failure_time.write().await;
if let Some(last) = *last_failure {
if last.elapsed() > self.config.failure_window {
self.failure_count.store(0, Ordering::SeqCst);
}
}
*last_failure = Some(Instant::now());
let failures = self.failure_count.fetch_add(1, Ordering::SeqCst) + 1;
if failures >= self.config.failure_threshold {
// Too many failures, open circuit
*state = CircuitState::Open;
*self.opened_at.write().await = Some(Instant::now());
}
}
CircuitState::HalfOpen => {
// Test request failed, back to open
*state = CircuitState::Open;
*self.opened_at.write().await = Some(Instant::now());
self.success_count.store(0, Ordering::SeqCst);
}
CircuitState::Open => {
// Already open, nothing to do
}
}
}
/// Execute a fallible operation through the circuit breaker
pub async fn call<T, E, F, Fut>(&self, operation: F) -> Result<T, CircuitError<E>>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
if !self.allow_request().await {
return Err(CircuitError::Open);
}
match operation().await {
Ok(value) => {
self.record_success().await;
Ok(value)
}
Err(e) => {
self.record_failure().await;
Err(CircuitError::ServiceError(e))
}
}
}
}
/// Errors from circuit breaker
#[derive(Debug)]
pub enum CircuitError<E> {
/// Circuit is open, request not attempted
Open,
/// Underlying service error
ServiceError(E),
}
impl<E: std::fmt::Display> std::fmt::Display for CircuitError<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CircuitError::Open => write!(f, "Circuit breaker is open"),
CircuitError::ServiceError(e) => write!(f, "Service error: {}", e),
}
}
}
/// Circuit breaker with metrics
pub struct MeteredCircuitBreaker {
inner: CircuitBreaker,
total_calls: AtomicU64,
successful_calls: AtomicU64,
failed_calls: AtomicU64,
rejected_calls: AtomicU64,
}
impl MeteredCircuitBreaker {
pub fn new(config: CircuitBreakerConfig) -> Self {
MeteredCircuitBreaker {
inner: CircuitBreaker::new(config),
total_calls: AtomicU64::new(0),
successful_calls: AtomicU64::new(0),
failed_calls: AtomicU64::new(0),
rejected_calls: AtomicU64::new(0),
}
}
pub async fn call<T, E, F, Fut>(&self, operation: F) -> Result<T, CircuitError<E>>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
self.total_calls.fetch_add(1, Ordering::Relaxed);
if !self.inner.allow_request().await {
self.rejected_calls.fetch_add(1, Ordering::Relaxed);
return Err(CircuitError::Open);
}
match operation().await {
Ok(value) => {
self.inner.record_success().await;
self.successful_calls.fetch_add(1, Ordering::Relaxed);
Ok(value)
}
Err(e) => {
self.inner.record_failure().await;
self.failed_calls.fetch_add(1, Ordering::Relaxed);
Err(CircuitError::ServiceError(e))
}
}
}
pub fn metrics(&self) -> CircuitMetrics {
CircuitMetrics {
total: self.total_calls.load(Ordering::Relaxed),
successful: self.successful_calls.load(Ordering::Relaxed),
failed: self.failed_calls.load(Ordering::Relaxed),
rejected: self.rejected_calls.load(Ordering::Relaxed),
}
}
}
#[derive(Debug)]
pub struct CircuitMetrics {
pub total: u64,
pub successful: u64,
pub failed: u64,
pub rejected: u64,
}
impl CircuitMetrics {
pub fn success_rate(&self) -> f64 {
if self.total == 0 {
1.0
} else {
self.successful as f64 / self.total as f64
}
}
pub fn error_rate(&self) -> f64 {
if self.total == 0 {
0.0
} else {
self.failed as f64 / self.total as f64
}
}
}
/// Per-service circuit breaker registry
pub struct CircuitBreakerRegistry {
breakers: RwLock<std::collections::HashMap<String, Arc<CircuitBreaker>>>,
default_config: CircuitBreakerConfig,
}
impl CircuitBreakerRegistry {
pub fn new(default_config: CircuitBreakerConfig) -> Self {
CircuitBreakerRegistry {
breakers: RwLock::new(std::collections::HashMap::new()),
default_config,
}
}
pub async fn get(&self, service: &str) -> Arc<CircuitBreaker> {
// Check if exists
{
let breakers = self.breakers.read().await;
if let Some(cb) = breakers.get(service) {
return Arc::clone(cb);
}
}
// Create new
let mut breakers = self.breakers.write().await;
breakers
.entry(service.to_string())
.or_insert_with(|| Arc::new(CircuitBreaker::new(self.default_config.clone())))
.clone()
}
}
fn main() {
println!("Circuit Breaker Pattern");
let config = CircuitBreakerConfig {
failure_threshold: 5,
success_threshold: 2,
timeout: Duration::from_secs(30),
failure_window: Duration::from_secs(60),
};
println!("Config: {:?}", config);
println!("\nState transitions:");
println!(" Closed -> Open: After {} failures", config.failure_threshold);
println!(" Open -> HalfOpen: After {:?}", config.timeout);
println!(" HalfOpen -> Closed: After {} successes", config.success_threshold);
println!(" HalfOpen -> Open: On any failure");
}
failure_threshold exceeded
┌──────────────────────────────────┐
│ ▼
┌───────┐ ┌────────┐
│CLOSED │ │ OPEN │
└───────┘ └────────┘
▲ │
│ │ timeout elapsed
│ success_threshold reached ▼
│ ┌──────────┐
└────────────────────────────│HALF-OPEN│
└──────────┘
│
any failure │
┌──────────┘
▼
┌────────┐
│ OPEN │
└────────┘
// DON'T: Circuit breaker without timeout
struct BadCircuit {
open: bool, // Once open, stays open forever!
}
// DON'T: Shared circuit for unrelated services
let shared_cb = CircuitBreaker::new(config);
call_service_a(&shared_cb).await; // Failure here...
call_service_b(&shared_cb).await; // ...affects unrelated service B
// DON'T: Circuit breaker without fallback
let result = cb.call(|| fetch_data()).await;
// Returns error to user when circuit is open
// DO: Provide fallback
let result = match cb.call(|| fetch_data()).await {
Ok(data) => data,
Err(CircuitError::Open) => get_cached_data(), // Fallback
Err(CircuitError::ServiceError(e)) => return Err(e),
};
Run this code in the official Rust Playground