Home/Async/Await & Futures/Stream Processing

Stream Processing

Async iterators and backpressure

advanced
streamasync-iteratorbackpressure
🎮 Interactive Playground

What is a Stream?

A Stream is Rust's async equivalent of an Iterator - it produces a sequence of values asynchronously. While an Iterator blocks until the next value is ready, a Stream yields control when waiting for values, allowing other tasks to run.

The Stream Trait

use std::task::{Context, Poll};
use std::pin::Pin;

pub trait Stream {
    type Item;
    
    // The heart of async iteration
    fn poll_next(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;
}

// Comparison with Iterator:
pub trait Iterator {
    type Item;
    
    fn next(&mut self) -> Option<Self::Item>;  // Blocking
}
Key differences:
  • Stream::poll_next: Returns Poll> - can be Pending
  • Iterator::next: Returns Option - blocks until ready
  • Pinning: Streams need Pin<&mut Self> for self-referential state
  • Context: Access to waker for async notifications

Stream vs Iterator

// Synchronous iterator - blocks on each item
let mut iter = vec![1, 2, 3].into_iter();
while let Some(item) = iter.next() {
    process(item);  // Blocks until ready
}

// Asynchronous stream - yields on each item
use tokio_stream::StreamExt;

let mut stream = tokio_stream::iter(vec![1, 2, 3]);
while let Some(item) = stream.next().await {
    process(item).await;  // Yields to other tasks
}

Why Streams?

Use streams for:
  • Network data: HTTP responses, WebSocket messages, gRPC streams
  • Large datasets: Database query results, file processing
  • Event processing: Log streams, sensor data, user events
  • Backpressure: Flow control when consumer is slower than producer
Core benefit: Memory-efficient processing of unbounded sequences without blocking.

---

Real-World Example 1: WebSocket Message Stream (Network Programming)

Problem: Handle continuous WebSocket messages with backpressure

use futures::stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
use tokio_tungstenite::tungstenite::Message;

/// A stream of WebSocket messages with backpressure
pub struct WebSocketStream {
    /// Receiver for incoming messages
    receiver: mpsc::Receiver<Message>,
    /// Connection handle for backpressure
    flow_control: WebSocketFlowControl,
}

struct WebSocketFlowControl {
    /// Current buffer size
    buffered: usize,
    /// Maximum buffer before applying backpressure
    max_buffer: usize,
}

impl WebSocketStream {
    pub fn new(
        ws_read: tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
    ) -> Self {
        let (tx, rx) = mpsc::channel(100);
        
        // Spawn task to read from WebSocket
        tokio::spawn(async move {
            use futures::StreamExt;
            
            let mut ws_read = ws_read;
            while let Some(msg_result) = ws_read.next().await {
                match msg_result {
                    Ok(Message::Text(text)) => {
                        if tx.send(Message::Text(text)).await.is_err() {
                            break; // Receiver dropped
                        }
                    }
                    Ok(Message::Binary(data)) => {
                        if tx.send(Message::Binary(data)).await.is_err() {
                            break;
                        }
                    }
                    Ok(Message::Close(_)) => break,
                    Err(e) => {
                        eprintln!("WebSocket error: {}", e);
                        break;
                    }
                    _ => {}
                }
            }
        });
        
        WebSocketStream {
            receiver: rx,
            flow_control: WebSocketFlowControl {
                buffered: 0,
                max_buffer: 100,
            },
        }
    }
}

impl Stream for WebSocketStream {
    type Item = Message;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>> {
        // Poll the channel for next message
        match self.receiver.poll_recv(cx) {
            Poll::Ready(Some(msg)) => {
                self.flow_control.buffered = self.flow_control.buffered.saturating_sub(1);
                Poll::Ready(Some(msg))
            }
            Poll::Ready(None) => {
                // Channel closed - WebSocket disconnected
                Poll::Ready(None)
            }
            Poll::Pending => {
                // No messages available
                Poll::Pending
            }
        }
    }
}

// Usage: Chat server handling multiple clients
async fn handle_websocket_client(
    ws_stream: tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
    client_id: u64,
) {
    let mut message_stream = WebSocketStream::new(ws_stream);
    
    while let Some(message) = message_stream.next().await {
        match message {
            Message::Text(text) => {
                println!("[Client {}] Received: {}", client_id, text);
                
                // Process message (database write, broadcast, etc.)
                process_chat_message(client_id, &text).await;
            }
            Message::Binary(data) => {
                println!("[Client {}] Received {} bytes", client_id, data.len());
                process_binary_data(client_id, data).await;
            }
            _ => {}
        }
    }
    
    println!("[Client {}] Disconnected", client_id);
}

// With backpressure: slow down reading if processing is slow
async fn handle_websocket_with_backpressure(
    ws_stream: tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
) {
    use tokio::time::{Duration, sleep};
    
    let mut message_stream = WebSocketStream::new(ws_stream);
    
    while let Some(message) = message_stream.next().await {
        // Slow processing (e.g., database write)
        if let Message::Text(text) = message {
            match save_to_database(&text).await {
                Ok(_) => println!("Saved: {}", text),
                Err(e) => {
                    eprintln!("Database error: {}", e);
                    // Backpressure: slow down reading
                    sleep(Duration::from_millis(100)).await;
                }
            }
        }
    }
}

// Combining multiple WebSocket streams
async fn multiplex_websockets(
    streams: Vec<tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>>,
) {
    use futures::stream::{self, StreamExt};
    
    // Merge all WebSocket streams into one
    let mut merged = stream::select_all(
        streams.into_iter().map(WebSocketStream::new)
    );
    
    while let Some(message) = merged.next().await {
        // Handle message from any WebSocket
        broadcast_to_all_clients(message).await;
    }
}
How it works:
  1. Channel-based: WebSocket reader pushes to bounded channel
  2. Natural backpressure: Channel fills up when consumer is slow
  3. Stream trait: Clean async iteration interface
  4. Error handling: Closes stream on errors or disconnection

---

Real-World Example 2: Database Result Stream (Web/Backend)

Problem: Stream large query results without loading everything into memory

use futures::stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_postgres::{Client, Row, RowStream};

/// Streams database query results with cursor-based pagination
pub struct DatabaseResultStream {
    /// Connection to database
    client: Client,
    /// Current batch of rows
    current_batch: RowStream,
    /// Cursor position for pagination
    cursor: Option<i64>,
    /// Query to execute
    query: String,
    /// Batch size for each fetch
    batch_size: i64,
    /// Total rows streamed
    rows_streamed: usize,
}

impl DatabaseResultStream {
    pub async fn new(
        client: Client,
        query: String,
        batch_size: i64,
    ) -> Result<Self, tokio_postgres::Error> {
        // Execute first batch
        let current_batch = client
            .query_raw(&query, &[])
            .await?;
        
        Ok(DatabaseResultStream {
            client,
            current_batch,
            cursor: Some(0),
            query,
            batch_size,
            rows_streamed: 0,
        })
    }
}

impl Stream for DatabaseResultStream {
    type Item = Result<Row, tokio_postgres::Error>;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>> {
        // Try to get next row from current batch
        match Pin::new(&mut self.current_batch).poll_next(cx) {
            Poll::Ready(Some(Ok(row))) => {
                self.rows_streamed += 1;
                Poll::Ready(Some(Ok(row)))
            }
            Poll::Ready(Some(Err(e))) => {
                Poll::Ready(Some(Err(e)))
            }
            Poll::Ready(None) => {
                // Current batch exhausted - need to fetch next batch
                // In real implementation, would spawn fetch task
                Poll::Ready(None)
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

// Better implementation: with explicit cursor pagination
pub struct CursorStream {
    client: Client,
    query: String,
    cursor: i64,
    batch_size: i64,
    current_batch: Vec<Row>,
    batch_index: usize,
    exhausted: bool,
}

impl CursorStream {
    pub async fn new(
        client: Client,
        base_query: String,
        batch_size: i64,
    ) -> Result<Self, tokio_postgres::Error> {
        let mut stream = CursorStream {
            client,
            query: base_query,
            cursor: 0,
            batch_size,
            current_batch: Vec::new(),
            batch_index: 0,
            exhausted: false,
        };
        
        // Load first batch
        stream.fetch_next_batch().await?;
        
        Ok(stream)
    }
    
    async fn fetch_next_batch(&mut self) -> Result<(), tokio_postgres::Error> {
        let query = format!(
            "{} LIMIT {} OFFSET {}",
            self.query, self.batch_size, self.cursor
        );
        
        let rows = self.client.query(&query, &[]).await?;
        
        if rows.is_empty() {
            self.exhausted = true;
        } else {
            self.cursor += rows.len() as i64;
            self.current_batch = rows;
            self.batch_index = 0;
        }
        
        Ok(())
    }
}

impl Stream for CursorStream {
    type Item = Result<Row, tokio_postgres::Error>;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>> {
        if self.exhausted {
            return Poll::Ready(None);
        }
        
        // Return next item from current batch
        if self.batch_index < self.current_batch.len() {
            let row = self.current_batch[self.batch_index].clone();
            self.batch_index += 1;
            return Poll::Ready(Some(Ok(row)));
        }
        
        // Need to fetch next batch - would use async task in real implementation
        // For now, signal that we need more data
        Poll::Pending
    }
}

// Usage: Process large dataset without OOM
async fn process_all_users(db: Client) -> Result<(), tokio_postgres::Error> {
    let query = "SELECT id, name, email FROM users ORDER BY id".to_string();
    let mut stream = CursorStream::new(db, query, 1000).await?;
    
    let mut count = 0;
    while let Some(result) = stream.next().await {
        let row = result?;
        
        let id: i64 = row.get(0);
        let name: String = row.get(1);
        let email: String = row.get(2);
        
        // Process user (send email, generate report, etc.)
        process_user(id, &name, &email).await;
        
        count += 1;
        if count % 10000 == 0 {
            println!("Processed {} users", count);
        }
    }
    
    println!("Total users processed: {}", count);
    Ok(())
}

// Stream with transformation pipeline
async fn export_users_to_csv(db: Client, output_path: &str) -> Result<(), Box<dyn std::error::Error>> {
    use tokio::io::AsyncWriteExt;
    
    let query = "SELECT id, name, email FROM users ORDER BY id".to_string();
    let stream = CursorStream::new(db, query, 1000).await?;
    
    let mut file = tokio::fs::File::create(output_path).await?;
    
    // Write CSV header
    file.write_all(b"id,name,email\n").await?;
    
    // Stream and transform
    stream
        .map(|result| {
            result.map(|row| {
                let id: i64 = row.get(0);
                let name: String = row.get(1);
                let email: String = row.get(2);
                format!("{},{},{}\n", id, name, email)
            })
        })
        .try_for_each(|csv_line| async {
            file.write_all(csv_line.as_bytes()).await?;
            Ok(())
        })
        .await?;
    
    file.flush().await?;
    Ok(())
}

// Parallel processing with bounded concurrency
async fn process_users_parallel(db: Client) -> Result<(), Box<dyn std::error::Error>> {
    use futures::stream::StreamExt;
    
    let query = "SELECT id, name, email FROM users ORDER BY id".to_string();
    let stream = CursorStream::new(db, query, 1000).await?;
    
    // Process up to 10 users concurrently
    stream
        .map(|result| async move {
            let row = result?;
            let id: i64 = row.get(0);
            
            // Expensive operation (API call, computation, etc.)
            expensive_processing(id).await?;
            
            Ok::<_, Box<dyn std::error::Error>>(())
        })
        .buffer_unordered(10)  // Concurrent processing
        .try_collect::<Vec<_>>()
        .await?;
    
    Ok(())
}
Key features:
  1. Cursor pagination: Fetches in batches, not all at once
  2. Memory efficient: Only holds one batch in memory
  3. Backpressure: Stops fetching when consumer is slow
  4. Composable: Works with StreamExt combinators

---

Real-World Example 3: File Line Reader (Systems Programming)

Problem: Read large files line-by-line without blocking

use futures::stream::Stream;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
use tokio::fs::File;

/// Async stream of lines from a file
pub struct FileLineStream {
    reader: BufReader<File>,
    buffer: String,
    done: bool,
}

impl FileLineStream {
    pub async fn new(path: &str) -> io::Result<Self> {
        let file = File::open(path).await?;
        Ok(FileLineStream {
            reader: BufReader::with_capacity(8192, file),
            buffer: String::with_capacity(1024),
            done: false,
        })
    }
}

impl Stream for FileLineStream {
    type Item = io::Result<String>;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>> {
        if self.done {
            return Poll::Ready(None);
        }
        
        // Clear buffer for next line
        self.buffer.clear();
        
        // Create pinned reader
        let mut reader = Pin::new(&mut self.reader);
        
        // Poll for next line
        match reader.poll_read_line(cx, &mut self.buffer) {
            Poll::Ready(Ok(0)) => {
                // EOF
                self.done = true;
                Poll::Ready(None)
            }
            Poll::Ready(Ok(_bytes)) => {
                // Got a line
                let line = self.buffer.clone();
                Poll::Ready(Some(Ok(line)))
            }
            Poll::Ready(Err(e)) => {
                self.done = true;
                Poll::Ready(Some(Err(e)))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

// Better: Using tokio's built-in lines() stream
use tokio::io::Lines;

pub async fn read_file_lines(path: &str) -> io::Result<Lines<BufReader<File>>> {
    let file = File::open(path).await?;
    let reader = BufReader::new(file);
    Ok(reader.lines())
}

// Usage: Log file processing
async fn analyze_log_file(path: &str) -> io::Result<()> {
    use futures::stream::StreamExt;
    
    let file = File::open(path).await?;
    let reader = BufReader::new(file);
    let mut lines = reader.lines();
    
    let mut error_count = 0;
    let mut warning_count = 0;
    
    while let Some(line) = lines.next_line().await? {
        if line.contains("ERROR") {
            error_count += 1;
            println!("Error: {}", line);
        } else if line.contains("WARN") {
            warning_count += 1;
        }
    }
    
    println!("Errors: {}, Warnings: {}", error_count, warning_count);
    Ok(())
}

// Stream pipeline: filter, map, collect
async fn extract_error_messages(path: &str) -> io::Result<Vec<String>> {
    use futures::stream::{StreamExt, TryStreamExt};
    
    let file = File::open(path).await?;
    let reader = BufReader::new(file);
    
    reader
        .lines()
        .try_filter(|line| {
            futures::future::ready(line.contains("ERROR"))
        })
        .try_map(|line| {
            // Extract timestamp and message
            let parts: Vec<&str> = line.splitn(3, ' ').collect();
            if parts.len() >= 3 {
                futures::future::ok(parts[2].to_string())
            } else {
                futures::future::ok(line)
            }
        })
        .try_collect()
        .await
}

// Processing multiple files concurrently
async fn process_log_directory(dir_path: &str) -> io::Result<()> {
    use futures::stream::{self, StreamExt};
    use tokio::fs;
    
    // Read directory entries
    let mut entries = fs::read_dir(dir_path).await?;
    let mut files = Vec::new();
    
    while let Some(entry) = entries.next_entry().await? {
        let path = entry.path();
        if path.extension().and_then(|s| s.to_str()) == Some("log") {
            files.push(path);
        }
    }
    
    // Process all log files concurrently
    stream::iter(files)
        .map(|path| async move {
            println!("Processing {:?}", path);
            analyze_log_file(path.to_str().unwrap()).await
        })
        .buffer_unordered(10)
        .collect::<Vec<_>>()
        .await;
    
    Ok(())
}

// Rate-limited file processing
use tokio::time::{interval, Duration};

async fn rate_limited_processing(path: &str) -> io::Result<()> {
    use futures::stream::StreamExt;
    
    let file = File::open(path).await?;
    let reader = BufReader::new(file);
    let mut lines = reader.lines();
    
    let mut rate_limiter = interval(Duration::from_millis(100));
    
    while let Some(line) = lines.next_line().await? {
        // Wait for rate limit
        rate_limiter.tick().await;
        
        // Process line (e.g., API call)
        send_to_api(&line).await?;
    }
    
    Ok(())
}
Performance characteristics:
  • Buffering: 8KB buffer for efficient I/O
  • Memory: Only one line in memory at a time
  • Async: Doesn't block on disk I/O
  • Backpressure: Stops reading when processing is slow

---

Real-World Example 4: Server-Sent Events Stream (Web/Real-time)

Problem: Stream real-time events to web clients

use futures::stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::time::{interval, Interval};

/// Server-Sent Events (SSE) stream
pub struct SseStream {
    /// Broadcast receiver for events
    receiver: broadcast::Receiver<ServerEvent>,
    /// Heartbeat interval to keep connection alive
    heartbeat: Interval,
}

#[derive(Clone, Debug)]
pub enum ServerEvent {
    Message { id: u64, data: String },
    Notification { level: String, text: String },
    Update { resource: String, data: String },
}

impl SseStream {
    pub fn new(receiver: broadcast::Receiver<ServerEvent>) -> Self {
        SseStream {
            receiver,
            heartbeat: interval(Duration::from_secs(15)),
        }
    }
}

impl Stream for SseStream {
    type Item = Result<String, String>;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>> {
        // Try to receive event
        match self.receiver.try_recv() {
            Ok(event) => {
                // Format as SSE
                let sse_data = match event {
                    ServerEvent::Message { id, data } => {
                        format!("id: {}\nevent: message\ndata: {}\n\n", id, data)
                    }
                    ServerEvent::Notification { level, text } => {
                        format!("event: notification\ndata: {{\"level\":\"{}\",\"text\":\"{}\"}}\n\n", level, text)
                    }
                    ServerEvent::Update { resource, data } => {
                        format!("event: update\ndata: {{\"resource\":\"{}\",\"data\":\"{}\"}}\n\n", resource, data)
                    }
                };
                return Poll::Ready(Some(Ok(sse_data)));
            }
            Err(broadcast::error::TryRecvError::Empty) => {
                // No events - check heartbeat
            }
            Err(broadcast::error::TryRecvError::Lagged(_)) => {
                // Receiver too slow - send error
                return Poll::Ready(Some(Err("Client lagging".to_string())));
            }
            Err(broadcast::error::TryRecvError::Closed) => {
                // Channel closed
                return Poll::Ready(None);
            }
        }
        
        // Check if heartbeat should fire
        if let Poll::Ready(_) = self.heartbeat.poll_tick(cx) {
            // Send heartbeat comment
            return Poll::Ready(Some(Ok(": heartbeat\n\n".to_string())));
        }
        
        // Register waker for next event
        cx.waker().wake_by_ref();
        Poll::Pending
    }
}

// HTTP handler for SSE endpoint
use axum::{
    response::{sse::{Event, KeepAlive, Sse}, IntoResponse},
    extract::State,
};

pub async fn sse_handler(
    State(event_tx): State<broadcast::Sender<ServerEvent>>,
) -> impl IntoResponse {
    let receiver = event_tx.subscribe();
    let stream = SseStream::new(receiver);
    
    Sse::new(stream)
        .keep_alive(KeepAlive::default())
}

// Event broadcaster
pub struct EventBroadcaster {
    sender: broadcast::Sender<ServerEvent>,
}

impl EventBroadcaster {
    pub fn new() -> Self {
        let (sender, _) = broadcast::channel(1000);
        EventBroadcaster { sender }
    }
    
    pub fn broadcast(&self, event: ServerEvent) {
        let _ = self.sender.send(event);
    }
    
    pub fn subscribe(&self) -> broadcast::Receiver<ServerEvent> {
        self.sender.subscribe()
    }
}

// Usage: Real-time dashboard
async fn run_dashboard_server() {
    use axum::{Router, routing::get};
    
    let broadcaster = EventBroadcaster::new();
    let event_tx = broadcaster.sender.clone();
    
    // Spawn task to generate events
    tokio::spawn(async move {
        let mut counter = 0u64;
        loop {
            tokio::time::sleep(Duration::from_secs(1)).await;
            
            broadcaster.broadcast(ServerEvent::Message {
                id: counter,
                data: format!("Update #{}", counter),
            });
            
            counter += 1;
        }
    });
    
    let app = Router::new()
        .route("/events", get(sse_handler))
        .with_state(event_tx);
    
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

// Client reconnection with exponential backoff
pub struct ReconnectingStream<S> {
    stream: Option<S>,
    reconnect_fn: Box<dyn Fn() -> S>,
    backoff: Duration,
    max_backoff: Duration,
}

impl<S: Stream> Stream for ReconnectingStream<S> {
    type Item = S::Item;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>> {
        if let Some(stream) = &mut self.stream {
            match Pin::new(stream).poll_next(cx) {
                Poll::Ready(Some(item)) => {
                    // Reset backoff on success
                    self.backoff = Duration::from_millis(100);
                    Poll::Ready(Some(item))
                }
                Poll::Ready(None) => {
                    // Stream ended - reconnect
                    self.stream = None;
                    cx.waker().wake_by_ref();
                    Poll::Pending
                }
                Poll::Pending => Poll::Pending,
            }
        } else {
            // Reconnect
            self.stream = Some((self.reconnect_fn)());
            
            // Increase backoff
            self.backoff = (self.backoff * 2).min(self.max_backoff);
            
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}
Key features:
  1. Heartbeat: Keeps connection alive with periodic comments
  2. Backpressure detection: Detects lagging clients
  3. Broadcast: One source, many clients
  4. Reconnection: Automatic retry with exponential backoff

---

Real-World Example 5: Log Aggregation Pipeline (Observability)

Problem: Collect, transform, and route logs from multiple sources

use futures::stream::{Stream, StreamExt};
use std::collections::HashMap;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
use serde::{Serialize, Deserialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LogEntry {
    pub timestamp: i64,
    pub level: LogLevel,
    pub source: String,
    pub message: String,
    pub metadata: HashMap<String, String>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum LogLevel {
    Debug,
    Info,
    Warning,
    Error,
    Critical,
}

/// Multi-source log aggregation stream
pub struct LogAggregator {
    /// Receivers from multiple sources
    sources: Vec<mpsc::Receiver<LogEntry>>,
    /// Current source being polled
    current_index: usize,
}

impl LogAggregator {
    pub fn new(sources: Vec<mpsc::Receiver<LogEntry>>) -> Self {
        LogAggregator {
            sources,
            current_index: 0,
        }
    }
}

impl Stream for LogAggregator {
    type Item = LogEntry;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>> {
        if self.sources.is_empty() {
            return Poll::Ready(None);
        }
        
        // Round-robin polling of sources
        let mut all_pending = true;
        
        for _ in 0..self.sources.len() {
            let source = &mut self.sources[self.current_index];
            
            match source.poll_recv(cx) {
                Poll::Ready(Some(entry)) => {
                    // Got an entry
                    self.current_index = (self.current_index + 1) % self.sources.len();
                    return Poll::Ready(Some(entry));
                }
                Poll::Ready(None) => {
                    // Source exhausted - remove it
                    self.sources.remove(self.current_index);
                    if self.sources.is_empty() {
                        return Poll::Ready(None);
                    }
                    self.current_index %= self.sources.len();
                    all_pending = false;
                }
                Poll::Pending => {
                    // Try next source
                    self.current_index = (self.current_index + 1) % self.sources.len();
                }
            }
        }
        
        if all_pending {
            Poll::Pending
        } else {
            // Try again
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

// Log processing pipeline
pub struct LogPipeline {
    aggregator: LogAggregator,
    filters: Vec<Box<dyn Fn(&LogEntry) -> bool + Send>>,
    transformers: Vec<Box<dyn Fn(LogEntry) -> LogEntry + Send>>,
}

impl LogPipeline {
    pub fn new(aggregator: LogAggregator) -> Self {
        LogPipeline {
            aggregator,
            filters: Vec::new(),
            transformers: Vec::new(),
        }
    }
    
    pub fn filter<F>(mut self, predicate: F) -> Self 
    where
        F: Fn(&LogEntry) -> bool + Send + 'static,
    {
        self.filters.push(Box::new(predicate));
        self
    }
    
    pub fn transform<F>(mut self, transformer: F) -> Self 
    where
        F: Fn(LogEntry) -> LogEntry + Send + 'static,
    {
        self.transformers.push(Box::new(transformer));
        self
    }
}

impl Stream for LogPipeline {
    type Item = LogEntry;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>> {
        loop {
            match Pin::new(&mut self.aggregator).poll_next(cx) {
                Poll::Ready(Some(mut entry)) => {
                    // Apply filters
                    let mut passes = true;
                    for filter in &self.filters {
                        if !filter(&entry) {
                            passes = false;
                            break;
                        }
                    }
                    
                    if !passes {
                        continue; // Skip this entry
                    }
                    
                    // Apply transformations
                    for transformer in &self.transformers {
                        entry = transformer(entry);
                    }
                    
                    return Poll::Ready(Some(entry));
                }
                Poll::Ready(None) => return Poll::Ready(None),
                Poll::Pending => return Poll::Pending,
            }
        }
    }
}

// Usage: Complete log aggregation system
async fn run_log_aggregation_system() {
    use tokio::time::{interval, Duration};
    
    // Create log sources
    let (app_tx, app_rx) = mpsc::channel(1000);
    let (web_tx, web_rx) = mpsc::channel(1000);
    let (db_tx, db_rx) = mpsc::channel(1000);
    
    // Simulate application logs
    tokio::spawn(async move {
        let mut ticker = interval(Duration::from_millis(100));
        let mut counter = 0;
        
        loop {
            ticker.tick().await;
            
            let entry = LogEntry {
                timestamp: chrono::Utc::now().timestamp(),
                level: LogLevel::Info,
                source: "app".to_string(),
                message: format!("Processing request #{}", counter),
                metadata: HashMap::new(),
            };
            
            if app_tx.send(entry).await.is_err() {
                break;
            }
            
            counter += 1;
        }
    });
    
    // Simulate web server logs
    tokio::spawn(async move {
        let mut ticker = interval(Duration::from_millis(50));
        
        loop {
            ticker.tick().await;
            
            let entry = LogEntry {
                timestamp: chrono::Utc::now().timestamp(),
                level: LogLevel::Debug,
                source: "web".to_string(),
                message: "HTTP request received".to_string(),
                metadata: HashMap::from([
                    ("method".to_string(), "GET".to_string()),
                    ("path".to_string(), "/api/users".to_string()),
                ]),
            };
            
            if web_tx.send(entry).await.is_err() {
                break;
            }
        }
    });
    
    // Simulate database logs
    tokio::spawn(async move {
        let mut ticker = interval(Duration::from_millis(200));
        
        loop {
            ticker.tick().await;
            
            let entry = LogEntry {
                timestamp: chrono::Utc::now().timestamp(),
                level: LogLevel::Info,
                source: "database".to_string(),
                message: "Query executed".to_string(),
                metadata: HashMap::from([
                    ("duration_ms".to_string(), "45".to_string()),
                ]),
            };
            
            if db_tx.send(entry).await.is_err() {
                break;
            }
        }
    });
    
    // Create aggregation pipeline
    let aggregator = LogAggregator::new(vec![app_rx, web_rx, db_rx]);
    
    let pipeline = LogPipeline::new(aggregator)
        // Filter: only warnings and errors
        .filter(|entry| matches!(entry.level, LogLevel::Warning | LogLevel::Error))
        // Transform: add environment tag
        .transform(|mut entry| {
            entry.metadata.insert("environment".to_string(), "production".to_string());
            entry
        });
    
    // Process logs with batching
    let mut batch = Vec::new();
    let batch_size = 100;
    
    tokio::pin!(pipeline);
    
    while let Some(entry) = pipeline.next().await {
        batch.push(entry);
        
        if batch.len() >= batch_size {
            // Send batch to storage (Elasticsearch, S3, etc.)
            send_to_storage(&batch).await;
            batch.clear();
        }
    }
    
    // Send remaining batch
    if !batch.is_empty() {
        send_to_storage(&batch).await;
    }
}

// Fan-out to multiple sinks
pub struct LogFanout {
    sinks: Vec<mpsc::Sender<LogEntry>>,
}

impl LogFanout {
    pub fn new(sinks: Vec<mpsc::Sender<LogEntry>>) -> Self {
        LogFanout { sinks }
    }
    
    pub async fn send(&self, entry: LogEntry) {
        // Send to all sinks (best-effort)
        for sink in &self.sinks {
            let _ = sink.send(entry.clone()).await;
        }
    }
}

// Usage: Route logs to multiple destinations
async fn run_fanout_system(aggregator: LogAggregator) {
    use futures::stream::StreamExt;
    
    // Create sinks
    let (elasticsearch_tx, elasticsearch_rx) = mpsc::channel(1000);
    let (s3_tx, s3_rx) = mpsc::channel(1000);
    let (metrics_tx, metrics_rx) = mpsc::channel(1000);
    
    // Spawn sink processors
    tokio::spawn(async move {
        write_to_elasticsearch(elasticsearch_rx).await;
    });
    
    tokio::spawn(async move {
        write_to_s3(s3_rx).await;
    });
    
    tokio::spawn(async move {
        update_metrics(metrics_rx).await;
    });
    
    // Fan out logs
    let fanout = LogFanout::new(vec![elasticsearch_tx, s3_tx, metrics_tx]);
    
    aggregator
        .for_each(|entry| async {
            fanout.send(entry).await;
        })
        .await;
}
Architecture benefits:
  1. Multi-source: Aggregates from multiple producers
  2. Pipeline: Filter and transform in stages
  3. Batching: Efficient bulk operations
  4. Fan-out: Route to multiple destinations
  5. Backpressure: Bounded channels control flow

---

Deep Dive Explanation

Stream Trait Definition

pub trait Stream {
    type Item;
    
    fn poll_next(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;
    
    // Provided method
    fn size_hint(&self) -> (usize, Option<usize>) {
        (0, None)
    }
}
Return values:
  • Poll::Ready(Some(item)): Next item is ready
  • Poll::Ready(None): Stream exhausted (like Iterator returning None)
  • Poll::Pending: No item ready, wake me when one arrives

Relationship to Future

// A Future returns ONE value
impl Future for MyFuture {
    type Output = T;
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
        // Poll::Ready(T) or Poll::Pending
    }
}

// A Stream returns MANY values
impl Stream for MyStream {
    type Item = T;
    
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
        // Poll::Ready(Some(T)), Poll::Ready(None), or Poll::Pending
    }
}

// Stream is essentially: Future<Output = Option<Item>> + repeat

Pinning Requirements

Streams need pinning for the same reason as futures:

struct SelfReferentialStream {
    buffer: Vec<u8>,
    // This points into buffer!
    current_slice: *const [u8],
}

// If this moves in memory, current_slice becomes dangling
// Pin guarantees it won't move

Waker Interaction

impl Stream for MyStream {
    type Item = i32;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<i32>> {
        if let Some(item) = self.try_get_item() {
            return Poll::Ready(Some(item));
        }
        
        // No item ready - store waker
        self.waker = Some(cx.waker().clone());
        
        // Later, when item arrives:
        // self.waker.take().unwrap().wake();
        
        Poll::Pending
    }
}
Waker lifecycle:
  1. Stream returns Pending, stores waker
  2. External event (I/O, timer, channel) triggers
  3. Event handler calls waker.wake()
  4. Executor re-polls stream
  5. Stream returns Ready with item

Backpressure Mechanisms

// Natural backpressure with bounded channels
let (tx, rx) = mpsc::channel(10); // Capacity: 10

// Producer
tokio::spawn(async move {
    for i in 0..1000 {
        // This blocks when channel is full!
        tx.send(i).await.unwrap();
    }
});

// Consumer (slow)
while let Some(item) = rx.recv().await {
    // Slow processing
    slow_operation(item).await;
}

// Producer automatically slows down to match consumer
Manual backpressure:
pub struct BackpressureStream {
    source: mpsc::Receiver<Data>,
    processing_capacity: usize,
    in_flight: usize,
}

impl Stream for BackpressureStream {
    type Item = Data;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Data>> {
        // Check capacity
        if self.in_flight >= self.processing_capacity {
            // Too many items in flight - apply backpressure
            return Poll::Pending;
        }
        
        // Try to receive
        match self.source.poll_recv(cx) {
            Poll::Ready(Some(data)) => {
                self.in_flight += 1;
                Poll::Ready(Some(data))
            }
            Poll::Ready(None) => Poll::Ready(None),
            Poll::Pending => Poll::Pending,
        }
    }
}

Buffering Strategies

Unbounded buffering:
use futures::stream::{self, StreamExt};

// DANGEROUS: Can OOM if producer faster than consumer
let stream = stream::iter(0..1_000_000)
    .map(|i| expensive_operation(i))
    .buffered(usize::MAX); // Unbounded!
Bounded buffering:
// Safe: Limits concurrent operations
let stream = stream::iter(0..1_000_000)
    .map(|i| expensive_operation(i))
    .buffered(10); // Max 10 concurrent operations
Adaptive buffering:
pub struct AdaptiveBuffer<S> {
    stream: S,
    buffer: VecDeque<S::Item>,
    capacity: usize,
    min_capacity: usize,
    max_capacity: usize,
}

impl<S: Stream> Stream for AdaptiveBuffer<S> {
    type Item = S::Item;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<S::Item>> {
        // Emit from buffer if available
        if let Some(item) = self.buffer.pop_front() {
            return Poll::Ready(Some(item));
        }
        
        // Fill buffer
        while self.buffer.len() < self.capacity {
            match Pin::new(&mut self.stream).poll_next(cx) {
                Poll::Ready(Some(item)) => {
                    self.buffer.push_back(item);
                }
                Poll::Ready(None) => {
                    return Poll::Ready(None);
                }
                Poll::Pending => break,
            }
        }
        
        // Adjust capacity based on utilization
        if self.buffer.is_empty() {
            // Consumer is fast - reduce buffer
            self.capacity = (self.capacity / 2).max(self.min_capacity);
        } else if self.buffer.len() == self.capacity {
            // Consumer is slow - increase buffer
            self.capacity = (self.capacity * 2).min(self.max_capacity);
        }
        
        if let Some(item) = self.buffer.pop_front() {
            Poll::Ready(Some(item))
        } else {
            Poll::Pending
        }
    }
}

Error Handling with TryStream

use futures::stream::TryStream;

// TryStream is Stream<Item = Result<T, E>>
pub trait TryStream: Stream {
    type Ok;
    type Error;
    
    // Automatically implemented for Stream<Item = Result<Ok, Error>>
}

// Example: Database query stream with errors
impl Stream for QueryStream {
    type Item = Result<Row, DbError>;
    
    fn poll_next(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Result<Row, DbError>>> {
        // Return Ready(Some(Ok(row))) or Ready(Some(Err(error)))
    }
}

// Using TryStream combinators
async fn process_query_results(stream: QueryStream) -> Result<Vec<User>, DbError> {
    use futures::stream::TryStreamExt;
    
    stream
        .try_filter(|row| {
            futures::future::ready(row.get::<i32, _>("active") == 1)
        })
        .try_map(|row| {
            futures::future::ok(User {
                id: row.get("id"),
                name: row.get("name"),
            })
        })
        .try_collect()
        .await
}

// Error recovery
async fn process_with_retry(stream: QueryStream) {
    use futures::stream::TryStreamExt;
    
    stream
        .or_else(|error| async move {
            // Log error and return fallback
            eprintln!("Error: {}", error);
            Ok(Row::default()) // Fallback value
        })
        .for_each(|row| async move {
            process_row(row).await;
        })
        .await;
}

StreamExt Combinators

use futures::stream::{Stream, StreamExt};

async fn demonstrate_combinators<S: Stream<Item = i32>>(stream: S) {
    // Map: Transform each item
    stream
        .map(|x| x * 2)
        .collect::<Vec<_>>()
        .await;
    
    // Filter: Keep only matching items
    stream
        .filter(|x| futures::future::ready(*x > 10))
        .collect::<Vec<_>>()
        .await;
    
    // FilterMap: Filter and transform
    stream
        .filter_map(|x| {
            if x % 2 == 0 {
                futures::future::ready(Some(x / 2))
            } else {
                futures::future::ready(None)
            }
        })
        .collect::<Vec<_>>()
        .await;
    
    // Take: Limit number of items
    stream
        .take(10)
        .collect::<Vec<_>>()
        .await;
    
    // Skip: Skip first N items
    stream
        .skip(5)
        .collect::<Vec<_>>()
        .await;
    
    // Fold: Reduce to single value
    let sum = stream
        .fold(0, |acc, x| futures::future::ready(acc + x))
        .await;
    
    // ForEach: Process each item
    stream
        .for_each(|x| async move {
            println!("{}", x);
        })
        .await;
}

// Concurrent processing
async fn concurrent_combinators<S: Stream<Item = i32>>(stream: S) {
    // Buffered: Run N futures concurrently
    stream
        .map(|x| async move {
            fetch_data(x).await
        })
        .buffered(10) // 10 concurrent requests
        .collect::<Vec<_>>()
        .await;
    
    // BufferUnordered: Don't preserve order
    stream
        .map(|x| async move {
            fetch_data(x).await
        })
        .buffer_unordered(10) // Faster!
        .collect::<Vec<_>>()
        .await;
}

// Combining streams
async fn combine_streams() {
    use futures::stream;
    
    let stream1 = stream::iter(vec![1, 2, 3]);
    let stream2 = stream::iter(vec![4, 5, 6]);
    
    // Chain: Concatenate streams
    stream1
        .chain(stream2)
        .collect::<Vec<_>>()
        .await;
    
    // Zip: Pair items from two streams
    let stream1 = stream::iter(vec![1, 2, 3]);
    let stream2 = stream::iter(vec!["a", "b", "c"]);
    
    stream1
        .zip(stream2)
        .collect::<Vec<_>>()
        .await; // [(1, "a"), (2, "b"), (3, "c")]
}

---

When to Use / When NOT to Use

Use Streams When:

  1. Large or unbounded datasets
  • Database query results with thousands of rows
  • Log files too large to fit in memory
  • Real-time event streams (no end)
  1. Network communication
  • HTTP streaming responses
  • WebSocket message streams
  • gRPC server streaming
  • SSE (Server-Sent Events)
  1. Asynchronous iteration
  • Need to yield between items
  • I/O-bound processing
  • Want to process items as they arrive
  1. Backpressure required
  • Consumer slower than producer
  • Need flow control
  • Memory constraints
  1. Pipeline processing
  • Filter, map, reduce operations
  • Multi-stage transformations
  • Fan-out to multiple consumers

Don't Use Streams When:

  1. Small, bounded collections

// OVERKILL: Using stream for small Vec
   let stream = stream::iter(vec![1, 2, 3]);
   
   // BETTER: Just use iterator
   for item in vec![1, 2, 3] {
       process(item);
   }

  1. Synchronous iteration is sufficient

// UNNECESSARY: No async needed
   stream::iter(users)
       .for_each(|user| async move {
           // Synchronous operation!
           println!("{}", user.name);
       })
       .await;
   
   // BETTER: Standard loop
   for user in users {
       println!("{}", user.name);
   }

  1. All data available immediately

// POINTLESS: Data already in memory
   let data = vec![1, 2, 3, 4, 5];
   let stream = stream::iter(data);
   
   // BETTER: Direct iteration
   for item in data {
       process(item);
   }

  1. Performance-critical tight loops

// SLOW: Async overhead for hot path
   stream.for_each(|x| async move {
       hot_path_computation(x); // Pure CPU, no I/O
   }).await;
   
   // BETTER: Synchronous iterator
   for x in data {
       hot_path_computation(x);
   }

  1. Complex error handling needed
  • Stream error handling can be awkward
  • Consider collecting to Vec first, then process
  • Or use Result accumulation patterns

---

⚠️ Anti-Patterns

1. Unbounded Buffers Causing Memory Leaks

// WRONG: Unbounded buffer
let stream = source_stream
    .map(|item| expensive_operation(item))
    .buffered(usize::MAX); // OOM if source is fast!

// CORRECT: Bounded buffer
let stream = source_stream
    .map(|item| expensive_operation(item))
    .buffered(10); // At most 10 concurrent operations

2. Blocking in poll_next

// WRONG: Blocking I/O in poll_next
impl Stream for BadStream {
    type Item = String;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<String>> {
        // This blocks the executor!
        let data = std::fs::read_to_string("file.txt").unwrap();
        Poll::Ready(Some(data))
    }
}

// CORRECT: Async I/O
impl Stream for GoodStream {
    type Item = String;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<String>> {
        // Non-blocking async I/O
        match Pin::new(&mut self.read_future).poll(cx) {
            Poll::Ready(Ok(data)) => Poll::Ready(Some(data)),
            Poll::Ready(Err(_)) => Poll::Ready(None),
            Poll::Pending => Poll::Pending,
        }
    }
}

3. Ignoring Backpressure Signals

// WRONG: Ignoring slow consumer
let (tx, rx) = mpsc::unbounded_channel(); // Unbounded!

tokio::spawn(async move {
    loop {
        let data = generate_data();
        tx.send(data).unwrap(); // Never blocks - memory leak!
    }
});

// CORRECT: Bounded channel with backpressure
let (tx, rx) = mpsc::channel(100); // Bounded

tokio::spawn(async move {
    loop {
        let data = generate_data();
        // Blocks when channel full - backpressure!
        if tx.send(data).await.is_err() {
            break;
        }
    }
});

4. Not Handling Stream Termination

// WRONG: Doesn't handle None
while let Some(item) = stream.next().await {
    process(item).await;
}
// What if stream errors? Lost!

// CORRECT: Handle both success and error termination
loop {
    match stream.next().await {
        Some(Ok(item)) => process(item).await,
        Some(Err(e)) => {
            eprintln!("Stream error: {}", e);
            break;
        }
        None => {
            println!("Stream complete");
            break;
        }
    }
}

5. Forgetting to Fuse Streams

// WRONG: Polling after None
let mut stream = source_stream();

while let Some(item) = stream.next().await {
    process(item).await;
}

// Polling again - undefined behavior!
if let Some(item) = stream.next().await {
    // This might work, might panic, might return garbage
}

// CORRECT: Use fuse() to make idempotent
let mut stream = source_stream().fuse();

while let Some(item) = stream.next().await {
    process(item).await;
}

// Safe: returns None forever after first None
if let Some(item) = stream.next().await {
    // Always None
}

6. Collecting Entire Stream Unnecessarily

// WRONG: Defeats purpose of streaming
let all_rows: Vec<Row> = db_stream.collect().await?;
for row in all_rows {
    process(row).await;
}

// CORRECT: Process as stream
db_stream
    .for_each(|row| async move {
        process(row).await;
    })
    .await?;

---

Performance Characteristics

Memory Usage

// Iterator: O(1) memory
let sum: i32 = (0..1_000_000).sum();

// Stream: O(1) memory (if not buffered)
let sum = stream::iter(0..1_000_000)
    .fold(0, |acc, x| futures::future::ready(acc + x))
    .await;

// Buffered stream: O(buffer_size) memory
let results = stream::iter(0..1_000_000)
    .map(|x| fetch(x))
    .buffered(10) // 10 * size_of(Future) in memory
    .collect::<Vec<_>>()
    .await;

CPU Overhead

// Benchmark: Sync iterator
fn sync_sum(data: &[i32]) -> i32 {
    data.iter().sum() // ~1ns per item
}

// Benchmark: Async stream
async fn async_sum(data: &[i32]) -> i32 {
    stream::iter(data)
        .fold(0, |acc, &x| futures::future::ready(acc + x))
        .await
    // ~10-50ns per item (poll overhead)
}

// Lesson: Don't use streams for CPU-bound hot paths!

Comparison with Sync Iterators

| Aspect | Iterator | Stream |

|--------|----------|--------|

| Latency per item | 0ns (inline) | 10-50ns (poll overhead) |

| Memory | O(1) | O(1) + waker storage |

| Concurrency | No | Yes (with buffered) |

| Backpressure | No | Yes |

| Async I/O | Blocks | Non-blocking |

| Use case | CPU-bound | I/O-bound |

Zero-Copy Optimization Opportunities

// BAD: Copying on every item
impl Stream for CopyStream {
    type Item = Vec<u8>;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Vec<u8>>> {
        // Clones data!
        Poll::Ready(Some(self.buffer.clone()))
    }
}

// GOOD: Zero-copy with Arc
impl Stream for ZeroCopyStream {
    type Item = Arc<Vec<u8>>;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Arc<Vec<u8>>>> {
        // Shared ownership, no copy
        Poll::Ready(Some(self.buffer.clone())) // Just increments refcount
    }
}

// BEST: Zero-copy with bytes crate
use bytes::Bytes;

impl Stream for BytesStream {
    type Item = Bytes;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Bytes>> {
        // Bytes uses Arc internally, cheap clone
        Poll::Ready(Some(self.buffer.clone()))
    }
}

---

Exercises

Beginner: Implement Range Stream

Create a stream that yields numbers from start to end:

use futures::stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};

struct RangeStream {
    current: i32,
    end: i32,
}

impl RangeStream {
    pub fn new(start: i32, end: i32) -> Self {
        RangeStream {
            current: start,
            end,
        }
    }
}

impl Stream for RangeStream {
    type Item = i32;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<i32>> {
        // TODO: Implement
        todo!()
    }
}

// Test:
#[tokio::main]
async fn main() {
    use futures::stream::StreamExt;
    
    let mut stream = RangeStream::new(1, 5);
    
    while let Some(value) = stream.next().await {
        println!("{}", value);
    }
    // Should print: 1, 2, 3, 4
}
Solution
impl Stream for RangeStream {
    type Item = i32;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<i32>> {
        if self.current < self.end {
            let value = self.current;
            self.current += 1;
            Poll::Ready(Some(value))
        } else {
            Poll::Ready(None)
        }
    }
}

Intermediate: Build Throttled Stream

Implement a stream that rate-limits item emission:

use futures::stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::{Duration, Instant, Sleep};

struct ThrottledStream<S> {
    stream: S,
    delay: Duration,
    next_allowed: Instant,
    sleep: Option<Pin<Box<Sleep>>>,
}

impl<S: Stream> ThrottledStream<S> {
    pub fn new(stream: S, delay: Duration) -> Self {
        ThrottledStream {
            stream,
            delay,
            next_allowed: Instant::now(),
            sleep: None,
        }
    }
}

impl<S: Stream + Unpin> Stream for ThrottledStream<S> {
    type Item = S::Item;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<S::Item>> {
        // TODO: Implement rate limiting
        todo!()
    }
}

// Test:
#[tokio::main]
async fn main() {
    use futures::stream::{self, StreamExt};
    use tokio::time::Instant;
    
    let start = Instant::now();
    let stream = stream::iter(vec![1, 2, 3, 4, 5]);
    let throttled = ThrottledStream::new(stream, Duration::from_millis(100));
    
    throttled.for_each(|x| async move {
        println!("{}: {}", start.elapsed().as_millis(), x);
    }).await;
    
    // Should print items ~100ms apart
}
Solution
impl<S: Stream + Unpin> Stream for ThrottledStream<S> {
    type Item = S::Item;
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<S::Item>> {
        let now = Instant::now();
        
        // Check if we need to delay
        if now < self.next_allowed {
            // Create sleep if needed
            if self.sleep.is_none() {
                self.sleep = Some(Box::pin(tokio::time::sleep_until(self.next_allowed)));
            }
            
            // Poll the sleep
            match self.sleep.as_mut().unwrap().as_mut().poll(cx) {
                Poll::Ready(()) => {
                    self.sleep = None;
                    // Fall through to poll stream
                }
                Poll::Pending => return Poll::Pending,
            }
        }
        
        // Poll underlying stream
        match Pin::new(&mut self.stream).poll_next(cx) {
            Poll::Ready(Some(item)) => {
                // Schedule next allowed time
                self.next_allowed = Instant::now() + self.delay;
                Poll::Ready(Some(item))
            }
            Poll::Ready(None) => Poll::Ready(None),
            Poll::Pending => Poll::Pending,
        }
    }
}

Advanced: Fan-out Stream with Backpressure

Create a stream that distributes items to multiple consumers with backpressure:

use futures::stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;

struct FanoutStream<T> {
    source: Pin<Box<dyn Stream<Item = T>>>,
    sinks: Vec<mpsc::Sender<T>>,
}

impl<T: Clone> FanoutStream<T> {
    pub fn new(
        source: impl Stream<Item = T> + 'static,
        sink_count: usize,
        buffer_size: usize,
    ) -> (Self, Vec<mpsc::Receiver<T>>) {
        let mut sinks = Vec::new();
        let mut receivers = Vec::new();
        
        for _ in 0..sink_count {
            let (tx, rx) = mpsc::channel(buffer_size);
            sinks.push(tx);
            receivers.push(rx);
        }
        
        let fanout = FanoutStream {
            source: Box::pin(source),
            sinks,
        };
        
        (fanout, receivers)
    }
}

impl<T: Clone> Stream for FanoutStream<T> {
    type Item = ();
    
    fn poll_next(
        mut self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<()>> {
        // TODO: Implement fan-out with backpressure
        // - Poll source for next item
        // - Try to send to all sinks
        // - If any sink is full, apply backpressure
        // - Remove closed sinks
        todo!()
    }
}

// Test:
#[tokio::main]
async fn main() {
    use futures::stream::{self, StreamExt};
    
    let source = stream::iter(0..100);
    let (fanout, mut receivers) = FanoutStream::new(source, 3, 10);
    
    // Spawn fanout task
    tokio::spawn(async move {
        fanout.for_each(|_| async {}).await;
    });
    
    // Spawn consumers
    for (i, mut rx) in receivers.into_iter().enumerate() {
        tokio::spawn(async move {
            while let Some(item) = rx.recv().await {
                println!("Consumer {}: {}", i, item);
                tokio::time::sleep(Duration::from_millis(10)).await;
            }
        });
    }
    
    tokio::time::sleep(Duration::from_secs(5)).await;
}
Hint

You'll need to:

  1. Poll the source stream
  2. For each item, try sending to ALL sinks
  3. Use try_send() to check if sink is full
  4. If any sink is full, store the item and return Pending
  5. Remove sinks that return SendError (closed)
  6. When all sinks are gone, terminate stream

---

Real-World Usage

tokio::io::AsyncBufReadExt::lines()

use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::fs::File;

async fn read_lines(path: &str) -> std::io::Result<()> {
    let file = File::open(path).await?;
    let reader = BufReader::new(file);
    let mut lines = reader.lines(); // Returns Stream!
    
    while let Some(line) = lines.next_line().await? {
        println!("{}", line);
    }
    
    Ok(())
}

tokio_stream Combinators

use tokio_stream::{StreamExt, Stream};

async fn process_events<S: Stream<Item = Event>>(stream: S) {
    stream
        .filter(|event| event.severity >= Severity::Warning)
        .map(|event| event.message)
        .take(100)
        .for_each(|msg| async move {
            send_alert(&msg).await;
        })
        .await;
}

futures::stream::StreamExt

use futures::stream::{self, StreamExt};

async fn parallel_requests(urls: Vec<String>) -> Vec<Response> {
    stream::iter(urls)
        .map(|url| async move {
            reqwest::get(&url).await
        })
        .buffered(10) // 10 concurrent requests
        .collect()
        .await
}

async-stream Crate Macros

use async_stream::stream;

fn fibonacci() -> impl Stream<Item = u64> {
    stream! {
        let mut a = 0u64;
        let mut b = 1u64;
        
        loop {
            yield a;
            let next = a + b;
            a = b;
            b = next;
        }
    }
}

// Usage:
#[tokio::main]
async fn main() {
    let mut fib = fibonacci();
    
    while let Some(n) = fib.next().await {
        if n > 1000 {
            break;
        }
        println!("{}", n);
    }
}

tonic (gRPC) Streaming

use tonic::{Request, Response, Status};
use futures::Stream;

// Server-side streaming RPC
async fn list_features(
    request: Request<Rectangle>,
) -> Result<Response<impl Stream<Item = Result<Feature, Status>>>, Status> {
    let rect = request.into_inner();
    
    let stream = stream! {
        for feature in get_features_in_rectangle(&rect) {
            yield Ok(feature);
        }
    };
    
    Ok(Response::new(Box::pin(stream)))
}

---

Further Reading

  1. Asynchronous Programming in Rust - https://rust-lang.github.io/async-book/
  • Official async book
  • Stream chapter covers details
  1. futures-rs Documentation - https://docs.rs/futures/
  • Stream trait reference
  • All combinators explained
  1. tokio_stream Crate - https://docs.rs/tokio-stream/
  • Tokio-specific stream utilities
  • Integration with tokio I/O
  1. async-stream Crate - https://docs.rs/async-stream/
  • Macros for easy stream creation
  • Generator-style syntax
  1. Backpressure in Async Rust - https://tokio.rs/blog/2020-02-announcement
  • Understanding flow control
  • Bounded vs unbounded channels
  1. Stream Processing Patterns - https://www.youtube.com/watch?v=AZht1rkHIxo
  • Jon Gjengset's talk
  • Real-world streaming architectures
  1. RFC 2996 - Stream trait - https://github.com/rust-lang/rfcs/pull/2996
  • Stream trait standardization
  • Design rationale

---

Helper Functions (For Examples)

// Helper functions used in examples above
use tokio_postgres::Row;
use std::io;

// WebSocket helpers
async fn process_chat_message(client_id: u64, text: &str) {
    println!("[Processing] Client {}: {}", client_id, text);
}

async fn process_binary_data(client_id: u64, data: Vec<u8>) {
    println!("[Processing] Client {} sent {} bytes", client_id, data.len());
}

async fn save_to_database(text: &str) -> Result<(), String> {
    tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
    Ok(())
}

async fn broadcast_to_all_clients(message: tokio_tungstenite::tungstenite::Message) {
    println!("[Broadcast] {}", message);
}

// Database helpers
async fn process_user(id: i64, name: &str, email: &str) {
    println!("Processing user: {} - {} ({})", id, name, email);
}

async fn expensive_processing(id: i64) -> Result<(), Box<dyn std::error::Error>> {
    tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
    Ok(())
}

// File processing helpers
async fn send_to_api(line: &str) -> io::Result<()> {
    println!("[API] Sending: {}", line);
    Ok(())
}

// Log aggregation helpers
async fn send_to_storage(batch: &[LogEntry]) {
    println!("[Storage] Sending batch of {} entries", batch.len());
}

async fn write_to_elasticsearch(mut rx: tokio::sync::mpsc::Receiver<LogEntry>) {
    while let Some(entry) = rx.recv().await {
        println!("[Elasticsearch] {:?}", entry);
    }
}

async fn write_to_s3(mut rx: tokio::sync::mpsc::Receiver<LogEntry>) {
    while let Some(entry) = rx.recv().await {
        println!("[S3] {:?}", entry);
    }
}

async fn update_metrics(mut rx: tokio::sync::mpsc::Receiver<LogEntry>) {
    while let Some(entry) = rx.recv().await {
        println!("[Metrics] {:?}", entry);
    }
}

// Generic helpers
async fn expensive_operation(x: i32) -> i32 {
    tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
    x * 2
}

fn hot_path_computation(x: i32) -> i32 {
    x * x
}

async fn fetch_data(id: i32) -> String {
    tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
    format!("Data for {}", id)
}

async fn fetch(x: i32) -> Result<String, String> {
    Ok(format!("Fetched {}", x))
}

async fn send_alert(msg: &str) {
    println!("[ALERT] {}", msg);
}

async fn process(item: i32) {
    println!("Processing {}", item);
}

async fn process_row(row: Row) {
    println!("Processing row: {:?}", row);
}

---

Summary

Streams are Rust's async iterators:

Key Takeaways:
  • Stream trait for async iteration over sequences
  • poll_next returns Poll> - can be Pending
  • Natural backpressure with bounded channels
  • Combinators for filtering, mapping, collecting
  • Memory-efficient for large/unbounded datasets
When to use streams:
  • Network communication (WebSocket, HTTP, gRPC)
  • Database result streaming
  • File processing line-by-line
  • Real-time event handling
  • Any I/O-bound iteration
Performance:
  • ~10-50ns overhead per item vs sync iterators
  • O(1) memory (unless buffered)
  • Bounded buffers prevent OOM
  • Zero-copy optimizations with Arc/Bytes
Remember: Streams excel at I/O-bound workloads. For CPU-bound iteration over small collections, stick with synchronous iterators. Use backpressure to prevent memory exhaustion when producers are faster than consumers.

Understanding streams makes you proficient at building scalable, memory-efficient async systems in Rust!

Now go stream some legendary data pipelines!

🎮 Try it Yourself

🎮

Stream Processing - Playground

Run this code in the official Rust Playground