Building futures from scratch
A Custom Future is a manual implementation of Rust's Future trait, giving you complete control over async execution. While async fn and async blocks automatically generate Future implementations, understanding custom futures is essential for:
use std::task::{Context, Poll};
use std::pin::Pin;
pub trait Future {
type Output;
// The heart of async: polling for completion
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T), // Task completed with result
Pending, // Task not ready, wake me later
}
Key concepts:
When you write:
async fn fetch_user(id: u64) -> User {
let response = http_get(&format!("/users/{}", id)).await;
parse_user(response).await
}
The compiler generates roughly:
enum FetchUserFuture {
// State 0: Initial
Initial { id: u64 },
// State 1: Waiting for HTTP response
WaitingForHttp {
id: u64,
http_future: HttpFuture
},
// State 2: Waiting for parse
WaitingForParse {
parse_future: ParseFuture
},
// State 3: Complete
Complete,
}
impl Future for FetchUserFuture {
type Output = User;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<User> {
loop {
match &mut *self {
Initial { id } => {
// Transition to next state
*self = WaitingForHttp {
id: *id,
http_future: http_get(...)
};
}
WaitingForHttp { http_future, .. } => {
match Pin::new(http_future).poll(cx) {
Poll::Ready(response) => {
*self = WaitingForParse {
parse_future: parse_user(response)
};
}
Poll::Pending => return Poll::Pending,
}
}
WaitingForParse { parse_future } => {
match Pin::new(parse_future).poll(cx) {
Poll::Ready(user) => {
*self = Complete;
return Poll::Ready(user);
}
Poll::Pending => return Poll::Pending,
}
}
Complete => panic!("polled after completion"),
}
}
}
}
This state machine transformation is zero-cost - no heap allocations, no dynamic dispatch (unless you box it).
---
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};
use std::sync::{Arc, Mutex};
use std::collections::BinaryHeap;
use std::cmp::Ordering;
/// A future that completes after a specified duration
pub struct Delay {
/// When this delay should complete
deadline: Instant,
/// State shared with the timer wheel
shared: Arc<Mutex<DelayShared>>,
}
struct DelayShared {
/// Waker to notify when deadline is reached
waker: Option<Waker>,
/// Has the delay been triggered?
triggered: bool,
}
impl Delay {
pub fn new(duration: Duration) -> Self {
let deadline = Instant::now() + duration;
let shared = Arc::new(Mutex::new(DelayShared {
waker: None,
triggered: false,
}));
// Register with global timer wheel
TIMER_WHEEL.register(deadline, shared.clone());
Delay { deadline, shared }
}
}
impl Future for Delay {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut shared = self.shared.lock().unwrap();
if shared.triggered {
// Timer fired, we're done
return Poll::Ready(());
}
// Check if already expired (edge case: executor lag)
if Instant::now() >= self.deadline {
shared.triggered = true;
return Poll::Ready(());
}
// Store waker for later notification
shared.waker = Some(cx.waker().clone());
Poll::Pending
}
}
/// Global timer wheel (simplified - production uses hierarchical wheel)
struct TimerWheel {
heap: Mutex<BinaryHeap<TimerEntry>>,
}
struct TimerEntry {
deadline: Instant,
shared: Arc<Mutex<DelayShared>>,
}
impl PartialEq for TimerEntry {
fn eq(&self, other: &Self) -> bool {
self.deadline == other.deadline
}
}
impl Eq for TimerEntry {}
impl PartialOrd for TimerEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for TimerEntry {
fn cmp(&self, other: &Self) -> Ordering {
// Reverse order for min-heap
other.deadline.cmp(&self.deadline)
}
}
static TIMER_WHEEL: TimerWheel = TimerWheel {
heap: Mutex::new(BinaryHeap::new()),
};
impl TimerWheel {
fn register(&self, deadline: Instant, shared: Arc<Mutex<DelayShared>>) {
let mut heap = self.heap.lock().unwrap();
heap.push(TimerEntry { deadline, shared });
}
/// Called by runtime: process expired timers
fn process_expired(&self) {
let now = Instant::now();
let mut heap = self.heap.lock().unwrap();
while let Some(entry) = heap.peek() {
if entry.deadline > now {
break; // No more expired timers
}
let entry = heap.pop().unwrap();
let mut shared = entry.shared.lock().unwrap();
shared.triggered = true;
// Wake the future
if let Some(waker) = shared.waker.take() {
waker.wake();
}
}
}
}
// Usage in async code:
async fn rate_limited_operation() {
for i in 0..10 {
process_item(i).await;
// Wait 100ms between operations
Delay::new(Duration::from_millis(100)).await;
}
}
async fn timeout_example() {
use std::future::Future;
let operation = fetch_data();
let timeout = Delay::new(Duration::from_secs(5));
// Race between operation and timeout
tokio::select! {
result = operation => println!("Got result: {:?}", result),
_ = timeout => println!("Operation timed out"),
}
}
How it works:
---
use std::future::Future;
use std::io::{self, Read};
use std::net::TcpStream;
use std::os::unix::io::{AsRawFd, RawFd};
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use std::sync::{Arc, Mutex};
/// Future for reading from a TCP socket
pub struct TcpReadFuture {
/// The socket to read from (set to non-blocking mode)
stream: Arc<Mutex<TcpStream>>,
/// Buffer to read into
buffer: Vec<u8>,
/// State shared with the reactor
io_state: Arc<Mutex<IoState>>,
}
struct IoState {
waker: Option<Waker>,
ready: bool,
}
impl TcpReadFuture {
pub fn new(stream: Arc<Mutex<TcpStream>>, buffer_size: usize) -> io::Result<Self> {
// Set socket to non-blocking mode
{
let stream_guard = stream.lock().unwrap();
stream_guard.set_nonblocking(true)?;
}
let fd = stream.lock().unwrap().as_raw_fd();
let io_state = Arc::new(Mutex::new(IoState {
waker: None,
ready: false,
}));
// Register with epoll/kqueue reactor
REACTOR.register(fd, io_state.clone());
Ok(TcpReadFuture {
stream,
buffer: vec![0u8; buffer_size],
io_state,
})
}
}
impl Future for TcpReadFuture {
type Output = io::Result<Vec<u8>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut io_state = self.io_state.lock().unwrap();
// Try to read (might succeed immediately)
let mut stream = self.stream.lock().unwrap();
match stream.read(&mut self.buffer) {
Ok(n) if n > 0 => {
// Successfully read data
self.buffer.truncate(n);
return Poll::Ready(Ok(self.buffer.clone()));
}
Ok(0) => {
// EOF
return Poll::Ready(Ok(Vec::new()));
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
// Not ready yet - register waker
io_state.waker = Some(cx.waker().clone());
io_state.ready = false;
Poll::Pending
}
Err(e) => {
// Actual error
Poll::Ready(Err(e))
}
}
}
}
/// Simplified reactor using epoll (Linux)
struct Reactor {
// In real implementation: epoll fd, event loop thread, registration map
registrations: Mutex<std::collections::HashMap<RawFd, Arc<Mutex<IoState>>>>,
}
static REACTOR: Reactor = Reactor {
registrations: Mutex::new(std::collections::HashMap::new()),
};
impl Reactor {
fn register(&self, fd: RawFd, io_state: Arc<Mutex<IoState>>) {
let mut regs = self.registrations.lock().unwrap();
regs.insert(fd, io_state);
// In real implementation:
// - Create epoll_event for fd
// - Call epoll_ctl(EPOLL_CTL_ADD, fd, ...)
}
/// Called by runtime event loop
fn poll_events(&self) {
// In real implementation:
// - Call epoll_wait() to get ready file descriptors
// - For each ready fd, wake its registered future
let regs = self.registrations.lock().unwrap();
for (_fd, io_state) in regs.iter() {
let mut state = io_state.lock().unwrap();
state.ready = true;
if let Some(waker) = state.waker.take() {
waker.wake();
}
}
}
}
// Usage:
async fn http_server_handler(stream: Arc<Mutex<TcpStream>>) -> io::Result<()> {
// Read HTTP request
let request_data = TcpReadFuture::new(stream.clone(), 8192).await?;
let request = parse_http_request(&request_data)?;
// Process request
let response = handle_request(request).await;
// Write response (similar TcpWriteFuture)
write_response(stream, response).await?;
Ok(())
}
How it works:
---
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
/// A future that waits for two futures to complete
pub struct Join<F1, F2>
where
F1: Future,
F2: Future,
{
future1: Pin<Box<F1>>,
future2: Pin<Box<F2>>,
output1: Option<F1::Output>,
output2: Option<F2::Output>,
}
impl<F1, F2> Join<F1, F2>
where
F1: Future,
F2: Future,
{
pub fn new(future1: F1, future2: F2) -> Self {
Join {
future1: Box::pin(future1),
future2: Box::pin(future2),
output1: None,
output2: None,
}
}
}
impl<F1, F2> Future for Join<F1, F2>
where
F1: Future,
F2: Future,
{
type Output = (F1::Output, F2::Output);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Poll first future if not complete
if self.output1.is_none() {
match self.future1.as_mut().poll(cx) {
Poll::Ready(output) => {
self.output1 = Some(output);
}
Poll::Pending => {}
}
}
// Poll second future if not complete
if self.output2.is_none() {
match self.future2.as_mut().poll(cx) {
Poll::Ready(output) => {
self.output2 = Some(output);
}
Poll::Pending => {}
}
}
// Check if both complete
match (self.output1.take(), self.output2.take()) {
(Some(out1), Some(out2)) => Poll::Ready((out1, out2)),
(out1, out2) => {
// Restore outputs for next poll
self.output1 = out1;
self.output2 = out2;
Poll::Pending
}
}
}
}
// Better: Zero-allocation join for up to 3 futures
pin_project_lite::pin_project! {
pub struct Join3<F1, F2, F3>
where
F1: Future,
F2: Future,
F3: Future,
{
#[pin]
future1: F1,
#[pin]
future2: F2,
#[pin]
future3: F3,
output1: Option<F1::Output>,
output2: Option<F2::Output>,
output3: Option<F3::Output>,
}
}
impl<F1, F2, F3> Future for Join3<F1, F2, F3>
where
F1: Future,
F2: Future,
F3: Future,
{
type Output = (F1::Output, F2::Output, F3::Output);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
// Poll all futures
if this.output1.is_none() {
if let Poll::Ready(out) = this.future1.poll(cx) {
*this.output1 = Some(out);
}
}
if this.output2.is_none() {
if let Poll::Ready(out) = this.future2.poll(cx) {
*this.output2 = Some(out);
}
}
if this.output3.is_none() {
if let Poll::Ready(out) = this.future3.poll(cx) {
*this.output3 = Some(out);
}
}
// Check if all complete
match (this.output1.take(), this.output2.take(), this.output3.take()) {
(Some(o1), Some(o2), Some(o3)) => Poll::Ready((o1, o2, o3)),
(o1, o2, o3) => {
*this.output1 = o1;
*this.output2 = o2;
*this.output3 = o3;
Poll::Pending
}
}
}
}
// Usage:
async fn parallel_api_calls() {
let (users, posts, comments) = Join3::new(
fetch_users(),
fetch_posts(),
fetch_comments(),
).await;
let dashboard = Dashboard { users, posts, comments };
render(dashboard);
}
Key insight: Each poll attempts to advance all incomplete futures. The waker is shared - waking one future causes the entire Join to be re-polled, which advances all others.
---
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
pin_project_lite::pin_project! {
/// A future that retries an operation with exponential backoff
pub struct Retry<F, Fut, T, E>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, E>>,
{
/// Factory for creating new attempts
factory: F,
/// Current attempt
#[pin]
current: Option<Fut>,
/// Delay between retries
#[pin]
delay: Option<Delay>,
/// Retry configuration
config: RetryConfig,
/// Current state
attempt: usize,
backoff: Duration,
}
}
pub struct RetryConfig {
pub max_attempts: usize,
pub initial_backoff: Duration,
pub max_backoff: Duration,
pub multiplier: f64,
}
impl<F, Fut, T, E> Retry<F, Fut, T, E>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, E>>,
{
pub fn new(factory: F, config: RetryConfig) -> Self {
Retry {
factory,
current: None,
delay: None,
config,
attempt: 0,
backoff: config.initial_backoff,
}
}
}
impl<F, Fut, T, E> Future for Retry<F, Fut, T, E>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, E>>,
{
type Output = Result<T, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
// If we're in a delay, poll it first
if let Some(delay) = this.delay.as_mut().as_pin_mut() {
match delay.poll(cx) {
Poll::Ready(()) => {
this.delay.set(None);
// Delay complete, start next attempt
}
Poll::Pending => return Poll::Pending,
}
}
// Start new attempt if needed
if this.current.is_none() {
if *this.attempt >= this.config.max_attempts {
// Exhausted all retries - return last error
// (In production, wrap in custom error type)
panic!("All retry attempts exhausted");
}
*this.attempt += 1;
this.current.set(Some((this.factory)()));
}
// Poll current attempt
match this.current.as_mut().as_pin_mut().unwrap().poll(cx) {
Poll::Ready(Ok(value)) => {
// Success!
return Poll::Ready(Ok(value));
}
Poll::Ready(Err(err)) => {
// Failed - schedule retry
this.current.set(None);
if *this.attempt >= this.config.max_attempts {
return Poll::Ready(Err(err));
}
// Calculate backoff
let delay_duration = *this.backoff;
*this.backoff = Duration::from_secs_f64(
(this.backoff.as_secs_f64() * this.config.multiplier)
.min(this.config.max_backoff.as_secs_f64())
);
this.delay.set(Some(Delay::new(delay_duration)));
// Loop to poll delay immediately
}
Poll::Pending => return Poll::Pending,
}
}
}
}
// Usage:
async fn fetch_user_with_retry(id: u64) -> Result<User, ApiError> {
Retry::new(
|| async move {
reqwest::get(&format!("https://api.example.com/users/{}", id))
.await?
.json()
.await
},
RetryConfig {
max_attempts: 5,
initial_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(10),
multiplier: 2.0,
},
).await
}
State transitions:
---
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use std::sync::{Arc, Mutex};
/// A oneshot channel: send one value, receive once
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let shared = Arc::new(Mutex::new(ChannelState {
value: None,
waker: None,
closed: false,
}));
let sender = Sender { shared: shared.clone() };
let receiver = Receiver { shared };
(sender, receiver)
}
struct ChannelState<T> {
value: Option<T>,
waker: Option<Waker>,
closed: bool,
}
pub struct Sender<T> {
shared: Arc<Mutex<ChannelState<T>>>,
}
impl<T> Sender<T> {
pub fn send(self, value: T) -> Result<(), T> {
let mut state = self.shared.lock().unwrap();
if state.closed {
return Err(value);
}
state.value = Some(value);
// Wake the receiver if it's waiting
if let Some(waker) = state.waker.take() {
waker.wake();
}
Ok(())
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let mut state = self.shared.lock().unwrap();
state.closed = true;
// Wake receiver so it can see the channel is closed
if let Some(waker) = state.waker.take() {
waker.wake();
}
}
}
pub struct Receiver<T> {
shared: Arc<Mutex<ChannelState<T>>>,
}
#[derive(Debug)]
pub enum RecvError {
Closed,
}
impl<T> Future for Receiver<T> {
type Output = Result<T, RecvError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut state = self.shared.lock().unwrap();
if let Some(value) = state.value.take() {
// Value available!
return Poll::Ready(Ok(value));
}
if state.closed {
// Sender dropped without sending
return Poll::Ready(Err(RecvError::Closed));
}
// Store waker for when sender sends
state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
// Usage:
async fn request_response_pattern() {
let (tx, rx) = channel();
// Spawn task to compute result
tokio::spawn(async move {
let result = expensive_computation().await;
let _ = tx.send(result);
});
// Wait for result
match rx.await {
Ok(result) => println!("Got result: {}", result),
Err(_) => println!("Computation failed"),
}
}
// Advanced: MPSC channel receiver (multi-producer, single-consumer)
pub struct MpscReceiver<T> {
shared: Arc<Mutex<MpscState<T>>>,
}
struct MpscState<T> {
queue: std::collections::VecDeque<T>,
waker: Option<Waker>,
sender_count: usize,
}
impl<T> Future for MpscReceiver<T> {
type Output = Option<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut state = self.shared.lock().unwrap();
if let Some(value) = state.queue.pop_front() {
return Poll::Ready(Some(value));
}
if state.sender_count == 0 {
// All senders dropped
return Poll::Ready(None);
}
// Wait for next message
state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
Key insight: The channel's shared state stores both the value and the waker. When the sender sends, it wakes the receiver's task.
---
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
Components:
use std::task::{Waker, RawWaker, RawWakerVTable};
use std::sync::Arc;
// Waker is a type-erased handle that can wake a task
pub struct Waker {
waker: RawWaker,
}
impl Waker {
pub fn wake(self) {
// Notify the executor to poll this task again
}
pub fn wake_by_ref(&self) {
// Wake without consuming the waker
}
pub fn clone(&self) -> Waker {
// Wakers must be cloneable
}
}
// How executors create wakers:
struct Task {
future: Pin<Box<dyn Future<Output = ()>>>,
// ... other task state
}
impl Task {
fn create_waker(self: &Arc<Self>) -> Waker {
// Type-erase the Arc<Task> into a RawWaker
let raw = RawWaker::new(
Arc::into_raw(self.clone()) as *const (),
&VTABLE,
);
unsafe { Waker::from_raw(raw) }
}
}
static VTABLE: RawWakerVTable = RawWakerVTable::new(
|data| {
// clone
let task = unsafe { Arc::from_raw(data as *const Task) };
let clone = task.clone();
std::mem::forget(task); // Don't drop original
RawWaker::new(Arc::into_raw(clone) as *const (), &VTABLE)
},
|data| {
// wake
let task = unsafe { Arc::from_raw(data as *const Task) };
EXECUTOR.schedule(task);
},
|data| {
// wake_by_ref
let task = unsafe { &*(data as *const Task) };
EXECUTOR.schedule(Arc::new(task.clone()));
},
|data| {
// drop
unsafe { Arc::from_raw(data as *const Task) };
},
);
How wakers work:
async fn example() {
let mut x = String::from("hello");
let x_ref = &x; // Borrow x
some_async_operation().await; // Suspension point
println!("{}", x_ref); // Use the reference after resumption
}
When compiled to a state machine:
enum ExampleFuture {
State0 {
x: String,
x_ref: *const String, // Pointer to x field!
},
// ...
}
If this struct moves in memory, x_ref becomes invalid!
use std::pin::Pin;
use std::marker::PhantomPinned;
struct SelfReferential {
data: String,
pointer: *const String,
_pin: PhantomPinned, // Makes this !Unpin
}
impl SelfReferential {
fn new(data: String) -> Pin<Box<Self>> {
let mut boxed = Box::pin(SelfReferential {
data,
pointer: std::ptr::null(),
_pin: PhantomPinned,
});
// Safe: we're about to pin this
let self_ptr: *const String = &boxed.data;
unsafe {
let mut_ref = Pin::as_mut(&mut boxed);
Pin::get_unchecked_mut(mut_ref).pointer = self_ptr;
}
boxed
}
}
Key rules:
Pin>
ensures T won't move if T: !UnpinUnpin (safe to move even when pinned)!Unpin (contain self-references)// Simplified executor
struct Executor {
ready_queue: Mutex<VecDeque<Arc<Task>>>,
}
impl Executor {
fn run(&self) {
loop {
// Get ready tasks
let task = {
let mut queue = self.ready_queue.lock().unwrap();
queue.pop_front()
};
if let Some(task) = task {
// Poll the future
let waker = task.create_waker();
let mut context = Context::from_waker(&waker);
match task.future.as_mut().poll(&mut context) {
Poll::Ready(()) => {
// Task complete
}
Poll::Pending => {
// Task parked, will be woken later
}
}
} else {
// No ready tasks, wait for I/O events
REACTOR.wait_for_events();
}
}
}
fn schedule(&self, task: Arc<Task>) {
let mut queue = self.ready_queue.lock().unwrap();
queue.push_back(task);
}
}
// Simplified reactor (I/O event loop)
struct Reactor {
epoll_fd: RawFd,
registrations: Mutex<HashMap<RawFd, Waker>>,
}
impl Reactor {
fn wait_for_events(&self) {
let mut events = [epoll::Event::new(epoll::Events::empty(), 0); 1024];
let n = epoll::wait(self.epoll_fd, &mut events, -1).unwrap();
for event in &events[..n] {
let fd = event.data() as RawFd;
let regs = self.registrations.lock().unwrap();
if let Some(waker) = regs.get(&fd) {
waker.wake_by_ref();
}
}
}
}
Architecture:
---
Box)tokio::join!, tokio::select!futures::future::join_allBox simplifies code---
// WRONG: Blocking destroys async runtime performance
impl Future for BadFuture {
type Output = String;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<String> {
// This blocks the executor thread!
let result = std::fs::read_to_string("file.txt").unwrap();
Poll::Ready(result)
}
}
// CORRECT: Use async I/O or spawn_blocking
impl Future for GoodFuture {
type Output = String;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<String> {
// Non-blocking async I/O
match self.async_file_read.poll(cx) {
Poll::Ready(result) => Poll::Ready(result),
Poll::Pending => Poll::Pending,
}
}
}
// WRONG: Returns Pending but never wakes
impl Future for BrokenFuture {
type Output = i32;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<i32> {
if self.ready {
Poll::Ready(42)
} else {
// FORGOT TO STORE WAKER - task will never wake!
Poll::Pending
}
}
}
// CORRECT: Always store waker before returning Pending
impl Future for WorkingFuture {
type Output = i32;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<i32> {
if self.ready {
Poll::Ready(42)
} else {
self.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
// WRONG: Moving out of Pin
impl Future for BadPin {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// This is UB if self is !Unpin!
let moved = unsafe { Pin::into_inner_unchecked(self) };
std::mem::swap(&mut moved.field, &mut other_field);
Poll::Pending
}
}
// CORRECT: Use pin_project or careful unsafe
use pin_project_lite::pin_project;
pin_project! {
struct GoodPin {
#[pin]
future: SomeFuture,
non_pinned: i32,
}
}
impl Future for GoodPin {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let this = self.project();
this.future.poll(cx)
}
}
// WRONG: Allows polling after completion
impl Future for BadCompletion {
type Output = i32;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<i32> {
// Returns Ready multiple times!
Poll::Ready(42)
}
}
// CORRECT: Panic or fuse after first Ready
impl Future for GoodCompletion {
type Output = i32;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<i32> {
if self.completed {
panic!("polled after completion");
}
self.completed = true;
Poll::Ready(42)
}
}
// INEFFICIENT: Unnecessary !Unpin
struct SlowFuture {
value: i32,
_pin: PhantomPinned, // Not needed!
}
// BETTER: Most futures can be Unpin
struct FastFuture {
value: i32,
}
// FastFuture is automatically Unpin, easier to use
---
// This async function:
async fn add(a: i32, b: i32) -> i32 {
a + b
}
// Compiles to roughly this (zero overhead):
struct AddFuture {
a: i32,
b: i32,
state: u8,
}
impl Future for AddFuture {
type Output = i32;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<i32> {
Poll::Ready(self.a + self.b)
}
}
Costs:
async fn large_state() {
let big_array = [0u8; 1024];
async_op().await;
use_array(&big_array);
}
// Compiles to:
enum LargeStateFuture {
State0 { big_array: [u8; 1024], async_op: AsyncOp },
State1 { big_array: [u8; 1024] },
}
// Size = 1024 bytes + size of AsyncOp + discriminant
Optimization strategies:
Box<[u8; 1024]>| Aspect | Async Future | OS Thread |
|--------|-------------|-----------|
| Context switch | 0ns (no switch, just function call) | 1-10 microseconds |
| Memory per task | 100-1000 bytes (state machine) | 1-8 MB (stack) |
| Creation cost | Nanoseconds | Microseconds |
| Max concurrent | Millions | Thousands |
| Overhead | Poll function call | Kernel scheduling |
When async wins:---
Create a future that yields control once, then completes:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct YieldNow {
yielded: bool,
}
impl YieldNow {
pub fn new() -> Self {
YieldNow { yielded: false }
}
}
impl Future for YieldNow {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.yielded {
// TODO: Return Ready
} else {
// TODO: Set yielded = true, wake immediately, return Pending
}
}
}
// Test:
#[tokio::main]
async fn main() {
println!("Before yield");
YieldNow::new().await;
println!("After yield");
}
impl Future for YieldNow {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if self.yielded {
Poll::Ready(())
} else {
self.yielded = true;
cx.waker().wake_by_ref(); // Schedule re-poll
Poll::Pending
}
}
}
Implement select() that returns the first future to complete:
pub struct Select<F1, F2> {
future1: Pin<Box<F1>>,
future2: Pin<Box<F2>>,
}
impl<F1, F2> Select<F1, F2>
where
F1: Future,
F2: Future,
{
pub fn new(future1: F1, future2: F2) -> Self {
// TODO
}
}
impl<F1, F2> Future for Select<F1, F2>
where
F1: Future,
F2: Future,
{
type Output = Either<F1::Output, F2::Output>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// TODO: Poll both, return first that's Ready
}
}
pub enum Either<L, R> {
Left(L),
Right(R),
}
// Test:
async fn test_select() {
let result = Select::new(
Delay::new(Duration::from_millis(100)),
Delay::new(Duration::from_millis(200)),
).await;
assert!(matches!(result, Either::Left(_)));
}
impl<F1, F2> Future for Select<F1, F2>
where
F1: Future,
F2: Future,
{
type Output = Either<F1::Output, F2::Output>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Try first future
if let Poll::Ready(out) = self.future1.as_mut().poll(cx) {
return Poll::Ready(Either::Left(out));
}
// Try second future
if let Poll::Ready(out) = self.future2.as_mut().poll(cx) {
return Poll::Ready(Either::Right(out));
}
// Both pending
Poll::Pending
}
}
Implement a future for reading a file using io_uring (Linux):
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct IoUringRead {
// File descriptor
fd: i32,
// Buffer to read into
buffer: Vec<u8>,
// Submission queue entry ID
sqe_id: Option<u64>,
// Waker for completion
waker: Option<Waker>,
}
impl IoUringRead {
pub fn new(fd: i32, size: usize) -> Self {
// TODO: Create read future
}
}
impl Future for IoUringRead {
type Output = io::Result<Vec<u8>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// TODO:
// 1. If not submitted, submit read to io_uring
// 2. Check completion queue for result
// 3. Store waker if not complete
// 4. Return Ready with data or Pending
}
}
// Integration with io_uring reactor needed
You'll need:
---
// Simplified version of tokio's Sleep
pub struct Sleep {
deadline: Instant,
entry: TimerEntry,
}
impl Future for Sleep {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if Instant::now() >= self.deadline {
return Poll::Ready(());
}
self.entry.register_waker(cx.waker());
Poll::Pending
}
}
// Timer wheel implementation:
// - Hierarchical timing wheels for O(1) insertion
// - Background thread processes expired timers
// - Wakes futures when deadline reached
// From futures crate
pub fn join<F1, F2>(f1: F1, f2: F2) -> Join<F1, F2>
where
F1: Future,
F2: Future,
{
Join::new(f1, f2)
}
pub fn select<F1, F2>(f1: F1, f2: F2) -> Select<F1, F2>
where
F1: Future,
F2: Future,
{
Select::new(f1, f2)
}
// Implementations follow patterns shown earlier
// Oneshot channel receiver
impl<T> Future for OneshotReceiver<T> {
type Output = Result<T, RecvError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.inner.poll_recv(cx)
}
}
// MPSC channel receiver
impl<T> Stream for MpscReceiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<T>>
{
self.inner.poll_recv(cx)
}
}
// TcpStream read
impl AsyncRead for TcpStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
// Register with async-std reactor
// Poll underlying socket
// Return Ready or Pending with waker registered
}
}
---
---
Custom Futures are the foundation of async Rust:
Key Takeaways:async fn - only drop to custom futures when you need precise control over the state machine or are building libraries.
Understanding custom futures makes you a better async Rust programmer, even if you rarely implement them directly. You'll understand performance characteristics, debug issues, and design better APIs.
Now go build some legendary async systems!
Run this code in the official Rust Playground