Home/Async/Await & Futures/Select/Join Patterns

Select/Join Patterns

Concurrent future composition

intermediate
selectjointokio
๐ŸŽฎ Interactive Playground

What are Select and Join?

Select and Join are fundamental patterns for composing multiple async operations:
  • Select (select!): Race multiple futures, return when first completes
  • Join (join!): Execute futures concurrently, return when all complete

These patterns transform how you think about concurrency - instead of spawning tasks and coordinating with channels, you compose futures declaratively.

The Core Difference

use tokio::time::{sleep, Duration};

// SELECT: First to complete wins
async fn select_example() {
    tokio::select! {
        _ = sleep(Duration::from_secs(1)) => {
            println!("Fast operation finished first");
        }
        _ = sleep(Duration::from_secs(5)) => {
            println!("Slow operation finished first");
        }
    }
    // Output: "Fast operation finished first"
    // The 5-second sleep is CANCELLED
}

// JOIN: Wait for all to complete
async fn join_example() {
    let (res1, res2) = tokio::join!(
        sleep(Duration::from_secs(1)),
        sleep(Duration::from_secs(5))
    );
    // Waits for BOTH (5 seconds total, not 6)
    println!("Both operations completed");
}

When to Use Which?

| Pattern | Use Case | Behavior |

|---------|----------|----------|

| select! | Timeouts, first-wins races, shutdown signals | Cancels unfinished branches |

| join! | Parallel execution, aggregating results | Waits for all branches |

| try_join! | Parallel with error propagation | Short-circuits on first error |

Cancellation Safety: The Critical Concept

Cancellation happens when a future in select! is dropped before completion. Not all futures are safe to cancel:
// โœ… CANCELLATION SAFE: Can be safely dropped
async fn safe_operation() {
    tokio::time::sleep(Duration::from_secs(1)).await;
}

// โŒ NOT CANCELLATION SAFE: Dropping loses data
async fn unsafe_operation() {
    let mut buffer = Vec::new();
    socket.read_to_end(&mut buffer).await;  // Partial read is lost!
}
Rule: Use select! only with cancellation-safe futures, or explicitly handle cleanup.

---

Real-World Example 1: Timeout Pattern (Systems Programming)

Problem: Prevent operations from hanging indefinitely

use tokio::time::{timeout, Duration};
use std::io;

/// Database query with timeout protection
pub struct Database {
    pool: sqlx::PgPool,
    default_timeout: Duration,
}

impl Database {
    /// Execute query with timeout
    pub async fn query_with_timeout<T>(
        &self,
        query: &str,
        timeout_duration: Duration,
    ) -> Result<T, QueryError> 
    where
        T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
    {
        // Pattern 1: Using tokio::timeout
        match timeout(timeout_duration, self.execute_query(query)).await {
            Ok(Ok(result)) => Ok(result),
            Ok(Err(db_err)) => Err(QueryError::Database(db_err)),
            Err(_) => Err(QueryError::Timeout),
        }
    }

    /// Same pattern using select! for more control
    pub async fn query_with_custom_timeout<T>(
        &self,
        query: &str,
    ) -> Result<T, QueryError>
    where
        T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
    {
        tokio::select! {
            // Main operation
            result = self.execute_query::<T>(query) => {
                result.map_err(QueryError::Database)
            }
            // Timeout branch
            _ = tokio::time::sleep(self.default_timeout) => {
                // Log timeout for monitoring
                tracing::warn!(
                    query = query,
                    timeout_ms = self.default_timeout.as_millis(),
                    "Database query timed out"
                );
                Err(QueryError::Timeout)
            }
        }
    }

    async fn execute_query<T>(&self, query: &str) -> sqlx::Result<T>
    where
        T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
    {
        sqlx::query_as(query)
            .fetch_one(&self.pool)
            .await
    }
}

#[derive(Debug, thiserror::Error)]
pub enum QueryError {
    #[error("Database error: {0}")]
    Database(#[from] sqlx::Error),
    
    #[error("Query timed out")]
    Timeout,
}

/// Health check with timeout
pub struct HealthChecker {
    endpoints: Vec<String>,
    timeout: Duration,
}

impl HealthChecker {
    /// Check single endpoint health with timeout
    pub async fn check_endpoint(&self, url: &str) -> HealthStatus {
        tokio::select! {
            // Health check HTTP request
            result = self.perform_health_check(url) => {
                match result {
                    Ok(_) => HealthStatus::Healthy,
                    Err(e) => HealthStatus::Unhealthy(e.to_string()),
                }
            }
            
            // Timeout after configured duration
            _ = tokio::time::sleep(self.timeout) => {
                HealthStatus::Timeout
            }
        }
    }

    /// Check all endpoints concurrently with individual timeouts
    pub async fn check_all(&self) -> Vec<(String, HealthStatus)> {
        // Create futures for all checks
        let checks = self.endpoints.iter().map(|url| {
            let url = url.clone();
            async move {
                let status = self.check_endpoint(&url).await;
                (url, status)
            }
        });

        // Execute all concurrently
        futures::future::join_all(checks).await
    }

    async fn perform_health_check(&self, url: &str) -> reqwest::Result<()> {
        reqwest::get(url).await?.error_for_status()?;
        Ok(())
    }
}

#[derive(Debug, Clone)]
pub enum HealthStatus {
    Healthy,
    Unhealthy(String),
    Timeout,
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_timeout_triggers() {
        let checker = HealthChecker {
            endpoints: vec![],
            timeout: Duration::from_millis(100),
        };

        // Simulate slow endpoint
        let status = tokio::select! {
            _ = tokio::time::sleep(Duration::from_secs(10)) => {
                HealthStatus::Healthy
            }
            _ = tokio::time::sleep(checker.timeout) => {
                HealthStatus::Timeout
            }
        };

        assert!(matches!(status, HealthStatus::Timeout));
    }
}
Why this works:
  • Timeout protection: Prevents hanging on slow databases/networks
  • Cancellation safety: HTTP requests and DB queries are safe to cancel
  • Monitoring integration: Log timeouts for alerting
  • Graceful degradation: Return error instead of hanging forever

---

Real-World Example 2: Multi-Source Data Aggregation (Web/Backend)

Problem: Fetch data from multiple APIs concurrently

use serde::{Deserialize, Serialize};
use std::time::Instant;

/// Aggregated user profile from multiple sources
#[derive(Debug, Serialize)]
pub struct UserProfile {
    user_info: UserInfo,
    activity: UserActivity,
    preferences: UserPreferences,
    fetch_time_ms: u128,
}

#[derive(Debug, Serialize, Deserialize)]
struct UserInfo {
    id: u64,
    name: String,
    email: String,
}

#[derive(Debug, Serialize, Deserialize)]
struct UserActivity {
    last_login: String,
    total_actions: u64,
}

#[derive(Debug, Serialize, Deserialize)]
struct UserPreferences {
    theme: String,
    notifications: bool,
}

/// Service for aggregating user data from multiple microservices
pub struct UserAggregator {
    client: reqwest::Client,
    user_service_url: String,
    activity_service_url: String,
    preferences_service_url: String,
}

impl UserAggregator {
    /// Fetch complete user profile concurrently
    pub async fn get_user_profile(
        &self,
        user_id: u64,
    ) -> Result<UserProfile, AggregationError> {
        let start = Instant::now();

        // Pattern 1: try_join! - fail fast on any error
        let (user_info, activity, preferences) = tokio::try_join!(
            self.fetch_user_info(user_id),
            self.fetch_user_activity(user_id),
            self.fetch_user_preferences(user_id),
        )?;

        Ok(UserProfile {
            user_info,
            activity,
            preferences,
            fetch_time_ms: start.elapsed().as_millis(),
        })
    }

    /// Fetch profile with partial results on failure
    pub async fn get_user_profile_resilient(
        &self,
        user_id: u64,
    ) -> Result<PartialUserProfile, AggregationError> {
        let start = Instant::now();

        // Pattern 2: join! with Result - collect all errors
        let (user_result, activity_result, prefs_result) = tokio::join!(
            self.fetch_user_info(user_id),
            self.fetch_user_activity(user_id),
            self.fetch_user_preferences(user_id),
        );

        // User info is required, others are optional
        let user_info = user_result?;

        Ok(PartialUserProfile {
            user_info,
            activity: activity_result.ok(),
            preferences: prefs_result.ok(),
            fetch_time_ms: start.elapsed().as_millis(),
            errors: vec![
                activity_result.err(),
                prefs_result.err(),
            ]
            .into_iter()
            .flatten()
            .collect(),
        })
    }

    /// Fetch with timeout per service
    pub async fn get_user_profile_with_timeouts(
        &self,
        user_id: u64,
        timeout_per_service: Duration,
    ) -> Result<PartialUserProfile, AggregationError> {
        let start = Instant::now();

        // Pattern 3: Combine join! with timeout for each future
        let (user_result, activity_result, prefs_result) = tokio::join!(
            timeout(timeout_per_service, self.fetch_user_info(user_id)),
            timeout(timeout_per_service, self.fetch_user_activity(user_id)),
            timeout(timeout_per_service, self.fetch_user_preferences(user_id)),
        );

        // Handle timeouts
        let user_info = user_result
            .map_err(|_| AggregationError::Timeout("user_info".to_string()))?
            .map_err(AggregationError::Http)?;

        Ok(PartialUserProfile {
            user_info,
            activity: activity_result.ok().and_then(Result::ok),
            preferences: prefs_result.ok().and_then(Result::ok),
            fetch_time_ms: start.elapsed().as_millis(),
            errors: vec![],
        })
    }

    async fn fetch_user_info(&self, user_id: u64) -> Result<UserInfo, reqwest::Error> {
        self.client
            .get(&format!("{}/users/{}", self.user_service_url, user_id))
            .send()
            .await?
            .json()
            .await
    }

    async fn fetch_user_activity(
        &self,
        user_id: u64,
    ) -> Result<UserActivity, reqwest::Error> {
        self.client
            .get(&format!(
                "{}/activity/{}",
                self.activity_service_url, user_id
            ))
            .send()
            .await?
            .json()
            .await
    }

    async fn fetch_user_preferences(
        &self,
        user_id: u64,
    ) -> Result<UserPreferences, reqwest::Error> {
        self.client
            .get(&format!(
                "{}/preferences/{}",
                self.preferences_service_url, user_id
            ))
            .send()
            .await?
            .json()
            .await
    }
}

#[derive(Debug, Serialize)]
pub struct PartialUserProfile {
    user_info: UserInfo,
    activity: Option<UserActivity>,
    preferences: Option<UserPreferences>,
    fetch_time_ms: u128,
    errors: Vec<AggregationError>,
}

#[derive(Debug, thiserror::Error, Serialize)]
pub enum AggregationError {
    #[error("HTTP error: {0}")]
    Http(#[from] reqwest::Error),

    #[error("Service timeout: {0}")]
    Timeout(String),
}

/// Performance comparison: Sequential vs Concurrent
pub mod benchmarks {
    use super::*;

    pub async fn sequential_fetch(aggregator: &UserAggregator, user_id: u64) -> Duration {
        let start = Instant::now();

        // Sequential: 300ms + 200ms + 100ms = 600ms total
        let _ = aggregator.fetch_user_info(user_id).await;
        let _ = aggregator.fetch_user_activity(user_id).await;
        let _ = aggregator.fetch_user_preferences(user_id).await;

        start.elapsed()
    }

    pub async fn concurrent_fetch(aggregator: &UserAggregator, user_id: u64) -> Duration {
        let start = Instant::now();

        // Concurrent: max(300ms, 200ms, 100ms) = 300ms total
        let _ = tokio::join!(
            aggregator.fetch_user_info(user_id),
            aggregator.fetch_user_activity(user_id),
            aggregator.fetch_user_preferences(user_id),
        );

        start.elapsed()
    }
}
Key insights:
  • try_join! for strict mode: Fail fast if any service fails
  • join! for resilient mode: Collect partial results
  • Timeout per service: Prevent slow services from blocking everything
  • 2-3x speedup: Concurrent fetching vs sequential

---

Real-World Example 3: Event Loop with Multiple Sources (Network Programming)

Problem: Build custom event loop listening to multiple async sources

use tokio::sync::mpsc;
use tokio::signal;
use std::net::SocketAddr;

/// Multi-source event loop for network server
pub struct EventLoop {
    /// TCP listener for incoming connections
    listener: tokio::net::TcpListener,
    /// Channel for internal events
    internal_events: mpsc::Receiver<InternalEvent>,
    /// Timer for periodic tasks
    tick_interval: tokio::time::Interval,
    /// Server state
    state: ServerState,
}

#[derive(Debug)]
enum InternalEvent {
    ConnectionClosed(SocketAddr),
    MetricsRequest(tokio::sync::oneshot::Sender<Metrics>),
    ConfigReload,
}

#[derive(Default)]
struct ServerState {
    active_connections: usize,
    total_requests: u64,
}

#[derive(Debug, Clone)]
struct Metrics {
    active_connections: usize,
    total_requests: u64,
    uptime_seconds: u64,
}

impl EventLoop {
    pub async fn new(addr: &str) -> std::io::Result<Self> {
        let listener = tokio::net::TcpListener::bind(addr).await?;
        let (tx, rx) = mpsc::channel(100);

        Ok(EventLoop {
            listener,
            internal_events: rx,
            tick_interval: tokio::time::interval(Duration::from_secs(10)),
            state: ServerState::default(),
        })
    }

    /// Main event loop - select from multiple sources
    pub async fn run(mut self) -> std::io::Result<()> {
        println!("Event loop started");

        loop {
            tokio::select! {
                // Branch 1: Accept new TCP connection
                result = self.listener.accept() => {
                    match result {
                        Ok((socket, addr)) => {
                            println!("New connection from: {}", addr);
                            self.state.active_connections += 1;
                            self.handle_connection(socket, addr);
                        }
                        Err(e) => {
                            eprintln!("Accept error: {}", e);
                        }
                    }
                }

                // Branch 2: Internal event from channels
                Some(event) = self.internal_events.recv() => {
                    self.handle_internal_event(event);
                }

                // Branch 3: Periodic tick for maintenance
                _ = self.tick_interval.tick() => {
                    self.periodic_maintenance();
                }

                // Branch 4: Graceful shutdown on Ctrl+C
                _ = signal::ctrl_c() => {
                    println!("Shutdown signal received");
                    self.shutdown().await;
                    break;
                }
            }
        }

        Ok(())
    }

    fn handle_connection(&mut self, socket: tokio::net::TcpStream, addr: SocketAddr) {
        // Spawn task to handle connection
        tokio::spawn(async move {
            // Handle connection...
        });
    }

    fn handle_internal_event(&mut self, event: InternalEvent) {
        match event {
            InternalEvent::ConnectionClosed(addr) => {
                self.state.active_connections = self.state.active_connections.saturating_sub(1);
                println!("Connection closed: {}", addr);
            }
            InternalEvent::MetricsRequest(response) => {
                let metrics = Metrics {
                    active_connections: self.state.active_connections,
                    total_requests: self.state.total_requests,
                    uptime_seconds: 0, // Would track uptime
                };
                let _ = response.send(metrics);
            }
            InternalEvent::ConfigReload => {
                println!("Reloading configuration...");
            }
        }
    }

    fn periodic_maintenance(&mut self) {
        println!(
            "Periodic tick - Active connections: {}",
            self.state.active_connections
        );
        // Cleanup, metrics, health checks, etc.
    }

    async fn shutdown(&mut self) {
        println!("Performing graceful shutdown...");
        // Close connections, flush buffers, etc.
    }
}

/// Advanced: Priority-based select with biased polling
pub struct PriorityEventLoop {
    high_priority: mpsc::Receiver<PriorityEvent>,
    normal_priority: mpsc::Receiver<PriorityEvent>,
    low_priority: mpsc::Receiver<PriorityEvent>,
}

#[derive(Debug)]
enum PriorityEvent {
    Critical(String),
    Normal(String),
    Background(String),
}

impl PriorityEventLoop {
    pub async fn run(mut self) {
        loop {
            // Use select_biased! to prioritize branches
            tokio::select! {
                biased;  // Enable biased polling - checks in order

                // Check high priority first
                Some(event) = self.high_priority.recv() => {
                    println!("HIGH PRIORITY: {:?}", event);
                }

                // Then normal priority
                Some(event) = self.normal_priority.recv() => {
                    println!("Normal priority: {:?}", event);
                }

                // Finally low priority
                Some(event) = self.low_priority.recv() => {
                    println!("Low priority: {:?}", event);
                }

                // All channels closed
                else => {
                    println!("All channels closed, shutting down");
                    break;
                }
            }
        }
    }
}

/// Channel multiplexing - select from multiple channels
pub async fn multiplex_channels() {
    let (tx1, mut rx1) = mpsc::channel::<String>(10);
    let (tx2, mut rx2) = mpsc::channel::<String>(10);
    let (tx3, mut rx3) = mpsc::channel::<String>(10);

    // Spawn producers
    tokio::spawn(async move {
        tx1.send("From channel 1".to_string()).await.ok();
    });

    tokio::spawn(async move {
        tx2.send("From channel 2".to_string()).await.ok();
    });

    tokio::spawn(async move {
        tx3.send("From channel 3".to_string()).await.ok();
    });

    // Consume from all channels
    let mut count = 0;
    while count < 3 {
        tokio::select! {
            Some(msg) = rx1.recv() => {
                println!("Channel 1: {}", msg);
                count += 1;
            }
            Some(msg) = rx2.recv() => {
                println!("Channel 2: {}", msg);
                count += 1;
            }
            Some(msg) = rx3.recv() => {
                println!("Channel 3: {}", msg);
                count += 1;
            }
        }
    }
}
Why this pattern:
  • Unified event handling: Single loop for multiple async sources
  • Fair scheduling: All branches get polled (unless biased)
  • Priority support: Use select_biased! for priority queues
  • Graceful shutdown: Shutdown signal integrated into main loop

---

Real-World Example 4: Graceful Shutdown (Production Systems)

Problem: Coordinate shutdown across multiple tasks

use tokio::sync::broadcast;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

/// Graceful shutdown coordinator
pub struct ShutdownCoordinator {
    /// Broadcast channel for shutdown signal
    shutdown_tx: broadcast::Sender<()>,
    /// Track if shutdown initiated
    shutdown_initiated: Arc<AtomicBool>,
}

impl ShutdownCoordinator {
    pub fn new() -> Self {
        let (shutdown_tx, _) = broadcast::channel(1);
        
        ShutdownCoordinator {
            shutdown_tx,
            shutdown_initiated: Arc::new(AtomicBool::new(false)),
        }
    }

    /// Initiate graceful shutdown
    pub fn shutdown(&self) {
        if self.shutdown_initiated.swap(true, Ordering::SeqCst) {
            return; // Already shutting down
        }

        println!("Initiating graceful shutdown...");
        let _ = self.shutdown_tx.send(());
    }

    /// Get shutdown receiver for tasks
    pub fn subscribe(&self) -> broadcast::Receiver<()> {
        self.shutdown_tx.subscribe()
    }

    /// Wait for Ctrl+C and initiate shutdown
    pub async fn wait_for_signal(&self) {
        signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
        self.shutdown();
    }
}

/// Worker that respects graceful shutdown
pub struct Worker {
    id: usize,
    shutdown_rx: broadcast::Receiver<()>,
}

impl Worker {
    pub async fn run(mut self) {
        println!("Worker {} started", self.id);

        loop {
            tokio::select! {
                // Main work loop
                _ = self.do_work() => {
                    // Continue processing
                }

                // Shutdown signal received
                _ = self.shutdown_rx.recv() => {
                    println!("Worker {} received shutdown signal", self.id);
                    self.cleanup().await;
                    break;
                }
            }
        }

        println!("Worker {} stopped", self.id);
    }

    async fn do_work(&self) {
        // Simulate work
        tokio::time::sleep(Duration::from_millis(100)).await;
    }

    async fn cleanup(&self) {
        println!("Worker {} cleaning up...", self.id);
        // Flush buffers, close connections, save state
        tokio::time::sleep(Duration::from_millis(50)).await;
    }
}

/// HTTP server with graceful shutdown
pub struct HttpServer {
    listener: tokio::net::TcpListener,
    shutdown_rx: broadcast::Receiver<()>,
    active_requests: Arc<AtomicUsize>,
}

use std::sync::atomic::AtomicUsize;

impl HttpServer {
    pub async fn run(mut self) {
        println!("HTTP server listening");

        loop {
            tokio::select! {
                // Accept new connections
                result = self.listener.accept() => {
                    match result {
                        Ok((socket, addr)) => {
                            let counter = self.active_requests.clone();
                            counter.fetch_add(1, Ordering::SeqCst);
                            
                            tokio::spawn(async move {
                                // Handle request
                                Self::handle_request(socket).await;
                                counter.fetch_sub(1, Ordering::SeqCst);
                            });
                        }
                        Err(e) => eprintln!("Accept error: {}", e),
                    }
                }

                // Shutdown signal
                _ = self.shutdown_rx.recv() => {
                    println!("HTTP server received shutdown signal");
                    break;
                }
            }
        }

        // Wait for in-flight requests to complete
        self.drain_requests().await;
        println!("HTTP server stopped");
    }

    async fn handle_request(socket: tokio::net::TcpStream) {
        // Handle HTTP request
    }

    async fn drain_requests(&self) {
        let timeout = Duration::from_secs(30);
        let start = Instant::now();

        while self.active_requests.load(Ordering::SeqCst) > 0 {
            if start.elapsed() > timeout {
                println!(
                    "Shutdown timeout: {} requests still active",
                    self.active_requests.load(Ordering::SeqCst)
                );
                break;
            }

            tokio::time::sleep(Duration::from_millis(100)).await;
        }

        println!("All requests drained");
    }
}

/// Complete application with graceful shutdown
pub async fn run_application() {
    let coordinator = Arc::new(ShutdownCoordinator::new());

    // Spawn workers
    for i in 0..4 {
        let worker = Worker {
            id: i,
            shutdown_rx: coordinator.subscribe(),
        };
        tokio::spawn(worker.run());
    }

    // Wait for shutdown signal
    coordinator.wait_for_signal().await;

    // Give tasks time to clean up
    tokio::time::sleep(Duration::from_secs(2)).await;
    println!("Application shutdown complete");
}

/// Pattern: Work with timeout OR shutdown
pub async fn work_with_timeout_or_shutdown(
    mut shutdown: broadcast::Receiver<()>,
) -> Result<(), WorkError> {
    let work = async {
        // Long-running work
        tokio::time::sleep(Duration::from_secs(60)).await;
        Ok::<_, WorkError>(())
    };

    tokio::select! {
        result = work => result,
        _ = shutdown.recv() => {
            println!("Work interrupted by shutdown");
            Err(WorkError::Interrupted)
        }
        _ = tokio::time::sleep(Duration::from_secs(10)) => {
            println!("Work timed out");
            Err(WorkError::Timeout)
        }
    }
}

#[derive(Debug, thiserror::Error)]
enum WorkError {
    #[error("Work interrupted")]
    Interrupted,

    #[error("Work timed out")]
    Timeout,
}
Shutdown pattern benefits:
  • Coordinated: All tasks receive shutdown signal simultaneously
  • Graceful: Tasks can clean up before exiting
  • Timeout: Prevent hanging on stuck tasks
  • Production-ready: Handles Ctrl+C, SIGTERM, etc.

---

Real-World Example 5: Load Balancing / Hedged Requests (Performance)

Problem: Reduce tail latency by racing requests to multiple backends

use std::sync::Arc;

/// Client that sends hedged requests to reduce latency
pub struct HedgedClient {
    backends: Vec<String>,
    client: reqwest::Client,
    hedging_delay: Duration,
}

impl HedgedClient {
    /// Send request with hedging - race to multiple backends
    pub async fn request_with_hedging(&self, path: &str) -> Result<Response, RequestError> {
        if self.backends.is_empty() {
            return Err(RequestError::NoBackends);
        }

        // Start with first backend
        let mut primary = Box::pin(self.send_to_backend(&self.backends[0], path));

        // Wait for hedging delay
        tokio::select! {
            // Primary completes within hedging delay
            result = &mut primary => {
                return result;
            }

            // Hedging delay elapsed - send backup request
            _ = tokio::time::sleep(self.hedging_delay) => {
                // Continue to hedge logic below
            }
        }

        // Primary is slow - race with backup
        if self.backends.len() > 1 {
            let backup = self.send_to_backend(&self.backends[1], path);

            tokio::select! {
                result = primary => result,
                result = backup => result,
            }
        } else {
            primary.await
        }
    }

    /// Send request to fastest backend (full racing)
    pub async fn request_fastest(&self, path: &str) -> Result<Response, RequestError> {
        if self.backends.is_empty() {
            return Err(RequestError::NoBackends);
        }

        // Create futures for all backends
        let requests: Vec<_> = self
            .backends
            .iter()
            .map(|backend| self.send_to_backend(backend, path))
            .collect();

        // Race all requests - first to complete wins
        // Note: This is simplified; production would use FuturesUnordered
        match requests.len() {
            1 => requests.into_iter().next().unwrap().await,
            2 => {
                let mut iter = requests.into_iter();
                let r1 = iter.next().unwrap();
                let r2 = iter.next().unwrap();

                tokio::select! {
                    result = r1 => result,
                    result = r2 => result,
                }
            }
            _ => {
                // For 3+ backends, use futures_util::future::select_all
                use futures::future::select_all;
                let (result, _index, _remaining) = select_all(requests).await;
                result
            }
        }
    }

    async fn send_to_backend(
        &self,
        backend: &str,
        path: &str,
    ) -> Result<Response, RequestError> {
        let url = format!("{}{}", backend, path);

        self.client
            .get(&url)
            .send()
            .await
            .map_err(RequestError::Http)?
            .json()
            .await
            .map_err(RequestError::Http)
    }
}

#[derive(Debug, serde::Deserialize)]
pub struct Response {
    data: String,
}

#[derive(Debug, thiserror::Error)]
pub enum RequestError {
    #[error("No backends available")]
    NoBackends,

    #[error("HTTP error: {0}")]
    Http(reqwest::Error),
}

/// Load balancer with health checking
pub struct LoadBalancer {
    backends: Arc<Vec<Backend>>,
}

#[derive(Clone)]
struct Backend {
    url: String,
    healthy: Arc<AtomicBool>,
}

impl LoadBalancer {
    /// Execute request on first healthy backend
    pub async fn execute_with_failover(
        &self,
        path: &str,
    ) -> Result<Response, RequestError> {
        // Try backends in order until one succeeds
        for backend in self.backends.iter() {
            if !backend.healthy.load(Ordering::SeqCst) {
                continue;
            }

            match self.try_backend(backend, path).await {
                Ok(response) => return Ok(response),
                Err(e) => {
                    eprintln!("Backend {} failed: {}", backend.url, e);
                    // Mark unhealthy
                    backend.healthy.store(false, Ordering::SeqCst);
                    continue;
                }
            }
        }

        Err(RequestError::NoBackends)
    }

    /// Execute on all healthy backends, return fastest
    pub async fn execute_racing(&self, path: &str) -> Result<Response, RequestError> {
        let healthy_backends: Vec<_> = self
            .backends
            .iter()
            .filter(|b| b.healthy.load(Ordering::SeqCst))
            .collect();

        if healthy_backends.is_empty() {
            return Err(RequestError::NoBackends);
        }

        // Create futures for all healthy backends
        let requests: Vec<_> = healthy_backends
            .iter()
            .map(|backend| self.try_backend(backend, path))
            .collect();

        // Use select_all to race them
        use futures::future::select_all;
        let (result, _index, _remaining) = select_all(requests).await;

        result
    }

    async fn try_backend(&self, backend: &Backend, path: &str) -> Result<Response, RequestError> {
        let client = reqwest::Client::new();
        let url = format!("{}{}", backend.url, path);

        client
            .get(&url)
            .timeout(Duration::from_millis(500))
            .send()
            .await
            .map_err(RequestError::Http)?
            .json()
            .await
            .map_err(RequestError::Http)
    }

    /// Background health checking
    pub async fn health_check_loop(self: Arc<Self>) {
        let mut interval = tokio::time::interval(Duration::from_secs(5));

        loop {
            interval.tick().await;

            for backend in self.backends.iter() {
                let backend = backend.clone();
                let lb = self.clone();

                tokio::spawn(async move {
                    match lb.check_health(&backend).await {
                        Ok(true) => {
                            backend.healthy.store(true, Ordering::SeqCst);
                        }
                        _ => {
                            backend.healthy.store(false, Ordering::SeqCst);
                        }
                    }
                });
            }
        }
    }

    async fn check_health(&self, backend: &Backend) -> Result<bool, RequestError> {
        let client = reqwest::Client::new();
        let url = format!("{}/health", backend.url);

        match client.get(&url).send().await {
            Ok(resp) => Ok(resp.status().is_success()),
            Err(_) => Ok(false),
        }
    }
}

/// Benchmarking: Sequential vs Racing
pub mod latency_tests {
    use super::*;

    pub async fn measure_sequential(backends: &[String], path: &str) -> Duration {
        let start = Instant::now();
        let client = reqwest::Client::new();

        // Try backends sequentially
        for backend in backends {
            let url = format!("{}{}", backend, path);
            if client.get(&url).send().await.is_ok() {
                break;
            }
        }

        start.elapsed()
    }

    pub async fn measure_racing(backends: &[String], path: &str) -> Duration {
        let start = Instant::now();
        let client = reqwest::Client::new();

        // Race all backends
        let requests: Vec<_> = backends
            .iter()
            .map(|backend| {
                let url = format!("{}{}", backend, path);
                client.get(&url).send()
            })
            .collect();

        use futures::future::select_all;
        let _ = select_all(requests).await;

        start.elapsed()
    }

    // Typical results:
    // Sequential with failures: 1000ms (try 3 backends @ 300ms each)
    // Racing: 300ms (fastest backend wins)
    // Latency improvement: 3.3x
}
Hedging benefits:
  • Reduced P99 latency: Slow backends don't dominate tail latency
  • Improved reliability: Automatic failover to healthy backends
  • Cost trade-off: More load on backends, better user experience
  • Production use: Google, Amazon use hedging extensively

---

Deep Dive: How Select and Join Work

The select! Macro Expansion

// What you write:
tokio::select! {
    result = future1 => { handle_result1(result); }
    result = future2 => { handle_result2(result); }
}

// Roughly expands to:
{
    use std::pin::Pin;
    use std::task::{Context, Poll};
    
    // Pin both futures
    let mut future1 = std::pin::pin!(future1);
    let mut future2 = std::pin::pin!(future2);
    
    // Poll them in a loop
    std::future::poll_fn(|cx| {
        // Randomly choose poll order (fair scheduling)
        if fastrand::bool() {
            // Try future1 first
            if let Poll::Ready(result) = future1.as_mut().poll(cx) {
                handle_result1(result);
                return Poll::Ready(());
            }
            if let Poll::Ready(result) = future2.as_mut().poll(cx) {
                handle_result2(result);
                return Poll::Ready(());
            }
        } else {
            // Try future2 first
            if let Poll::Ready(result) = future2.as_mut().poll(cx) {
                handle_result2(result);
                return Poll::Ready(());
            }
            if let Poll::Ready(result) = future1.as_mut().poll(cx) {
                handle_result1(result);
                return Poll::Ready(());
            }
        }
        
        // Both pending
        Poll::Pending
    }).await
}
Key mechanics:
  • Random polling order: Prevents starvation (fair scheduling)
  • Pinning: Futures stay in place for safety
  • Cancellation: When one completes, others are dropped
  • Single waker: All branches share one waker registration

The join! Macro Expansion

// What you write:
let (res1, res2) = tokio::join!(future1, future2);

// Roughly expands to:
{
    use std::pin::Pin;
    use std::task::{Context, Poll};
    
    let mut future1 = std::pin::pin!(future1);
    let mut future2 = std::pin::pin!(future2);
    
    let mut result1 = None;
    let mut result2 = None;
    
    std::future::poll_fn(|cx| {
        // Poll future1 if not completed
        if result1.is_none() {
            if let Poll::Ready(res) = future1.as_mut().poll(cx) {
                result1 = Some(res);
            }
        }
        
        // Poll future2 if not completed
        if result2.is_none() {
            if let Poll::Ready(res) = future2.as_mut().poll(cx) {
                result2 = Some(res);
            }
        }
        
        // Both ready?
        match (result1.take(), result2.take()) {
            (Some(r1), Some(r2)) => Poll::Ready((r1, r2)),
            (r1, r2) => {
                // Put back partial results
                result1 = r1;
                result2 = r2;
                Poll::Pending
            }
        }
    }).await
}
Key mechanics:
  • All futures polled: Every poll cycle polls all incomplete futures
  • No cancellation: All futures run to completion
  • Result storage: Completed results stored until all ready
  • Efficient: No spawning, no channels, just polling

Polling Order and Fairness

/// Fair select - random poll order
tokio::select! {
    _ = future1 => { /* ... */ }
    _ = future2 => { /* ... */ }
}

/// Biased select - deterministic order (future1 always checked first)
tokio::select! {
    biased;
    _ = future1 => { /* future1 has priority */ }
    _ = future2 => { /* future2 only if future1 pending */ }
}
When to use biased:
  • Priority queues: High-priority events should be checked first
  • Optimization: Skip randomization overhead when order matters
  • Determinism: Testing scenarios requiring predictable behavior
Danger: Biased select can starve lower-priority branches if higher branches are always ready.

Cancellation Safety Deep Dive

What is cancellation safety?

A future is cancellation safe if dropping it before completion doesn't lose data or leave inconsistent state.

// โœ… CANCELLATION SAFE: No state lost
async fn safe_sleep() {
    tokio::time::sleep(Duration::from_secs(1)).await;
}
// Dropping this is fine - timer just stops

// โŒ NOT CANCELLATION SAFE: Partial read lost
async fn unsafe_read(socket: &mut TcpStream) -> Vec<u8> {
    let mut buffer = vec![0u8; 1024];
    socket.read(&mut buffer).await.unwrap();
    buffer
}
// If cancelled mid-read, partial data is lost forever!

// โœ… FIXED: Make it cancellation safe
async fn safe_read(socket: &mut TcpStream) -> Vec<u8> {
    let mut buffer = vec![0u8; 1024];
    
    // Use tokio::io::AsyncReadExt::read_exact which is documented
    // as NOT cancellation safe, so we DON'T use it in select!
    
    // Instead, buffer partial reads
    let mut total = vec![];
    loop {
        match socket.try_read(&mut buffer) {
            Ok(0) => break,
            Ok(n) => total.extend_from_slice(&buffer[..n]),
            Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                // Wait for readable
                socket.readable().await.unwrap();
            }
            Err(e) => panic!("{}", e),
        }
    }
    total
}
Common cancellation-unsafe patterns:
  • AsyncReadExt::read_exact - partial reads lost
  • AsyncWriteExt::write_all - partial writes lost
  • Mutating shared state without atomics
  • Multi-step operations without transactions
How to handle:
  1. Check documentation: Most tokio methods document cancellation safety
  2. Use atomic operations: For state updates
  3. Buffer intermediate results: Don't lose partial data
  4. Explicit cleanup: Use drop guards if needed

Memory Layout and State Management

// How select! stores state
pub struct Select2<F1, F2> {
    future1: std::pin::Pin<Box<F1>>,
    future2: std::pin::Pin<Box<F2>>,
    // No result storage - drops losing branch immediately
}

// How join! stores state
pub struct Join2<F1, F2> {
    future1: std::pin::Pin<Box<F1>>,
    future2: std::pin::Pin<Box<F2>>,
    result1: Option<F1::Output>,
    result2: Option<F2::Output>,
    // Stores results until both complete
}
Memory implications:
  • select!: Only stores futures, not results (lower memory)
  • join!: Stores all results (higher memory for large outputs)
  • Pinning: Both use Pin to keep futures in place

try_join! for Fallible Operations

use tokio::try_join;

/// Fetch data from multiple sources, fail fast on error
async fn fetch_all() -> Result<(Data1, Data2, Data3), Error> {
    // If ANY future returns Err, immediately:
    // 1. Cancel other futures
    // 2. Return that error
    try_join!(
        fetch_data1(),
        fetch_data2(),
        fetch_data3(),
    )
}

// Expanded behavior:
async fn fetch_all_expanded() -> Result<(Data1, Data2, Data3), Error> {
    let (r1, r2, r3) = tokio::join!(
        fetch_data1(),
        fetch_data2(),
        fetch_data3(),
    );
    
    // Early return on first error
    Ok((r1?, r2?, r3?))
}
Use try_join! when:
  • All results are needed (can't proceed with partial)
  • Errors should stop all work immediately
  • Example: Database transaction across multiple tables
Use join! when:
  • Partial results are acceptable
  • Errors should be collected, not short-circuit
  • Example: Aggregating data from multiple optional sources

---

When to Use Select and Join

Use select! when:

| Scenario | Why |

|----------|-----|

| Timeouts | Race operation with timeout future |

| First-wins | First successful response from multiple sources |

| Shutdown signals | Interrupt work when shutdown signaled |

| User input + background work | Respond to input while processing |

| Multiple event sources | TCP accept, timer, channel, signal |

Use join! when:

| Scenario | Why |

|----------|-----|

| Parallel execution | Multiple independent operations |

| Aggregating results | Need all results to proceed |

| Performance optimization | Concurrent fetching instead of sequential |

| Batch operations | Process multiple items simultaneously |

Use try_join! when:

| Scenario | Why |

|----------|-----|

| All-or-nothing | Need all results or fail entirely |

| Transactional operations | Database writes across tables |

| Service initialization | Multiple services must all start |

DON'T use select! when:

| Anti-pattern | Problem | Solution |

|--------------|---------|----------|

| Sequential dependencies | Results depend on each other | Chain with .await |

| Cancellation-unsafe futures | Data loss on cancellation | Use join! or handle cleanup |

| Need all results | Select only gives one | Use join! instead |

| Complex racing logic | Hard to maintain | Use futures::select_all or FuturesUnordered |

---

โš ๏ธ Anti-patterns and Pitfalls

โš ๏ธ Anti-pattern 1: Not Understanding Cancellation

// โŒ WRONG: Lost data on cancellation
async fn dangerous_select(mut socket: TcpStream) {
    let mut buffer = Vec::new();
    
    tokio::select! {
        result = socket.read_to_end(&mut buffer) => {
            println!("Read {} bytes", buffer.len());
        }
        _ = tokio::time::sleep(Duration::from_secs(1)) => {
            println!("Timeout");
            // PROBLEM: If timeout wins, partial read is LOST!
            // buffer is dropped with data
        }
    }
}

// โœ… CORRECT: Handle partial results
async fn safe_select(mut socket: TcpStream) {
    let mut buffer = Vec::new();
    let mut total_read = 0;
    
    loop {
        let mut chunk = vec![0u8; 1024];
        
        tokio::select! {
            result = socket.read(&mut chunk) => {
                match result {
                    Ok(0) => break,  // EOF
                    Ok(n) => {
                        buffer.extend_from_slice(&chunk[..n]);
                        total_read += n;
                    }
                    Err(e) => panic!("{}", e),
                }
            }
            _ = tokio::time::sleep(Duration::from_secs(1)) => {
                println!("Timeout, but we saved {} bytes", total_read);
                break;
            }
        }
    }
    
    println!("Total read: {}", buffer.len());
}

โš ๏ธ Anti-pattern 2: Race Conditions in Select Branches

// โŒ WRONG: Shared state race condition
use std::sync::Arc;
use tokio::sync::Mutex;

async fn racy_select(counter: Arc<Mutex<u64>>) {
    tokio::select! {
        _ = async {
            let mut count = counter.lock().await;
            *count += 1;
            // Lock dropped here
        } => {
            // Branch 1
        }
        _ = async {
            let mut count = counter.lock().await;
            *count += 1;
            // Lock dropped here
        } => {
            // Branch 2
        }
    }
    // PROBLEM: Both branches might acquire lock, but only one completes!
    // The other's increment is lost due to cancellation
}

// โœ… CORRECT: Atomic operations for shared state
use std::sync::atomic::{AtomicU64, Ordering};

async fn safe_select(counter: Arc<AtomicU64>) {
    tokio::select! {
        _ = async {
            counter.fetch_add(1, Ordering::SeqCst);
        } => {
            // Branch 1 - increment persists even if cancelled
        }
        _ = async {
            counter.fetch_add(1, Ordering::SeqCst);
        } => {
            // Branch 2 - increment persists even if cancelled
        }
    }
}

โš ๏ธ Anti-pattern 3: Using select! When join! is Needed

// โŒ WRONG: select! when you need all results
async fn wrong_pattern() {
    tokio::select! {
        user = fetch_user() => {
            // Only got user, missing orders!
        }
        orders = fetch_orders() => {
            // Only got orders, missing user!
        }
    }
}

// โœ… CORRECT: Use join! for all results
async fn correct_pattern() {
    let (user, orders) = tokio::join!(
        fetch_user(),
        fetch_orders(),
    );
    // Have both user and orders
}

โš ๏ธ Anti-pattern 4: Ignoring Partial Results in join!

// โŒ WRONG: All-or-nothing with join!
async fn wasteful_join() {
    let (user, activity, prefs) = tokio::join!(
        fetch_user(),
        fetch_activity(),
        fetch_preferences(),
    );
    
    // PROBLEM: If any fetch fails, we throw away successful fetches!
    let user = user.unwrap();
    let activity = activity.unwrap();
    let prefs = prefs.unwrap();
}

// โœ… CORRECT: Use partial results
async fn smart_join() {
    let (user, activity, prefs) = tokio::join!(
        fetch_user(),
        fetch_activity(),
        fetch_preferences(),
    );
    
    // Use what we got
    let user = user?;  // User is required
    let activity = activity.ok();  // Activity is optional
    let prefs = prefs.ok();  // Preferences optional
    
    Ok(Profile { user, activity, prefs })
}

โš ๏ธ Anti-pattern 5: Not Handling Biased Polling

// โŒ WRONG: Biased select starves low-priority branch
async fn starving_select() {
    let (tx, mut rx) = mpsc::channel(100);
    
    // Spam high-priority channel
    tokio::spawn(async move {
        loop {
            tx.send("high").await.ok();
        }
    });
    
    tokio::select! {
        biased;
        
        // High priority - ALWAYS ready
        Some(msg) = rx.recv() => {
            println!("High: {}", msg);
        }
        
        // Low priority - NEVER runs!
        _ = tokio::time::sleep(Duration::from_millis(100)) => {
            println!("Low priority task");
        }
    }
}

// โœ… CORRECT: Fair select or rate limiting
async fn fair_select() {
    let (tx, mut rx) = mpsc::channel(100);
    
    tokio::select! {
        // No 'biased' - fair scheduling
        
        Some(msg) = rx.recv() => {
            println!("High: {}", msg);
        }
        
        _ = tokio::time::sleep(Duration::from_millis(100)) => {
            println!("Low priority task - will eventually run");
        }
    }
}

---

Performance Characteristics

Overhead of select! Polling

// Benchmarking select! vs direct await
use std::time::Instant;

async fn bench_direct_await() -> Duration {
    let start = Instant::now();
    
    for _ in 0..10000 {
        tokio::time::sleep(Duration::from_nanos(1)).await;
    }
    
    start.elapsed()
}

async fn bench_select_overhead() -> Duration {
    let start = Instant::now();
    
    for _ in 0..10000 {
        tokio::select! {
            _ = tokio::time::sleep(Duration::from_nanos(1)) => {}
        }
    }
    
    start.elapsed()
}

// Typical results:
// Direct await: 50ms
// Select overhead: 55ms
// Overhead: ~10% for single-branch select
select! overhead sources:
  • Random polling order generation
  • Multiple future pinning
  • Branch selection logic
  • Waker registration for all branches
Optimization: For hot paths, prefer direct .await over single-branch select!

Memory Usage for Concurrent Futures

// Memory comparison
use std::mem::size_of_val;

async fn memory_comparison() {
    // Sequential: Only stores current future
    let sequential = async {
        let r1 = fetch_large_data().await;
        let r2 = fetch_large_data().await;
        (r1, r2)
    };
    
    // join!: Stores both futures + results
    let concurrent = async {
        tokio::join!(
            fetch_large_data(),
            fetch_large_data(),
        )
    };
    
    // If each future is 1KB and result is 1MB:
    // Sequential: 1KB (future) + 1MB (result 1) + 1MB (result 2) = ~2MB peak
    // Concurrent: 2KB (both futures) + 2MB (both results) = ~2MB peak
    
    // BUT: Concurrent holds both results simultaneously
    // Sequential can drop result1 before allocating result2
}

async fn fetch_large_data() -> Vec<u8> {
    vec![0u8; 1024 * 1024]  // 1MB
}
Memory implications:
  • join!: Peak memory = sum of all futures + all results
  • Sequential: Peak memory = largest single result
  • Trade-off: Speed vs memory

Comparison: Sequential vs join! vs spawn

use std::time::Instant;

/// Sequential execution
async fn sequential_fetch() -> Duration {
    let start = Instant::now();
    
    let _r1 = fetch_data(100).await;
    let _r2 = fetch_data(100).await;
    let _r3 = fetch_data(100).await;
    
    start.elapsed()
    // Total: 300ms (100 + 100 + 100)
}

/// Concurrent with join!
async fn join_fetch() -> Duration {
    let start = Instant::now();
    
    let (_r1, _r2, _r3) = tokio::join!(
        fetch_data(100),
        fetch_data(100),
        fetch_data(100),
    );
    
    start.elapsed()
    // Total: 100ms (max of all)
}

/// Concurrent with spawn
async fn spawn_fetch() -> Duration {
    let start = Instant::now();
    
    let h1 = tokio::spawn(fetch_data(100));
    let h2 = tokio::spawn(fetch_data(100));
    let h3 = tokio::spawn(fetch_data(100));
    
    let _ = tokio::try_join!(h1, h2, h3).unwrap();
    
    start.elapsed()
    // Total: 100ms + spawn overhead (~5ms)
}

async fn fetch_data(delay_ms: u64) -> String {
    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
    "data".to_string()
}
Performance comparison:

| Method | Time | Overhead | Use Case |

|--------|------|----------|----------|

| Sequential | 300ms | 0ms | Dependencies between operations |

| join! | 100ms | ~1ms | Concurrent, same task |

| spawn | 105ms | ~5ms | Concurrent, need parallelism |

When to use spawn vs join!:
  • join!: CPU-light async I/O, no parallelism needed
  • spawn: CPU-heavy work, want true parallelism

Cancellation Cost

// Measuring cancellation overhead
async fn bench_cancellation() {
    let start = Instant::now();
    
    for _ in 0..10000 {
        tokio::select! {
            _ = expensive_future() => {}
            _ = tokio::time::sleep(Duration::from_nanos(1)) => {
                // Cancels expensive_future
            }
        }
    }
    
    println!("Cancellation overhead: {:?}", start.elapsed());
}

async fn expensive_future() {
    // Simulate complex future with resources
    let _large_buffer = vec![0u8; 1024 * 1024];
    tokio::time::sleep(Duration::from_secs(10)).await;
}

// Cost of cancellation:
// - Drop expensive_future and all its state
// - Deallocate large_buffer
// - Cleanup any registered wakers
// Typical: 1-10ยตs depending on future complexity

---

Exercises

Beginner: Implement Timeout Wrapper with select!

Task: Create a generic timeout wrapper for any future.
/// Wrap any future with a timeout
pub async fn with_timeout<F, T>(
    future: F,
    timeout: Duration,
) -> Result<T, TimeoutError>
where
    F: std::future::Future<Output = T>,
{
    // TODO: Use select! to race future with timeout
    // Return Ok(result) if future completes
    // Return Err(TimeoutError) if timeout elapses
    
    todo!()
}

#[derive(Debug)]
pub struct TimeoutError;

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_completes_before_timeout() {
        let result = with_timeout(
            async { 42 },
            Duration::from_secs(1),
        ).await;
        
        assert_eq!(result.unwrap(), 42);
    }

    #[tokio::test]
    async fn test_timeout_triggers() {
        let result = with_timeout(
            async {
                tokio::time::sleep(Duration::from_secs(10)).await;
                42
            },
            Duration::from_millis(100),
        ).await;
        
        assert!(result.is_err());
    }
}
Solution:
pub async fn with_timeout<F, T>(
    future: F,
    timeout: Duration,
) -> Result<T, TimeoutError>
where
    F: std::future::Future<Output = T>,
{
    tokio::select! {
        result = future => Ok(result),
        _ = tokio::time::sleep(timeout) => Err(TimeoutError),
    }
}

Intermediate: Build Multi-Endpoint Health Checker

Task: Create a health checker that tests multiple endpoints concurrently.
use std::collections::HashMap;

#[derive(Debug, Clone)]
pub enum HealthStatus {
    Healthy { latency_ms: u128 },
    Unhealthy { error: String },
    Timeout,
}

pub struct MultiHealthChecker {
    endpoints: HashMap<String, String>,
    timeout: Duration,
}

impl MultiHealthChecker {
    pub fn new(timeout: Duration) -> Self {
        MultiHealthChecker {
            endpoints: HashMap::new(),
            timeout,
        }
    }

    pub fn add_endpoint(&mut self, name: String, url: String) {
        self.endpoints.insert(name, url);
    }

    /// Check all endpoints concurrently
    /// Return map of endpoint name -> status
    pub async fn check_all(&self) -> HashMap<String, HealthStatus> {
        // TODO: 
        // 1. Create futures for checking each endpoint
        // 2. Use futures::future::join_all to run concurrently
        // 3. Each check should have individual timeout using select!
        // 4. Measure latency for healthy endpoints
        
        todo!()
    }

    async fn check_endpoint(&self, url: &str) -> HealthStatus {
        // TODO:
        // 1. Measure start time
        // 2. Use select! to race HTTP request with timeout
        // 3. Return appropriate HealthStatus
        
        todo!()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_health_checker() {
        let mut checker = MultiHealthChecker::new(Duration::from_secs(5));
        checker.add_endpoint("google".to_string(), "https://google.com".to_string());
        checker.add_endpoint("example".to_string(), "https://example.com".to_string());
        
        let results = checker.check_all().await;
        assert_eq!(results.len(), 2);
        
        for (name, status) in results {
            println!("{}: {:?}", name, status);
        }
    }
}
Solution:
impl MultiHealthChecker {
    pub async fn check_all(&self) -> HashMap<String, HealthStatus> {
        let checks: Vec<_> = self
            .endpoints
            .iter()
            .map(|(name, url)| {
                let name = name.clone();
                let url = url.clone();
                async move {
                    let status = self.check_endpoint(&url).await;
                    (name, status)
                }
            })
            .collect();

        futures::future::join_all(checks)
            .await
            .into_iter()
            .collect()
    }

    async fn check_endpoint(&self, url: &str) -> HealthStatus {
        let start = Instant::now();

        let result = tokio::select! {
            result = reqwest::get(url) => result,
            _ = tokio::time::sleep(self.timeout) => {
                return HealthStatus::Timeout;
            }
        };

        match result {
            Ok(resp) if resp.status().is_success() => HealthStatus::Healthy {
                latency_ms: start.elapsed().as_millis(),
            },
            Ok(resp) => HealthStatus::Unhealthy {
                error: format!("HTTP {}", resp.status()),
            },
            Err(e) => HealthStatus::Unhealthy {
                error: e.to_string(),
            },
        }
    }
}

Advanced: Create Custom Select Combinator with Priority

Task: Build a custom select combinator that supports priority levels.
use std::pin::Pin;
use std::task::{Context, Poll};
use std::future::Future;

/// Select combinator with priority - higher priority futures checked first
pub struct PrioritySelect<F1, F2, F3> {
    high_priority: Pin<Box<F1>>,
    medium_priority: Pin<Box<F2>>,
    low_priority: Pin<Box<F3>>,
}

pub enum PriorityResult<T1, T2, T3> {
    High(T1),
    Medium(T2),
    Low(T3),
}

impl<F1, F2, F3> PrioritySelect<F1, F2, F3>
where
    F1: Future,
    F2: Future,
    F3: Future,
{
    pub fn new(high: F1, medium: F2, low: F3) -> Self {
        PrioritySelect {
            high_priority: Box::pin(high),
            medium_priority: Box::pin(medium),
            low_priority: Box::pin(low),
        }
    }
}

impl<F1, F2, F3> Future for PrioritySelect<F1, F2, F3>
where
    F1: Future,
    F2: Future,
    F3: Future,
{
    type Output = PriorityResult<F1::Output, F2::Output, F3::Output>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // TODO:
        // 1. Always poll high_priority first
        // 2. If high is Ready, return immediately
        // 3. Then poll medium_priority
        // 4. If medium is Ready, return immediately
        // 5. Finally poll low_priority
        // 6. If all Pending, return Pending
        
        todo!()
    }
}

// Helper function to use it
pub fn priority_select<F1, F2, F3>(
    high: F1,
    medium: F2,
    low: F3,
) -> PrioritySelect<F1, F2, F3>
where
    F1: Future,
    F2: Future,
    F3: Future,
{
    PrioritySelect::new(high, medium, low)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_high_priority_wins() {
        let result = priority_select(
            async { tokio::time::sleep(Duration::from_millis(10)).await; "high" },
            async { tokio::time::sleep(Duration::from_millis(5)).await; "medium" },
            async { tokio::time::sleep(Duration::from_millis(1)).await; "low" },
        )
        .await;

        // Even though low completes first, high should win due to priority
        // Wait... actually low will complete first!
        // This tests that priority polling gives high-priority futures
        // more opportunities to complete
        match result {
            PriorityResult::High(_) => println!("High priority"),
            PriorityResult::Medium(_) => println!("Medium priority"),
            PriorityResult::Low(v) => {
                assert_eq!(v, "low");
                println!("Low priority completed first");
            }
        }
    }
}
Solution:
impl<F1, F2, F3> Future for PrioritySelect<F1, F2, F3>
where
    F1: Future,
    F2: Future,
    F3: Future,
{
    type Output = PriorityResult<F1::Output, F2::Output, F3::Output>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Check high priority first
        if let Poll::Ready(output) = self.high_priority.as_mut().poll(cx) {
            return Poll::Ready(PriorityResult::High(output));
        }

        // Then medium priority
        if let Poll::Ready(output) = self.medium_priority.as_mut().poll(cx) {
            return Poll::Ready(PriorityResult::Medium(output));
        }

        // Finally low priority
        if let Poll::Ready(output) = self.low_priority.as_mut().poll(cx) {
            return Poll::Ready(PriorityResult::Low(output));
        }

        // All pending
        Poll::Pending
    }
}

---

Real-World Usage Examples

tokio::select! Examples

use tokio::sync::mpsc;
use tokio::signal;

/// Complete server with select! patterns
pub async fn run_server() {
    let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
    
    // Main server loop
    tokio::select! {
        // Handle signals
        _ = signal::ctrl_c() => {
            println!("Ctrl+C received");
        }
        
        // Wait for explicit shutdown
        _ = shutdown_rx.recv() => {
            println!("Shutdown signal received");
        }
        
        // Or run for maximum duration
        _ = tokio::time::sleep(Duration::from_secs(3600)) => {
            println!("Max runtime reached");
        }
    }
    
    println!("Server shutting down");
}

/// select! with else branch for default action
pub async fn select_with_else() {
    let (tx, mut rx) = mpsc::channel::<String>(10);
    
    tokio::select! {
        Some(msg) = rx.recv() => {
            println!("Received: {}", msg);
        }
        else => {
            println!("Channel closed, no more messages");
        }
    }
}

futures::select! Differences

use futures::select;
use futures::FutureExt;  // for .fuse()

/// futures::select! requires .fuse() for reusable futures
pub async fn futures_select_example() {
    let mut future1 = async { 1 }.fuse();
    let mut future2 = async { 2 }.fuse();
    
    select! {
        result = future1 => println!("Future 1: {}", result),
        result = future2 => println!("Future 2: {}", result),
    }
    
    // Key difference: futures::select! can be used in loops
    // because .fuse() makes futures reusable
}

/// tokio::select! for one-shot selection
pub async fn tokio_select_example() {
    // No .fuse() needed, but futures are consumed
    tokio::select! {
        result = async { 1 } => println!("Result: {}", result),
        result = async { 2 } => println!("Result: {}", result),
    }
}

tokio::try_join! for Errors

/// Initialize multiple services, fail if any fails
pub async fn initialize_services() -> Result<Services, InitError> {
    let (db, cache, queue) = tokio::try_join!(
        init_database(),
        init_cache(),
        init_queue(),
    )?;
    
    Ok(Services { db, cache, queue })
}

struct Services {
    db: Database,
    cache: Cache,
    queue: Queue,
}

#[derive(Debug)]
struct Database;
#[derive(Debug)]
struct Cache;
#[derive(Debug)]
struct Queue;

#[derive(Debug, thiserror::Error)]
enum InitError {
    #[error("Database init failed")]
    Database,
    #[error("Cache init failed")]
    Cache,
    #[error("Queue init failed")]
    Queue,
}

async fn init_database() -> Result<Database, InitError> {
    Ok(Database)
}

async fn init_cache() -> Result<Cache, InitError> {
    Ok(Cache)
}

async fn init_queue() -> Result<Queue, InitError> {
    Ok(Queue)
}

Dynamic Select with FuturesUnordered

use futures::stream::{FuturesUnordered, StreamExt};

/// Process dynamic number of futures concurrently
pub async fn dynamic_select() {
    let mut futures = FuturesUnordered::new();
    
    // Add futures dynamically
    for i in 0..10 {
        futures.push(async move {
            tokio::time::sleep(Duration::from_millis(i * 10)).await;
            i
        });
    }
    
    // Process as they complete
    while let Some(result) = futures.next().await {
        println!("Future {} completed", result);
    }
}

/// Race dynamic number of HTTP requests
pub async fn race_many_requests(urls: Vec<String>) -> Option<String> {
    let mut futures = FuturesUnordered::new();
    
    for url in urls {
        futures.push(async move {
            reqwest::get(&url).await.ok()?.text().await.ok()
        });
    }
    
    // Return first successful result
    while let Some(result) = futures.next().await {
        if let Some(response) = result {
            return Some(response);
        }
    }
    
    None
}

---

Further Reading

Official Documentation

Deep Dives

Advanced Topics

Production Patterns

Summary

Select and Join are the Swiss Army knife of async composition:

Key Takeaways

  1. select! = Race (first to complete)
  • Use for timeouts, shutdown signals, first-wins
  • Cancels unfinished branches
  • Check cancellation safety!
  1. join! = Concurrent (all to complete)
  • Use for parallel execution, aggregation
  • No cancellation, all run to completion
  • 2-3x speedup over sequential
  1. try_join! = Fail fast
  • Use for all-or-nothing operations
  • First error cancels everything
  • Perfect for transactional workflows
  1. Cancellation Safety is critical
  • Not all futures are safe to cancel
  • Check docs, test thoroughly
  • Use atomic operations for shared state
  1. Performance vs Complexity
  • select! adds ~10% overhead
  • join! saves 2-3x time for I/O
  • spawn adds more overhead but enables parallelism

When to Use What

| Pattern | Latency | Throughput | Complexity | Use Case |

|---------|---------|------------|------------|----------|

| Sequential .await | High | Low | Low | Dependencies |

| join! | Low | High | Medium | Concurrent I/O |

| select! | Low | Medium | Medium | Racing, timeouts |

| spawn | Low | Highest | High | CPU parallelism |

Master these patterns and you'll write elegant, high-performance async Rust that scales from prototypes to production.

Now go forth and compose futures like a legendary Rust master! ๐Ÿฆ€โšก

๐ŸŽฎ Try it Yourself

๐ŸŽฎ

Select/Join Patterns - Playground

Run this code in the official Rust Playground