Home/Async/Await & Futures/Custom Future Implementation

Custom Future Implementation

Building futures from scratch

expert
futureasyncpoll
🎮 Interactive Playground

What is a Custom Future?

A Custom Future is a manual implementation of Rust's Future trait, giving you complete control over async execution. While async fn and async blocks automatically generate Future implementations, understanding custom futures is essential for:

  • Building async primitives (timers, channels, I/O operations)
  • Performance optimization (zero-allocation futures, specialized state machines)
  • Runtime integration (connecting with event loops, wakers)
  • Deep understanding of how async Rust works under the hood

The Future Trait

use std::task::{Context, Poll};
use std::pin::Pin;

pub trait Future {
    type Output;
    
    // The heart of async: polling for completion
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),    // Task completed with result
    Pending,     // Task not ready, wake me later
}
Key concepts:
  • Poll::Ready(T): Future completed, here's the result
  • Poll::Pending: Not ready yet, I've registered your waker
  • Pin<&mut Self>: Ensures self-referential structs stay put in memory
  • Context: Provides access to the Waker for notifications

How Async/Await Compiles to State Machines

When you write:

async fn fetch_user(id: u64) -> User {
    let response = http_get(&format!("/users/{}", id)).await;
    parse_user(response).await
}

The compiler generates roughly:

enum FetchUserFuture {
    // State 0: Initial
    Initial { id: u64 },
    // State 1: Waiting for HTTP response
    WaitingForHttp { 
        id: u64, 
        http_future: HttpFuture 
    },
    // State 2: Waiting for parse
    WaitingForParse { 
        parse_future: ParseFuture 
    },
    // State 3: Complete
    Complete,
}

impl Future for FetchUserFuture {
    type Output = User;
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<User> {
        loop {
            match &mut *self {
                Initial { id } => {
                    // Transition to next state
                    *self = WaitingForHttp { 
                        id: *id, 
                        http_future: http_get(...) 
                    };
                }
                WaitingForHttp { http_future, .. } => {
                    match Pin::new(http_future).poll(cx) {
                        Poll::Ready(response) => {
                            *self = WaitingForParse {
                                parse_future: parse_user(response)
                            };
                        }
                        Poll::Pending => return Poll::Pending,
                    }
                }
                WaitingForParse { parse_future } => {
                    match Pin::new(parse_future).poll(cx) {
                        Poll::Ready(user) => {
                            *self = Complete;
                            return Poll::Ready(user);
                        }
                        Poll::Pending => return Poll::Pending,
                    }
                }
                Complete => panic!("polled after completion"),
            }
        }
    }
}

This state machine transformation is zero-cost - no heap allocations, no dynamic dispatch (unless you box it).

---

Real-World Example 1: Delay Timer Future (Systems Programming)

Problem: Async sleep without spawning threads

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};
use std::sync::{Arc, Mutex};
use std::collections::BinaryHeap;
use std::cmp::Ordering;

/// A future that completes after a specified duration
pub struct Delay {
    /// When this delay should complete
    deadline: Instant,
    /// State shared with the timer wheel
    shared: Arc<Mutex<DelayShared>>,
}

struct DelayShared {
    /// Waker to notify when deadline is reached
    waker: Option<Waker>,
    /// Has the delay been triggered?
    triggered: bool,
}

impl Delay {
    pub fn new(duration: Duration) -> Self {
        let deadline = Instant::now() + duration;
        let shared = Arc::new(Mutex::new(DelayShared {
            waker: None,
            triggered: false,
        }));
        
        // Register with global timer wheel
        TIMER_WHEEL.register(deadline, shared.clone());
        
        Delay { deadline, shared }
    }
}

impl Future for Delay {
    type Output = ();
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        let mut shared = self.shared.lock().unwrap();
        
        if shared.triggered {
            // Timer fired, we're done
            return Poll::Ready(());
        }
        
        // Check if already expired (edge case: executor lag)
        if Instant::now() >= self.deadline {
            shared.triggered = true;
            return Poll::Ready(());
        }
        
        // Store waker for later notification
        shared.waker = Some(cx.waker().clone());
        
        Poll::Pending
    }
}

/// Global timer wheel (simplified - production uses hierarchical wheel)
struct TimerWheel {
    heap: Mutex<BinaryHeap<TimerEntry>>,
}

struct TimerEntry {
    deadline: Instant,
    shared: Arc<Mutex<DelayShared>>,
}

impl PartialEq for TimerEntry {
    fn eq(&self, other: &Self) -> bool {
        self.deadline == other.deadline
    }
}

impl Eq for TimerEntry {}

impl PartialOrd for TimerEntry {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for TimerEntry {
    fn cmp(&self, other: &Self) -> Ordering {
        // Reverse order for min-heap
        other.deadline.cmp(&self.deadline)
    }
}

static TIMER_WHEEL: TimerWheel = TimerWheel {
    heap: Mutex::new(BinaryHeap::new()),
};

impl TimerWheel {
    fn register(&self, deadline: Instant, shared: Arc<Mutex<DelayShared>>) {
        let mut heap = self.heap.lock().unwrap();
        heap.push(TimerEntry { deadline, shared });
    }
    
    /// Called by runtime: process expired timers
    fn process_expired(&self) {
        let now = Instant::now();
        let mut heap = self.heap.lock().unwrap();
        
        while let Some(entry) = heap.peek() {
            if entry.deadline > now {
                break; // No more expired timers
            }
            
            let entry = heap.pop().unwrap();
            let mut shared = entry.shared.lock().unwrap();
            shared.triggered = true;
            
            // Wake the future
            if let Some(waker) = shared.waker.take() {
                waker.wake();
            }
        }
    }
}

// Usage in async code:
async fn rate_limited_operation() {
    for i in 0..10 {
        process_item(i).await;
        // Wait 100ms between operations
        Delay::new(Duration::from_millis(100)).await;
    }
}

async fn timeout_example() {
    use std::future::Future;
    
    let operation = fetch_data();
    let timeout = Delay::new(Duration::from_secs(5));
    
    // Race between operation and timeout
    tokio::select! {
        result = operation => println!("Got result: {:?}", result),
        _ = timeout => println!("Operation timed out"),
    }
}
How it works:
  1. Future creation: Store deadline, register with timer wheel
  2. First poll: Check if already expired, otherwise store waker and return Pending
  3. Timer wheel: Background thread/runtime checks for expired timers
  4. Waker notification: When deadline passes, waker.wake() is called
  5. Re-poll: Executor polls future again, now returns Ready

---

Real-World Example 2: Network I/O Future (Network Programming)

Problem: Non-blocking TCP read with epoll integration

use std::future::Future;
use std::io::{self, Read};
use std::net::TcpStream;
use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use std::sync::{Arc, Mutex};

/// Future for reading from a TCP socket
pub struct TcpReadFuture {
    /// The socket to read from (set to non-blocking mode)
    stream: Arc<Mutex<TcpStream>>,
    /// Buffer to read into
    buffer: Vec<u8>,
    /// State shared with the reactor
    io_state: Arc<Mutex<IoState>>,
}

struct IoState {
    waker: Option<Waker>,
    ready: bool,
}

impl TcpReadFuture {
    pub fn new(stream: Arc<Mutex<TcpStream>>, buffer_size: usize) -> io::Result<Self> {
        // Set socket to non-blocking mode
        {
            let stream_guard = stream.lock().unwrap();
            stream_guard.set_nonblocking(true)?;
        }
        
        let fd = stream.lock().unwrap().as_raw_fd();
        let io_state = Arc::new(Mutex::new(IoState {
            waker: None,
            ready: false,
        }));
        
        // Register with epoll/kqueue reactor
        REACTOR.register(fd, io_state.clone());
        
        Ok(TcpReadFuture {
            stream,
            buffer: vec![0u8; buffer_size],
            io_state,
        })
    }
}

impl Future for TcpReadFuture {
    type Output = io::Result<Vec<u8>>;
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut io_state = self.io_state.lock().unwrap();
        
        // Try to read (might succeed immediately)
        let mut stream = self.stream.lock().unwrap();
        match stream.read(&mut self.buffer) {
            Ok(n) if n > 0 => {
                // Successfully read data
                self.buffer.truncate(n);
                return Poll::Ready(Ok(self.buffer.clone()));
            }
            Ok(0) => {
                // EOF
                return Poll::Ready(Ok(Vec::new()));
            }
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                // Not ready yet - register waker
                io_state.waker = Some(cx.waker().clone());
                io_state.ready = false;
                Poll::Pending
            }
            Err(e) => {
                // Actual error
                Poll::Ready(Err(e))
            }
        }
    }
}

/// Simplified reactor using epoll (Linux)
struct Reactor {
    // In real implementation: epoll fd, event loop thread, registration map
    registrations: Mutex<std::collections::HashMap<RawFd, Arc<Mutex<IoState>>>>,
}

static REACTOR: Reactor = Reactor {
    registrations: Mutex::new(std::collections::HashMap::new()),
};

impl Reactor {
    fn register(&self, fd: RawFd, io_state: Arc<Mutex<IoState>>) {
        let mut regs = self.registrations.lock().unwrap();
        regs.insert(fd, io_state);
        
        // In real implementation:
        // - Create epoll_event for fd
        // - Call epoll_ctl(EPOLL_CTL_ADD, fd, ...)
    }
    
    /// Called by runtime event loop
    fn poll_events(&self) {
        // In real implementation:
        // - Call epoll_wait() to get ready file descriptors
        // - For each ready fd, wake its registered future
        
        let regs = self.registrations.lock().unwrap();
        for (_fd, io_state) in regs.iter() {
            let mut state = io_state.lock().unwrap();
            state.ready = true;
            if let Some(waker) = state.waker.take() {
                waker.wake();
            }
        }
    }
}

// Usage:
async fn http_server_handler(stream: Arc<Mutex<TcpStream>>) -> io::Result<()> {
    // Read HTTP request
    let request_data = TcpReadFuture::new(stream.clone(), 8192).await?;
    let request = parse_http_request(&request_data)?;
    
    // Process request
    let response = handle_request(request).await;
    
    // Write response (similar TcpWriteFuture)
    write_response(stream, response).await?;
    
    Ok(())
}
How it works:
  1. Socket setup: Set to non-blocking mode
  2. Reactor registration: Register fd with epoll/kqueue
  3. First poll: Try read, if WouldBlock, store waker and return Pending
  4. Event loop: epoll_wait() detects socket readiness
  5. Wake notification: Reactor calls waker.wake()
  6. Re-poll: Read succeeds, return Ready(data)

---

Real-World Example 3: Join Future (Concurrency)

Problem: Wait for multiple futures to complete

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

/// A future that waits for two futures to complete
pub struct Join<F1, F2> 
where
    F1: Future,
    F2: Future,
{
    future1: Pin<Box<F1>>,
    future2: Pin<Box<F2>>,
    output1: Option<F1::Output>,
    output2: Option<F2::Output>,
}

impl<F1, F2> Join<F1, F2>
where
    F1: Future,
    F2: Future,
{
    pub fn new(future1: F1, future2: F2) -> Self {
        Join {
            future1: Box::pin(future1),
            future2: Box::pin(future2),
            output1: None,
            output2: None,
        }
    }
}

impl<F1, F2> Future for Join<F1, F2>
where
    F1: Future,
    F2: Future,
{
    type Output = (F1::Output, F2::Output);
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Poll first future if not complete
        if self.output1.is_none() {
            match self.future1.as_mut().poll(cx) {
                Poll::Ready(output) => {
                    self.output1 = Some(output);
                }
                Poll::Pending => {}
            }
        }
        
        // Poll second future if not complete
        if self.output2.is_none() {
            match self.future2.as_mut().poll(cx) {
                Poll::Ready(output) => {
                    self.output2 = Some(output);
                }
                Poll::Pending => {}
            }
        }
        
        // Check if both complete
        match (self.output1.take(), self.output2.take()) {
            (Some(out1), Some(out2)) => Poll::Ready((out1, out2)),
            (out1, out2) => {
                // Restore outputs for next poll
                self.output1 = out1;
                self.output2 = out2;
                Poll::Pending
            }
        }
    }
}

// Better: Zero-allocation join for up to 3 futures
pin_project_lite::pin_project! {
    pub struct Join3<F1, F2, F3> 
    where
        F1: Future,
        F2: Future,
        F3: Future,
    {
        #[pin]
        future1: F1,
        #[pin]
        future2: F2,
        #[pin]
        future3: F3,
        output1: Option<F1::Output>,
        output2: Option<F2::Output>,
        output3: Option<F3::Output>,
    }
}

impl<F1, F2, F3> Future for Join3<F1, F2, F3>
where
    F1: Future,
    F2: Future,
    F3: Future,
{
    type Output = (F1::Output, F2::Output, F3::Output);
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        
        // Poll all futures
        if this.output1.is_none() {
            if let Poll::Ready(out) = this.future1.poll(cx) {
                *this.output1 = Some(out);
            }
        }
        
        if this.output2.is_none() {
            if let Poll::Ready(out) = this.future2.poll(cx) {
                *this.output2 = Some(out);
            }
        }
        
        if this.output3.is_none() {
            if let Poll::Ready(out) = this.future3.poll(cx) {
                *this.output3 = Some(out);
            }
        }
        
        // Check if all complete
        match (this.output1.take(), this.output2.take(), this.output3.take()) {
            (Some(o1), Some(o2), Some(o3)) => Poll::Ready((o1, o2, o3)),
            (o1, o2, o3) => {
                *this.output1 = o1;
                *this.output2 = o2;
                *this.output3 = o3;
                Poll::Pending
            }
        }
    }
}

// Usage:
async fn parallel_api_calls() {
    let (users, posts, comments) = Join3::new(
        fetch_users(),
        fetch_posts(),
        fetch_comments(),
    ).await;
    
    let dashboard = Dashboard { users, posts, comments };
    render(dashboard);
}
Key insight: Each poll attempts to advance all incomplete futures. The waker is shared - waking one future causes the entire Join to be re-polled, which advances all others.

---

Real-World Example 4: Retry Future (Web/Backend)

Problem: Automatic retry with exponential backoff

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;

pin_project_lite::pin_project! {
    /// A future that retries an operation with exponential backoff
    pub struct Retry<F, Fut, T, E>
    where
        F: FnMut() -> Fut,
        Fut: Future<Output = Result<T, E>>,
    {
        /// Factory for creating new attempts
        factory: F,
        /// Current attempt
        #[pin]
        current: Option<Fut>,
        /// Delay between retries
        #[pin]
        delay: Option<Delay>,
        /// Retry configuration
        config: RetryConfig,
        /// Current state
        attempt: usize,
        backoff: Duration,
    }
}

pub struct RetryConfig {
    pub max_attempts: usize,
    pub initial_backoff: Duration,
    pub max_backoff: Duration,
    pub multiplier: f64,
}

impl<F, Fut, T, E> Retry<F, Fut, T, E>
where
    F: FnMut() -> Fut,
    Fut: Future<Output = Result<T, E>>,
{
    pub fn new(factory: F, config: RetryConfig) -> Self {
        Retry {
            factory,
            current: None,
            delay: None,
            config,
            attempt: 0,
            backoff: config.initial_backoff,
        }
    }
}

impl<F, Fut, T, E> Future for Retry<F, Fut, T, E>
where
    F: FnMut() -> Fut,
    Fut: Future<Output = Result<T, E>>,
{
    type Output = Result<T, E>;
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut this = self.project();
        
        loop {
            // If we're in a delay, poll it first
            if let Some(delay) = this.delay.as_mut().as_pin_mut() {
                match delay.poll(cx) {
                    Poll::Ready(()) => {
                        this.delay.set(None);
                        // Delay complete, start next attempt
                    }
                    Poll::Pending => return Poll::Pending,
                }
            }
            
            // Start new attempt if needed
            if this.current.is_none() {
                if *this.attempt >= this.config.max_attempts {
                    // Exhausted all retries - return last error
                    // (In production, wrap in custom error type)
                    panic!("All retry attempts exhausted");
                }
                
                *this.attempt += 1;
                this.current.set(Some((this.factory)()));
            }
            
            // Poll current attempt
            match this.current.as_mut().as_pin_mut().unwrap().poll(cx) {
                Poll::Ready(Ok(value)) => {
                    // Success!
                    return Poll::Ready(Ok(value));
                }
                Poll::Ready(Err(err)) => {
                    // Failed - schedule retry
                    this.current.set(None);
                    
                    if *this.attempt >= this.config.max_attempts {
                        return Poll::Ready(Err(err));
                    }
                    
                    // Calculate backoff
                    let delay_duration = *this.backoff;
                    *this.backoff = Duration::from_secs_f64(
                        (this.backoff.as_secs_f64() * this.config.multiplier)
                            .min(this.config.max_backoff.as_secs_f64())
                    );
                    
                    this.delay.set(Some(Delay::new(delay_duration)));
                    // Loop to poll delay immediately
                }
                Poll::Pending => return Poll::Pending,
            }
        }
    }
}

// Usage:
async fn fetch_user_with_retry(id: u64) -> Result<User, ApiError> {
    Retry::new(
        || async move {
            reqwest::get(&format!("https://api.example.com/users/{}", id))
                .await?
                .json()
                .await
        },
        RetryConfig {
            max_attempts: 5,
            initial_backoff: Duration::from_millis(100),
            max_backoff: Duration::from_secs(10),
            multiplier: 2.0,
        },
    ).await
}
State transitions:
  1. Initial → Attempt1 (poll operation)
  2. Attempt1 fails → Delay(100ms)
  3. Delay complete → Attempt2 (poll operation)
  4. Attempt2 fails → Delay(200ms)
  5. Continues until success or max attempts

---

Real-World Example 5: Channel Receive Future (Async Communication)

Problem: Oneshot channel receiver as a future

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use std::sync::{Arc, Mutex};

/// A oneshot channel: send one value, receive once
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
    let shared = Arc::new(Mutex::new(ChannelState {
        value: None,
        waker: None,
        closed: false,
    }));
    
    let sender = Sender { shared: shared.clone() };
    let receiver = Receiver { shared };
    
    (sender, receiver)
}

struct ChannelState<T> {
    value: Option<T>,
    waker: Option<Waker>,
    closed: bool,
}

pub struct Sender<T> {
    shared: Arc<Mutex<ChannelState<T>>>,
}

impl<T> Sender<T> {
    pub fn send(self, value: T) -> Result<(), T> {
        let mut state = self.shared.lock().unwrap();
        
        if state.closed {
            return Err(value);
        }
        
        state.value = Some(value);
        
        // Wake the receiver if it's waiting
        if let Some(waker) = state.waker.take() {
            waker.wake();
        }
        
        Ok(())
    }
}

impl<T> Drop for Sender<T> {
    fn drop(&mut self) {
        let mut state = self.shared.lock().unwrap();
        state.closed = true;
        
        // Wake receiver so it can see the channel is closed
        if let Some(waker) = state.waker.take() {
            waker.wake();
        }
    }
}

pub struct Receiver<T> {
    shared: Arc<Mutex<ChannelState<T>>>,
}

#[derive(Debug)]
pub enum RecvError {
    Closed,
}

impl<T> Future for Receiver<T> {
    type Output = Result<T, RecvError>;
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut state = self.shared.lock().unwrap();
        
        if let Some(value) = state.value.take() {
            // Value available!
            return Poll::Ready(Ok(value));
        }
        
        if state.closed {
            // Sender dropped without sending
            return Poll::Ready(Err(RecvError::Closed));
        }
        
        // Store waker for when sender sends
        state.waker = Some(cx.waker().clone());
        
        Poll::Pending
    }
}

// Usage:
async fn request_response_pattern() {
    let (tx, rx) = channel();
    
    // Spawn task to compute result
    tokio::spawn(async move {
        let result = expensive_computation().await;
        let _ = tx.send(result);
    });
    
    // Wait for result
    match rx.await {
        Ok(result) => println!("Got result: {}", result),
        Err(_) => println!("Computation failed"),
    }
}

// Advanced: MPSC channel receiver (multi-producer, single-consumer)
pub struct MpscReceiver<T> {
    shared: Arc<Mutex<MpscState<T>>>,
}

struct MpscState<T> {
    queue: std::collections::VecDeque<T>,
    waker: Option<Waker>,
    sender_count: usize,
}

impl<T> Future for MpscReceiver<T> {
    type Output = Option<T>;
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut state = self.shared.lock().unwrap();
        
        if let Some(value) = state.queue.pop_front() {
            return Poll::Ready(Some(value));
        }
        
        if state.sender_count == 0 {
            // All senders dropped
            return Poll::Ready(None);
        }
        
        // Wait for next message
        state.waker = Some(cx.waker().clone());
        Poll::Pending
    }
}
Key insight: The channel's shared state stores both the value and the waker. When the sender sends, it wakes the receiver's task.

---

Deep Dive Explanation

The Future Trait in Detail

pub trait Future {
    type Output;
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Components:
  1. Pin<&mut Self>: Ensures the future doesn't move in memory
  • Required for self-referential structs
  • Prevents invalidating pointers across await points
  1. Context<'_>: Provides the Waker
  • Waker is a handle to wake up the task
  • Allows async runtime to schedule task execution
  1. Poll: The return type
  • Ready(T): Task complete, here's the result
  • Pending: Task not done, I've registered the waker

The Waker Mechanism

use std::task::{Waker, RawWaker, RawWakerVTable};
use std::sync::Arc;

// Waker is a type-erased handle that can wake a task
pub struct Waker {
    waker: RawWaker,
}

impl Waker {
    pub fn wake(self) {
        // Notify the executor to poll this task again
    }
    
    pub fn wake_by_ref(&self) {
        // Wake without consuming the waker
    }
    
    pub fn clone(&self) -> Waker {
        // Wakers must be cloneable
    }
}

// How executors create wakers:
struct Task {
    future: Pin<Box<dyn Future<Output = ()>>>,
    // ... other task state
}

impl Task {
    fn create_waker(self: &Arc<Self>) -> Waker {
        // Type-erase the Arc<Task> into a RawWaker
        let raw = RawWaker::new(
            Arc::into_raw(self.clone()) as *const (),
            &VTABLE,
        );
        unsafe { Waker::from_raw(raw) }
    }
}

static VTABLE: RawWakerVTable = RawWakerVTable::new(
    |data| {
        // clone
        let task = unsafe { Arc::from_raw(data as *const Task) };
        let clone = task.clone();
        std::mem::forget(task); // Don't drop original
        RawWaker::new(Arc::into_raw(clone) as *const (), &VTABLE)
    },
    |data| {
        // wake
        let task = unsafe { Arc::from_raw(data as *const Task) };
        EXECUTOR.schedule(task);
    },
    |data| {
        // wake_by_ref
        let task = unsafe { &*(data as *const Task) };
        EXECUTOR.schedule(Arc::new(task.clone()));
    },
    |data| {
        // drop
        unsafe { Arc::from_raw(data as *const Task) };
    },
);
How wakers work:
  1. Executor creates waker from task handle
  2. Future stores waker when returning Pending
  3. I/O completion, timer, or other event calls waker.wake()
  4. Executor receives notification and re-polls the task

Pin and Self-Referential Structs

The problem:
async fn example() {
    let mut x = String::from("hello");
    let x_ref = &x;  // Borrow x
    
    some_async_operation().await;  // Suspension point
    
    println!("{}", x_ref);  // Use the reference after resumption
}

When compiled to a state machine:

enum ExampleFuture {
    State0 {
        x: String,
        x_ref: *const String,  // Pointer to x field!
    },
    // ...
}

If this struct moves in memory, x_ref becomes invalid!

Solution: Pin
use std::pin::Pin;
use std::marker::PhantomPinned;

struct SelfReferential {
    data: String,
    pointer: *const String,
    _pin: PhantomPinned,  // Makes this !Unpin
}

impl SelfReferential {
    fn new(data: String) -> Pin<Box<Self>> {
        let mut boxed = Box::pin(SelfReferential {
            data,
            pointer: std::ptr::null(),
            _pin: PhantomPinned,
        });
        
        // Safe: we're about to pin this
        let self_ptr: *const String = &boxed.data;
        unsafe {
            let mut_ref = Pin::as_mut(&mut boxed);
            Pin::get_unchecked_mut(mut_ref).pointer = self_ptr;
        }
        
        boxed
    }
}
Key rules:
  • Pin> ensures T won't move if T: !Unpin
  • Most types are Unpin (safe to move even when pinned)
  • Compiler-generated async futures are !Unpin (contain self-references)

Executor and Reactor Model

// Simplified executor
struct Executor {
    ready_queue: Mutex<VecDeque<Arc<Task>>>,
}

impl Executor {
    fn run(&self) {
        loop {
            // Get ready tasks
            let task = {
                let mut queue = self.ready_queue.lock().unwrap();
                queue.pop_front()
            };
            
            if let Some(task) = task {
                // Poll the future
                let waker = task.create_waker();
                let mut context = Context::from_waker(&waker);
                
                match task.future.as_mut().poll(&mut context) {
                    Poll::Ready(()) => {
                        // Task complete
                    }
                    Poll::Pending => {
                        // Task parked, will be woken later
                    }
                }
            } else {
                // No ready tasks, wait for I/O events
                REACTOR.wait_for_events();
            }
        }
    }
    
    fn schedule(&self, task: Arc<Task>) {
        let mut queue = self.ready_queue.lock().unwrap();
        queue.push_back(task);
    }
}

// Simplified reactor (I/O event loop)
struct Reactor {
    epoll_fd: RawFd,
    registrations: Mutex<HashMap<RawFd, Waker>>,
}

impl Reactor {
    fn wait_for_events(&self) {
        let mut events = [epoll::Event::new(epoll::Events::empty(), 0); 1024];
        
        let n = epoll::wait(self.epoll_fd, &mut events, -1).unwrap();
        
        for event in &events[..n] {
            let fd = event.data() as RawFd;
            let regs = self.registrations.lock().unwrap();
            
            if let Some(waker) = regs.get(&fd) {
                waker.wake_by_ref();
            }
        }
    }
}
Architecture:
  • Executor: Manages task queue, polls futures
  • Reactor: Monitors I/O events (epoll/kqueue/IOCP)
  • Waker: Bridge between reactor and executor

---

When to Use Custom Futures

Use When:

  1. Building async primitives
  • Timers, channels, synchronization primitives
  • Custom I/O abstractions
  • Runtime integration
  1. Performance optimization
  • Eliminate allocations (avoid Box)
  • Custom state machines more efficient than generic combinators
  • Zero-cost abstractions
  1. Complex control flow
  • Custom retry logic with state
  • Multi-stage protocols (handshakes, negotiations)
  • Advanced combinators not in standard library
  1. Integration with foreign systems
  • C libraries with async callbacks
  • JavaScript promises (WASM)
  • Hardware event notifications

Don't Use When:

  1. async/await is sufficient
  • Simple async functions
  • Standard control flow
  • Readability matters more than nanoseconds
  1. Existing combinators work
  • tokio::join!, tokio::select!
  • futures::future::join_all
  • Standard library or crate provides what you need
  1. Premature optimization
  • No measured performance bottleneck
  • Maintainability cost not worth gains
  • Team unfamiliar with async internals
  1. Dynamic dispatch is acceptable
  • Box simplifies code
  • Performance difference negligible
  • Flexibility worth the cost

---

⚠️ Anti-Patterns

1. Blocking in poll()

// WRONG: Blocking destroys async runtime performance
impl Future for BadFuture {
    type Output = String;
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<String> {
        // This blocks the executor thread!
        let result = std::fs::read_to_string("file.txt").unwrap();
        Poll::Ready(result)
    }
}

// CORRECT: Use async I/O or spawn_blocking
impl Future for GoodFuture {
    type Output = String;
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<String> {
        // Non-blocking async I/O
        match self.async_file_read.poll(cx) {
            Poll::Ready(result) => Poll::Ready(result),
            Poll::Pending => Poll::Pending,
        }
    }
}

2. Forgetting to wake after Pending

// WRONG: Returns Pending but never wakes
impl Future for BrokenFuture {
    type Output = i32;
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<i32> {
        if self.ready {
            Poll::Ready(42)
        } else {
            // FORGOT TO STORE WAKER - task will never wake!
            Poll::Pending
        }
    }
}

// CORRECT: Always store waker before returning Pending
impl Future for WorkingFuture {
    type Output = i32;
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<i32> {
        if self.ready {
            Poll::Ready(42)
        } else {
            self.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

3. Incorrect Pin usage

// WRONG: Moving out of Pin
impl Future for BadPin {
    type Output = ();
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        // This is UB if self is !Unpin!
        let moved = unsafe { Pin::into_inner_unchecked(self) };
        std::mem::swap(&mut moved.field, &mut other_field);
        Poll::Pending
    }
}

// CORRECT: Use pin_project or careful unsafe
use pin_project_lite::pin_project;

pin_project! {
    struct GoodPin {
        #[pin]
        future: SomeFuture,
        non_pinned: i32,
    }
}

impl Future for GoodPin {
    type Output = ();
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        let this = self.project();
        this.future.poll(cx)
    }
}

4. Polling after Ready

// WRONG: Allows polling after completion
impl Future for BadCompletion {
    type Output = i32;
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<i32> {
        // Returns Ready multiple times!
        Poll::Ready(42)
    }
}

// CORRECT: Panic or fuse after first Ready
impl Future for GoodCompletion {
    type Output = i32;
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<i32> {
        if self.completed {
            panic!("polled after completion");
        }
        
        self.completed = true;
        Poll::Ready(42)
    }
}

5. Not implementing Unpin when possible

// INEFFICIENT: Unnecessary !Unpin
struct SlowFuture {
    value: i32,
    _pin: PhantomPinned,  // Not needed!
}

// BETTER: Most futures can be Unpin
struct FastFuture {
    value: i32,
}

// FastFuture is automatically Unpin, easier to use

---

Performance Characteristics

Zero-Cost Abstraction

// This async function:
async fn add(a: i32, b: i32) -> i32 {
    a + b
}

// Compiles to roughly this (zero overhead):
struct AddFuture {
    a: i32,
    b: i32,
    state: u8,
}

impl Future for AddFuture {
    type Output = i32;
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<i32> {
        Poll::Ready(self.a + self.b)
    }
}
Costs:
  • State machine: Size = max of all state sizes
  • No heap allocation (unless boxed)
  • No dynamic dispatch (unless trait object)
  • poll() is inlined by optimizer

State Machine Size

async fn large_state() {
    let big_array = [0u8; 1024];
    async_op().await;
    use_array(&big_array);
}

// Compiles to:
enum LargeStateFuture {
    State0 { big_array: [u8; 1024], async_op: AsyncOp },
    State1 { big_array: [u8; 1024] },
}

// Size = 1024 bytes + size of AsyncOp + discriminant
Optimization strategies:
  • Move large data to heap: Box<[u8; 1024]>
  • Split into smaller functions
  • Use references where possible

Comparison: Async vs Threads

| Aspect | Async Future | OS Thread |

|--------|-------------|-----------|

| Context switch | 0ns (no switch, just function call) | 1-10 microseconds |

| Memory per task | 100-1000 bytes (state machine) | 1-8 MB (stack) |

| Creation cost | Nanoseconds | Microseconds |

| Max concurrent | Millions | Thousands |

| Overhead | Poll function call | Kernel scheduling |

When async wins:
  • I/O-bound workloads
  • Many concurrent operations
  • Fine-grained tasks
When threads win:
  • CPU-bound workloads
  • Blocking operations
  • Simpler mental model

---

Exercises

Beginner: Implement YieldNow

Create a future that yields control once, then completes:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct YieldNow {
    yielded: bool,
}

impl YieldNow {
    pub fn new() -> Self {
        YieldNow { yielded: false }
    }
}

impl Future for YieldNow {
    type Output = ();
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if self.yielded {
            // TODO: Return Ready
        } else {
            // TODO: Set yielded = true, wake immediately, return Pending
        }
    }
}

// Test:
#[tokio::main]
async fn main() {
    println!("Before yield");
    YieldNow::new().await;
    println!("After yield");
}
Solution
impl Future for YieldNow {
    type Output = ();
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if self.yielded {
            Poll::Ready(())
        } else {
            self.yielded = true;
            cx.waker().wake_by_ref();  // Schedule re-poll
            Poll::Pending
        }
    }
}

Intermediate: Build Select Combinator

Implement select() that returns the first future to complete:

pub struct Select<F1, F2> {
    future1: Pin<Box<F1>>,
    future2: Pin<Box<F2>>,
}

impl<F1, F2> Select<F1, F2>
where
    F1: Future,
    F2: Future,
{
    pub fn new(future1: F1, future2: F2) -> Self {
        // TODO
    }
}

impl<F1, F2> Future for Select<F1, F2>
where
    F1: Future,
    F2: Future,
{
    type Output = Either<F1::Output, F2::Output>;
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // TODO: Poll both, return first that's Ready
    }
}

pub enum Either<L, R> {
    Left(L),
    Right(R),
}

// Test:
async fn test_select() {
    let result = Select::new(
        Delay::new(Duration::from_millis(100)),
        Delay::new(Duration::from_millis(200)),
    ).await;
    
    assert!(matches!(result, Either::Left(_)));
}
Solution
impl<F1, F2> Future for Select<F1, F2>
where
    F1: Future,
    F2: Future,
{
    type Output = Either<F1::Output, F2::Output>;
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Try first future
        if let Poll::Ready(out) = self.future1.as_mut().poll(cx) {
            return Poll::Ready(Either::Left(out));
        }
        
        // Try second future
        if let Poll::Ready(out) = self.future2.as_mut().poll(cx) {
            return Poll::Ready(Either::Right(out));
        }
        
        // Both pending
        Poll::Pending
    }
}

Advanced: Async File I/O with io_uring

Implement a future for reading a file using io_uring (Linux):

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

pub struct IoUringRead {
    // File descriptor
    fd: i32,
    // Buffer to read into
    buffer: Vec<u8>,
    // Submission queue entry ID
    sqe_id: Option<u64>,
    // Waker for completion
    waker: Option<Waker>,
}

impl IoUringRead {
    pub fn new(fd: i32, size: usize) -> Self {
        // TODO: Create read future
    }
}

impl Future for IoUringRead {
    type Output = io::Result<Vec<u8>>;
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // TODO:
        // 1. If not submitted, submit read to io_uring
        // 2. Check completion queue for result
        // 3. Store waker if not complete
        // 4. Return Ready with data or Pending
    }
}

// Integration with io_uring reactor needed
Hint

You'll need:

  1. Global io_uring instance with submission and completion queues
  2. Map from SQE ID to waker
  3. Background thread processing completion queue
  4. When completion arrives, wake the corresponding future

---

Real-World Usage

tokio::time::Sleep

// Simplified version of tokio's Sleep
pub struct Sleep {
    deadline: Instant,
    entry: TimerEntry,
}

impl Future for Sleep {
    type Output = ();
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if Instant::now() >= self.deadline {
            return Poll::Ready(());
        }
        
        self.entry.register_waker(cx.waker());
        Poll::Pending
    }
}

// Timer wheel implementation:
// - Hierarchical timing wheels for O(1) insertion
// - Background thread processes expired timers
// - Wakes futures when deadline reached

futures::future combinators

// From futures crate
pub fn join<F1, F2>(f1: F1, f2: F2) -> Join<F1, F2>
where
    F1: Future,
    F2: Future,
{
    Join::new(f1, f2)
}

pub fn select<F1, F2>(f1: F1, f2: F2) -> Select<F1, F2>
where
    F1: Future,
    F2: Future,
{
    Select::new(f1, f2)
}

// Implementations follow patterns shown earlier

tokio::sync channels

// Oneshot channel receiver
impl<T> Future for OneshotReceiver<T> {
    type Output = Result<T, RecvError>;
    
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.inner.poll_recv(cx)
    }
}

// MPSC channel receiver
impl<T> Stream for MpscReceiver<T> {
    type Item = T;
    
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) 
        -> Poll<Option<T>> 
    {
        self.inner.poll_recv(cx)
    }
}

async-std I/O primitives

// TcpStream read
impl AsyncRead for TcpStream {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<io::Result<usize>> {
        // Register with async-std reactor
        // Poll underlying socket
        // Return Ready or Pending with waker registered
    }
}

---

Further Reading

  1. Rust Async Book: https://rust-lang.github.io/async-book/
  • Official guide to async Rust
  • Covers Future trait, async/await, Pin
  1. RFC 2592 - Futures: https://rust-lang.github.io/rfcs/2592-futures.html
  • Original Future trait design
  • Rationale for Pin and Waker
  1. Pin and Suffering: https://fasterthanli.me/articles/pin-and-suffering
  • Deep dive into Pin by fasterthanlime
  • Explains self-referential structs
  1. How Async/Await Works: https://os.phil-opp.com/async-await/
  • State machine transformation explained
  • Low-level implementation details
  1. Tokio Internals: https://tokio.rs/blog/2019-10-scheduler
  • How tokio's executor works
  • Work-stealing scheduler design
  1. Building an Async Runtime: https://www.youtube.com/watch?v=9_3krAQtD2k
  • Carl Lerche's talk on building tokio
  • Reactor/executor patterns
  1. io_uring and Async I/O: https://kernel.dk/io_uring.pdf
  • Modern Linux async I/O
  • Integration with Rust futures

---

Summary

Custom Futures are the foundation of async Rust:

Key Takeaways:
  • Futures are state machines compiled from async/await
  • Poll returns Ready (done) or Pending (wake me later)
  • Wakers bridge I/O events to task scheduling
  • Pin prevents self-referential futures from moving
  • Zero-cost abstraction: no overhead vs hand-written code
When to implement custom futures:
  • Building async primitives (timers, channels, I/O)
  • Performance-critical code needing zero allocations
  • Complex protocols requiring custom state machines
  • Integration with external async systems
Remember: Most code should use async fn - only drop to custom futures when you need precise control over the state machine or are building libraries.

Understanding custom futures makes you a better async Rust programmer, even if you rarely implement them directly. You'll understand performance characteristics, debug issues, and design better APIs.

Now go build some legendary async systems!

🎮 Try it Yourself

🎮

Custom Future Implementation - Playground

Run this code in the official Rust Playground