Concurrent future composition
select!): Race multiple futures, return when first completesjoin!): Execute futures concurrently, return when all completeThese patterns transform how you think about concurrency - instead of spawning tasks and coordinating with channels, you compose futures declaratively.
use tokio::time::{sleep, Duration};
// SELECT: First to complete wins
async fn select_example() {
tokio::select! {
_ = sleep(Duration::from_secs(1)) => {
println!("Fast operation finished first");
}
_ = sleep(Duration::from_secs(5)) => {
println!("Slow operation finished first");
}
}
// Output: "Fast operation finished first"
// The 5-second sleep is CANCELLED
}
// JOIN: Wait for all to complete
async fn join_example() {
let (res1, res2) = tokio::join!(
sleep(Duration::from_secs(1)),
sleep(Duration::from_secs(5))
);
// Waits for BOTH (5 seconds total, not 6)
println!("Both operations completed");
}
| Pattern | Use Case | Behavior |
|---------|----------|----------|
| select! | Timeouts, first-wins races, shutdown signals | Cancels unfinished branches |
| join! | Parallel execution, aggregating results | Waits for all branches |
| try_join! | Parallel with error propagation | Short-circuits on first error |
select! is dropped before completion. Not all futures are safe to cancel:
// โ
CANCELLATION SAFE: Can be safely dropped
async fn safe_operation() {
tokio::time::sleep(Duration::from_secs(1)).await;
}
// โ NOT CANCELLATION SAFE: Dropping loses data
async fn unsafe_operation() {
let mut buffer = Vec::new();
socket.read_to_end(&mut buffer).await; // Partial read is lost!
}
Rule: Use select! only with cancellation-safe futures, or explicitly handle cleanup.
---
use tokio::time::{timeout, Duration};
use std::io;
/// Database query with timeout protection
pub struct Database {
pool: sqlx::PgPool,
default_timeout: Duration,
}
impl Database {
/// Execute query with timeout
pub async fn query_with_timeout<T>(
&self,
query: &str,
timeout_duration: Duration,
) -> Result<T, QueryError>
where
T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
{
// Pattern 1: Using tokio::timeout
match timeout(timeout_duration, self.execute_query(query)).await {
Ok(Ok(result)) => Ok(result),
Ok(Err(db_err)) => Err(QueryError::Database(db_err)),
Err(_) => Err(QueryError::Timeout),
}
}
/// Same pattern using select! for more control
pub async fn query_with_custom_timeout<T>(
&self,
query: &str,
) -> Result<T, QueryError>
where
T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
{
tokio::select! {
// Main operation
result = self.execute_query::<T>(query) => {
result.map_err(QueryError::Database)
}
// Timeout branch
_ = tokio::time::sleep(self.default_timeout) => {
// Log timeout for monitoring
tracing::warn!(
query = query,
timeout_ms = self.default_timeout.as_millis(),
"Database query timed out"
);
Err(QueryError::Timeout)
}
}
}
async fn execute_query<T>(&self, query: &str) -> sqlx::Result<T>
where
T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
{
sqlx::query_as(query)
.fetch_one(&self.pool)
.await
}
}
#[derive(Debug, thiserror::Error)]
pub enum QueryError {
#[error("Database error: {0}")]
Database(#[from] sqlx::Error),
#[error("Query timed out")]
Timeout,
}
/// Health check with timeout
pub struct HealthChecker {
endpoints: Vec<String>,
timeout: Duration,
}
impl HealthChecker {
/// Check single endpoint health with timeout
pub async fn check_endpoint(&self, url: &str) -> HealthStatus {
tokio::select! {
// Health check HTTP request
result = self.perform_health_check(url) => {
match result {
Ok(_) => HealthStatus::Healthy,
Err(e) => HealthStatus::Unhealthy(e.to_string()),
}
}
// Timeout after configured duration
_ = tokio::time::sleep(self.timeout) => {
HealthStatus::Timeout
}
}
}
/// Check all endpoints concurrently with individual timeouts
pub async fn check_all(&self) -> Vec<(String, HealthStatus)> {
// Create futures for all checks
let checks = self.endpoints.iter().map(|url| {
let url = url.clone();
async move {
let status = self.check_endpoint(&url).await;
(url, status)
}
});
// Execute all concurrently
futures::future::join_all(checks).await
}
async fn perform_health_check(&self, url: &str) -> reqwest::Result<()> {
reqwest::get(url).await?.error_for_status()?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub enum HealthStatus {
Healthy,
Unhealthy(String),
Timeout,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_timeout_triggers() {
let checker = HealthChecker {
endpoints: vec![],
timeout: Duration::from_millis(100),
};
// Simulate slow endpoint
let status = tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(10)) => {
HealthStatus::Healthy
}
_ = tokio::time::sleep(checker.timeout) => {
HealthStatus::Timeout
}
};
assert!(matches!(status, HealthStatus::Timeout));
}
}
Why this works:
---
use serde::{Deserialize, Serialize};
use std::time::Instant;
/// Aggregated user profile from multiple sources
#[derive(Debug, Serialize)]
pub struct UserProfile {
user_info: UserInfo,
activity: UserActivity,
preferences: UserPreferences,
fetch_time_ms: u128,
}
#[derive(Debug, Serialize, Deserialize)]
struct UserInfo {
id: u64,
name: String,
email: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct UserActivity {
last_login: String,
total_actions: u64,
}
#[derive(Debug, Serialize, Deserialize)]
struct UserPreferences {
theme: String,
notifications: bool,
}
/// Service for aggregating user data from multiple microservices
pub struct UserAggregator {
client: reqwest::Client,
user_service_url: String,
activity_service_url: String,
preferences_service_url: String,
}
impl UserAggregator {
/// Fetch complete user profile concurrently
pub async fn get_user_profile(
&self,
user_id: u64,
) -> Result<UserProfile, AggregationError> {
let start = Instant::now();
// Pattern 1: try_join! - fail fast on any error
let (user_info, activity, preferences) = tokio::try_join!(
self.fetch_user_info(user_id),
self.fetch_user_activity(user_id),
self.fetch_user_preferences(user_id),
)?;
Ok(UserProfile {
user_info,
activity,
preferences,
fetch_time_ms: start.elapsed().as_millis(),
})
}
/// Fetch profile with partial results on failure
pub async fn get_user_profile_resilient(
&self,
user_id: u64,
) -> Result<PartialUserProfile, AggregationError> {
let start = Instant::now();
// Pattern 2: join! with Result - collect all errors
let (user_result, activity_result, prefs_result) = tokio::join!(
self.fetch_user_info(user_id),
self.fetch_user_activity(user_id),
self.fetch_user_preferences(user_id),
);
// User info is required, others are optional
let user_info = user_result?;
Ok(PartialUserProfile {
user_info,
activity: activity_result.ok(),
preferences: prefs_result.ok(),
fetch_time_ms: start.elapsed().as_millis(),
errors: vec![
activity_result.err(),
prefs_result.err(),
]
.into_iter()
.flatten()
.collect(),
})
}
/// Fetch with timeout per service
pub async fn get_user_profile_with_timeouts(
&self,
user_id: u64,
timeout_per_service: Duration,
) -> Result<PartialUserProfile, AggregationError> {
let start = Instant::now();
// Pattern 3: Combine join! with timeout for each future
let (user_result, activity_result, prefs_result) = tokio::join!(
timeout(timeout_per_service, self.fetch_user_info(user_id)),
timeout(timeout_per_service, self.fetch_user_activity(user_id)),
timeout(timeout_per_service, self.fetch_user_preferences(user_id)),
);
// Handle timeouts
let user_info = user_result
.map_err(|_| AggregationError::Timeout("user_info".to_string()))?
.map_err(AggregationError::Http)?;
Ok(PartialUserProfile {
user_info,
activity: activity_result.ok().and_then(Result::ok),
preferences: prefs_result.ok().and_then(Result::ok),
fetch_time_ms: start.elapsed().as_millis(),
errors: vec![],
})
}
async fn fetch_user_info(&self, user_id: u64) -> Result<UserInfo, reqwest::Error> {
self.client
.get(&format!("{}/users/{}", self.user_service_url, user_id))
.send()
.await?
.json()
.await
}
async fn fetch_user_activity(
&self,
user_id: u64,
) -> Result<UserActivity, reqwest::Error> {
self.client
.get(&format!(
"{}/activity/{}",
self.activity_service_url, user_id
))
.send()
.await?
.json()
.await
}
async fn fetch_user_preferences(
&self,
user_id: u64,
) -> Result<UserPreferences, reqwest::Error> {
self.client
.get(&format!(
"{}/preferences/{}",
self.preferences_service_url, user_id
))
.send()
.await?
.json()
.await
}
}
#[derive(Debug, Serialize)]
pub struct PartialUserProfile {
user_info: UserInfo,
activity: Option<UserActivity>,
preferences: Option<UserPreferences>,
fetch_time_ms: u128,
errors: Vec<AggregationError>,
}
#[derive(Debug, thiserror::Error, Serialize)]
pub enum AggregationError {
#[error("HTTP error: {0}")]
Http(#[from] reqwest::Error),
#[error("Service timeout: {0}")]
Timeout(String),
}
/// Performance comparison: Sequential vs Concurrent
pub mod benchmarks {
use super::*;
pub async fn sequential_fetch(aggregator: &UserAggregator, user_id: u64) -> Duration {
let start = Instant::now();
// Sequential: 300ms + 200ms + 100ms = 600ms total
let _ = aggregator.fetch_user_info(user_id).await;
let _ = aggregator.fetch_user_activity(user_id).await;
let _ = aggregator.fetch_user_preferences(user_id).await;
start.elapsed()
}
pub async fn concurrent_fetch(aggregator: &UserAggregator, user_id: u64) -> Duration {
let start = Instant::now();
// Concurrent: max(300ms, 200ms, 100ms) = 300ms total
let _ = tokio::join!(
aggregator.fetch_user_info(user_id),
aggregator.fetch_user_activity(user_id),
aggregator.fetch_user_preferences(user_id),
);
start.elapsed()
}
}
Key insights:
---
use tokio::sync::mpsc;
use tokio::signal;
use std::net::SocketAddr;
/// Multi-source event loop for network server
pub struct EventLoop {
/// TCP listener for incoming connections
listener: tokio::net::TcpListener,
/// Channel for internal events
internal_events: mpsc::Receiver<InternalEvent>,
/// Timer for periodic tasks
tick_interval: tokio::time::Interval,
/// Server state
state: ServerState,
}
#[derive(Debug)]
enum InternalEvent {
ConnectionClosed(SocketAddr),
MetricsRequest(tokio::sync::oneshot::Sender<Metrics>),
ConfigReload,
}
#[derive(Default)]
struct ServerState {
active_connections: usize,
total_requests: u64,
}
#[derive(Debug, Clone)]
struct Metrics {
active_connections: usize,
total_requests: u64,
uptime_seconds: u64,
}
impl EventLoop {
pub async fn new(addr: &str) -> std::io::Result<Self> {
let listener = tokio::net::TcpListener::bind(addr).await?;
let (tx, rx) = mpsc::channel(100);
Ok(EventLoop {
listener,
internal_events: rx,
tick_interval: tokio::time::interval(Duration::from_secs(10)),
state: ServerState::default(),
})
}
/// Main event loop - select from multiple sources
pub async fn run(mut self) -> std::io::Result<()> {
println!("Event loop started");
loop {
tokio::select! {
// Branch 1: Accept new TCP connection
result = self.listener.accept() => {
match result {
Ok((socket, addr)) => {
println!("New connection from: {}", addr);
self.state.active_connections += 1;
self.handle_connection(socket, addr);
}
Err(e) => {
eprintln!("Accept error: {}", e);
}
}
}
// Branch 2: Internal event from channels
Some(event) = self.internal_events.recv() => {
self.handle_internal_event(event);
}
// Branch 3: Periodic tick for maintenance
_ = self.tick_interval.tick() => {
self.periodic_maintenance();
}
// Branch 4: Graceful shutdown on Ctrl+C
_ = signal::ctrl_c() => {
println!("Shutdown signal received");
self.shutdown().await;
break;
}
}
}
Ok(())
}
fn handle_connection(&mut self, socket: tokio::net::TcpStream, addr: SocketAddr) {
// Spawn task to handle connection
tokio::spawn(async move {
// Handle connection...
});
}
fn handle_internal_event(&mut self, event: InternalEvent) {
match event {
InternalEvent::ConnectionClosed(addr) => {
self.state.active_connections = self.state.active_connections.saturating_sub(1);
println!("Connection closed: {}", addr);
}
InternalEvent::MetricsRequest(response) => {
let metrics = Metrics {
active_connections: self.state.active_connections,
total_requests: self.state.total_requests,
uptime_seconds: 0, // Would track uptime
};
let _ = response.send(metrics);
}
InternalEvent::ConfigReload => {
println!("Reloading configuration...");
}
}
}
fn periodic_maintenance(&mut self) {
println!(
"Periodic tick - Active connections: {}",
self.state.active_connections
);
// Cleanup, metrics, health checks, etc.
}
async fn shutdown(&mut self) {
println!("Performing graceful shutdown...");
// Close connections, flush buffers, etc.
}
}
/// Advanced: Priority-based select with biased polling
pub struct PriorityEventLoop {
high_priority: mpsc::Receiver<PriorityEvent>,
normal_priority: mpsc::Receiver<PriorityEvent>,
low_priority: mpsc::Receiver<PriorityEvent>,
}
#[derive(Debug)]
enum PriorityEvent {
Critical(String),
Normal(String),
Background(String),
}
impl PriorityEventLoop {
pub async fn run(mut self) {
loop {
// Use select_biased! to prioritize branches
tokio::select! {
biased; // Enable biased polling - checks in order
// Check high priority first
Some(event) = self.high_priority.recv() => {
println!("HIGH PRIORITY: {:?}", event);
}
// Then normal priority
Some(event) = self.normal_priority.recv() => {
println!("Normal priority: {:?}", event);
}
// Finally low priority
Some(event) = self.low_priority.recv() => {
println!("Low priority: {:?}", event);
}
// All channels closed
else => {
println!("All channels closed, shutting down");
break;
}
}
}
}
}
/// Channel multiplexing - select from multiple channels
pub async fn multiplex_channels() {
let (tx1, mut rx1) = mpsc::channel::<String>(10);
let (tx2, mut rx2) = mpsc::channel::<String>(10);
let (tx3, mut rx3) = mpsc::channel::<String>(10);
// Spawn producers
tokio::spawn(async move {
tx1.send("From channel 1".to_string()).await.ok();
});
tokio::spawn(async move {
tx2.send("From channel 2".to_string()).await.ok();
});
tokio::spawn(async move {
tx3.send("From channel 3".to_string()).await.ok();
});
// Consume from all channels
let mut count = 0;
while count < 3 {
tokio::select! {
Some(msg) = rx1.recv() => {
println!("Channel 1: {}", msg);
count += 1;
}
Some(msg) = rx2.recv() => {
println!("Channel 2: {}", msg);
count += 1;
}
Some(msg) = rx3.recv() => {
println!("Channel 3: {}", msg);
count += 1;
}
}
}
}
Why this pattern:
select_biased! for priority queues---
use tokio::sync::broadcast;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
/// Graceful shutdown coordinator
pub struct ShutdownCoordinator {
/// Broadcast channel for shutdown signal
shutdown_tx: broadcast::Sender<()>,
/// Track if shutdown initiated
shutdown_initiated: Arc<AtomicBool>,
}
impl ShutdownCoordinator {
pub fn new() -> Self {
let (shutdown_tx, _) = broadcast::channel(1);
ShutdownCoordinator {
shutdown_tx,
shutdown_initiated: Arc::new(AtomicBool::new(false)),
}
}
/// Initiate graceful shutdown
pub fn shutdown(&self) {
if self.shutdown_initiated.swap(true, Ordering::SeqCst) {
return; // Already shutting down
}
println!("Initiating graceful shutdown...");
let _ = self.shutdown_tx.send(());
}
/// Get shutdown receiver for tasks
pub fn subscribe(&self) -> broadcast::Receiver<()> {
self.shutdown_tx.subscribe()
}
/// Wait for Ctrl+C and initiate shutdown
pub async fn wait_for_signal(&self) {
signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
self.shutdown();
}
}
/// Worker that respects graceful shutdown
pub struct Worker {
id: usize,
shutdown_rx: broadcast::Receiver<()>,
}
impl Worker {
pub async fn run(mut self) {
println!("Worker {} started", self.id);
loop {
tokio::select! {
// Main work loop
_ = self.do_work() => {
// Continue processing
}
// Shutdown signal received
_ = self.shutdown_rx.recv() => {
println!("Worker {} received shutdown signal", self.id);
self.cleanup().await;
break;
}
}
}
println!("Worker {} stopped", self.id);
}
async fn do_work(&self) {
// Simulate work
tokio::time::sleep(Duration::from_millis(100)).await;
}
async fn cleanup(&self) {
println!("Worker {} cleaning up...", self.id);
// Flush buffers, close connections, save state
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
/// HTTP server with graceful shutdown
pub struct HttpServer {
listener: tokio::net::TcpListener,
shutdown_rx: broadcast::Receiver<()>,
active_requests: Arc<AtomicUsize>,
}
use std::sync::atomic::AtomicUsize;
impl HttpServer {
pub async fn run(mut self) {
println!("HTTP server listening");
loop {
tokio::select! {
// Accept new connections
result = self.listener.accept() => {
match result {
Ok((socket, addr)) => {
let counter = self.active_requests.clone();
counter.fetch_add(1, Ordering::SeqCst);
tokio::spawn(async move {
// Handle request
Self::handle_request(socket).await;
counter.fetch_sub(1, Ordering::SeqCst);
});
}
Err(e) => eprintln!("Accept error: {}", e),
}
}
// Shutdown signal
_ = self.shutdown_rx.recv() => {
println!("HTTP server received shutdown signal");
break;
}
}
}
// Wait for in-flight requests to complete
self.drain_requests().await;
println!("HTTP server stopped");
}
async fn handle_request(socket: tokio::net::TcpStream) {
// Handle HTTP request
}
async fn drain_requests(&self) {
let timeout = Duration::from_secs(30);
let start = Instant::now();
while self.active_requests.load(Ordering::SeqCst) > 0 {
if start.elapsed() > timeout {
println!(
"Shutdown timeout: {} requests still active",
self.active_requests.load(Ordering::SeqCst)
);
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
println!("All requests drained");
}
}
/// Complete application with graceful shutdown
pub async fn run_application() {
let coordinator = Arc::new(ShutdownCoordinator::new());
// Spawn workers
for i in 0..4 {
let worker = Worker {
id: i,
shutdown_rx: coordinator.subscribe(),
};
tokio::spawn(worker.run());
}
// Wait for shutdown signal
coordinator.wait_for_signal().await;
// Give tasks time to clean up
tokio::time::sleep(Duration::from_secs(2)).await;
println!("Application shutdown complete");
}
/// Pattern: Work with timeout OR shutdown
pub async fn work_with_timeout_or_shutdown(
mut shutdown: broadcast::Receiver<()>,
) -> Result<(), WorkError> {
let work = async {
// Long-running work
tokio::time::sleep(Duration::from_secs(60)).await;
Ok::<_, WorkError>(())
};
tokio::select! {
result = work => result,
_ = shutdown.recv() => {
println!("Work interrupted by shutdown");
Err(WorkError::Interrupted)
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
println!("Work timed out");
Err(WorkError::Timeout)
}
}
}
#[derive(Debug, thiserror::Error)]
enum WorkError {
#[error("Work interrupted")]
Interrupted,
#[error("Work timed out")]
Timeout,
}
Shutdown pattern benefits:
---
use std::sync::Arc;
/// Client that sends hedged requests to reduce latency
pub struct HedgedClient {
backends: Vec<String>,
client: reqwest::Client,
hedging_delay: Duration,
}
impl HedgedClient {
/// Send request with hedging - race to multiple backends
pub async fn request_with_hedging(&self, path: &str) -> Result<Response, RequestError> {
if self.backends.is_empty() {
return Err(RequestError::NoBackends);
}
// Start with first backend
let mut primary = Box::pin(self.send_to_backend(&self.backends[0], path));
// Wait for hedging delay
tokio::select! {
// Primary completes within hedging delay
result = &mut primary => {
return result;
}
// Hedging delay elapsed - send backup request
_ = tokio::time::sleep(self.hedging_delay) => {
// Continue to hedge logic below
}
}
// Primary is slow - race with backup
if self.backends.len() > 1 {
let backup = self.send_to_backend(&self.backends[1], path);
tokio::select! {
result = primary => result,
result = backup => result,
}
} else {
primary.await
}
}
/// Send request to fastest backend (full racing)
pub async fn request_fastest(&self, path: &str) -> Result<Response, RequestError> {
if self.backends.is_empty() {
return Err(RequestError::NoBackends);
}
// Create futures for all backends
let requests: Vec<_> = self
.backends
.iter()
.map(|backend| self.send_to_backend(backend, path))
.collect();
// Race all requests - first to complete wins
// Note: This is simplified; production would use FuturesUnordered
match requests.len() {
1 => requests.into_iter().next().unwrap().await,
2 => {
let mut iter = requests.into_iter();
let r1 = iter.next().unwrap();
let r2 = iter.next().unwrap();
tokio::select! {
result = r1 => result,
result = r2 => result,
}
}
_ => {
// For 3+ backends, use futures_util::future::select_all
use futures::future::select_all;
let (result, _index, _remaining) = select_all(requests).await;
result
}
}
}
async fn send_to_backend(
&self,
backend: &str,
path: &str,
) -> Result<Response, RequestError> {
let url = format!("{}{}", backend, path);
self.client
.get(&url)
.send()
.await
.map_err(RequestError::Http)?
.json()
.await
.map_err(RequestError::Http)
}
}
#[derive(Debug, serde::Deserialize)]
pub struct Response {
data: String,
}
#[derive(Debug, thiserror::Error)]
pub enum RequestError {
#[error("No backends available")]
NoBackends,
#[error("HTTP error: {0}")]
Http(reqwest::Error),
}
/// Load balancer with health checking
pub struct LoadBalancer {
backends: Arc<Vec<Backend>>,
}
#[derive(Clone)]
struct Backend {
url: String,
healthy: Arc<AtomicBool>,
}
impl LoadBalancer {
/// Execute request on first healthy backend
pub async fn execute_with_failover(
&self,
path: &str,
) -> Result<Response, RequestError> {
// Try backends in order until one succeeds
for backend in self.backends.iter() {
if !backend.healthy.load(Ordering::SeqCst) {
continue;
}
match self.try_backend(backend, path).await {
Ok(response) => return Ok(response),
Err(e) => {
eprintln!("Backend {} failed: {}", backend.url, e);
// Mark unhealthy
backend.healthy.store(false, Ordering::SeqCst);
continue;
}
}
}
Err(RequestError::NoBackends)
}
/// Execute on all healthy backends, return fastest
pub async fn execute_racing(&self, path: &str) -> Result<Response, RequestError> {
let healthy_backends: Vec<_> = self
.backends
.iter()
.filter(|b| b.healthy.load(Ordering::SeqCst))
.collect();
if healthy_backends.is_empty() {
return Err(RequestError::NoBackends);
}
// Create futures for all healthy backends
let requests: Vec<_> = healthy_backends
.iter()
.map(|backend| self.try_backend(backend, path))
.collect();
// Use select_all to race them
use futures::future::select_all;
let (result, _index, _remaining) = select_all(requests).await;
result
}
async fn try_backend(&self, backend: &Backend, path: &str) -> Result<Response, RequestError> {
let client = reqwest::Client::new();
let url = format!("{}{}", backend.url, path);
client
.get(&url)
.timeout(Duration::from_millis(500))
.send()
.await
.map_err(RequestError::Http)?
.json()
.await
.map_err(RequestError::Http)
}
/// Background health checking
pub async fn health_check_loop(self: Arc<Self>) {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
for backend in self.backends.iter() {
let backend = backend.clone();
let lb = self.clone();
tokio::spawn(async move {
match lb.check_health(&backend).await {
Ok(true) => {
backend.healthy.store(true, Ordering::SeqCst);
}
_ => {
backend.healthy.store(false, Ordering::SeqCst);
}
}
});
}
}
}
async fn check_health(&self, backend: &Backend) -> Result<bool, RequestError> {
let client = reqwest::Client::new();
let url = format!("{}/health", backend.url);
match client.get(&url).send().await {
Ok(resp) => Ok(resp.status().is_success()),
Err(_) => Ok(false),
}
}
}
/// Benchmarking: Sequential vs Racing
pub mod latency_tests {
use super::*;
pub async fn measure_sequential(backends: &[String], path: &str) -> Duration {
let start = Instant::now();
let client = reqwest::Client::new();
// Try backends sequentially
for backend in backends {
let url = format!("{}{}", backend, path);
if client.get(&url).send().await.is_ok() {
break;
}
}
start.elapsed()
}
pub async fn measure_racing(backends: &[String], path: &str) -> Duration {
let start = Instant::now();
let client = reqwest::Client::new();
// Race all backends
let requests: Vec<_> = backends
.iter()
.map(|backend| {
let url = format!("{}{}", backend, path);
client.get(&url).send()
})
.collect();
use futures::future::select_all;
let _ = select_all(requests).await;
start.elapsed()
}
// Typical results:
// Sequential with failures: 1000ms (try 3 backends @ 300ms each)
// Racing: 300ms (fastest backend wins)
// Latency improvement: 3.3x
}
Hedging benefits:
---
// What you write:
tokio::select! {
result = future1 => { handle_result1(result); }
result = future2 => { handle_result2(result); }
}
// Roughly expands to:
{
use std::pin::Pin;
use std::task::{Context, Poll};
// Pin both futures
let mut future1 = std::pin::pin!(future1);
let mut future2 = std::pin::pin!(future2);
// Poll them in a loop
std::future::poll_fn(|cx| {
// Randomly choose poll order (fair scheduling)
if fastrand::bool() {
// Try future1 first
if let Poll::Ready(result) = future1.as_mut().poll(cx) {
handle_result1(result);
return Poll::Ready(());
}
if let Poll::Ready(result) = future2.as_mut().poll(cx) {
handle_result2(result);
return Poll::Ready(());
}
} else {
// Try future2 first
if let Poll::Ready(result) = future2.as_mut().poll(cx) {
handle_result2(result);
return Poll::Ready(());
}
if let Poll::Ready(result) = future1.as_mut().poll(cx) {
handle_result1(result);
return Poll::Ready(());
}
}
// Both pending
Poll::Pending
}).await
}
Key mechanics:
// What you write:
let (res1, res2) = tokio::join!(future1, future2);
// Roughly expands to:
{
use std::pin::Pin;
use std::task::{Context, Poll};
let mut future1 = std::pin::pin!(future1);
let mut future2 = std::pin::pin!(future2);
let mut result1 = None;
let mut result2 = None;
std::future::poll_fn(|cx| {
// Poll future1 if not completed
if result1.is_none() {
if let Poll::Ready(res) = future1.as_mut().poll(cx) {
result1 = Some(res);
}
}
// Poll future2 if not completed
if result2.is_none() {
if let Poll::Ready(res) = future2.as_mut().poll(cx) {
result2 = Some(res);
}
}
// Both ready?
match (result1.take(), result2.take()) {
(Some(r1), Some(r2)) => Poll::Ready((r1, r2)),
(r1, r2) => {
// Put back partial results
result1 = r1;
result2 = r2;
Poll::Pending
}
}
}).await
}
Key mechanics:
/// Fair select - random poll order
tokio::select! {
_ = future1 => { /* ... */ }
_ = future2 => { /* ... */ }
}
/// Biased select - deterministic order (future1 always checked first)
tokio::select! {
biased;
_ = future1 => { /* future1 has priority */ }
_ = future2 => { /* future2 only if future1 pending */ }
}
When to use biased:
A future is cancellation safe if dropping it before completion doesn't lose data or leave inconsistent state.
// โ
CANCELLATION SAFE: No state lost
async fn safe_sleep() {
tokio::time::sleep(Duration::from_secs(1)).await;
}
// Dropping this is fine - timer just stops
// โ NOT CANCELLATION SAFE: Partial read lost
async fn unsafe_read(socket: &mut TcpStream) -> Vec<u8> {
let mut buffer = vec![0u8; 1024];
socket.read(&mut buffer).await.unwrap();
buffer
}
// If cancelled mid-read, partial data is lost forever!
// โ
FIXED: Make it cancellation safe
async fn safe_read(socket: &mut TcpStream) -> Vec<u8> {
let mut buffer = vec![0u8; 1024];
// Use tokio::io::AsyncReadExt::read_exact which is documented
// as NOT cancellation safe, so we DON'T use it in select!
// Instead, buffer partial reads
let mut total = vec![];
loop {
match socket.try_read(&mut buffer) {
Ok(0) => break,
Ok(n) => total.extend_from_slice(&buffer[..n]),
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// Wait for readable
socket.readable().await.unwrap();
}
Err(e) => panic!("{}", e),
}
}
total
}
Common cancellation-unsafe patterns:
AsyncReadExt::read_exact - partial reads lostAsyncWriteExt::write_all - partial writes lostdrop guards if needed// How select! stores state
pub struct Select2<F1, F2> {
future1: std::pin::Pin<Box<F1>>,
future2: std::pin::Pin<Box<F2>>,
// No result storage - drops losing branch immediately
}
// How join! stores state
pub struct Join2<F1, F2> {
future1: std::pin::Pin<Box<F1>>,
future2: std::pin::Pin<Box<F2>>,
result1: Option<F1::Output>,
result2: Option<F2::Output>,
// Stores results until both complete
}
Memory implications:
Pin to keep futures in placeuse tokio::try_join;
/// Fetch data from multiple sources, fail fast on error
async fn fetch_all() -> Result<(Data1, Data2, Data3), Error> {
// If ANY future returns Err, immediately:
// 1. Cancel other futures
// 2. Return that error
try_join!(
fetch_data1(),
fetch_data2(),
fetch_data3(),
)
}
// Expanded behavior:
async fn fetch_all_expanded() -> Result<(Data1, Data2, Data3), Error> {
let (r1, r2, r3) = tokio::join!(
fetch_data1(),
fetch_data2(),
fetch_data3(),
);
// Early return on first error
Ok((r1?, r2?, r3?))
}
Use try_join! when:
---
| Scenario | Why |
|----------|-----|
| Timeouts | Race operation with timeout future |
| First-wins | First successful response from multiple sources |
| Shutdown signals | Interrupt work when shutdown signaled |
| User input + background work | Respond to input while processing |
| Multiple event sources | TCP accept, timer, channel, signal |
| Scenario | Why |
|----------|-----|
| Parallel execution | Multiple independent operations |
| Aggregating results | Need all results to proceed |
| Performance optimization | Concurrent fetching instead of sequential |
| Batch operations | Process multiple items simultaneously |
| Scenario | Why |
|----------|-----|
| All-or-nothing | Need all results or fail entirely |
| Transactional operations | Database writes across tables |
| Service initialization | Multiple services must all start |
| Anti-pattern | Problem | Solution |
|--------------|---------|----------|
| Sequential dependencies | Results depend on each other | Chain with .await |
| Cancellation-unsafe futures | Data loss on cancellation | Use join! or handle cleanup |
| Need all results | Select only gives one | Use join! instead |
| Complex racing logic | Hard to maintain | Use futures::select_all or FuturesUnordered |
---
// โ WRONG: Lost data on cancellation
async fn dangerous_select(mut socket: TcpStream) {
let mut buffer = Vec::new();
tokio::select! {
result = socket.read_to_end(&mut buffer) => {
println!("Read {} bytes", buffer.len());
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
println!("Timeout");
// PROBLEM: If timeout wins, partial read is LOST!
// buffer is dropped with data
}
}
}
// โ
CORRECT: Handle partial results
async fn safe_select(mut socket: TcpStream) {
let mut buffer = Vec::new();
let mut total_read = 0;
loop {
let mut chunk = vec![0u8; 1024];
tokio::select! {
result = socket.read(&mut chunk) => {
match result {
Ok(0) => break, // EOF
Ok(n) => {
buffer.extend_from_slice(&chunk[..n]);
total_read += n;
}
Err(e) => panic!("{}", e),
}
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
println!("Timeout, but we saved {} bytes", total_read);
break;
}
}
}
println!("Total read: {}", buffer.len());
}
// โ WRONG: Shared state race condition
use std::sync::Arc;
use tokio::sync::Mutex;
async fn racy_select(counter: Arc<Mutex<u64>>) {
tokio::select! {
_ = async {
let mut count = counter.lock().await;
*count += 1;
// Lock dropped here
} => {
// Branch 1
}
_ = async {
let mut count = counter.lock().await;
*count += 1;
// Lock dropped here
} => {
// Branch 2
}
}
// PROBLEM: Both branches might acquire lock, but only one completes!
// The other's increment is lost due to cancellation
}
// โ
CORRECT: Atomic operations for shared state
use std::sync::atomic::{AtomicU64, Ordering};
async fn safe_select(counter: Arc<AtomicU64>) {
tokio::select! {
_ = async {
counter.fetch_add(1, Ordering::SeqCst);
} => {
// Branch 1 - increment persists even if cancelled
}
_ = async {
counter.fetch_add(1, Ordering::SeqCst);
} => {
// Branch 2 - increment persists even if cancelled
}
}
}
// โ WRONG: select! when you need all results
async fn wrong_pattern() {
tokio::select! {
user = fetch_user() => {
// Only got user, missing orders!
}
orders = fetch_orders() => {
// Only got orders, missing user!
}
}
}
// โ
CORRECT: Use join! for all results
async fn correct_pattern() {
let (user, orders) = tokio::join!(
fetch_user(),
fetch_orders(),
);
// Have both user and orders
}
// โ WRONG: All-or-nothing with join!
async fn wasteful_join() {
let (user, activity, prefs) = tokio::join!(
fetch_user(),
fetch_activity(),
fetch_preferences(),
);
// PROBLEM: If any fetch fails, we throw away successful fetches!
let user = user.unwrap();
let activity = activity.unwrap();
let prefs = prefs.unwrap();
}
// โ
CORRECT: Use partial results
async fn smart_join() {
let (user, activity, prefs) = tokio::join!(
fetch_user(),
fetch_activity(),
fetch_preferences(),
);
// Use what we got
let user = user?; // User is required
let activity = activity.ok(); // Activity is optional
let prefs = prefs.ok(); // Preferences optional
Ok(Profile { user, activity, prefs })
}
// โ WRONG: Biased select starves low-priority branch
async fn starving_select() {
let (tx, mut rx) = mpsc::channel(100);
// Spam high-priority channel
tokio::spawn(async move {
loop {
tx.send("high").await.ok();
}
});
tokio::select! {
biased;
// High priority - ALWAYS ready
Some(msg) = rx.recv() => {
println!("High: {}", msg);
}
// Low priority - NEVER runs!
_ = tokio::time::sleep(Duration::from_millis(100)) => {
println!("Low priority task");
}
}
}
// โ
CORRECT: Fair select or rate limiting
async fn fair_select() {
let (tx, mut rx) = mpsc::channel(100);
tokio::select! {
// No 'biased' - fair scheduling
Some(msg) = rx.recv() => {
println!("High: {}", msg);
}
_ = tokio::time::sleep(Duration::from_millis(100)) => {
println!("Low priority task - will eventually run");
}
}
}
---
// Benchmarking select! vs direct await
use std::time::Instant;
async fn bench_direct_await() -> Duration {
let start = Instant::now();
for _ in 0..10000 {
tokio::time::sleep(Duration::from_nanos(1)).await;
}
start.elapsed()
}
async fn bench_select_overhead() -> Duration {
let start = Instant::now();
for _ in 0..10000 {
tokio::select! {
_ = tokio::time::sleep(Duration::from_nanos(1)) => {}
}
}
start.elapsed()
}
// Typical results:
// Direct await: 50ms
// Select overhead: 55ms
// Overhead: ~10% for single-branch select
select! overhead sources:
.await over single-branch select!
// Memory comparison
use std::mem::size_of_val;
async fn memory_comparison() {
// Sequential: Only stores current future
let sequential = async {
let r1 = fetch_large_data().await;
let r2 = fetch_large_data().await;
(r1, r2)
};
// join!: Stores both futures + results
let concurrent = async {
tokio::join!(
fetch_large_data(),
fetch_large_data(),
)
};
// If each future is 1KB and result is 1MB:
// Sequential: 1KB (future) + 1MB (result 1) + 1MB (result 2) = ~2MB peak
// Concurrent: 2KB (both futures) + 2MB (both results) = ~2MB peak
// BUT: Concurrent holds both results simultaneously
// Sequential can drop result1 before allocating result2
}
async fn fetch_large_data() -> Vec<u8> {
vec![0u8; 1024 * 1024] // 1MB
}
Memory implications:
use std::time::Instant;
/// Sequential execution
async fn sequential_fetch() -> Duration {
let start = Instant::now();
let _r1 = fetch_data(100).await;
let _r2 = fetch_data(100).await;
let _r3 = fetch_data(100).await;
start.elapsed()
// Total: 300ms (100 + 100 + 100)
}
/// Concurrent with join!
async fn join_fetch() -> Duration {
let start = Instant::now();
let (_r1, _r2, _r3) = tokio::join!(
fetch_data(100),
fetch_data(100),
fetch_data(100),
);
start.elapsed()
// Total: 100ms (max of all)
}
/// Concurrent with spawn
async fn spawn_fetch() -> Duration {
let start = Instant::now();
let h1 = tokio::spawn(fetch_data(100));
let h2 = tokio::spawn(fetch_data(100));
let h3 = tokio::spawn(fetch_data(100));
let _ = tokio::try_join!(h1, h2, h3).unwrap();
start.elapsed()
// Total: 100ms + spawn overhead (~5ms)
}
async fn fetch_data(delay_ms: u64) -> String {
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
"data".to_string()
}
Performance comparison:
| Method | Time | Overhead | Use Case |
|--------|------|----------|----------|
| Sequential | 300ms | 0ms | Dependencies between operations |
| join! | 100ms | ~1ms | Concurrent, same task |
| spawn | 105ms | ~5ms | Concurrent, need parallelism |
When to use spawn vs join!:// Measuring cancellation overhead
async fn bench_cancellation() {
let start = Instant::now();
for _ in 0..10000 {
tokio::select! {
_ = expensive_future() => {}
_ = tokio::time::sleep(Duration::from_nanos(1)) => {
// Cancels expensive_future
}
}
}
println!("Cancellation overhead: {:?}", start.elapsed());
}
async fn expensive_future() {
// Simulate complex future with resources
let _large_buffer = vec![0u8; 1024 * 1024];
tokio::time::sleep(Duration::from_secs(10)).await;
}
// Cost of cancellation:
// - Drop expensive_future and all its state
// - Deallocate large_buffer
// - Cleanup any registered wakers
// Typical: 1-10ยตs depending on future complexity
---
/// Wrap any future with a timeout
pub async fn with_timeout<F, T>(
future: F,
timeout: Duration,
) -> Result<T, TimeoutError>
where
F: std::future::Future<Output = T>,
{
// TODO: Use select! to race future with timeout
// Return Ok(result) if future completes
// Return Err(TimeoutError) if timeout elapses
todo!()
}
#[derive(Debug)]
pub struct TimeoutError;
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_completes_before_timeout() {
let result = with_timeout(
async { 42 },
Duration::from_secs(1),
).await;
assert_eq!(result.unwrap(), 42);
}
#[tokio::test]
async fn test_timeout_triggers() {
let result = with_timeout(
async {
tokio::time::sleep(Duration::from_secs(10)).await;
42
},
Duration::from_millis(100),
).await;
assert!(result.is_err());
}
}
Solution:
pub async fn with_timeout<F, T>(
future: F,
timeout: Duration,
) -> Result<T, TimeoutError>
where
F: std::future::Future<Output = T>,
{
tokio::select! {
result = future => Ok(result),
_ = tokio::time::sleep(timeout) => Err(TimeoutError),
}
}
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub enum HealthStatus {
Healthy { latency_ms: u128 },
Unhealthy { error: String },
Timeout,
}
pub struct MultiHealthChecker {
endpoints: HashMap<String, String>,
timeout: Duration,
}
impl MultiHealthChecker {
pub fn new(timeout: Duration) -> Self {
MultiHealthChecker {
endpoints: HashMap::new(),
timeout,
}
}
pub fn add_endpoint(&mut self, name: String, url: String) {
self.endpoints.insert(name, url);
}
/// Check all endpoints concurrently
/// Return map of endpoint name -> status
pub async fn check_all(&self) -> HashMap<String, HealthStatus> {
// TODO:
// 1. Create futures for checking each endpoint
// 2. Use futures::future::join_all to run concurrently
// 3. Each check should have individual timeout using select!
// 4. Measure latency for healthy endpoints
todo!()
}
async fn check_endpoint(&self, url: &str) -> HealthStatus {
// TODO:
// 1. Measure start time
// 2. Use select! to race HTTP request with timeout
// 3. Return appropriate HealthStatus
todo!()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_health_checker() {
let mut checker = MultiHealthChecker::new(Duration::from_secs(5));
checker.add_endpoint("google".to_string(), "https://google.com".to_string());
checker.add_endpoint("example".to_string(), "https://example.com".to_string());
let results = checker.check_all().await;
assert_eq!(results.len(), 2);
for (name, status) in results {
println!("{}: {:?}", name, status);
}
}
}
Solution:
impl MultiHealthChecker {
pub async fn check_all(&self) -> HashMap<String, HealthStatus> {
let checks: Vec<_> = self
.endpoints
.iter()
.map(|(name, url)| {
let name = name.clone();
let url = url.clone();
async move {
let status = self.check_endpoint(&url).await;
(name, status)
}
})
.collect();
futures::future::join_all(checks)
.await
.into_iter()
.collect()
}
async fn check_endpoint(&self, url: &str) -> HealthStatus {
let start = Instant::now();
let result = tokio::select! {
result = reqwest::get(url) => result,
_ = tokio::time::sleep(self.timeout) => {
return HealthStatus::Timeout;
}
};
match result {
Ok(resp) if resp.status().is_success() => HealthStatus::Healthy {
latency_ms: start.elapsed().as_millis(),
},
Ok(resp) => HealthStatus::Unhealthy {
error: format!("HTTP {}", resp.status()),
},
Err(e) => HealthStatus::Unhealthy {
error: e.to_string(),
},
}
}
}
use std::pin::Pin;
use std::task::{Context, Poll};
use std::future::Future;
/// Select combinator with priority - higher priority futures checked first
pub struct PrioritySelect<F1, F2, F3> {
high_priority: Pin<Box<F1>>,
medium_priority: Pin<Box<F2>>,
low_priority: Pin<Box<F3>>,
}
pub enum PriorityResult<T1, T2, T3> {
High(T1),
Medium(T2),
Low(T3),
}
impl<F1, F2, F3> PrioritySelect<F1, F2, F3>
where
F1: Future,
F2: Future,
F3: Future,
{
pub fn new(high: F1, medium: F2, low: F3) -> Self {
PrioritySelect {
high_priority: Box::pin(high),
medium_priority: Box::pin(medium),
low_priority: Box::pin(low),
}
}
}
impl<F1, F2, F3> Future for PrioritySelect<F1, F2, F3>
where
F1: Future,
F2: Future,
F3: Future,
{
type Output = PriorityResult<F1::Output, F2::Output, F3::Output>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// TODO:
// 1. Always poll high_priority first
// 2. If high is Ready, return immediately
// 3. Then poll medium_priority
// 4. If medium is Ready, return immediately
// 5. Finally poll low_priority
// 6. If all Pending, return Pending
todo!()
}
}
// Helper function to use it
pub fn priority_select<F1, F2, F3>(
high: F1,
medium: F2,
low: F3,
) -> PrioritySelect<F1, F2, F3>
where
F1: Future,
F2: Future,
F3: Future,
{
PrioritySelect::new(high, medium, low)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_high_priority_wins() {
let result = priority_select(
async { tokio::time::sleep(Duration::from_millis(10)).await; "high" },
async { tokio::time::sleep(Duration::from_millis(5)).await; "medium" },
async { tokio::time::sleep(Duration::from_millis(1)).await; "low" },
)
.await;
// Even though low completes first, high should win due to priority
// Wait... actually low will complete first!
// This tests that priority polling gives high-priority futures
// more opportunities to complete
match result {
PriorityResult::High(_) => println!("High priority"),
PriorityResult::Medium(_) => println!("Medium priority"),
PriorityResult::Low(v) => {
assert_eq!(v, "low");
println!("Low priority completed first");
}
}
}
}
Solution:
impl<F1, F2, F3> Future for PrioritySelect<F1, F2, F3>
where
F1: Future,
F2: Future,
F3: Future,
{
type Output = PriorityResult<F1::Output, F2::Output, F3::Output>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Check high priority first
if let Poll::Ready(output) = self.high_priority.as_mut().poll(cx) {
return Poll::Ready(PriorityResult::High(output));
}
// Then medium priority
if let Poll::Ready(output) = self.medium_priority.as_mut().poll(cx) {
return Poll::Ready(PriorityResult::Medium(output));
}
// Finally low priority
if let Poll::Ready(output) = self.low_priority.as_mut().poll(cx) {
return Poll::Ready(PriorityResult::Low(output));
}
// All pending
Poll::Pending
}
}
---
use tokio::sync::mpsc;
use tokio::signal;
/// Complete server with select! patterns
pub async fn run_server() {
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
// Main server loop
tokio::select! {
// Handle signals
_ = signal::ctrl_c() => {
println!("Ctrl+C received");
}
// Wait for explicit shutdown
_ = shutdown_rx.recv() => {
println!("Shutdown signal received");
}
// Or run for maximum duration
_ = tokio::time::sleep(Duration::from_secs(3600)) => {
println!("Max runtime reached");
}
}
println!("Server shutting down");
}
/// select! with else branch for default action
pub async fn select_with_else() {
let (tx, mut rx) = mpsc::channel::<String>(10);
tokio::select! {
Some(msg) = rx.recv() => {
println!("Received: {}", msg);
}
else => {
println!("Channel closed, no more messages");
}
}
}
use futures::select;
use futures::FutureExt; // for .fuse()
/// futures::select! requires .fuse() for reusable futures
pub async fn futures_select_example() {
let mut future1 = async { 1 }.fuse();
let mut future2 = async { 2 }.fuse();
select! {
result = future1 => println!("Future 1: {}", result),
result = future2 => println!("Future 2: {}", result),
}
// Key difference: futures::select! can be used in loops
// because .fuse() makes futures reusable
}
/// tokio::select! for one-shot selection
pub async fn tokio_select_example() {
// No .fuse() needed, but futures are consumed
tokio::select! {
result = async { 1 } => println!("Result: {}", result),
result = async { 2 } => println!("Result: {}", result),
}
}
/// Initialize multiple services, fail if any fails
pub async fn initialize_services() -> Result<Services, InitError> {
let (db, cache, queue) = tokio::try_join!(
init_database(),
init_cache(),
init_queue(),
)?;
Ok(Services { db, cache, queue })
}
struct Services {
db: Database,
cache: Cache,
queue: Queue,
}
#[derive(Debug)]
struct Database;
#[derive(Debug)]
struct Cache;
#[derive(Debug)]
struct Queue;
#[derive(Debug, thiserror::Error)]
enum InitError {
#[error("Database init failed")]
Database,
#[error("Cache init failed")]
Cache,
#[error("Queue init failed")]
Queue,
}
async fn init_database() -> Result<Database, InitError> {
Ok(Database)
}
async fn init_cache() -> Result<Cache, InitError> {
Ok(Cache)
}
async fn init_queue() -> Result<Queue, InitError> {
Ok(Queue)
}
use futures::stream::{FuturesUnordered, StreamExt};
/// Process dynamic number of futures concurrently
pub async fn dynamic_select() {
let mut futures = FuturesUnordered::new();
// Add futures dynamically
for i in 0..10 {
futures.push(async move {
tokio::time::sleep(Duration::from_millis(i * 10)).await;
i
});
}
// Process as they complete
while let Some(result) = futures.next().await {
println!("Future {} completed", result);
}
}
/// Race dynamic number of HTTP requests
pub async fn race_many_requests(urls: Vec<String>) -> Option<String> {
let mut futures = FuturesUnordered::new();
for url in urls {
futures.push(async move {
reqwest::get(&url).await.ok()?.text().await.ok()
});
}
// Return first successful result
while let Some(result) = futures.next().await {
if let Some(response) = result {
return Some(response);
}
}
None
}
---
| Pattern | Latency | Throughput | Complexity | Use Case |
|---------|---------|------------|------------|----------|
| Sequential .await | High | Low | Low | Dependencies |
| join! | Low | High | Medium | Concurrent I/O |
| select! | Low | Medium | Medium | Racing, timeouts |
| spawn | Low | Highest | High | CPU parallelism |
Now go forth and compose futures like a legendary Rust master! ๐ฆโก
Run this code in the official Rust Playground