Home/Async/Await & Futures/Cancellation & Timeouts

Cancellation & Timeouts

Proper cleanup patterns

advanced
cancellationtimeoutcleanup
🎮 Interactive Playground

What is Cancellation Safety?

Cancellation safety is the property that an async operation can be safely dropped (cancelled) at any .await point without corrupting state or leaking resources.

In async Rust, futures can be cancelled at any time by dropping them. This happens with:

  • tokio::select! when another branch completes first
  • tokio::time::timeout when time expires
  • Manual drop of a JoinHandle
  • Task cancellation via CancellationToken

The Core Problem

use tokio::fs::File;
use tokio::io::AsyncWriteExt;

// UNSAFE: Not cancellation-safe!
async fn write_user_data(id: u64, data: &[u8]) -> std::io::Result<()> {
    let mut file = File::create(format!("user_{}.data", id)).await?;
    
    // If cancelled here, file is created but empty
    file.write_all(b"HEADER:").await?;
    
    // If cancelled here, file has header but incomplete data
    file.write_all(data).await?;
    
    // If cancelled here, data written but not synced to disk
    file.sync_all().await?;
    
    Ok(())
}

// SAFE: Cancellation-safe with atomic rename
async fn write_user_data_safe(id: u64, data: &[u8]) -> std::io::Result<()> {
    use tokio::fs;
    
    let tmp_path = format!("user_{}.tmp", id);
    let final_path = format!("user_{}.data", id);
    
    // Write to temporary file
    let mut file = File::create(&tmp_path).await?;
    file.write_all(b"HEADER:").await?;
    file.write_all(data).await?;
    file.sync_all().await?;
    drop(file); // Close before rename
    
    // Atomic rename - if cancelled before this, no corrupt file
    fs::rename(&tmp_path, &final_path).await?;
    
    Ok(())
}

Cancellation Points

Futures can only be cancelled at .await points:

async fn cancellation_points() {
    // Non-cancellable: runs to completion
    let x = expensive_computation();
    
    // CANCELLATION POINT: Can be dropped here
    some_async_operation().await;
    
    // Non-cancellable: runs to completion
    let y = more_computation(x);
    
    // CANCELLATION POINT: Can be dropped here
    another_async_operation().await;
}
Key insight: Between .await points, code runs to completion. This is why blocking operations in async code are problematic - they can't be cancelled.

---

Real-World Examples

Example 1: Database Transaction Rollback (Web/Backend)

Ensuring database transactions rollback on cancellation is critical for data consistency.

use sqlx::{PgPool, Postgres, Transaction};
use std::ops::{Deref, DerefMut};

/// RAII guard that ensures transaction rollback on drop
struct TransactionGuard<'c> {
    tx: Option<Transaction<'c, Postgres>>,
    committed: bool,
}

impl<'c> TransactionGuard<'c> {
    async fn new(pool: &PgPool) -> Result<Self, sqlx::Error> {
        let tx = pool.begin().await?;
        Ok(Self {
            tx: Some(tx),
            committed: false,
        })
    }
    
    async fn commit(mut self) -> Result<(), sqlx::Error> {
        if let Some(tx) = self.tx.take() {
            tx.commit().await?;
            self.committed = true;
        }
        Ok(())
    }
}

impl<'c> Deref for TransactionGuard<'c> {
    type Target = Transaction<'c, Postgres>;
    
    fn deref(&self) -> &Self::Target {
        self.tx.as_ref().unwrap()
    }
}

impl<'c> DerefMut for TransactionGuard<'c> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        self.tx.as_mut().unwrap()
    }
}

impl<'c> Drop for TransactionGuard<'c> {
    fn drop(&mut self) {
        if !self.committed && self.tx.is_some() {
            // Transaction will be rolled back when dropped
            // This is synchronous - the connection will rollback
            // the transaction when the connection is returned to the pool
            eprintln!("Warning: Transaction rolled back due to cancellation");
        }
    }
}

/// Transfer money between accounts with cancellation safety
async fn transfer_money(
    pool: &PgPool,
    from_account: i64,
    to_account: i64,
    amount: i64,
) -> Result<(), sqlx::Error> {
    let mut tx = TransactionGuard::new(pool).await?;
    
    // If cancelled during these operations, transaction will rollback
    sqlx::query("UPDATE accounts SET balance = balance - $1 WHERE id = $2")
        .bind(amount)
        .bind(from_account)
        .execute(&mut *tx)
        .await?;
    
    sqlx::query("UPDATE accounts SET balance = balance + $1 WHERE id = $2")
        .bind(amount)
        .bind(to_account)
        .execute(&mut *tx)
        .await?;
    
    // Explicit commit - only here does the transaction become permanent
    tx.commit().await?;
    
    Ok(())
}

/// Web handler with timeout
async fn handle_transfer_request(
    pool: PgPool,
    from: i64,
    to: i64,
    amount: i64,
) -> Result<String, String> {
    use tokio::time::{timeout, Duration};
    
    // 5-second timeout - if exceeded, transaction rolls back automatically
    match timeout(
        Duration::from_secs(5),
        transfer_money(&pool, from, to, amount),
    ).await {
        Ok(Ok(())) => Ok("Transfer completed".to_string()),
        Ok(Err(e)) => Err(format!("Database error: {}", e)),
        Err(_) => {
            // Timeout - transaction was automatically rolled back
            Err("Transfer timed out and was rolled back".to_string())
        }
    }
}
Why this works:
  • TransactionGuard uses RAII to ensure cleanup
  • Drop implementation handles rollback automatically
  • Database connection rollback is synchronous and safe in Drop
  • Explicit commit() is the only way to persist changes
Production usage: This pattern is used in web services handling financial transactions, inventory management, and any multi-step database operations.

Example 2: Atomic File Operations (Systems Programming)

File operations are inherently cancellation-unsafe. Partial writes can corrupt data.

use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncWriteExt, BufWriter};
use std::path::{Path, PathBuf};
use std::io;

/// Cancellation-safe file writer using atomic rename
pub struct AtomicFileWriter {
    temp_path: PathBuf,
    final_path: PathBuf,
    writer: Option<BufWriter<File>>,
}

impl AtomicFileWriter {
    pub async fn new(path: impl AsRef<Path>) -> io::Result<Self> {
        let final_path = path.as_ref().to_path_buf();
        let temp_path = final_path.with_extension("tmp");
        
        let file = OpenOptions::new()
            .write(true)
            .create(true)
            .truncate(true)
            .open(&temp_path)
            .await?;
        
        let writer = BufWriter::new(file);
        
        Ok(Self {
            temp_path,
            final_path,
            writer: Some(writer),
        })
    }
    
    pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
        self.writer.as_mut().unwrap().write_all(data).await
    }
    
    /// Commit changes atomically
    pub async fn commit(mut self) -> io::Result<()> {
        if let Some(mut writer) = self.writer.take() {
            // Flush all data to disk
            writer.flush().await?;
            
            // Sync to ensure durability
            writer.get_ref().sync_all().await?;
            
            // Close file before rename
            drop(writer);
            
            // Atomic rename - this is the commit point
            tokio::fs::rename(&self.temp_path, &self.final_path).await?;
        }
        
        Ok(())
    }
}

impl Drop for AtomicFileWriter {
    fn drop(&mut self) {
        if self.writer.is_some() {
            // Cleanup temporary file on cancellation
            // We can't do async work in Drop, so we just log
            eprintln!(
                "Warning: AtomicFileWriter dropped without commit, \
                 temp file may remain: {:?}",
                self.temp_path
            );
            
            // In production, you'd spawn a cleanup task:
            // tokio::spawn(async move {
            //     let _ = tokio::fs::remove_file(temp_path).await;
            // });
        }
    }
}

/// Configuration file writer with timeout
pub async fn write_config_with_timeout<T: serde::Serialize>(
    path: impl AsRef<Path>,
    config: &T,
    timeout_secs: u64,
) -> Result<(), Box<dyn std::error::Error>> {
    use tokio::time::{timeout, Duration};
    
    let write_op = async {
        let mut writer = AtomicFileWriter::new(path).await?;
        
        let json = serde_json::to_string_pretty(config)?;
        writer.write_all(json.as_bytes()).await?;
        
        writer.commit().await?;
        
        Ok::<_, Box<dyn std::error::Error>>(())
    };
    
    timeout(Duration::from_secs(timeout_secs), write_op)
        .await
        .map_err(|_| "Config write timed out")??;
    
    Ok(())
}

/// Example: Multi-file update with rollback on cancellation
pub struct MultiFileTransaction {
    files: Vec<(PathBuf, AtomicFileWriter)>,
}

impl MultiFileTransaction {
    pub fn new() -> Self {
        Self { files: Vec::new() }
    }
    
    pub async fn add_file(
        &mut self,
        path: impl AsRef<Path>,
        content: &[u8],
    ) -> io::Result<()> {
        let path = path.as_ref().to_path_buf();
        let mut writer = AtomicFileWriter::new(&path).await?;
        writer.write_all(content).await?;
        self.files.push((path, writer));
        Ok(())
    }
    
    /// Commit all files atomically (as much as possible)
    pub async fn commit(self) -> io::Result<()> {
        // Commit all files
        for (_, writer) in self.files {
            writer.commit().await?;
        }
        Ok(())
    }
}

impl Drop for MultiFileTransaction {
    fn drop(&mut self) {
        if !self.files.is_empty() {
            eprintln!(
                "Warning: MultiFileTransaction dropped without commit, \
                 {} files not written",
                self.files.len()
            );
        }
    }
}
Key patterns:
  1. Temporary file + atomic rename: Ensures either complete file or no file
  2. Drop guard: Cleans up on cancellation
  3. Explicit commit: Success must be intentional
  4. Sync before rename: Ensures durability
Production usage: Configuration management, log rotation, database write-ahead logs, checkpoint files.

Example 3: HTTP Request with Timeout (Network Programming)

Network operations need timeouts to prevent hanging indefinitely.

use reqwest::{Client, Response};
use std::time::Duration;
use tokio::time::timeout;
use std::sync::Arc;
use tokio::sync::Semaphore;

/// HTTP client with built-in timeout and cancellation
pub struct TimeoutClient {
    client: Client,
    default_timeout: Duration,
    max_concurrent: Arc<Semaphore>,
}

impl TimeoutClient {
    pub fn new(default_timeout: Duration, max_concurrent: usize) -> Self {
        Self {
            client: Client::builder()
                .pool_max_idle_per_host(10)
                .build()
                .unwrap(),
            default_timeout,
            max_concurrent: Arc::new(Semaphore::new(max_concurrent)),
        }
    }
    
    /// Make a GET request with timeout
    pub async fn get_with_timeout(
        &self,
        url: &str,
        timeout_duration: Option<Duration>,
    ) -> Result<Response, RequestError> {
        // Acquire semaphore permit (limits concurrent requests)
        let _permit = self.max_concurrent.acquire().await.unwrap();
        
        let timeout_duration = timeout_duration.unwrap_or(self.default_timeout);
        
        // The actual request with timeout
        let result = timeout(timeout_duration, async {
            self.client
                .get(url)
                .send()
                .await
                .map_err(RequestError::Network)?
                .error_for_status()
                .map_err(RequestError::Http)
        })
        .await;
        
        match result {
            Ok(res) => res,
            Err(_) => Err(RequestError::Timeout),
        }
        // Permit is automatically dropped here, releasing the slot
    }
    
    /// Make multiple requests with individual timeouts
    pub async fn get_many(
        &self,
        urls: &[String],
        per_request_timeout: Duration,
    ) -> Vec<Result<Response, RequestError>> {
        use futures::stream::{self, StreamExt};
        
        // Process requests concurrently
        stream::iter(urls)
            .map(|url| self.get_with_timeout(url, Some(per_request_timeout)))
            .buffer_unordered(10) // Max 10 concurrent
            .collect()
            .await
    }
}

#[derive(Debug, thiserror::Error)]
pub enum RequestError {
    #[error("Request timed out")]
    Timeout,
    
    #[error("Network error: {0}")]
    Network(#[from] reqwest::Error),
    
    #[error("HTTP error: {0}")]
    Http(reqwest::Error),
}

/// Circuit breaker pattern with cancellation
pub struct CircuitBreaker {
    client: TimeoutClient,
    failure_count: Arc<tokio::sync::Mutex<u32>>,
    failure_threshold: u32,
    timeout: Duration,
}

impl CircuitBreaker {
    pub fn new(timeout: Duration, failure_threshold: u32) -> Self {
        Self {
            client: TimeoutClient::new(timeout, 100),
            failure_count: Arc::new(tokio::sync::Mutex::new(0)),
            failure_threshold,
            timeout,
        }
    }
    
    pub async fn call(&self, url: &str) -> Result<Response, RequestError> {
        let failures = *self.failure_count.lock().await;
        
        if failures >= self.failure_threshold {
            return Err(RequestError::Http(
                reqwest::Error::new(
                    reqwest::StatusCode::SERVICE_UNAVAILABLE,
                    "Circuit breaker open",
                ),
            ));
        }
        
        match self.client.get_with_timeout(url, Some(self.timeout)).await {
            Ok(resp) => {
                // Reset on success
                *self.failure_count.lock().await = 0;
                Ok(resp)
            }
            Err(e) => {
                // Increment failure count
                *self.failure_count.lock().await += 1;
                Err(e)
            }
        }
    }
}

/// Example: API aggregation with timeout
pub async fn aggregate_user_data(
    client: &TimeoutClient,
    user_id: u64,
) -> Result<UserData, Box<dyn std::error::Error>> {
    use tokio::time::timeout;
    
    // Aggregate from multiple services with overall timeout
    let aggregate_op = async {
        // Parallel requests with individual timeouts
        let (profile, posts, friends) = tokio::try_join!(
            async {
                let resp = client
                    .get_with_timeout(
                        &format!("https://api.example.com/users/{}", user_id),
                        Some(Duration::from_secs(2)),
                    )
                    .await?;
                resp.json::<Profile>().await.map_err(RequestError::from)
            },
            async {
                let resp = client
                    .get_with_timeout(
                        &format!("https://api.example.com/users/{}/posts", user_id),
                        Some(Duration::from_secs(3)),
                    )
                    .await?;
                resp.json::<Vec<Post>>().await.map_err(RequestError::from)
            },
            async {
                let resp = client
                    .get_with_timeout(
                        &format!("https://api.example.com/users/{}/friends", user_id),
                        Some(Duration::from_secs(2)),
                    )
                    .await?;
                resp.json::<Vec<UserId>>().await.map_err(RequestError::from)
            },
        )?;
        
        Ok::<_, RequestError>(UserData {
            profile,
            posts,
            friends,
        })
    };
    
    // Overall timeout of 5 seconds for all operations
    timeout(Duration::from_secs(5), aggregate_op)
        .await
        .map_err(|_| "Overall aggregation timed out")??;
    
    Ok(UserData {
        profile: Profile::default(),
        posts: vec![],
        friends: vec![],
    })
}

#[derive(Default)]
struct UserData {
    profile: Profile,
    posts: Vec<Post>,
    friends: Vec<UserId>,
}

#[derive(Default, serde::Deserialize)]
struct Profile {}

#[derive(serde::Deserialize)]
struct Post {}

#[derive(serde::Deserialize)]
struct UserId(u64);
Key features:
  1. Nested timeouts: Per-request + overall timeout
  2. Semaphore for backpressure: Limits concurrent requests
  3. Automatic cleanup: Permits released on cancellation
  4. Circuit breaker: Prevents cascade failures
Production usage: API gateways, microservice aggregation, web scrapers, health check systems.

Example 4: Task Cancellation Tree (Production Systems)

Coordinated shutdown of multiple tasks using CancellationToken.

use tokio_util::sync::CancellationToken;
use tokio::time::{sleep, Duration};
use std::sync::Arc;

/// Background worker with graceful shutdown
pub struct BackgroundWorker {
    name: String,
    cancel_token: CancellationToken,
    handle: Option<tokio::task::JoinHandle<()>>,
}

impl BackgroundWorker {
    pub fn new(name: impl Into<String>, cancel_token: CancellationToken) -> Self {
        Self {
            name: name.into(),
            cancel_token,
            handle: None,
        }
    }
    
    /// Start the worker
    pub fn start<F, Fut>(&mut self, work: F)
    where
        F: FnOnce(CancellationToken) -> Fut + Send + 'static,
        Fut: std::future::Future<Output = ()> + Send,
    {
        let name = self.name.clone();
        let token = self.cancel_token.clone();
        
        let handle = tokio::spawn(async move {
            println!("[{}] Starting", name);
            
            work(token).await;
            
            println!("[{}] Shutdown complete", name);
        });
        
        self.handle = Some(handle);
    }
    
    /// Wait for graceful shutdown
    pub async fn join(self) {
        if let Some(handle) = self.handle {
            let _ = handle.await;
        }
    }
}

/// Application with coordinated shutdown
pub struct Application {
    root_token: CancellationToken,
    workers: Vec<BackgroundWorker>,
}

impl Application {
    pub fn new() -> Self {
        Self {
            root_token: CancellationToken::new(),
            workers: Vec::new(),
        }
    }
    
    /// Add a worker to the application
    pub fn add_worker<F, Fut>(&mut self, name: impl Into<String>, work: F)
    where
        F: FnOnce(CancellationToken) -> Fut + Send + 'static,
        Fut: std::future::Future<Output = ()> + Send,
    {
        let child_token = self.root_token.child_token();
        let mut worker = BackgroundWorker::new(name, child_token.clone());
        worker.start(work);
        self.workers.push(worker);
    }
    
    /// Run application with graceful shutdown
    pub async fn run(self) {
        use tokio::signal;
        
        // Wait for shutdown signal
        tokio::select! {
            _ = signal::ctrl_c() => {
                println!("\nReceived Ctrl+C, initiating graceful shutdown...");
            }
            _ = self.root_token.cancelled() => {
                println!("Shutdown requested programmatically");
            }
        }
        
        // Cancel all workers
        self.root_token.cancel();
        
        // Wait for all workers to finish
        for worker in self.workers {
            worker.join().await;
        }
        
        println!("All workers shut down gracefully");
    }
    
    /// Get a handle to trigger shutdown
    pub fn shutdown_handle(&self) -> CancellationToken {
        self.root_token.clone()
    }
}

/// Example workers

async fn database_worker(cancel: CancellationToken) {
    loop {
        tokio::select! {
            _ = cancel.cancelled() => {
                println!("[DB] Closing connections...");
                sleep(Duration::from_millis(500)).await;
                break;
            }
            _ = sleep(Duration::from_secs(1)) => {
                println!("[DB] Processing batch...");
            }
        }
    }
}

async fn http_server_worker(cancel: CancellationToken) {
    loop {
        tokio::select! {
            _ = cancel.cancelled() => {
                println!("[HTTP] Draining active connections...");
                sleep(Duration::from_millis(1000)).await;
                break;
            }
            _ = sleep(Duration::from_millis(500)) => {
                println!("[HTTP] Handling request...");
            }
        }
    }
}

async fn metrics_worker(cancel: CancellationToken) {
    loop {
        tokio::select! {
            _ = cancel.cancelled() => {
                println!("[Metrics] Flushing final metrics...");
                sleep(Duration::from_millis(200)).await;
                break;
            }
            _ = sleep(Duration::from_secs(10)) => {
                println!("[Metrics] Reporting metrics...");
            }
        }
    }
}

/// Example: Complex service with task hierarchy
pub async fn run_service() {
    let mut app = Application::new();
    
    // Add workers
    app.add_worker("database", database_worker);
    app.add_worker("http-server", http_server_worker);
    app.add_worker("metrics", metrics_worker);
    
    // Could add more nested workers
    let shutdown = app.shutdown_handle();
    app.add_worker("health-check", move |cancel| async move {
        loop {
            tokio::select! {
                _ = cancel.cancelled() => break,
                _ = sleep(Duration::from_secs(30)) => {
                    println!("[Health] Checking system health...");
                    // If unhealthy, could trigger shutdown:
                    // shutdown.cancel();
                }
            }
        }
    });
    
    // Run until shutdown signal
    app.run().await;
}
Key patterns:
  1. Parent-child token hierarchy: Cancelling parent cancels all children
  2. Graceful shutdown: Workers finish current work before stopping
  3. Coordinated cleanup: All workers shut down together
  4. Signal handling: Ctrl+C triggers graceful shutdown
Production usage: Web servers, background job processors, streaming pipelines, microservices.

Example 5: Long-Running Computation with Checkpointing

Making expensive computations cancellation-aware with progress saving.

use std::sync::Arc;
use tokio::sync::{Mutex, watch};
use tokio::time::{sleep, Duration, Instant};
use tokio_util::sync::CancellationToken;

/// Progress tracker for long-running operations
#[derive(Clone)]
pub struct Progress {
    current: Arc<Mutex<usize>>,
    total: usize,
    sender: watch::Sender<f64>,
}

impl Progress {
    pub fn new(total: usize) -> (Self, watch::Receiver<f64>) {
        let (sender, receiver) = watch::channel(0.0);
        (
            Self {
                current: Arc::new(Mutex::new(0)),
                total,
                sender,
            },
            receiver,
        )
    }
    
    pub async fn update(&self, current: usize) {
        *self.current.lock().await = current;
        let percent = (current as f64 / self.total as f64) * 100.0;
        let _ = self.sender.send(percent);
    }
    
    pub async fn get(&self) -> usize {
        *self.current.lock().await
    }
}

/// Checkpoint for resumable computation
#[derive(serde::Serialize, serde::Deserialize)]
pub struct Checkpoint {
    processed: usize,
    partial_results: Vec<u64>,
    timestamp: u64,
}

/// Cancellation-aware computation with checkpointing
pub struct ResumableComputation {
    checkpoint_path: String,
    progress: Progress,
    cancel_token: CancellationToken,
}

impl ResumableComputation {
    pub fn new(
        checkpoint_path: String,
        total_items: usize,
        cancel_token: CancellationToken,
    ) -> (Self, watch::Receiver<f64>) {
        let (progress, receiver) = Progress::new(total_items);
        
        (
            Self {
                checkpoint_path,
                progress,
                cancel_token,
            },
            receiver,
        )
    }
    
    /// Load checkpoint if exists
    async fn load_checkpoint(&self) -> Option<Checkpoint> {
        match tokio::fs::read_to_string(&self.checkpoint_path).await {
            Ok(data) => serde_json::from_str(&data).ok(),
            Err(_) => None,
        }
    }
    
    /// Save checkpoint
    async fn save_checkpoint(&self, checkpoint: &Checkpoint) -> std::io::Result<()> {
        let data = serde_json::to_string(checkpoint)?;
        tokio::fs::write(&self.checkpoint_path, data).await
    }
    
    /// Process items with cancellation checks and checkpointing
    pub async fn process_items(
        &self,
        items: Vec<u64>,
    ) -> Result<Vec<u64>, ComputeError> {
        let mut results = Vec::new();
        let mut start_idx = 0;
        
        // Try to resume from checkpoint
        if let Some(checkpoint) = self.load_checkpoint().await {
            println!(
                "Resuming from checkpoint: {} items processed",
                checkpoint.processed
            );
            results = checkpoint.partial_results;
            start_idx = checkpoint.processed;
            self.progress.update(start_idx).await;
        }
        
        let checkpoint_interval = 100; // Checkpoint every 100 items
        let cancellation_check_interval = 10; // Check cancellation every 10 items
        
        for (idx, item) in items.iter().enumerate().skip(start_idx) {
            // Check for cancellation periodically
            if idx % cancellation_check_interval == 0 {
                if self.cancel_token.is_cancelled() {
                    println!("Computation cancelled, saving checkpoint...");
                    
                    let checkpoint = Checkpoint {
                        processed: idx,
                        partial_results: results.clone(),
                        timestamp: std::time::SystemTime::now()
                            .duration_since(std::time::UNIX_EPOCH)
                            .unwrap()
                            .as_secs(),
                    };
                    
                    self.save_checkpoint(&checkpoint).await?;
                    return Err(ComputeError::Cancelled);
                }
            }
            
            // Simulate expensive computation
            let result = expensive_operation(*item).await;
            results.push(result);
            
            // Update progress
            self.progress.update(idx + 1).await;
            
            // Periodic checkpoint
            if idx % checkpoint_interval == 0 {
                let checkpoint = Checkpoint {
                    processed: idx + 1,
                    partial_results: results.clone(),
                    timestamp: std::time::SystemTime::now()
                        .duration_since(std::time::UNIX_EPOCH)
                        .unwrap()
                        .as_secs(),
                };
                
                self.save_checkpoint(&checkpoint).await?;
            }
        }
        
        // Clean up checkpoint on success
        let _ = tokio::fs::remove_file(&self.checkpoint_path).await;
        
        Ok(results)
    }
}

async fn expensive_operation(item: u64) -> u64 {
    // Simulate expensive work
    sleep(Duration::from_millis(10)).await;
    item * 2
}

#[derive(Debug, thiserror::Error)]
pub enum ComputeError {
    #[error("Computation cancelled")]
    Cancelled,
    
    #[error("IO error: {0}")]
    Io(#[from] std::io::Error),
}

/// Example: Running computation with timeout and cancellation
pub async fn run_with_timeout_and_resume() -> Result<(), Box<dyn std::error::Error>> {
    let items: Vec<u64> = (0..1000).collect();
    let cancel_token = CancellationToken::new();
    
    let (computation, mut progress_rx) = ResumableComputation::new(
        "computation.checkpoint".to_string(),
        items.len(),
        cancel_token.clone(),
    );
    
    // Spawn progress reporter
    let progress_task = tokio::spawn(async move {
        while progress_rx.changed().await.is_ok() {
            let progress = *progress_rx.borrow();
            println!("Progress: {:.1}%", progress);
        }
    });
    
    // Run with timeout
    let compute_task = tokio::spawn(async move {
        computation.process_items(items).await
    });
    
    // Simulate timeout after 2 seconds
    tokio::select! {
        result = compute_task => {
            match result? {
                Ok(results) => {
                    println!("Computation completed: {} results", results.len());
                    Ok(())
                }
                Err(ComputeError::Cancelled) => {
                    println!("Computation was cancelled and checkpointed");
                    Ok(())
                }
                Err(e) => Err(Box::new(e) as Box<dyn std::error::Error>),
            }
        }
        _ = sleep(Duration::from_secs(2)) => {
            println!("Timeout reached, cancelling computation...");
            cancel_token.cancel();
            
            // Wait for cleanup
            sleep(Duration::from_millis(500)).await;
            
            println!("Can resume later by running again");
            Ok(())
        }
    }?;
    
    progress_task.abort();
    Ok(())
}
Key features:
  1. Periodic cancellation checks: Don't wait for .await points
  2. Checkpoint saving: Resume from last saved state
  3. Progress reporting: Real-time feedback
  4. Cleanup on completion: Remove checkpoint when done
Production usage: Data processing pipelines, ML training jobs, batch ETL operations, video encoding.

---

Deep Dive: How Cancellation Works

Drop Semantics in Async

Futures in Rust are lazy and can be dropped at any .await point:

use tokio::time::{sleep, Duration};

async fn can_be_cancelled() {
    println!("Starting...");
    
    // This runs to completion (no .await)
    let x = expensive_sync_work();
    
    // CANCELLATION POINT
    // Future can be dropped here
    sleep(Duration::from_secs(1)).await;
    
    // If cancelled above, this never runs
    println!("Finished: {}", x);
}

fn expensive_sync_work() -> u32 {
    // This CANNOT be cancelled once started
    // Runs to completion atomically
    (0..1000000).sum()
}

The Drop Trait and Async

The Drop trait is synchronous - you cannot .await in it:

use tokio::fs::File;

struct AsyncResource {
    file: Option<File>,
}

impl Drop for AsyncResource {
    fn drop(&mut self) {
        // CANNOT DO THIS:
        // self.file.close().await; // Error: cannot await in Drop
        
        // Drop is synchronous - file is closed when dropped
        // but there's no guarantee of when that happens
        if self.file.is_some() {
            eprintln!("Warning: AsyncResource dropped without explicit cleanup");
        }
    }
}

// BETTER: Explicit async cleanup
impl AsyncResource {
    async fn close(mut self) -> std::io::Result<()> {
        if let Some(file) = self.file.take() {
            // Explicit async cleanup
            drop(file);
        }
        Ok(())
    }
}

Cancellation Safety Categories

Cancellation-safe operations:
// Safe: No state mutation until completion
async fn read_file(path: &str) -> std::io::Result<String> {
    tokio::fs::read_to_string(path).await
}

// Safe: Idempotent operation
async fn get_user(id: u64) -> Result<User, Error> {
    reqwest::get(&format!("https://api.example.com/users/{}", id))
        .await?
        .json()
        .await
}

// Safe: Atomic operation
async fn increment_counter(counter: &AtomicU64) {
    counter.fetch_add(1, Ordering::SeqCst);
}
Cancellation-unsafe operations:
// UNSAFE: Partial state mutation
async fn transfer_unsafe(from: &mut Account, to: &mut Account, amount: u64) {
    from.balance -= amount; // If cancelled here, money disappears!
    // CANCELLATION POINT
    expensive_logging().await;
    to.balance += amount; // This might not run
}

// UNSAFE: Multi-step file operation
async fn update_file_unsafe(path: &str, data: &[u8]) -> std::io::Result<()> {
    let mut file = File::create(path).await?; // File created
    // If cancelled here, empty file exists
    file.write_all(data).await?; // Partial data written
    // If cancelled here, incomplete data
    file.sync_all().await?; // Not synced to disk
    Ok(())
}

// UNSAFE: Acquiring without releasing
async fn deadlock_risk(mutex: &Mutex<State>) {
    let guard = mutex.lock().await;
    // If cancelled here, lock might not be released properly
    expensive_operation(&guard).await;
    // guard dropped here
}

Making Operations Cancellation-Safe

Pattern 1: Atomic Commit Point
async fn transfer_safe(
    from: &Mutex<Account>,
    to: &Mutex<Account>,
    amount: u64,
) -> Result<(), Error> {
    // All validation first (no state mutation)
    let mut from_guard = from.lock().await;
    let mut to_guard = to.lock().await;
    
    if from_guard.balance < amount {
        return Err(Error::InsufficientFunds);
    }
    
    // Do expensive work before mutation
    expensive_validation().await?;
    
    // ATOMIC SECTION: No .await between mutations
    from_guard.balance -= amount;
    to_guard.balance += amount;
    // End atomic section
    
    Ok(())
}
Pattern 2: RAII Guards
struct Guard<F: FnOnce()> {
    cleanup: Option<F>,
}

impl<F: FnOnce()> Guard<F> {
    fn new(cleanup: F) -> Self {
        Self {
            cleanup: Some(cleanup),
        }
    }
    
    fn disarm(mut self) {
        self.cleanup = None;
    }
}

impl<F: FnOnce()> Drop for Guard<F> {
    fn drop(&mut self) {
        if let Some(cleanup) = self.cleanup.take() {
            cleanup();
        }
    }
}

async fn with_guard() -> Result<(), Error> {
    let resource = acquire_resource().await?;
    
    // Guard ensures cleanup even on cancellation
    let _guard = Guard::new(|| {
        // This runs on drop, even if cancelled
        eprintln!("Cleaning up resource");
    });
    
    // If cancelled here, guard drops and cleanup runs
    use_resource(&resource).await?;
    
    Ok(())
}
Pattern 3: Try-Commit-Cancel
async fn try_commit_cancel<T, E>(
    operation: impl Future<Output = Result<T, E>>,
    on_cancel: impl FnOnce(),
) -> Result<T, E> {
    struct CancelGuard<F: FnOnce()>(Option<F>);
    
    impl<F: FnOnce()> Drop for CancelGuard<F> {
        fn drop(&mut self) {
            if let Some(f) = self.0.take() {
                f();
            }
        }
    }
    
    let mut guard = CancelGuard(Some(on_cancel));
    let result = operation.await?;
    guard.0 = None; // Disarm on success
    Ok(result)
}

// Usage
async fn create_user(db: &Database, user: User) -> Result<UserId, DbError> {
    let temp_id = db.insert_temp_user(&user).await?;
    
    try_commit_cancel(
        db.commit_user(temp_id),
        || eprintln!("Rollback user creation"),
    )
    .await
}

Timeout Implementation

Tokio's timeout is implemented using select!:

use tokio::time::{sleep, Duration, Instant};
use std::future::Future;

// Simplified version of tokio::time::timeout
pub async fn timeout<F, T>(
    duration: Duration,
    future: F,
) -> Result<T, Elapsed>
where
    F: Future<Output = T>,
{
    tokio::pin!(future);
    
    tokio::select! {
        result = &mut future => Ok(result),
        _ = sleep(duration) => Err(Elapsed),
    }
}

#[derive(Debug)]
pub struct Elapsed;

// Usage
async fn with_timeout() -> Result<String, Box<dyn std::error::Error>> {
    let result = timeout(
        Duration::from_secs(5),
        fetch_data(),
    ).await?;
    
    Ok(result)
}

async fn fetch_data() -> String {
    sleep(Duration::from_secs(10)).await;
    "Data".to_string()
}
How it works:
  1. Both futures are polled concurrently
  2. When sleep completes first, it returns Err(Elapsed)
  3. The future is dropped, triggering cancellation
  4. Any drop guards in future run at this point

CancellationToken Deep Dive

CancellationToken provides structured cancellation:
use tokio_util::sync::CancellationToken;

pub struct CancellationTokenImpl {
    // Simplified implementation
    inner: Arc<tokio::sync::Notify>,
}

impl CancellationTokenImpl {
    pub fn new() -> Self {
        Self {
            inner: Arc::new(tokio::sync::Notify::new()),
        }
    }
    
    pub fn cancel(&self) {
        self.inner.notify_waiters();
    }
    
    pub async fn cancelled(&self) {
        self.inner.notified().await;
    }
    
    pub fn is_cancelled(&self) -> bool {
        // In real implementation, this checks state
        false
    }
    
    pub fn child_token(&self) -> Self {
        // In real implementation, creates parent-child relationship
        Self {
            inner: self.inner.clone(),
        }
    }
}

// Usage pattern
async fn worker(cancel: CancellationToken) {
    loop {
        tokio::select! {
            _ = cancel.cancelled() => {
                println!("Cancellation requested");
                break;
            }
            _ = do_work() => {
                println!("Work completed");
            }
        }
    }
}

async fn do_work() {
    tokio::time::sleep(Duration::from_secs(1)).await;
}
Key features:
  • Parent-child hierarchy: Cancelling parent cancels all children
  • Clone-able: Can be shared across tasks
  • Efficient: Uses Notify internally for wake-up
  • No false positives: Once cancelled, stays cancelled

---

When to Use Cancellation Safety Patterns

Use When:

  1. Timeouts are Required

// API calls, network operations, user-facing requests
   let result = timeout(Duration::from_secs(30), api_call()).await?;

  1. Graceful Shutdown

// Server shutdown, worker cleanup, resource release
   tokio::select! {
       _ = shutdown_signal() => { /* cleanup */ }
       _ = server.run() => { /* normal exit */ }
   }

  1. Resource Management

// Ensure cleanup even on cancellation
   let _guard = Guard::new(|| cleanup_resource());
   risky_operation().await?;

  1. User Cancellation

// Long-running operations that users can cancel
   tokio::select! {
       _ = cancel_button_clicked() => { /* save checkpoint */ }
       result = long_computation() => { /* complete */ }
   }

  1. Fault Tolerance

// Circuit breakers, retry with timeout
   for attempt in 1..=3 {
       match timeout(Duration::from_secs(5), operation()).await {
           Ok(Ok(result)) => return Ok(result),
           Ok(Err(e)) => eprintln!("Attempt {} failed: {}", attempt, e),
           Err(_) => eprintln!("Attempt {} timed out", attempt),
       }
   }

Don't Use When:

  1. Operations Must Complete

// DON'T add timeout if partial completion is dangerous
   // Example: Financial transactions that must be atomic
   database.commit_transaction().await?; // No timeout!

  1. Fast Operations

// DON'T add overhead for operations that always complete quickly
   let x = atomic_counter.fetch_add(1, Ordering::SeqCst);

  1. Already Cancellation-Safe

// DON'T add guards if operation is already safe
   let data = fs::read_to_string(path).await?; // Already safe

---

⚠️ Anti-patterns

1. Relying on Async Drop (Not Guaranteed)

// BAD: Drop won't run async cleanup
struct BadAsyncResource {
    file: File,
}

impl Drop for BadAsyncResource {
    fn drop(&mut self) {
        // This doesn't compile - cannot await in Drop
        // self.file.sync_all().await;
    }
}

// GOOD: Explicit async cleanup method
struct GoodAsyncResource {
    file: Option<File>,
}

impl GoodAsyncResource {
    async fn close(mut self) -> std::io::Result<()> {
        if let Some(mut file) = self.file.take() {
            file.sync_all().await?;
        }
        Ok(())
    }
}

impl Drop for GoodAsyncResource {
    fn drop(&mut self) {
        if self.file.is_some() {
            eprintln!("Warning: Resource not properly closed");
        }
    }
}

2. Partial State Mutation

// BAD: State can be corrupted on cancellation
async fn update_user_bad(user: &mut User, db: &Database) -> Result<(), Error> {
    user.last_login = Utc::now(); // Mutated!
    
    // If cancelled here, memory state doesn't match DB
    db.update_user(user).await?;
    
    Ok(())
}

// GOOD: All-or-nothing update
async fn update_user_good(user_id: u64, db: &Database) -> Result<User, Error> {
    let mut updated = db.get_user(user_id).await?;
    updated.last_login = Utc::now();
    
    // If cancelled before here, no mutation
    db.update_user(&updated).await?;
    
    // Only mutate on success
    Ok(updated)
}

3. Resource Leaks on Cancellation

// BAD: Semaphore permit leaked on cancellation
async fn process_bad(sem: &Semaphore) -> Result<(), Error> {
    let permit = sem.acquire().await.unwrap();
    
    // If cancelled here, permit might not be released properly
    risky_operation().await?;
    
    drop(permit);
    Ok(())
}

// GOOD: RAII ensures release
async fn process_good(sem: &Semaphore) -> Result<(), Error> {
    let _permit = sem.acquire().await.unwrap();
    // Permit automatically released on drop, even if cancelled
    risky_operation().await?;
    Ok(())
}

4. Forgetting to Register Cancellation Tokens

// BAD: Spawned task ignores cancellation
async fn spawn_worker_bad(cancel: CancellationToken) {
    tokio::spawn(async {
        loop {
            do_work().await;
            // This never checks cancel token!
        }
    });
}

// GOOD: Task respects cancellation
async fn spawn_worker_good(cancel: CancellationToken) {
    tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = cancel.cancelled() => {
                    println!("Worker shutting down");
                    break;
                }
                _ = do_work() => {}
            }
        }
    });
}

5. Blocking in Drop

// BAD: Blocking in Drop delays cancellation
impl Drop for BadResource {
    fn drop(&mut self) {
        // This blocks the executor!
        std::thread::sleep(Duration::from_secs(5));
        // Expensive synchronous cleanup
        self.expensive_cleanup();
    }
}

// GOOD: Fast synchronous cleanup, or spawn blocking task
impl Drop for GoodResource {
    fn drop(&mut self) {
        // Fast cleanup only
        self.fast_cleanup();
        
        // For expensive cleanup, spawn a task
        // (but be aware this may not complete if process exits)
        if self.needs_expensive_cleanup {
            let data = self.cleanup_data.clone();
            tokio::spawn(async move {
                expensive_async_cleanup(data).await;
            });
        }
    }
}

---

Performance Characteristics

Cost of Cancellation Checks

Checking for cancellation has overhead:

use std::time::Instant;

async fn benchmark_cancellation_overhead() {
    let cancel = CancellationToken::new();
    
    // Baseline: no cancellation checks
    let start = Instant::now();
    for _ in 0..1_000_000 {
        // Work without cancellation check
    }
    let baseline = start.elapsed();
    
    // With cancellation checks
    let start = Instant::now();
    for _ in 0..1_000_000 {
        if cancel.is_cancelled() {
            break;
        }
        // Same work
    }
    let with_check = start.elapsed();
    
    println!("Baseline: {:?}", baseline);
    println!("With check: {:?}", with_check);
    println!("Overhead: {:?}", with_check - baseline);
    
    // Typical overhead: 10-50ns per check
    // For 1M iterations: ~10-50ms total
}
Guidelines:
  • Check every 100-1000 iterations for CPU-bound loops
  • Check before/after I/O operations (negligible overhead)
  • Don't check in hot inner loops

CancellationToken Overhead

// Memory overhead
struct Overhead {
    // CancellationToken: Arc<Inner> = 8 bytes
    token: CancellationToken,
    
    // Inner contains:
    // - AtomicBool for cancelled state: 1 byte
    // - Notify for wake-ups: ~24 bytes
    // - Parent reference: 8 bytes
    // Total: ~32 bytes per token
}

// Performance comparison
async fn compare_cancellation_methods() {
    // Method 1: CancellationToken (structured)
    let token = CancellationToken::new();
    let start = Instant::now();
    for _ in 0..10_000 {
        if token.is_cancelled() { break; }
    }
    println!("CancellationToken: {:?}", start.elapsed());
    
    // Method 2: AtomicBool (manual)
    let cancelled = Arc::new(AtomicBool::new(false));
    let start = Instant::now();
    for _ in 0..10_000 {
        if cancelled.load(Ordering::Acquire) { break; }
    }
    println!("AtomicBool: {:?}", start.elapsed());
    
    // Result: Similar performance, but CancellationToken
    // provides better ergonomics and parent-child relationships
}

Timeout Impact

async fn measure_timeout_overhead() {
    use tokio::time::{timeout, Duration, Instant};
    
    // Fast operation without timeout
    let start = Instant::now();
    for _ in 0..1000 {
        fast_operation().await;
    }
    let without_timeout = start.elapsed();
    
    // Fast operation with timeout
    let start = Instant::now();
    for _ in 0..1000 {
        let _ = timeout(Duration::from_secs(1), fast_operation()).await;
    }
    let with_timeout = start.elapsed();
    
    println!("Without timeout: {:?}", without_timeout);
    println!("With timeout: {:?}", with_timeout);
    
    // Overhead: ~100-500ns per timeout call
    // Reason: Setting up timer, select! machinery
}

async fn fast_operation() {
    // Completes immediately
}
Guidelines:
  • Timeout adds ~100-500ns overhead
  • Negligible for I/O operations
  • Avoid for hot loops with microsecond timing

---

Exercises

Beginner: Cancellation-Safe File Writer

Implement a file writer that ensures atomicity even on cancellation.

Task: Create SafeFileWriter that:
  1. Writes to a temporary file
  2. Uses atomic rename on success
  3. Cleans up temp file on cancellation
  4. Ensures data is synced to disk
Template:
pub struct SafeFileWriter {
    // Your fields here
}

impl SafeFileWriter {
    pub async fn new(path: impl AsRef<Path>) -> io::Result<Self> {
        todo!()
    }
    
    pub async fn write(&mut self, data: &[u8]) -> io::Result<()> {
        todo!()
    }
    
    pub async fn commit(self) -> io::Result<()> {
        todo!()
    }
}

// Test with timeout
#[tokio::test]
async fn test_cancellation() {
    let writer = SafeFileWriter::new("test.txt").await.unwrap();
    
    // Simulate cancellation
    let result = tokio::time::timeout(
        Duration::from_millis(10),
        async {
            for i in 0..1000 {
                writer.write(format!("Line {}\n", i).as_bytes()).await.unwrap();
            }
            writer.commit().await
        },
    ).await;
    
    // File should not exist (cancelled before commit)
    assert!(!Path::new("test.txt").exists());
}
Hints:
  • Use Path::with_extension for temp file
  • Remember to sync before rename
  • Drop guard should clean up temp file

Intermediate: Task Group with Coordinated Cancellation

Build a task group that can cancel all tasks together.

Task: Create TaskGroup that:
  1. Spawns multiple tasks with shared cancellation
  2. Waits for all tasks to complete gracefully
  3. Supports adding tasks after creation
  4. Reports which tasks completed vs cancelled
Template:
pub struct TaskGroup {
    // Your fields here
}

impl TaskGroup {
    pub fn new() -> Self {
        todo!()
    }
    
    pub fn spawn<F>(&mut self, name: String, task: F)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        todo!()
    }
    
    pub async fn cancel_all(&self) {
        todo!()
    }
    
    pub async fn wait(self) -> TaskGroupResult {
        todo!()
    }
}

pub struct TaskGroupResult {
    pub completed: Vec<String>,
    pub cancelled: Vec<String>,
}

// Test
#[tokio::test]
async fn test_task_group() {
    let mut group = TaskGroup::new();
    
    group.spawn("fast".to_string(), async {
        tokio::time::sleep(Duration::from_millis(100)).await;
    });
    
    group.spawn("slow".to_string(), async {
        tokio::time::sleep(Duration::from_secs(10)).await;
    });
    
    tokio::time::sleep(Duration::from_millis(200)).await;
    group.cancel_all().await;
    
    let result = group.wait().await;
    assert_eq!(result.completed.len(), 1);
    assert_eq!(result.cancelled.len(), 1);
}
Hints:
  • Use CancellationToken for coordination
  • Store JoinHandle for each task
  • Tasks should check token in their loops

Advanced: Resumable Long-Running Computation

Create a computation that can be cancelled and resumed.

Task: Implement ResumableTask that:
  1. Processes items in batches
  2. Saves checkpoints periodically
  3. Can be cancelled mid-processing
  4. Resumes from last checkpoint
  5. Reports progress in real-time
Template:
pub struct ResumableTask<T> {
    // Your fields here
}

impl<T> ResumableTask<T>
where
    T: Serialize + DeserializeOwned,
{
    pub fn new(
        items: Vec<T>,
        checkpoint_path: PathBuf,
        cancel_token: CancellationToken,
    ) -> Self {
        todo!()
    }
    
    pub async fn process<F, R>(
        &mut self,
        processor: F,
    ) -> Result<Vec<R>, TaskError>
    where
        F: Fn(T) -> Future<Output = R>,
        R: Serialize + DeserializeOwned,
    {
        todo!()
    }
    
    pub fn progress(&self) -> f64 {
        todo!()
    }
}

#[derive(Debug, Error)]
pub enum TaskError {
    #[error("Task cancelled")]
    Cancelled,
    
    #[error("IO error: {0}")]
    Io(#[from] io::Error),
}

// Test
#[tokio::test]
async fn test_resumable_task() {
    let items: Vec<u64> = (0..1000).collect();
    let cancel = CancellationToken::new();
    
    let mut task = ResumableTask::new(
        items.clone(),
        PathBuf::from("test.checkpoint"),
        cancel.clone(),
    );
    
    // Start processing
    let handle = tokio::spawn(async move {
        task.process(|x| async move { x * 2 }).await
    });
    
    // Cancel after short time
    tokio::time::sleep(Duration::from_millis(100)).await;
    cancel.cancel();
    
    let result = handle.await.unwrap();
    assert!(matches!(result, Err(TaskError::Cancelled)));
    
    // Resume
    let mut task2 = ResumableTask::new(
        items,
        PathBuf::from("test.checkpoint"),
        CancellationToken::new(),
    );
    
    let result = task2.process(|x| async move { x * 2 }).await.unwrap();
    assert_eq!(result.len(), 1000);
}
Hints:
  • Checkpoint every N items
  • Check cancellation between batches
  • Load checkpoint in constructor
  • Use serde_json for serialization

---

Real-World Usage

Tokio Timeout

use tokio::time::{timeout, Duration};

// Basic timeout
let result = timeout(
    Duration::from_secs(5),
    expensive_operation(),
).await;

match result {
    Ok(value) => println!("Success: {:?}", value),
    Err(_) => println!("Operation timed out"),
}

// Timeout with fallback
let value = timeout(
    Duration::from_secs(5),
    fetch_from_primary(),
)
.await
.unwrap_or_else(|_| fetch_from_cache());

CancellationToken

use tokio_util::sync::CancellationToken;

// Parent-child relationship
let parent = CancellationToken::new();
let child1 = parent.child_token();
let child2 = parent.child_token();

// Spawn workers
tokio::spawn(worker(child1));
tokio::spawn(worker(child2));

// Cancel all
parent.cancel();

async fn worker(cancel: CancellationToken) {
    loop {
        tokio::select! {
            _ = cancel.cancelled() => break,
            _ = do_work() => {}
        }
    }
}

Axum Request Cancellation

use axum::{extract::Request, response::Response};

async fn handler(req: Request) -> Response {
    // Axum automatically cancels handler if client disconnects
    
    tokio::select! {
        result = long_operation() => {
            // Normal completion
            Response::new(result.into())
        }
        _ = req.body().on_upgrade() => {
            // Client disconnected
            Response::new("Cancelled".into())
        }
    }
}

Tonic gRPC Cancellation

use tonic::{Request, Response, Status};

async fn process_stream(
    request: Request<StreamRequest>,
) -> Result<Response<StreamResponse>, Status> {
    // Tonic provides cancellation via request extensions
    let cancel = request.extensions().get::<CancellationToken>().cloned();
    
    tokio::select! {
        result = process_data() => Ok(Response::new(result)),
        _ = cancel.cancelled() => Err(Status::cancelled("Request cancelled")),
    }
}

async-scoped for Structured Concurrency

use async_scoped::TokioScope;

async fn parallel_with_scope() -> Result<(), Box<dyn std::error::Error>> {
    TokioScope::scope_and_block(|s| {
        s.spawn(async {
            println!("Task 1");
        });
        
        s.spawn(async {
            println!("Task 2");
        });
        
        // All tasks automatically cancelled if scope exits early
    });
    
    Ok(())
}

---

Further Reading

  1. Tokio Documentation
  1. Blog Posts
  1. Libraries
  1. Academic Papers
  • "Structured Concurrency" by Martin Sústrik
  • "Async Drop" RFC #1850
  1. Production Examples
  • Linkerd2-proxy cancellation handling
  • Vector pipeline shutdown
  • Axum graceful shutdown examples

---

Summary

Cancellation safety is critical for building reliable async systems:

Key Takeaways:
  • Futures can be cancelled at any .await point
  • Use RAII guards to ensure cleanup
  • Prefer atomic operations and explicit commit points
  • CancellationToken provides structured cancellation
  • Always consider what happens on cancellation
Best Practices:
  1. Make operations cancellation-safe by default
  2. Use atomic commits for multi-step operations
  3. Implement explicit cleanup methods (not just Drop)
  4. Test cancellation paths thoroughly
  5. Document cancellation behavior

Mastering cancellation safety is the difference between toy async code and production-ready systems that handle timeouts, shutdowns, and failures gracefully.

🎮 Try it Yourself

🎮

Cancellation & Timeouts - Playground

Run this code in the official Rust Playground