Isolating failures with resource pools
The bulkhead pattern isolates failures by partitioning resources. Like ship bulkheads that prevent water from flooding the entire vessel, software bulkheads prevent one failing component from exhausting shared resources.
Without isolation:
use std::sync::Arc;
use tokio::sync::{Semaphore, SemaphorePermit, OwnedSemaphorePermit};
use std::time::Duration;
use std::collections::HashMap;
/// Semaphore-based bulkhead
pub struct Bulkhead {
name: String,
semaphore: Arc<Semaphore>,
max_concurrent: usize,
}
impl Bulkhead {
pub fn new(name: impl Into<String>, max_concurrent: usize) -> Self {
Bulkhead {
name: name.into(),
semaphore: Arc::new(Semaphore::new(max_concurrent)),
max_concurrent,
}
}
/// Try to acquire a permit, returning immediately if unavailable
pub fn try_acquire(&self) -> Option<BulkheadPermit> {
self.semaphore
.clone()
.try_acquire_owned()
.ok()
.map(|permit| BulkheadPermit { _permit: permit })
}
/// Acquire with timeout
pub async fn acquire_timeout(
&self,
timeout: Duration,
) -> Result<BulkheadPermit, BulkheadError> {
match tokio::time::timeout(timeout, self.semaphore.clone().acquire_owned()).await {
Ok(Ok(permit)) => Ok(BulkheadPermit { _permit: permit }),
Ok(Err(_)) => Err(BulkheadError::Closed),
Err(_) => Err(BulkheadError::Timeout),
}
}
/// Execute operation within bulkhead
pub async fn execute<T, E, F, Fut>(
&self,
timeout: Duration,
operation: F,
) -> Result<T, BulkheadError<E>>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
let _permit = self.acquire_timeout(timeout).await
.map_err(|e| match e {
BulkheadError::Timeout => BulkheadError::Timeout,
BulkheadError::Closed => BulkheadError::Closed,
_ => BulkheadError::Timeout,
})?;
operation().await.map_err(BulkheadError::ServiceError)
}
/// Current number of available permits
pub fn available(&self) -> usize {
self.semaphore.available_permits()
}
/// Check if bulkhead is full
pub fn is_full(&self) -> bool {
self.available() == 0
}
pub fn name(&self) -> &str {
&self.name
}
pub fn max_concurrent(&self) -> usize {
self.max_concurrent
}
}
/// Permit that releases on drop
pub struct BulkheadPermit {
_permit: OwnedSemaphorePermit,
}
#[derive(Debug)]
pub enum BulkheadError<E = ()> {
Timeout,
Closed,
ServiceError(E),
}
impl<E: std::fmt::Display> std::fmt::Display for BulkheadError<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BulkheadError::Timeout => write!(f, "Bulkhead timeout"),
BulkheadError::Closed => write!(f, "Bulkhead closed"),
BulkheadError::ServiceError(e) => write!(f, "Service error: {}", e),
}
}
}
/// Thread pool bulkhead
pub struct ThreadPoolBulkhead {
name: String,
runtime: tokio::runtime::Runtime,
max_threads: usize,
}
impl ThreadPoolBulkhead {
pub fn new(name: impl Into<String>, max_threads: usize) -> Self {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(max_threads)
.thread_name(name.to_string())
.build()
.expect("Failed to create runtime");
ThreadPoolBulkhead {
name: name.into(),
runtime,
max_threads,
}
}
pub fn spawn<F>(&self, future: F) -> tokio::task::JoinHandle<F::Output>
where
F: std::future::Future + Send + 'static,
F::Output: Send + 'static,
{
self.runtime.spawn(future)
}
pub fn block_on<F: std::future::Future>(&self, future: F) -> F::Output {
self.runtime.block_on(future)
}
}
/// Bulkhead registry for managing multiple bulkheads
pub struct BulkheadRegistry {
bulkheads: HashMap<String, Arc<Bulkhead>>,
}
impl BulkheadRegistry {
pub fn new() -> Self {
BulkheadRegistry {
bulkheads: HashMap::new(),
}
}
pub fn register(&mut self, name: impl Into<String>, max_concurrent: usize) -> Arc<Bulkhead> {
let name = name.into();
let bulkhead = Arc::new(Bulkhead::new(name.clone(), max_concurrent));
self.bulkheads.insert(name, bulkhead.clone());
bulkhead
}
pub fn get(&self, name: &str) -> Option<Arc<Bulkhead>> {
self.bulkheads.get(name).cloned()
}
pub fn status(&self) -> Vec<BulkheadStatus> {
self.bulkheads
.values()
.map(|b| BulkheadStatus {
name: b.name().to_string(),
max_concurrent: b.max_concurrent(),
available: b.available(),
in_use: b.max_concurrent() - b.available(),
})
.collect()
}
}
impl Default for BulkheadRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct BulkheadStatus {
pub name: String,
pub max_concurrent: usize,
pub available: usize,
pub in_use: usize,
}
/// Connection pool bulkhead with per-service limits
pub struct ConnectionPoolBulkhead<C> {
pools: HashMap<String, Pool<C>>,
}
struct Pool<C> {
connections: Vec<C>,
max_size: usize,
semaphore: Arc<Semaphore>,
}
impl<C: Clone> ConnectionPoolBulkhead<C> {
pub fn new() -> Self {
ConnectionPoolBulkhead {
pools: HashMap::new(),
}
}
pub fn register_pool(&mut self, name: impl Into<String>, max_size: usize, connector: impl Fn() -> C) {
let connections: Vec<C> = (0..max_size).map(|_| connector()).collect();
self.pools.insert(name.into(), Pool {
connections,
max_size,
semaphore: Arc::new(Semaphore::new(max_size)),
});
}
pub async fn acquire(&self, pool_name: &str) -> Option<PooledConnection<'_, C>> {
let pool = self.pools.get(pool_name)?;
let permit = pool.semaphore.clone().acquire_owned().await.ok()?;
// In real implementation, get actual connection from pool
pool.connections.first().map(|conn| PooledConnection {
connection: conn.clone(),
_permit: permit,
})
}
}
impl<C: Clone> Default for ConnectionPoolBulkhead<C> {
fn default() -> Self {
Self::new()
}
}
pub struct PooledConnection<'a, C> {
pub connection: C,
_permit: OwnedSemaphorePermit,
}
/// Rate-limited bulkhead (combines rate limiting with concurrency)
pub struct RateLimitedBulkhead {
bulkhead: Bulkhead,
rate_limiter: Arc<tokio::sync::Mutex<RateLimiter>>,
}
struct RateLimiter {
tokens: f64,
max_tokens: f64,
refill_rate: f64,
last_refill: std::time::Instant,
}
impl RateLimiter {
fn new(max_tokens: f64, refill_per_second: f64) -> Self {
RateLimiter {
tokens: max_tokens,
max_tokens,
refill_rate: refill_per_second,
last_refill: std::time::Instant::now(),
}
}
fn try_acquire(&mut self) -> bool {
self.refill();
if self.tokens >= 1.0 {
self.tokens -= 1.0;
true
} else {
false
}
}
fn refill(&mut self) {
let now = std::time::Instant::now();
let elapsed = now.duration_since(self.last_refill).as_secs_f64();
self.tokens = (self.tokens + elapsed * self.refill_rate).min(self.max_tokens);
self.last_refill = now;
}
}
impl RateLimitedBulkhead {
pub fn new(
name: impl Into<String>,
max_concurrent: usize,
max_rate: f64,
rate_per_second: f64,
) -> Self {
RateLimitedBulkhead {
bulkhead: Bulkhead::new(name, max_concurrent),
rate_limiter: Arc::new(tokio::sync::Mutex::new(
RateLimiter::new(max_rate, rate_per_second)
)),
}
}
pub async fn execute<T, E, F, Fut>(
&self,
timeout: Duration,
operation: F,
) -> Result<T, BulkheadError<E>>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
// Check rate limit first
{
let mut limiter = self.rate_limiter.lock().await;
if !limiter.try_acquire() {
return Err(BulkheadError::Timeout);
}
}
// Then check concurrency
self.bulkhead.execute(timeout, operation).await
}
}
fn main() {
println!("Bulkhead Pattern Examples");
// Create bulkheads for different services
let mut registry = BulkheadRegistry::new();
registry.register("database", 10); // Max 10 concurrent DB operations
registry.register("external-api", 5); // Max 5 concurrent API calls
registry.register("file-io", 3); // Max 3 concurrent file operations
println!("\nBulkhead Status:");
for status in registry.status() {
println!(
" {}: {}/{} available",
status.name, status.available, status.max_concurrent
);
}
}
| Type | Isolates | Use Case |
|------|----------|----------|
| Semaphore | Concurrent operations | Most common, lightweight |
| Thread Pool | CPU/threads | CPU-bound work |
| Connection Pool | Connections | Database, HTTP clients |
| Process | Memory/CPU | Complete isolation |
// DON'T: Shared resources without limits
async fn handle_request() {
// Everyone fights for the same connections!
let conn = global_pool.get().await;
}
// DON'T: Bulkhead too large
let bulkhead = Bulkhead::new("api", 10000); // Defeats the purpose
// DON'T: Bulkhead too small
let bulkhead = Bulkhead::new("critical", 1); // Bottleneck
// DON'T: Same bulkhead for different priorities
let shared = Bulkhead::new("all", 10);
critical_operation(&shared).await; // Might wait for...
background_job(&shared).await; // ...this low-priority job
// DO: Separate bulkheads by priority
let critical = Bulkhead::new("critical", 8);
let background = Bulkhead::new("background", 2);
Run this code in the official Rust Playground