Home/Resilience Patterns/Bulkhead Pattern

Bulkhead Pattern

Isolating failures with resource pools

advanced
bulkheadisolationsemaphore
🎮 Interactive Playground

What is the Bulkhead Pattern?

The bulkhead pattern isolates failures by partitioning resources. Like ship bulkheads that prevent water from flooding the entire vessel, software bulkheads prevent one failing component from exhausting shared resources.

The Problem

Without isolation:

  • Thread pool exhaustion: One slow endpoint consumes all threads
  • Connection pool starvation: One service uses all database connections
  • Memory pressure: One runaway process affects all others
  • Cascading failures: One bad dependency takes down everything

Example Code

use std::sync::Arc;
use tokio::sync::{Semaphore, SemaphorePermit, OwnedSemaphorePermit};
use std::time::Duration;
use std::collections::HashMap;

/// Semaphore-based bulkhead
pub struct Bulkhead {
    name: String,
    semaphore: Arc<Semaphore>,
    max_concurrent: usize,
}

impl Bulkhead {
    pub fn new(name: impl Into<String>, max_concurrent: usize) -> Self {
        Bulkhead {
            name: name.into(),
            semaphore: Arc::new(Semaphore::new(max_concurrent)),
            max_concurrent,
        }
    }

    /// Try to acquire a permit, returning immediately if unavailable
    pub fn try_acquire(&self) -> Option<BulkheadPermit> {
        self.semaphore
            .clone()
            .try_acquire_owned()
            .ok()
            .map(|permit| BulkheadPermit { _permit: permit })
    }

    /// Acquire with timeout
    pub async fn acquire_timeout(
        &self,
        timeout: Duration,
    ) -> Result<BulkheadPermit, BulkheadError> {
        match tokio::time::timeout(timeout, self.semaphore.clone().acquire_owned()).await {
            Ok(Ok(permit)) => Ok(BulkheadPermit { _permit: permit }),
            Ok(Err(_)) => Err(BulkheadError::Closed),
            Err(_) => Err(BulkheadError::Timeout),
        }
    }

    /// Execute operation within bulkhead
    pub async fn execute<T, E, F, Fut>(
        &self,
        timeout: Duration,
        operation: F,
    ) -> Result<T, BulkheadError<E>>
    where
        F: FnOnce() -> Fut,
        Fut: std::future::Future<Output = Result<T, E>>,
    {
        let _permit = self.acquire_timeout(timeout).await
            .map_err(|e| match e {
                BulkheadError::Timeout => BulkheadError::Timeout,
                BulkheadError::Closed => BulkheadError::Closed,
                _ => BulkheadError::Timeout,
            })?;

        operation().await.map_err(BulkheadError::ServiceError)
    }

    /// Current number of available permits
    pub fn available(&self) -> usize {
        self.semaphore.available_permits()
    }

    /// Check if bulkhead is full
    pub fn is_full(&self) -> bool {
        self.available() == 0
    }

    pub fn name(&self) -> &str {
        &self.name
    }

    pub fn max_concurrent(&self) -> usize {
        self.max_concurrent
    }
}

/// Permit that releases on drop
pub struct BulkheadPermit {
    _permit: OwnedSemaphorePermit,
}

#[derive(Debug)]
pub enum BulkheadError<E = ()> {
    Timeout,
    Closed,
    ServiceError(E),
}

impl<E: std::fmt::Display> std::fmt::Display for BulkheadError<E> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            BulkheadError::Timeout => write!(f, "Bulkhead timeout"),
            BulkheadError::Closed => write!(f, "Bulkhead closed"),
            BulkheadError::ServiceError(e) => write!(f, "Service error: {}", e),
        }
    }
}

/// Thread pool bulkhead
pub struct ThreadPoolBulkhead {
    name: String,
    runtime: tokio::runtime::Runtime,
    max_threads: usize,
}

impl ThreadPoolBulkhead {
    pub fn new(name: impl Into<String>, max_threads: usize) -> Self {
        let runtime = tokio::runtime::Builder::new_multi_thread()
            .worker_threads(max_threads)
            .thread_name(name.to_string())
            .build()
            .expect("Failed to create runtime");

        ThreadPoolBulkhead {
            name: name.into(),
            runtime,
            max_threads,
        }
    }

    pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
    where
        F: std::future::Future + Send + 'static,
        F::Output: Send + 'static,
    {
        self.runtime.spawn(future)
    }

    pub fn block_on<F: std::future::Future>(&self, future: F) -> F::Output {
        self.runtime.block_on(future)
    }
}

/// Bulkhead registry for managing multiple bulkheads
pub struct BulkheadRegistry {
    bulkheads: HashMap<String, Arc<Bulkhead>>,
}

impl BulkheadRegistry {
    pub fn new() -> Self {
        BulkheadRegistry {
            bulkheads: HashMap::new(),
        }
    }

    pub fn register(&mut self, name: impl Into<String>, max_concurrent: usize) -> Arc<Bulkhead> {
        let name = name.into();
        let bulkhead = Arc::new(Bulkhead::new(name.clone(), max_concurrent));
        self.bulkheads.insert(name, bulkhead.clone());
        bulkhead
    }

    pub fn get(&self, name: &str) -> Option<Arc<Bulkhead>> {
        self.bulkheads.get(name).cloned()
    }

    pub fn status(&self) -> Vec<BulkheadStatus> {
        self.bulkheads
            .values()
            .map(|b| BulkheadStatus {
                name: b.name().to_string(),
                max_concurrent: b.max_concurrent(),
                available: b.available(),
                in_use: b.max_concurrent() - b.available(),
            })
            .collect()
    }
}

impl Default for BulkheadRegistry {
    fn default() -> Self {
        Self::new()
    }
}

#[derive(Debug)]
pub struct BulkheadStatus {
    pub name: String,
    pub max_concurrent: usize,
    pub available: usize,
    pub in_use: usize,
}

/// Connection pool bulkhead with per-service limits
pub struct ConnectionPoolBulkhead<C> {
    pools: HashMap<String, Pool<C>>,
}

struct Pool<C> {
    connections: Vec<C>,
    max_size: usize,
    semaphore: Arc<Semaphore>,
}

impl<C: Clone> ConnectionPoolBulkhead<C> {
    pub fn new() -> Self {
        ConnectionPoolBulkhead {
            pools: HashMap::new(),
        }
    }

    pub fn register_pool(&mut self, name: impl Into<String>, max_size: usize, connector: impl Fn() -> C) {
        let connections: Vec<C> = (0..max_size).map(|_| connector()).collect();
        self.pools.insert(name.into(), Pool {
            connections,
            max_size,
            semaphore: Arc::new(Semaphore::new(max_size)),
        });
    }

    pub async fn acquire(&self, pool_name: &str) -> Option<PooledConnection<'_, C>> {
        let pool = self.pools.get(pool_name)?;
        let permit = pool.semaphore.clone().acquire_owned().await.ok()?;

        // In real implementation, get actual connection from pool
        pool.connections.first().map(|conn| PooledConnection {
            connection: conn.clone(),
            _permit: permit,
        })
    }
}

impl<C: Clone> Default for ConnectionPoolBulkhead<C> {
    fn default() -> Self {
        Self::new()
    }
}

pub struct PooledConnection<'a, C> {
    pub connection: C,
    _permit: OwnedSemaphorePermit,
}

/// Rate-limited bulkhead (combines rate limiting with concurrency)
pub struct RateLimitedBulkhead {
    bulkhead: Bulkhead,
    rate_limiter: Arc<tokio::sync::Mutex<RateLimiter>>,
}

struct RateLimiter {
    tokens: f64,
    max_tokens: f64,
    refill_rate: f64,
    last_refill: std::time::Instant,
}

impl RateLimiter {
    fn new(max_tokens: f64, refill_per_second: f64) -> Self {
        RateLimiter {
            tokens: max_tokens,
            max_tokens,
            refill_rate: refill_per_second,
            last_refill: std::time::Instant::now(),
        }
    }

    fn try_acquire(&mut self) -> bool {
        self.refill();
        if self.tokens >= 1.0 {
            self.tokens -= 1.0;
            true
        } else {
            false
        }
    }

    fn refill(&mut self) {
        let now = std::time::Instant::now();
        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
        self.tokens = (self.tokens + elapsed * self.refill_rate).min(self.max_tokens);
        self.last_refill = now;
    }
}

impl RateLimitedBulkhead {
    pub fn new(
        name: impl Into<String>,
        max_concurrent: usize,
        max_rate: f64,
        rate_per_second: f64,
    ) -> Self {
        RateLimitedBulkhead {
            bulkhead: Bulkhead::new(name, max_concurrent),
            rate_limiter: Arc::new(tokio::sync::Mutex::new(
                RateLimiter::new(max_rate, rate_per_second)
            )),
        }
    }

    pub async fn execute<T, E, F, Fut>(
        &self,
        timeout: Duration,
        operation: F,
    ) -> Result<T, BulkheadError<E>>
    where
        F: FnOnce() -> Fut,
        Fut: std::future::Future<Output = Result<T, E>>,
    {
        // Check rate limit first
        {
            let mut limiter = self.rate_limiter.lock().await;
            if !limiter.try_acquire() {
                return Err(BulkheadError::Timeout);
            }
        }

        // Then check concurrency
        self.bulkhead.execute(timeout, operation).await
    }
}

fn main() {
    println!("Bulkhead Pattern Examples");

    // Create bulkheads for different services
    let mut registry = BulkheadRegistry::new();

    registry.register("database", 10);    // Max 10 concurrent DB operations
    registry.register("external-api", 5); // Max 5 concurrent API calls
    registry.register("file-io", 3);      // Max 3 concurrent file operations

    println!("\nBulkhead Status:");
    for status in registry.status() {
        println!(
            "  {}: {}/{} available",
            status.name, status.available, status.max_concurrent
        );
    }
}

Why This Works

  1. Resource isolation: Each service has dedicated resources
  2. Failure containment: One service can't starve others
  3. Predictable capacity: Known limits for capacity planning
  4. Graceful degradation: Shed load instead of crashing

Bulkhead Types

| Type | Isolates | Use Case |

|------|----------|----------|

| Semaphore | Concurrent operations | Most common, lightweight |

| Thread Pool | CPU/threads | CPU-bound work |

| Connection Pool | Connections | Database, HTTP clients |

| Process | Memory/CPU | Complete isolation |

⚠️ Anti-patterns

// DON'T: Shared resources without limits
async fn handle_request() {
    // Everyone fights for the same connections!
    let conn = global_pool.get().await;
}

// DON'T: Bulkhead too large
let bulkhead = Bulkhead::new("api", 10000); // Defeats the purpose

// DON'T: Bulkhead too small
let bulkhead = Bulkhead::new("critical", 1); // Bottleneck

// DON'T: Same bulkhead for different priorities
let shared = Bulkhead::new("all", 10);
critical_operation(&shared).await;    // Might wait for...
background_job(&shared).await;        // ...this low-priority job

// DO: Separate bulkheads by priority
let critical = Bulkhead::new("critical", 8);
let background = Bulkhead::new("background", 2);

Exercises

  1. Implement adaptive bulkhead that adjusts size based on latency
  2. Add bulkhead metrics (queue depth, wait time)
  3. Create priority-based bulkhead with multiple queues
  4. Implement bulkhead with request queueing and fairness

🎮 Try it Yourself

🎮

Bulkhead Pattern - Playground

Run this code in the official Rust Playground