Proper cleanup patterns
.await point without corrupting state or leaking resources.
In async Rust, futures can be cancelled at any time by dropping them. This happens with:
tokio::select! when another branch completes firsttokio::time::timeout when time expiresJoinHandleCancellationTokenuse tokio::fs::File;
use tokio::io::AsyncWriteExt;
// UNSAFE: Not cancellation-safe!
async fn write_user_data(id: u64, data: &[u8]) -> std::io::Result<()> {
let mut file = File::create(format!("user_{}.data", id)).await?;
// If cancelled here, file is created but empty
file.write_all(b"HEADER:").await?;
// If cancelled here, file has header but incomplete data
file.write_all(data).await?;
// If cancelled here, data written but not synced to disk
file.sync_all().await?;
Ok(())
}
// SAFE: Cancellation-safe with atomic rename
async fn write_user_data_safe(id: u64, data: &[u8]) -> std::io::Result<()> {
use tokio::fs;
let tmp_path = format!("user_{}.tmp", id);
let final_path = format!("user_{}.data", id);
// Write to temporary file
let mut file = File::create(&tmp_path).await?;
file.write_all(b"HEADER:").await?;
file.write_all(data).await?;
file.sync_all().await?;
drop(file); // Close before rename
// Atomic rename - if cancelled before this, no corrupt file
fs::rename(&tmp_path, &final_path).await?;
Ok(())
}
Futures can only be cancelled at .await points:
async fn cancellation_points() {
// Non-cancellable: runs to completion
let x = expensive_computation();
// CANCELLATION POINT: Can be dropped here
some_async_operation().await;
// Non-cancellable: runs to completion
let y = more_computation(x);
// CANCELLATION POINT: Can be dropped here
another_async_operation().await;
}
Key insight: Between .await points, code runs to completion. This is why blocking operations in async code are problematic - they can't be cancelled.
---
Ensuring database transactions rollback on cancellation is critical for data consistency.
use sqlx::{PgPool, Postgres, Transaction};
use std::ops::{Deref, DerefMut};
/// RAII guard that ensures transaction rollback on drop
struct TransactionGuard<'c> {
tx: Option<Transaction<'c, Postgres>>,
committed: bool,
}
impl<'c> TransactionGuard<'c> {
async fn new(pool: &PgPool) -> Result<Self, sqlx::Error> {
let tx = pool.begin().await?;
Ok(Self {
tx: Some(tx),
committed: false,
})
}
async fn commit(mut self) -> Result<(), sqlx::Error> {
if let Some(tx) = self.tx.take() {
tx.commit().await?;
self.committed = true;
}
Ok(())
}
}
impl<'c> Deref for TransactionGuard<'c> {
type Target = Transaction<'c, Postgres>;
fn deref(&self) -> &Self::Target {
self.tx.as_ref().unwrap()
}
}
impl<'c> DerefMut for TransactionGuard<'c> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.tx.as_mut().unwrap()
}
}
impl<'c> Drop for TransactionGuard<'c> {
fn drop(&mut self) {
if !self.committed && self.tx.is_some() {
// Transaction will be rolled back when dropped
// This is synchronous - the connection will rollback
// the transaction when the connection is returned to the pool
eprintln!("Warning: Transaction rolled back due to cancellation");
}
}
}
/// Transfer money between accounts with cancellation safety
async fn transfer_money(
pool: &PgPool,
from_account: i64,
to_account: i64,
amount: i64,
) -> Result<(), sqlx::Error> {
let mut tx = TransactionGuard::new(pool).await?;
// If cancelled during these operations, transaction will rollback
sqlx::query("UPDATE accounts SET balance = balance - $1 WHERE id = $2")
.bind(amount)
.bind(from_account)
.execute(&mut *tx)
.await?;
sqlx::query("UPDATE accounts SET balance = balance + $1 WHERE id = $2")
.bind(amount)
.bind(to_account)
.execute(&mut *tx)
.await?;
// Explicit commit - only here does the transaction become permanent
tx.commit().await?;
Ok(())
}
/// Web handler with timeout
async fn handle_transfer_request(
pool: PgPool,
from: i64,
to: i64,
amount: i64,
) -> Result<String, String> {
use tokio::time::{timeout, Duration};
// 5-second timeout - if exceeded, transaction rolls back automatically
match timeout(
Duration::from_secs(5),
transfer_money(&pool, from, to, amount),
).await {
Ok(Ok(())) => Ok("Transfer completed".to_string()),
Ok(Err(e)) => Err(format!("Database error: {}", e)),
Err(_) => {
// Timeout - transaction was automatically rolled back
Err("Transfer timed out and was rolled back".to_string())
}
}
}
Why this works:
TransactionGuard uses RAII to ensure cleanupDrop implementation handles rollback automaticallyDropcommit() is the only way to persist changesFile operations are inherently cancellation-unsafe. Partial writes can corrupt data.
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncWriteExt, BufWriter};
use std::path::{Path, PathBuf};
use std::io;
/// Cancellation-safe file writer using atomic rename
pub struct AtomicFileWriter {
temp_path: PathBuf,
final_path: PathBuf,
writer: Option<BufWriter<File>>,
}
impl AtomicFileWriter {
pub async fn new(path: impl AsRef<Path>) -> io::Result<Self> {
let final_path = path.as_ref().to_path_buf();
let temp_path = final_path.with_extension("tmp");
let file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&temp_path)
.await?;
let writer = BufWriter::new(file);
Ok(Self {
temp_path,
final_path,
writer: Some(writer),
})
}
pub async fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
self.writer.as_mut().unwrap().write_all(data).await
}
/// Commit changes atomically
pub async fn commit(mut self) -> io::Result<()> {
if let Some(mut writer) = self.writer.take() {
// Flush all data to disk
writer.flush().await?;
// Sync to ensure durability
writer.get_ref().sync_all().await?;
// Close file before rename
drop(writer);
// Atomic rename - this is the commit point
tokio::fs::rename(&self.temp_path, &self.final_path).await?;
}
Ok(())
}
}
impl Drop for AtomicFileWriter {
fn drop(&mut self) {
if self.writer.is_some() {
// Cleanup temporary file on cancellation
// We can't do async work in Drop, so we just log
eprintln!(
"Warning: AtomicFileWriter dropped without commit, \
temp file may remain: {:?}",
self.temp_path
);
// In production, you'd spawn a cleanup task:
// tokio::spawn(async move {
// let _ = tokio::fs::remove_file(temp_path).await;
// });
}
}
}
/// Configuration file writer with timeout
pub async fn write_config_with_timeout<T: serde::Serialize>(
path: impl AsRef<Path>,
config: &T,
timeout_secs: u64,
) -> Result<(), Box<dyn std::error::Error>> {
use tokio::time::{timeout, Duration};
let write_op = async {
let mut writer = AtomicFileWriter::new(path).await?;
let json = serde_json::to_string_pretty(config)?;
writer.write_all(json.as_bytes()).await?;
writer.commit().await?;
Ok::<_, Box<dyn std::error::Error>>(())
};
timeout(Duration::from_secs(timeout_secs), write_op)
.await
.map_err(|_| "Config write timed out")??;
Ok(())
}
/// Example: Multi-file update with rollback on cancellation
pub struct MultiFileTransaction {
files: Vec<(PathBuf, AtomicFileWriter)>,
}
impl MultiFileTransaction {
pub fn new() -> Self {
Self { files: Vec::new() }
}
pub async fn add_file(
&mut self,
path: impl AsRef<Path>,
content: &[u8],
) -> io::Result<()> {
let path = path.as_ref().to_path_buf();
let mut writer = AtomicFileWriter::new(&path).await?;
writer.write_all(content).await?;
self.files.push((path, writer));
Ok(())
}
/// Commit all files atomically (as much as possible)
pub async fn commit(self) -> io::Result<()> {
// Commit all files
for (_, writer) in self.files {
writer.commit().await?;
}
Ok(())
}
}
impl Drop for MultiFileTransaction {
fn drop(&mut self) {
if !self.files.is_empty() {
eprintln!(
"Warning: MultiFileTransaction dropped without commit, \
{} files not written",
self.files.len()
);
}
}
}
Key patterns:
Network operations need timeouts to prevent hanging indefinitely.
use reqwest::{Client, Response};
use std::time::Duration;
use tokio::time::timeout;
use std::sync::Arc;
use tokio::sync::Semaphore;
/// HTTP client with built-in timeout and cancellation
pub struct TimeoutClient {
client: Client,
default_timeout: Duration,
max_concurrent: Arc<Semaphore>,
}
impl TimeoutClient {
pub fn new(default_timeout: Duration, max_concurrent: usize) -> Self {
Self {
client: Client::builder()
.pool_max_idle_per_host(10)
.build()
.unwrap(),
default_timeout,
max_concurrent: Arc::new(Semaphore::new(max_concurrent)),
}
}
/// Make a GET request with timeout
pub async fn get_with_timeout(
&self,
url: &str,
timeout_duration: Option<Duration>,
) -> Result<Response, RequestError> {
// Acquire semaphore permit (limits concurrent requests)
let _permit = self.max_concurrent.acquire().await.unwrap();
let timeout_duration = timeout_duration.unwrap_or(self.default_timeout);
// The actual request with timeout
let result = timeout(timeout_duration, async {
self.client
.get(url)
.send()
.await
.map_err(RequestError::Network)?
.error_for_status()
.map_err(RequestError::Http)
})
.await;
match result {
Ok(res) => res,
Err(_) => Err(RequestError::Timeout),
}
// Permit is automatically dropped here, releasing the slot
}
/// Make multiple requests with individual timeouts
pub async fn get_many(
&self,
urls: &[String],
per_request_timeout: Duration,
) -> Vec<Result<Response, RequestError>> {
use futures::stream::{self, StreamExt};
// Process requests concurrently
stream::iter(urls)
.map(|url| self.get_with_timeout(url, Some(per_request_timeout)))
.buffer_unordered(10) // Max 10 concurrent
.collect()
.await
}
}
#[derive(Debug, thiserror::Error)]
pub enum RequestError {
#[error("Request timed out")]
Timeout,
#[error("Network error: {0}")]
Network(#[from] reqwest::Error),
#[error("HTTP error: {0}")]
Http(reqwest::Error),
}
/// Circuit breaker pattern with cancellation
pub struct CircuitBreaker {
client: TimeoutClient,
failure_count: Arc<tokio::sync::Mutex<u32>>,
failure_threshold: u32,
timeout: Duration,
}
impl CircuitBreaker {
pub fn new(timeout: Duration, failure_threshold: u32) -> Self {
Self {
client: TimeoutClient::new(timeout, 100),
failure_count: Arc::new(tokio::sync::Mutex::new(0)),
failure_threshold,
timeout,
}
}
pub async fn call(&self, url: &str) -> Result<Response, RequestError> {
let failures = *self.failure_count.lock().await;
if failures >= self.failure_threshold {
return Err(RequestError::Http(
reqwest::Error::new(
reqwest::StatusCode::SERVICE_UNAVAILABLE,
"Circuit breaker open",
),
));
}
match self.client.get_with_timeout(url, Some(self.timeout)).await {
Ok(resp) => {
// Reset on success
*self.failure_count.lock().await = 0;
Ok(resp)
}
Err(e) => {
// Increment failure count
*self.failure_count.lock().await += 1;
Err(e)
}
}
}
}
/// Example: API aggregation with timeout
pub async fn aggregate_user_data(
client: &TimeoutClient,
user_id: u64,
) -> Result<UserData, Box<dyn std::error::Error>> {
use tokio::time::timeout;
// Aggregate from multiple services with overall timeout
let aggregate_op = async {
// Parallel requests with individual timeouts
let (profile, posts, friends) = tokio::try_join!(
async {
let resp = client
.get_with_timeout(
&format!("https://api.example.com/users/{}", user_id),
Some(Duration::from_secs(2)),
)
.await?;
resp.json::<Profile>().await.map_err(RequestError::from)
},
async {
let resp = client
.get_with_timeout(
&format!("https://api.example.com/users/{}/posts", user_id),
Some(Duration::from_secs(3)),
)
.await?;
resp.json::<Vec<Post>>().await.map_err(RequestError::from)
},
async {
let resp = client
.get_with_timeout(
&format!("https://api.example.com/users/{}/friends", user_id),
Some(Duration::from_secs(2)),
)
.await?;
resp.json::<Vec<UserId>>().await.map_err(RequestError::from)
},
)?;
Ok::<_, RequestError>(UserData {
profile,
posts,
friends,
})
};
// Overall timeout of 5 seconds for all operations
timeout(Duration::from_secs(5), aggregate_op)
.await
.map_err(|_| "Overall aggregation timed out")??;
Ok(UserData {
profile: Profile::default(),
posts: vec![],
friends: vec![],
})
}
#[derive(Default)]
struct UserData {
profile: Profile,
posts: Vec<Post>,
friends: Vec<UserId>,
}
#[derive(Default, serde::Deserialize)]
struct Profile {}
#[derive(serde::Deserialize)]
struct Post {}
#[derive(serde::Deserialize)]
struct UserId(u64);
Key features:
Coordinated shutdown of multiple tasks using CancellationToken.
use tokio_util::sync::CancellationToken;
use tokio::time::{sleep, Duration};
use std::sync::Arc;
/// Background worker with graceful shutdown
pub struct BackgroundWorker {
name: String,
cancel_token: CancellationToken,
handle: Option<tokio::task::JoinHandle<()>>,
}
impl BackgroundWorker {
pub fn new(name: impl Into<String>, cancel_token: CancellationToken) -> Self {
Self {
name: name.into(),
cancel_token,
handle: None,
}
}
/// Start the worker
pub fn start<F, Fut>(&mut self, work: F)
where
F: FnOnce(CancellationToken) -> Fut + Send + 'static,
Fut: std::future::Future<Output = ()> + Send,
{
let name = self.name.clone();
let token = self.cancel_token.clone();
let handle = tokio::spawn(async move {
println!("[{}] Starting", name);
work(token).await;
println!("[{}] Shutdown complete", name);
});
self.handle = Some(handle);
}
/// Wait for graceful shutdown
pub async fn join(self) {
if let Some(handle) = self.handle {
let _ = handle.await;
}
}
}
/// Application with coordinated shutdown
pub struct Application {
root_token: CancellationToken,
workers: Vec<BackgroundWorker>,
}
impl Application {
pub fn new() -> Self {
Self {
root_token: CancellationToken::new(),
workers: Vec::new(),
}
}
/// Add a worker to the application
pub fn add_worker<F, Fut>(&mut self, name: impl Into<String>, work: F)
where
F: FnOnce(CancellationToken) -> Fut + Send + 'static,
Fut: std::future::Future<Output = ()> + Send,
{
let child_token = self.root_token.child_token();
let mut worker = BackgroundWorker::new(name, child_token.clone());
worker.start(work);
self.workers.push(worker);
}
/// Run application with graceful shutdown
pub async fn run(self) {
use tokio::signal;
// Wait for shutdown signal
tokio::select! {
_ = signal::ctrl_c() => {
println!("\nReceived Ctrl+C, initiating graceful shutdown...");
}
_ = self.root_token.cancelled() => {
println!("Shutdown requested programmatically");
}
}
// Cancel all workers
self.root_token.cancel();
// Wait for all workers to finish
for worker in self.workers {
worker.join().await;
}
println!("All workers shut down gracefully");
}
/// Get a handle to trigger shutdown
pub fn shutdown_handle(&self) -> CancellationToken {
self.root_token.clone()
}
}
/// Example workers
async fn database_worker(cancel: CancellationToken) {
loop {
tokio::select! {
_ = cancel.cancelled() => {
println!("[DB] Closing connections...");
sleep(Duration::from_millis(500)).await;
break;
}
_ = sleep(Duration::from_secs(1)) => {
println!("[DB] Processing batch...");
}
}
}
}
async fn http_server_worker(cancel: CancellationToken) {
loop {
tokio::select! {
_ = cancel.cancelled() => {
println!("[HTTP] Draining active connections...");
sleep(Duration::from_millis(1000)).await;
break;
}
_ = sleep(Duration::from_millis(500)) => {
println!("[HTTP] Handling request...");
}
}
}
}
async fn metrics_worker(cancel: CancellationToken) {
loop {
tokio::select! {
_ = cancel.cancelled() => {
println!("[Metrics] Flushing final metrics...");
sleep(Duration::from_millis(200)).await;
break;
}
_ = sleep(Duration::from_secs(10)) => {
println!("[Metrics] Reporting metrics...");
}
}
}
}
/// Example: Complex service with task hierarchy
pub async fn run_service() {
let mut app = Application::new();
// Add workers
app.add_worker("database", database_worker);
app.add_worker("http-server", http_server_worker);
app.add_worker("metrics", metrics_worker);
// Could add more nested workers
let shutdown = app.shutdown_handle();
app.add_worker("health-check", move |cancel| async move {
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = sleep(Duration::from_secs(30)) => {
println!("[Health] Checking system health...");
// If unhealthy, could trigger shutdown:
// shutdown.cancel();
}
}
}
});
// Run until shutdown signal
app.run().await;
}
Key patterns:
Making expensive computations cancellation-aware with progress saving.
use std::sync::Arc;
use tokio::sync::{Mutex, watch};
use tokio::time::{sleep, Duration, Instant};
use tokio_util::sync::CancellationToken;
/// Progress tracker for long-running operations
#[derive(Clone)]
pub struct Progress {
current: Arc<Mutex<usize>>,
total: usize,
sender: watch::Sender<f64>,
}
impl Progress {
pub fn new(total: usize) -> (Self, watch::Receiver<f64>) {
let (sender, receiver) = watch::channel(0.0);
(
Self {
current: Arc::new(Mutex::new(0)),
total,
sender,
},
receiver,
)
}
pub async fn update(&self, current: usize) {
*self.current.lock().await = current;
let percent = (current as f64 / self.total as f64) * 100.0;
let _ = self.sender.send(percent);
}
pub async fn get(&self) -> usize {
*self.current.lock().await
}
}
/// Checkpoint for resumable computation
#[derive(serde::Serialize, serde::Deserialize)]
pub struct Checkpoint {
processed: usize,
partial_results: Vec<u64>,
timestamp: u64,
}
/// Cancellation-aware computation with checkpointing
pub struct ResumableComputation {
checkpoint_path: String,
progress: Progress,
cancel_token: CancellationToken,
}
impl ResumableComputation {
pub fn new(
checkpoint_path: String,
total_items: usize,
cancel_token: CancellationToken,
) -> (Self, watch::Receiver<f64>) {
let (progress, receiver) = Progress::new(total_items);
(
Self {
checkpoint_path,
progress,
cancel_token,
},
receiver,
)
}
/// Load checkpoint if exists
async fn load_checkpoint(&self) -> Option<Checkpoint> {
match tokio::fs::read_to_string(&self.checkpoint_path).await {
Ok(data) => serde_json::from_str(&data).ok(),
Err(_) => None,
}
}
/// Save checkpoint
async fn save_checkpoint(&self, checkpoint: &Checkpoint) -> std::io::Result<()> {
let data = serde_json::to_string(checkpoint)?;
tokio::fs::write(&self.checkpoint_path, data).await
}
/// Process items with cancellation checks and checkpointing
pub async fn process_items(
&self,
items: Vec<u64>,
) -> Result<Vec<u64>, ComputeError> {
let mut results = Vec::new();
let mut start_idx = 0;
// Try to resume from checkpoint
if let Some(checkpoint) = self.load_checkpoint().await {
println!(
"Resuming from checkpoint: {} items processed",
checkpoint.processed
);
results = checkpoint.partial_results;
start_idx = checkpoint.processed;
self.progress.update(start_idx).await;
}
let checkpoint_interval = 100; // Checkpoint every 100 items
let cancellation_check_interval = 10; // Check cancellation every 10 items
for (idx, item) in items.iter().enumerate().skip(start_idx) {
// Check for cancellation periodically
if idx % cancellation_check_interval == 0 {
if self.cancel_token.is_cancelled() {
println!("Computation cancelled, saving checkpoint...");
let checkpoint = Checkpoint {
processed: idx,
partial_results: results.clone(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};
self.save_checkpoint(&checkpoint).await?;
return Err(ComputeError::Cancelled);
}
}
// Simulate expensive computation
let result = expensive_operation(*item).await;
results.push(result);
// Update progress
self.progress.update(idx + 1).await;
// Periodic checkpoint
if idx % checkpoint_interval == 0 {
let checkpoint = Checkpoint {
processed: idx + 1,
partial_results: results.clone(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};
self.save_checkpoint(&checkpoint).await?;
}
}
// Clean up checkpoint on success
let _ = tokio::fs::remove_file(&self.checkpoint_path).await;
Ok(results)
}
}
async fn expensive_operation(item: u64) -> u64 {
// Simulate expensive work
sleep(Duration::from_millis(10)).await;
item * 2
}
#[derive(Debug, thiserror::Error)]
pub enum ComputeError {
#[error("Computation cancelled")]
Cancelled,
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
}
/// Example: Running computation with timeout and cancellation
pub async fn run_with_timeout_and_resume() -> Result<(), Box<dyn std::error::Error>> {
let items: Vec<u64> = (0..1000).collect();
let cancel_token = CancellationToken::new();
let (computation, mut progress_rx) = ResumableComputation::new(
"computation.checkpoint".to_string(),
items.len(),
cancel_token.clone(),
);
// Spawn progress reporter
let progress_task = tokio::spawn(async move {
while progress_rx.changed().await.is_ok() {
let progress = *progress_rx.borrow();
println!("Progress: {:.1}%", progress);
}
});
// Run with timeout
let compute_task = tokio::spawn(async move {
computation.process_items(items).await
});
// Simulate timeout after 2 seconds
tokio::select! {
result = compute_task => {
match result? {
Ok(results) => {
println!("Computation completed: {} results", results.len());
Ok(())
}
Err(ComputeError::Cancelled) => {
println!("Computation was cancelled and checkpointed");
Ok(())
}
Err(e) => Err(Box::new(e) as Box<dyn std::error::Error>),
}
}
_ = sleep(Duration::from_secs(2)) => {
println!("Timeout reached, cancelling computation...");
cancel_token.cancel();
// Wait for cleanup
sleep(Duration::from_millis(500)).await;
println!("Can resume later by running again");
Ok(())
}
}?;
progress_task.abort();
Ok(())
}
Key features:
.await points---
Futures in Rust are lazy and can be dropped at any .await point:
use tokio::time::{sleep, Duration};
async fn can_be_cancelled() {
println!("Starting...");
// This runs to completion (no .await)
let x = expensive_sync_work();
// CANCELLATION POINT
// Future can be dropped here
sleep(Duration::from_secs(1)).await;
// If cancelled above, this never runs
println!("Finished: {}", x);
}
fn expensive_sync_work() -> u32 {
// This CANNOT be cancelled once started
// Runs to completion atomically
(0..1000000).sum()
}
The Drop trait is synchronous - you cannot .await in it:
use tokio::fs::File;
struct AsyncResource {
file: Option<File>,
}
impl Drop for AsyncResource {
fn drop(&mut self) {
// CANNOT DO THIS:
// self.file.close().await; // Error: cannot await in Drop
// Drop is synchronous - file is closed when dropped
// but there's no guarantee of when that happens
if self.file.is_some() {
eprintln!("Warning: AsyncResource dropped without explicit cleanup");
}
}
}
// BETTER: Explicit async cleanup
impl AsyncResource {
async fn close(mut self) -> std::io::Result<()> {
if let Some(file) = self.file.take() {
// Explicit async cleanup
drop(file);
}
Ok(())
}
}
// Safe: No state mutation until completion
async fn read_file(path: &str) -> std::io::Result<String> {
tokio::fs::read_to_string(path).await
}
// Safe: Idempotent operation
async fn get_user(id: u64) -> Result<User, Error> {
reqwest::get(&format!("https://api.example.com/users/{}", id))
.await?
.json()
.await
}
// Safe: Atomic operation
async fn increment_counter(counter: &AtomicU64) {
counter.fetch_add(1, Ordering::SeqCst);
}
Cancellation-unsafe operations:
// UNSAFE: Partial state mutation
async fn transfer_unsafe(from: &mut Account, to: &mut Account, amount: u64) {
from.balance -= amount; // If cancelled here, money disappears!
// CANCELLATION POINT
expensive_logging().await;
to.balance += amount; // This might not run
}
// UNSAFE: Multi-step file operation
async fn update_file_unsafe(path: &str, data: &[u8]) -> std::io::Result<()> {
let mut file = File::create(path).await?; // File created
// If cancelled here, empty file exists
file.write_all(data).await?; // Partial data written
// If cancelled here, incomplete data
file.sync_all().await?; // Not synced to disk
Ok(())
}
// UNSAFE: Acquiring without releasing
async fn deadlock_risk(mutex: &Mutex<State>) {
let guard = mutex.lock().await;
// If cancelled here, lock might not be released properly
expensive_operation(&guard).await;
// guard dropped here
}
async fn transfer_safe(
from: &Mutex<Account>,
to: &Mutex<Account>,
amount: u64,
) -> Result<(), Error> {
// All validation first (no state mutation)
let mut from_guard = from.lock().await;
let mut to_guard = to.lock().await;
if from_guard.balance < amount {
return Err(Error::InsufficientFunds);
}
// Do expensive work before mutation
expensive_validation().await?;
// ATOMIC SECTION: No .await between mutations
from_guard.balance -= amount;
to_guard.balance += amount;
// End atomic section
Ok(())
}
Pattern 2: RAII Guards
struct Guard<F: FnOnce()> {
cleanup: Option<F>,
}
impl<F: FnOnce()> Guard<F> {
fn new(cleanup: F) -> Self {
Self {
cleanup: Some(cleanup),
}
}
fn disarm(mut self) {
self.cleanup = None;
}
}
impl<F: FnOnce()> Drop for Guard<F> {
fn drop(&mut self) {
if let Some(cleanup) = self.cleanup.take() {
cleanup();
}
}
}
async fn with_guard() -> Result<(), Error> {
let resource = acquire_resource().await?;
// Guard ensures cleanup even on cancellation
let _guard = Guard::new(|| {
// This runs on drop, even if cancelled
eprintln!("Cleaning up resource");
});
// If cancelled here, guard drops and cleanup runs
use_resource(&resource).await?;
Ok(())
}
Pattern 3: Try-Commit-Cancel
async fn try_commit_cancel<T, E>(
operation: impl Future<Output = Result<T, E>>,
on_cancel: impl FnOnce(),
) -> Result<T, E> {
struct CancelGuard<F: FnOnce()>(Option<F>);
impl<F: FnOnce()> Drop for CancelGuard<F> {
fn drop(&mut self) {
if let Some(f) = self.0.take() {
f();
}
}
}
let mut guard = CancelGuard(Some(on_cancel));
let result = operation.await?;
guard.0 = None; // Disarm on success
Ok(result)
}
// Usage
async fn create_user(db: &Database, user: User) -> Result<UserId, DbError> {
let temp_id = db.insert_temp_user(&user).await?;
try_commit_cancel(
db.commit_user(temp_id),
|| eprintln!("Rollback user creation"),
)
.await
}
Tokio's timeout is implemented using select!:
use tokio::time::{sleep, Duration, Instant};
use std::future::Future;
// Simplified version of tokio::time::timeout
pub async fn timeout<F, T>(
duration: Duration,
future: F,
) -> Result<T, Elapsed>
where
F: Future<Output = T>,
{
tokio::pin!(future);
tokio::select! {
result = &mut future => Ok(result),
_ = sleep(duration) => Err(Elapsed),
}
}
#[derive(Debug)]
pub struct Elapsed;
// Usage
async fn with_timeout() -> Result<String, Box<dyn std::error::Error>> {
let result = timeout(
Duration::from_secs(5),
fetch_data(),
).await?;
Ok(result)
}
async fn fetch_data() -> String {
sleep(Duration::from_secs(10)).await;
"Data".to_string()
}
How it works:
sleep completes first, it returns Err(Elapsed)future is dropped, triggering cancellationfuture run at this pointCancellationToken provides structured cancellation:
use tokio_util::sync::CancellationToken;
pub struct CancellationTokenImpl {
// Simplified implementation
inner: Arc<tokio::sync::Notify>,
}
impl CancellationTokenImpl {
pub fn new() -> Self {
Self {
inner: Arc::new(tokio::sync::Notify::new()),
}
}
pub fn cancel(&self) {
self.inner.notify_waiters();
}
pub async fn cancelled(&self) {
self.inner.notified().await;
}
pub fn is_cancelled(&self) -> bool {
// In real implementation, this checks state
false
}
pub fn child_token(&self) -> Self {
// In real implementation, creates parent-child relationship
Self {
inner: self.inner.clone(),
}
}
}
// Usage pattern
async fn worker(cancel: CancellationToken) {
loop {
tokio::select! {
_ = cancel.cancelled() => {
println!("Cancellation requested");
break;
}
_ = do_work() => {
println!("Work completed");
}
}
}
}
async fn do_work() {
tokio::time::sleep(Duration::from_secs(1)).await;
}
Key features:
Notify internally for wake-up---
// API calls, network operations, user-facing requests
let result = timeout(Duration::from_secs(30), api_call()).await?;
// Server shutdown, worker cleanup, resource release
tokio::select! {
_ = shutdown_signal() => { /* cleanup */ }
_ = server.run() => { /* normal exit */ }
}
// Ensure cleanup even on cancellation
let _guard = Guard::new(|| cleanup_resource());
risky_operation().await?;
// Long-running operations that users can cancel
tokio::select! {
_ = cancel_button_clicked() => { /* save checkpoint */ }
result = long_computation() => { /* complete */ }
}
// Circuit breakers, retry with timeout
for attempt in 1..=3 {
match timeout(Duration::from_secs(5), operation()).await {
Ok(Ok(result)) => return Ok(result),
Ok(Err(e)) => eprintln!("Attempt {} failed: {}", attempt, e),
Err(_) => eprintln!("Attempt {} timed out", attempt),
}
}
// DON'T add timeout if partial completion is dangerous
// Example: Financial transactions that must be atomic
database.commit_transaction().await?; // No timeout!
// DON'T add overhead for operations that always complete quickly
let x = atomic_counter.fetch_add(1, Ordering::SeqCst);
// DON'T add guards if operation is already safe
let data = fs::read_to_string(path).await?; // Already safe
---
// BAD: Drop won't run async cleanup
struct BadAsyncResource {
file: File,
}
impl Drop for BadAsyncResource {
fn drop(&mut self) {
// This doesn't compile - cannot await in Drop
// self.file.sync_all().await;
}
}
// GOOD: Explicit async cleanup method
struct GoodAsyncResource {
file: Option<File>,
}
impl GoodAsyncResource {
async fn close(mut self) -> std::io::Result<()> {
if let Some(mut file) = self.file.take() {
file.sync_all().await?;
}
Ok(())
}
}
impl Drop for GoodAsyncResource {
fn drop(&mut self) {
if self.file.is_some() {
eprintln!("Warning: Resource not properly closed");
}
}
}
// BAD: State can be corrupted on cancellation
async fn update_user_bad(user: &mut User, db: &Database) -> Result<(), Error> {
user.last_login = Utc::now(); // Mutated!
// If cancelled here, memory state doesn't match DB
db.update_user(user).await?;
Ok(())
}
// GOOD: All-or-nothing update
async fn update_user_good(user_id: u64, db: &Database) -> Result<User, Error> {
let mut updated = db.get_user(user_id).await?;
updated.last_login = Utc::now();
// If cancelled before here, no mutation
db.update_user(&updated).await?;
// Only mutate on success
Ok(updated)
}
// BAD: Semaphore permit leaked on cancellation
async fn process_bad(sem: &Semaphore) -> Result<(), Error> {
let permit = sem.acquire().await.unwrap();
// If cancelled here, permit might not be released properly
risky_operation().await?;
drop(permit);
Ok(())
}
// GOOD: RAII ensures release
async fn process_good(sem: &Semaphore) -> Result<(), Error> {
let _permit = sem.acquire().await.unwrap();
// Permit automatically released on drop, even if cancelled
risky_operation().await?;
Ok(())
}
// BAD: Spawned task ignores cancellation
async fn spawn_worker_bad(cancel: CancellationToken) {
tokio::spawn(async {
loop {
do_work().await;
// This never checks cancel token!
}
});
}
// GOOD: Task respects cancellation
async fn spawn_worker_good(cancel: CancellationToken) {
tokio::spawn(async move {
loop {
tokio::select! {
_ = cancel.cancelled() => {
println!("Worker shutting down");
break;
}
_ = do_work() => {}
}
}
});
}
// BAD: Blocking in Drop delays cancellation
impl Drop for BadResource {
fn drop(&mut self) {
// This blocks the executor!
std::thread::sleep(Duration::from_secs(5));
// Expensive synchronous cleanup
self.expensive_cleanup();
}
}
// GOOD: Fast synchronous cleanup, or spawn blocking task
impl Drop for GoodResource {
fn drop(&mut self) {
// Fast cleanup only
self.fast_cleanup();
// For expensive cleanup, spawn a task
// (but be aware this may not complete if process exits)
if self.needs_expensive_cleanup {
let data = self.cleanup_data.clone();
tokio::spawn(async move {
expensive_async_cleanup(data).await;
});
}
}
}
---
Checking for cancellation has overhead:
use std::time::Instant;
async fn benchmark_cancellation_overhead() {
let cancel = CancellationToken::new();
// Baseline: no cancellation checks
let start = Instant::now();
for _ in 0..1_000_000 {
// Work without cancellation check
}
let baseline = start.elapsed();
// With cancellation checks
let start = Instant::now();
for _ in 0..1_000_000 {
if cancel.is_cancelled() {
break;
}
// Same work
}
let with_check = start.elapsed();
println!("Baseline: {:?}", baseline);
println!("With check: {:?}", with_check);
println!("Overhead: {:?}", with_check - baseline);
// Typical overhead: 10-50ns per check
// For 1M iterations: ~10-50ms total
}
Guidelines:
// Memory overhead
struct Overhead {
// CancellationToken: Arc<Inner> = 8 bytes
token: CancellationToken,
// Inner contains:
// - AtomicBool for cancelled state: 1 byte
// - Notify for wake-ups: ~24 bytes
// - Parent reference: 8 bytes
// Total: ~32 bytes per token
}
// Performance comparison
async fn compare_cancellation_methods() {
// Method 1: CancellationToken (structured)
let token = CancellationToken::new();
let start = Instant::now();
for _ in 0..10_000 {
if token.is_cancelled() { break; }
}
println!("CancellationToken: {:?}", start.elapsed());
// Method 2: AtomicBool (manual)
let cancelled = Arc::new(AtomicBool::new(false));
let start = Instant::now();
for _ in 0..10_000 {
if cancelled.load(Ordering::Acquire) { break; }
}
println!("AtomicBool: {:?}", start.elapsed());
// Result: Similar performance, but CancellationToken
// provides better ergonomics and parent-child relationships
}
async fn measure_timeout_overhead() {
use tokio::time::{timeout, Duration, Instant};
// Fast operation without timeout
let start = Instant::now();
for _ in 0..1000 {
fast_operation().await;
}
let without_timeout = start.elapsed();
// Fast operation with timeout
let start = Instant::now();
for _ in 0..1000 {
let _ = timeout(Duration::from_secs(1), fast_operation()).await;
}
let with_timeout = start.elapsed();
println!("Without timeout: {:?}", without_timeout);
println!("With timeout: {:?}", with_timeout);
// Overhead: ~100-500ns per timeout call
// Reason: Setting up timer, select! machinery
}
async fn fast_operation() {
// Completes immediately
}
Guidelines:
---
Implement a file writer that ensures atomicity even on cancellation.
Task: CreateSafeFileWriter that:
pub struct SafeFileWriter {
// Your fields here
}
impl SafeFileWriter {
pub async fn new(path: impl AsRef<Path>) -> io::Result<Self> {
todo!()
}
pub async fn write(&mut self, data: &[u8]) -> io::Result<()> {
todo!()
}
pub async fn commit(self) -> io::Result<()> {
todo!()
}
}
// Test with timeout
#[tokio::test]
async fn test_cancellation() {
let writer = SafeFileWriter::new("test.txt").await.unwrap();
// Simulate cancellation
let result = tokio::time::timeout(
Duration::from_millis(10),
async {
for i in 0..1000 {
writer.write(format!("Line {}\n", i).as_bytes()).await.unwrap();
}
writer.commit().await
},
).await;
// File should not exist (cancelled before commit)
assert!(!Path::new("test.txt").exists());
}
Hints:
Path::with_extension for temp fileBuild a task group that can cancel all tasks together.
Task: CreateTaskGroup that:
pub struct TaskGroup {
// Your fields here
}
impl TaskGroup {
pub fn new() -> Self {
todo!()
}
pub fn spawn<F>(&mut self, name: String, task: F)
where
F: Future<Output = ()> + Send + 'static,
{
todo!()
}
pub async fn cancel_all(&self) {
todo!()
}
pub async fn wait(self) -> TaskGroupResult {
todo!()
}
}
pub struct TaskGroupResult {
pub completed: Vec<String>,
pub cancelled: Vec<String>,
}
// Test
#[tokio::test]
async fn test_task_group() {
let mut group = TaskGroup::new();
group.spawn("fast".to_string(), async {
tokio::time::sleep(Duration::from_millis(100)).await;
});
group.spawn("slow".to_string(), async {
tokio::time::sleep(Duration::from_secs(10)).await;
});
tokio::time::sleep(Duration::from_millis(200)).await;
group.cancel_all().await;
let result = group.wait().await;
assert_eq!(result.completed.len(), 1);
assert_eq!(result.cancelled.len(), 1);
}
Hints:
CancellationToken for coordinationJoinHandle for each taskCreate a computation that can be cancelled and resumed.
Task: ImplementResumableTask that:
pub struct ResumableTask<T> {
// Your fields here
}
impl<T> ResumableTask<T>
where
T: Serialize + DeserializeOwned,
{
pub fn new(
items: Vec<T>,
checkpoint_path: PathBuf,
cancel_token: CancellationToken,
) -> Self {
todo!()
}
pub async fn process<F, R>(
&mut self,
processor: F,
) -> Result<Vec<R>, TaskError>
where
F: Fn(T) -> Future<Output = R>,
R: Serialize + DeserializeOwned,
{
todo!()
}
pub fn progress(&self) -> f64 {
todo!()
}
}
#[derive(Debug, Error)]
pub enum TaskError {
#[error("Task cancelled")]
Cancelled,
#[error("IO error: {0}")]
Io(#[from] io::Error),
}
// Test
#[tokio::test]
async fn test_resumable_task() {
let items: Vec<u64> = (0..1000).collect();
let cancel = CancellationToken::new();
let mut task = ResumableTask::new(
items.clone(),
PathBuf::from("test.checkpoint"),
cancel.clone(),
);
// Start processing
let handle = tokio::spawn(async move {
task.process(|x| async move { x * 2 }).await
});
// Cancel after short time
tokio::time::sleep(Duration::from_millis(100)).await;
cancel.cancel();
let result = handle.await.unwrap();
assert!(matches!(result, Err(TaskError::Cancelled)));
// Resume
let mut task2 = ResumableTask::new(
items,
PathBuf::from("test.checkpoint"),
CancellationToken::new(),
);
let result = task2.process(|x| async move { x * 2 }).await.unwrap();
assert_eq!(result.len(), 1000);
}
Hints:
serde_json for serialization---
use tokio::time::{timeout, Duration};
// Basic timeout
let result = timeout(
Duration::from_secs(5),
expensive_operation(),
).await;
match result {
Ok(value) => println!("Success: {:?}", value),
Err(_) => println!("Operation timed out"),
}
// Timeout with fallback
let value = timeout(
Duration::from_secs(5),
fetch_from_primary(),
)
.await
.unwrap_or_else(|_| fetch_from_cache());
use tokio_util::sync::CancellationToken;
// Parent-child relationship
let parent = CancellationToken::new();
let child1 = parent.child_token();
let child2 = parent.child_token();
// Spawn workers
tokio::spawn(worker(child1));
tokio::spawn(worker(child2));
// Cancel all
parent.cancel();
async fn worker(cancel: CancellationToken) {
loop {
tokio::select! {
_ = cancel.cancelled() => break,
_ = do_work() => {}
}
}
}
use axum::{extract::Request, response::Response};
async fn handler(req: Request) -> Response {
// Axum automatically cancels handler if client disconnects
tokio::select! {
result = long_operation() => {
// Normal completion
Response::new(result.into())
}
_ = req.body().on_upgrade() => {
// Client disconnected
Response::new("Cancelled".into())
}
}
}
use tonic::{Request, Response, Status};
async fn process_stream(
request: Request<StreamRequest>,
) -> Result<Response<StreamResponse>, Status> {
// Tonic provides cancellation via request extensions
let cancel = request.extensions().get::<CancellationToken>().cloned();
tokio::select! {
result = process_data() => Ok(Response::new(result)),
_ = cancel.cancelled() => Err(Status::cancelled("Request cancelled")),
}
}
use async_scoped::TokioScope;
async fn parallel_with_scope() -> Result<(), Box<dyn std::error::Error>> {
TokioScope::scope_and_block(|s| {
s.spawn(async {
println!("Task 1");
});
s.spawn(async {
println!("Task 2");
});
// All tasks automatically cancelled if scope exits early
});
Ok(())
}
---
tokio-util::sync::CancellationTokenasync-scoped - Structured concurrency---
Cancellation safety is critical for building reliable async systems:
Key Takeaways:.await pointCancellationToken provides structured cancellationMastering cancellation safety is the difference between toy async code and production-ready systems that handle timeouts, shutdowns, and failures gracefully.
Run this code in the official Rust Playground