Async iterators and backpressure
A Stream is Rust's async equivalent of an Iterator - it produces a sequence of values asynchronously. While an Iterator blocks until the next value is ready, a Stream yields control when waiting for values, allowing other tasks to run.
use std::task::{Context, Poll};
use std::pin::Pin;
pub trait Stream {
type Item;
// The heart of async iteration
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>;
}
// Comparison with Iterator:
pub trait Iterator {
type Item;
fn next(&mut self) -> Option<Self::Item>; // Blocking
}
Key differences:
Poll - can be PendingOption - blocks until readyPin<&mut Self> for self-referential state// Synchronous iterator - blocks on each item
let mut iter = vec![1, 2, 3].into_iter();
while let Some(item) = iter.next() {
process(item); // Blocks until ready
}
// Asynchronous stream - yields on each item
use tokio_stream::StreamExt;
let mut stream = tokio_stream::iter(vec![1, 2, 3]);
while let Some(item) = stream.next().await {
process(item).await; // Yields to other tasks
}
---
use futures::stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
use tokio_tungstenite::tungstenite::Message;
/// A stream of WebSocket messages with backpressure
pub struct WebSocketStream {
/// Receiver for incoming messages
receiver: mpsc::Receiver<Message>,
/// Connection handle for backpressure
flow_control: WebSocketFlowControl,
}
struct WebSocketFlowControl {
/// Current buffer size
buffered: usize,
/// Maximum buffer before applying backpressure
max_buffer: usize,
}
impl WebSocketStream {
pub fn new(
ws_read: tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
) -> Self {
let (tx, rx) = mpsc::channel(100);
// Spawn task to read from WebSocket
tokio::spawn(async move {
use futures::StreamExt;
let mut ws_read = ws_read;
while let Some(msg_result) = ws_read.next().await {
match msg_result {
Ok(Message::Text(text)) => {
if tx.send(Message::Text(text)).await.is_err() {
break; // Receiver dropped
}
}
Ok(Message::Binary(data)) => {
if tx.send(Message::Binary(data)).await.is_err() {
break;
}
}
Ok(Message::Close(_)) => break,
Err(e) => {
eprintln!("WebSocket error: {}", e);
break;
}
_ => {}
}
}
});
WebSocketStream {
receiver: rx,
flow_control: WebSocketFlowControl {
buffered: 0,
max_buffer: 100,
},
}
}
}
impl Stream for WebSocketStream {
type Item = Message;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> {
// Poll the channel for next message
match self.receiver.poll_recv(cx) {
Poll::Ready(Some(msg)) => {
self.flow_control.buffered = self.flow_control.buffered.saturating_sub(1);
Poll::Ready(Some(msg))
}
Poll::Ready(None) => {
// Channel closed - WebSocket disconnected
Poll::Ready(None)
}
Poll::Pending => {
// No messages available
Poll::Pending
}
}
}
}
// Usage: Chat server handling multiple clients
async fn handle_websocket_client(
ws_stream: tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
client_id: u64,
) {
let mut message_stream = WebSocketStream::new(ws_stream);
while let Some(message) = message_stream.next().await {
match message {
Message::Text(text) => {
println!("[Client {}] Received: {}", client_id, text);
// Process message (database write, broadcast, etc.)
process_chat_message(client_id, &text).await;
}
Message::Binary(data) => {
println!("[Client {}] Received {} bytes", client_id, data.len());
process_binary_data(client_id, data).await;
}
_ => {}
}
}
println!("[Client {}] Disconnected", client_id);
}
// With backpressure: slow down reading if processing is slow
async fn handle_websocket_with_backpressure(
ws_stream: tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
) {
use tokio::time::{Duration, sleep};
let mut message_stream = WebSocketStream::new(ws_stream);
while let Some(message) = message_stream.next().await {
// Slow processing (e.g., database write)
if let Message::Text(text) = message {
match save_to_database(&text).await {
Ok(_) => println!("Saved: {}", text),
Err(e) => {
eprintln!("Database error: {}", e);
// Backpressure: slow down reading
sleep(Duration::from_millis(100)).await;
}
}
}
}
}
// Combining multiple WebSocket streams
async fn multiplex_websockets(
streams: Vec<tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>>,
) {
use futures::stream::{self, StreamExt};
// Merge all WebSocket streams into one
let mut merged = stream::select_all(
streams.into_iter().map(WebSocketStream::new)
);
while let Some(message) = merged.next().await {
// Handle message from any WebSocket
broadcast_to_all_clients(message).await;
}
}
How it works:
---
use futures::stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_postgres::{Client, Row, RowStream};
/// Streams database query results with cursor-based pagination
pub struct DatabaseResultStream {
/// Connection to database
client: Client,
/// Current batch of rows
current_batch: RowStream,
/// Cursor position for pagination
cursor: Option<i64>,
/// Query to execute
query: String,
/// Batch size for each fetch
batch_size: i64,
/// Total rows streamed
rows_streamed: usize,
}
impl DatabaseResultStream {
pub async fn new(
client: Client,
query: String,
batch_size: i64,
) -> Result<Self, tokio_postgres::Error> {
// Execute first batch
let current_batch = client
.query_raw(&query, &[])
.await?;
Ok(DatabaseResultStream {
client,
current_batch,
cursor: Some(0),
query,
batch_size,
rows_streamed: 0,
})
}
}
impl Stream for DatabaseResultStream {
type Item = Result<Row, tokio_postgres::Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> {
// Try to get next row from current batch
match Pin::new(&mut self.current_batch).poll_next(cx) {
Poll::Ready(Some(Ok(row))) => {
self.rows_streamed += 1;
Poll::Ready(Some(Ok(row)))
}
Poll::Ready(Some(Err(e))) => {
Poll::Ready(Some(Err(e)))
}
Poll::Ready(None) => {
// Current batch exhausted - need to fetch next batch
// In real implementation, would spawn fetch task
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}
// Better implementation: with explicit cursor pagination
pub struct CursorStream {
client: Client,
query: String,
cursor: i64,
batch_size: i64,
current_batch: Vec<Row>,
batch_index: usize,
exhausted: bool,
}
impl CursorStream {
pub async fn new(
client: Client,
base_query: String,
batch_size: i64,
) -> Result<Self, tokio_postgres::Error> {
let mut stream = CursorStream {
client,
query: base_query,
cursor: 0,
batch_size,
current_batch: Vec::new(),
batch_index: 0,
exhausted: false,
};
// Load first batch
stream.fetch_next_batch().await?;
Ok(stream)
}
async fn fetch_next_batch(&mut self) -> Result<(), tokio_postgres::Error> {
let query = format!(
"{} LIMIT {} OFFSET {}",
self.query, self.batch_size, self.cursor
);
let rows = self.client.query(&query, &[]).await?;
if rows.is_empty() {
self.exhausted = true;
} else {
self.cursor += rows.len() as i64;
self.current_batch = rows;
self.batch_index = 0;
}
Ok(())
}
}
impl Stream for CursorStream {
type Item = Result<Row, tokio_postgres::Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> {
if self.exhausted {
return Poll::Ready(None);
}
// Return next item from current batch
if self.batch_index < self.current_batch.len() {
let row = self.current_batch[self.batch_index].clone();
self.batch_index += 1;
return Poll::Ready(Some(Ok(row)));
}
// Need to fetch next batch - would use async task in real implementation
// For now, signal that we need more data
Poll::Pending
}
}
// Usage: Process large dataset without OOM
async fn process_all_users(db: Client) -> Result<(), tokio_postgres::Error> {
let query = "SELECT id, name, email FROM users ORDER BY id".to_string();
let mut stream = CursorStream::new(db, query, 1000).await?;
let mut count = 0;
while let Some(result) = stream.next().await {
let row = result?;
let id: i64 = row.get(0);
let name: String = row.get(1);
let email: String = row.get(2);
// Process user (send email, generate report, etc.)
process_user(id, &name, &email).await;
count += 1;
if count % 10000 == 0 {
println!("Processed {} users", count);
}
}
println!("Total users processed: {}", count);
Ok(())
}
// Stream with transformation pipeline
async fn export_users_to_csv(db: Client, output_path: &str) -> Result<(), Box<dyn std::error::Error>> {
use tokio::io::AsyncWriteExt;
let query = "SELECT id, name, email FROM users ORDER BY id".to_string();
let stream = CursorStream::new(db, query, 1000).await?;
let mut file = tokio::fs::File::create(output_path).await?;
// Write CSV header
file.write_all(b"id,name,email\n").await?;
// Stream and transform
stream
.map(|result| {
result.map(|row| {
let id: i64 = row.get(0);
let name: String = row.get(1);
let email: String = row.get(2);
format!("{},{},{}\n", id, name, email)
})
})
.try_for_each(|csv_line| async {
file.write_all(csv_line.as_bytes()).await?;
Ok(())
})
.await?;
file.flush().await?;
Ok(())
}
// Parallel processing with bounded concurrency
async fn process_users_parallel(db: Client) -> Result<(), Box<dyn std::error::Error>> {
use futures::stream::StreamExt;
let query = "SELECT id, name, email FROM users ORDER BY id".to_string();
let stream = CursorStream::new(db, query, 1000).await?;
// Process up to 10 users concurrently
stream
.map(|result| async move {
let row = result?;
let id: i64 = row.get(0);
// Expensive operation (API call, computation, etc.)
expensive_processing(id).await?;
Ok::<_, Box<dyn std::error::Error>>(())
})
.buffer_unordered(10) // Concurrent processing
.try_collect::<Vec<_>>()
.await?;
Ok(())
}
Key features:
---
use futures::stream::Stream;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
use tokio::fs::File;
/// Async stream of lines from a file
pub struct FileLineStream {
reader: BufReader<File>,
buffer: String,
done: bool,
}
impl FileLineStream {
pub async fn new(path: &str) -> io::Result<Self> {
let file = File::open(path).await?;
Ok(FileLineStream {
reader: BufReader::with_capacity(8192, file),
buffer: String::with_capacity(1024),
done: false,
})
}
}
impl Stream for FileLineStream {
type Item = io::Result<String>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> {
if self.done {
return Poll::Ready(None);
}
// Clear buffer for next line
self.buffer.clear();
// Create pinned reader
let mut reader = Pin::new(&mut self.reader);
// Poll for next line
match reader.poll_read_line(cx, &mut self.buffer) {
Poll::Ready(Ok(0)) => {
// EOF
self.done = true;
Poll::Ready(None)
}
Poll::Ready(Ok(_bytes)) => {
// Got a line
let line = self.buffer.clone();
Poll::Ready(Some(Ok(line)))
}
Poll::Ready(Err(e)) => {
self.done = true;
Poll::Ready(Some(Err(e)))
}
Poll::Pending => Poll::Pending,
}
}
}
// Better: Using tokio's built-in lines() stream
use tokio::io::Lines;
pub async fn read_file_lines(path: &str) -> io::Result<Lines<BufReader<File>>> {
let file = File::open(path).await?;
let reader = BufReader::new(file);
Ok(reader.lines())
}
// Usage: Log file processing
async fn analyze_log_file(path: &str) -> io::Result<()> {
use futures::stream::StreamExt;
let file = File::open(path).await?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
let mut error_count = 0;
let mut warning_count = 0;
while let Some(line) = lines.next_line().await? {
if line.contains("ERROR") {
error_count += 1;
println!("Error: {}", line);
} else if line.contains("WARN") {
warning_count += 1;
}
}
println!("Errors: {}, Warnings: {}", error_count, warning_count);
Ok(())
}
// Stream pipeline: filter, map, collect
async fn extract_error_messages(path: &str) -> io::Result<Vec<String>> {
use futures::stream::{StreamExt, TryStreamExt};
let file = File::open(path).await?;
let reader = BufReader::new(file);
reader
.lines()
.try_filter(|line| {
futures::future::ready(line.contains("ERROR"))
})
.try_map(|line| {
// Extract timestamp and message
let parts: Vec<&str> = line.splitn(3, ' ').collect();
if parts.len() >= 3 {
futures::future::ok(parts[2].to_string())
} else {
futures::future::ok(line)
}
})
.try_collect()
.await
}
// Processing multiple files concurrently
async fn process_log_directory(dir_path: &str) -> io::Result<()> {
use futures::stream::{self, StreamExt};
use tokio::fs;
// Read directory entries
let mut entries = fs::read_dir(dir_path).await?;
let mut files = Vec::new();
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("log") {
files.push(path);
}
}
// Process all log files concurrently
stream::iter(files)
.map(|path| async move {
println!("Processing {:?}", path);
analyze_log_file(path.to_str().unwrap()).await
})
.buffer_unordered(10)
.collect::<Vec<_>>()
.await;
Ok(())
}
// Rate-limited file processing
use tokio::time::{interval, Duration};
async fn rate_limited_processing(path: &str) -> io::Result<()> {
use futures::stream::StreamExt;
let file = File::open(path).await?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
let mut rate_limiter = interval(Duration::from_millis(100));
while let Some(line) = lines.next_line().await? {
// Wait for rate limit
rate_limiter.tick().await;
// Process line (e.g., API call)
send_to_api(&line).await?;
}
Ok(())
}
Performance characteristics:
---
use futures::stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::time::{interval, Interval};
/// Server-Sent Events (SSE) stream
pub struct SseStream {
/// Broadcast receiver for events
receiver: broadcast::Receiver<ServerEvent>,
/// Heartbeat interval to keep connection alive
heartbeat: Interval,
}
#[derive(Clone, Debug)]
pub enum ServerEvent {
Message { id: u64, data: String },
Notification { level: String, text: String },
Update { resource: String, data: String },
}
impl SseStream {
pub fn new(receiver: broadcast::Receiver<ServerEvent>) -> Self {
SseStream {
receiver,
heartbeat: interval(Duration::from_secs(15)),
}
}
}
impl Stream for SseStream {
type Item = Result<String, String>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> {
// Try to receive event
match self.receiver.try_recv() {
Ok(event) => {
// Format as SSE
let sse_data = match event {
ServerEvent::Message { id, data } => {
format!("id: {}\nevent: message\ndata: {}\n\n", id, data)
}
ServerEvent::Notification { level, text } => {
format!("event: notification\ndata: {{\"level\":\"{}\",\"text\":\"{}\"}}\n\n", level, text)
}
ServerEvent::Update { resource, data } => {
format!("event: update\ndata: {{\"resource\":\"{}\",\"data\":\"{}\"}}\n\n", resource, data)
}
};
return Poll::Ready(Some(Ok(sse_data)));
}
Err(broadcast::error::TryRecvError::Empty) => {
// No events - check heartbeat
}
Err(broadcast::error::TryRecvError::Lagged(_)) => {
// Receiver too slow - send error
return Poll::Ready(Some(Err("Client lagging".to_string())));
}
Err(broadcast::error::TryRecvError::Closed) => {
// Channel closed
return Poll::Ready(None);
}
}
// Check if heartbeat should fire
if let Poll::Ready(_) = self.heartbeat.poll_tick(cx) {
// Send heartbeat comment
return Poll::Ready(Some(Ok(": heartbeat\n\n".to_string())));
}
// Register waker for next event
cx.waker().wake_by_ref();
Poll::Pending
}
}
// HTTP handler for SSE endpoint
use axum::{
response::{sse::{Event, KeepAlive, Sse}, IntoResponse},
extract::State,
};
pub async fn sse_handler(
State(event_tx): State<broadcast::Sender<ServerEvent>>,
) -> impl IntoResponse {
let receiver = event_tx.subscribe();
let stream = SseStream::new(receiver);
Sse::new(stream)
.keep_alive(KeepAlive::default())
}
// Event broadcaster
pub struct EventBroadcaster {
sender: broadcast::Sender<ServerEvent>,
}
impl EventBroadcaster {
pub fn new() -> Self {
let (sender, _) = broadcast::channel(1000);
EventBroadcaster { sender }
}
pub fn broadcast(&self, event: ServerEvent) {
let _ = self.sender.send(event);
}
pub fn subscribe(&self) -> broadcast::Receiver<ServerEvent> {
self.sender.subscribe()
}
}
// Usage: Real-time dashboard
async fn run_dashboard_server() {
use axum::{Router, routing::get};
let broadcaster = EventBroadcaster::new();
let event_tx = broadcaster.sender.clone();
// Spawn task to generate events
tokio::spawn(async move {
let mut counter = 0u64;
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
broadcaster.broadcast(ServerEvent::Message {
id: counter,
data: format!("Update #{}", counter),
});
counter += 1;
}
});
let app = Router::new()
.route("/events", get(sse_handler))
.with_state(event_tx);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
// Client reconnection with exponential backoff
pub struct ReconnectingStream<S> {
stream: Option<S>,
reconnect_fn: Box<dyn Fn() -> S>,
backoff: Duration,
max_backoff: Duration,
}
impl<S: Stream> Stream for ReconnectingStream<S> {
type Item = S::Item;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> {
if let Some(stream) = &mut self.stream {
match Pin::new(stream).poll_next(cx) {
Poll::Ready(Some(item)) => {
// Reset backoff on success
self.backoff = Duration::from_millis(100);
Poll::Ready(Some(item))
}
Poll::Ready(None) => {
// Stream ended - reconnect
self.stream = None;
cx.waker().wake_by_ref();
Poll::Pending
}
Poll::Pending => Poll::Pending,
}
} else {
// Reconnect
self.stream = Some((self.reconnect_fn)());
// Increase backoff
self.backoff = (self.backoff * 2).min(self.max_backoff);
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
Key features:
---
use futures::stream::{Stream, StreamExt};
use std::collections::HashMap;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
use serde::{Serialize, Deserialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LogEntry {
pub timestamp: i64,
pub level: LogLevel,
pub source: String,
pub message: String,
pub metadata: HashMap<String, String>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum LogLevel {
Debug,
Info,
Warning,
Error,
Critical,
}
/// Multi-source log aggregation stream
pub struct LogAggregator {
/// Receivers from multiple sources
sources: Vec<mpsc::Receiver<LogEntry>>,
/// Current source being polled
current_index: usize,
}
impl LogAggregator {
pub fn new(sources: Vec<mpsc::Receiver<LogEntry>>) -> Self {
LogAggregator {
sources,
current_index: 0,
}
}
}
impl Stream for LogAggregator {
type Item = LogEntry;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> {
if self.sources.is_empty() {
return Poll::Ready(None);
}
// Round-robin polling of sources
let mut all_pending = true;
for _ in 0..self.sources.len() {
let source = &mut self.sources[self.current_index];
match source.poll_recv(cx) {
Poll::Ready(Some(entry)) => {
// Got an entry
self.current_index = (self.current_index + 1) % self.sources.len();
return Poll::Ready(Some(entry));
}
Poll::Ready(None) => {
// Source exhausted - remove it
self.sources.remove(self.current_index);
if self.sources.is_empty() {
return Poll::Ready(None);
}
self.current_index %= self.sources.len();
all_pending = false;
}
Poll::Pending => {
// Try next source
self.current_index = (self.current_index + 1) % self.sources.len();
}
}
}
if all_pending {
Poll::Pending
} else {
// Try again
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
// Log processing pipeline
pub struct LogPipeline {
aggregator: LogAggregator,
filters: Vec<Box<dyn Fn(&LogEntry) -> bool + Send>>,
transformers: Vec<Box<dyn Fn(LogEntry) -> LogEntry + Send>>,
}
impl LogPipeline {
pub fn new(aggregator: LogAggregator) -> Self {
LogPipeline {
aggregator,
filters: Vec::new(),
transformers: Vec::new(),
}
}
pub fn filter<F>(mut self, predicate: F) -> Self
where
F: Fn(&LogEntry) -> bool + Send + 'static,
{
self.filters.push(Box::new(predicate));
self
}
pub fn transform<F>(mut self, transformer: F) -> Self
where
F: Fn(LogEntry) -> LogEntry + Send + 'static,
{
self.transformers.push(Box::new(transformer));
self
}
}
impl Stream for LogPipeline {
type Item = LogEntry;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> {
loop {
match Pin::new(&mut self.aggregator).poll_next(cx) {
Poll::Ready(Some(mut entry)) => {
// Apply filters
let mut passes = true;
for filter in &self.filters {
if !filter(&entry) {
passes = false;
break;
}
}
if !passes {
continue; // Skip this entry
}
// Apply transformations
for transformer in &self.transformers {
entry = transformer(entry);
}
return Poll::Ready(Some(entry));
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
}
}
}
}
// Usage: Complete log aggregation system
async fn run_log_aggregation_system() {
use tokio::time::{interval, Duration};
// Create log sources
let (app_tx, app_rx) = mpsc::channel(1000);
let (web_tx, web_rx) = mpsc::channel(1000);
let (db_tx, db_rx) = mpsc::channel(1000);
// Simulate application logs
tokio::spawn(async move {
let mut ticker = interval(Duration::from_millis(100));
let mut counter = 0;
loop {
ticker.tick().await;
let entry = LogEntry {
timestamp: chrono::Utc::now().timestamp(),
level: LogLevel::Info,
source: "app".to_string(),
message: format!("Processing request #{}", counter),
metadata: HashMap::new(),
};
if app_tx.send(entry).await.is_err() {
break;
}
counter += 1;
}
});
// Simulate web server logs
tokio::spawn(async move {
let mut ticker = interval(Duration::from_millis(50));
loop {
ticker.tick().await;
let entry = LogEntry {
timestamp: chrono::Utc::now().timestamp(),
level: LogLevel::Debug,
source: "web".to_string(),
message: "HTTP request received".to_string(),
metadata: HashMap::from([
("method".to_string(), "GET".to_string()),
("path".to_string(), "/api/users".to_string()),
]),
};
if web_tx.send(entry).await.is_err() {
break;
}
}
});
// Simulate database logs
tokio::spawn(async move {
let mut ticker = interval(Duration::from_millis(200));
loop {
ticker.tick().await;
let entry = LogEntry {
timestamp: chrono::Utc::now().timestamp(),
level: LogLevel::Info,
source: "database".to_string(),
message: "Query executed".to_string(),
metadata: HashMap::from([
("duration_ms".to_string(), "45".to_string()),
]),
};
if db_tx.send(entry).await.is_err() {
break;
}
}
});
// Create aggregation pipeline
let aggregator = LogAggregator::new(vec![app_rx, web_rx, db_rx]);
let pipeline = LogPipeline::new(aggregator)
// Filter: only warnings and errors
.filter(|entry| matches!(entry.level, LogLevel::Warning | LogLevel::Error))
// Transform: add environment tag
.transform(|mut entry| {
entry.metadata.insert("environment".to_string(), "production".to_string());
entry
});
// Process logs with batching
let mut batch = Vec::new();
let batch_size = 100;
tokio::pin!(pipeline);
while let Some(entry) = pipeline.next().await {
batch.push(entry);
if batch.len() >= batch_size {
// Send batch to storage (Elasticsearch, S3, etc.)
send_to_storage(&batch).await;
batch.clear();
}
}
// Send remaining batch
if !batch.is_empty() {
send_to_storage(&batch).await;
}
}
// Fan-out to multiple sinks
pub struct LogFanout {
sinks: Vec<mpsc::Sender<LogEntry>>,
}
impl LogFanout {
pub fn new(sinks: Vec<mpsc::Sender<LogEntry>>) -> Self {
LogFanout { sinks }
}
pub async fn send(&self, entry: LogEntry) {
// Send to all sinks (best-effort)
for sink in &self.sinks {
let _ = sink.send(entry.clone()).await;
}
}
}
// Usage: Route logs to multiple destinations
async fn run_fanout_system(aggregator: LogAggregator) {
use futures::stream::StreamExt;
// Create sinks
let (elasticsearch_tx, elasticsearch_rx) = mpsc::channel(1000);
let (s3_tx, s3_rx) = mpsc::channel(1000);
let (metrics_tx, metrics_rx) = mpsc::channel(1000);
// Spawn sink processors
tokio::spawn(async move {
write_to_elasticsearch(elasticsearch_rx).await;
});
tokio::spawn(async move {
write_to_s3(s3_rx).await;
});
tokio::spawn(async move {
update_metrics(metrics_rx).await;
});
// Fan out logs
let fanout = LogFanout::new(vec![elasticsearch_tx, s3_tx, metrics_tx]);
aggregator
.for_each(|entry| async {
fanout.send(entry).await;
})
.await;
}
Architecture benefits:
---
pub trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>;
// Provided method
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
Return values:
// A Future returns ONE value
impl Future for MyFuture {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
// Poll::Ready(T) or Poll::Pending
}
}
// A Stream returns MANY values
impl Stream for MyStream {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
// Poll::Ready(Some(T)), Poll::Ready(None), or Poll::Pending
}
}
// Stream is essentially: Future<Output = Option<Item>> + repeat
Streams need pinning for the same reason as futures:
struct SelfReferentialStream {
buffer: Vec<u8>,
// This points into buffer!
current_slice: *const [u8],
}
// If this moves in memory, current_slice becomes dangling
// Pin guarantees it won't move
impl Stream for MyStream {
type Item = i32;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<i32>> {
if let Some(item) = self.try_get_item() {
return Poll::Ready(Some(item));
}
// No item ready - store waker
self.waker = Some(cx.waker().clone());
// Later, when item arrives:
// self.waker.take().unwrap().wake();
Poll::Pending
}
}
Waker lifecycle:
// Natural backpressure with bounded channels
let (tx, rx) = mpsc::channel(10); // Capacity: 10
// Producer
tokio::spawn(async move {
for i in 0..1000 {
// This blocks when channel is full!
tx.send(i).await.unwrap();
}
});
// Consumer (slow)
while let Some(item) = rx.recv().await {
// Slow processing
slow_operation(item).await;
}
// Producer automatically slows down to match consumer
Manual backpressure:
pub struct BackpressureStream {
source: mpsc::Receiver<Data>,
processing_capacity: usize,
in_flight: usize,
}
impl Stream for BackpressureStream {
type Item = Data;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Data>> {
// Check capacity
if self.in_flight >= self.processing_capacity {
// Too many items in flight - apply backpressure
return Poll::Pending;
}
// Try to receive
match self.source.poll_recv(cx) {
Poll::Ready(Some(data)) => {
self.in_flight += 1;
Poll::Ready(Some(data))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
use futures::stream::{self, StreamExt};
// DANGEROUS: Can OOM if producer faster than consumer
let stream = stream::iter(0..1_000_000)
.map(|i| expensive_operation(i))
.buffered(usize::MAX); // Unbounded!
Bounded buffering:
// Safe: Limits concurrent operations
let stream = stream::iter(0..1_000_000)
.map(|i| expensive_operation(i))
.buffered(10); // Max 10 concurrent operations
Adaptive buffering:
pub struct AdaptiveBuffer<S> {
stream: S,
buffer: VecDeque<S::Item>,
capacity: usize,
min_capacity: usize,
max_capacity: usize,
}
impl<S: Stream> Stream for AdaptiveBuffer<S> {
type Item = S::Item;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<S::Item>> {
// Emit from buffer if available
if let Some(item) = self.buffer.pop_front() {
return Poll::Ready(Some(item));
}
// Fill buffer
while self.buffer.len() < self.capacity {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(Some(item)) => {
self.buffer.push_back(item);
}
Poll::Ready(None) => {
return Poll::Ready(None);
}
Poll::Pending => break,
}
}
// Adjust capacity based on utilization
if self.buffer.is_empty() {
// Consumer is fast - reduce buffer
self.capacity = (self.capacity / 2).max(self.min_capacity);
} else if self.buffer.len() == self.capacity {
// Consumer is slow - increase buffer
self.capacity = (self.capacity * 2).min(self.max_capacity);
}
if let Some(item) = self.buffer.pop_front() {
Poll::Ready(Some(item))
} else {
Poll::Pending
}
}
}
use futures::stream::TryStream;
// TryStream is Stream<Item = Result<T, E>>
pub trait TryStream: Stream {
type Ok;
type Error;
// Automatically implemented for Stream<Item = Result<Ok, Error>>
}
// Example: Database query stream with errors
impl Stream for QueryStream {
type Item = Result<Row, DbError>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Result<Row, DbError>>> {
// Return Ready(Some(Ok(row))) or Ready(Some(Err(error)))
}
}
// Using TryStream combinators
async fn process_query_results(stream: QueryStream) -> Result<Vec<User>, DbError> {
use futures::stream::TryStreamExt;
stream
.try_filter(|row| {
futures::future::ready(row.get::<i32, _>("active") == 1)
})
.try_map(|row| {
futures::future::ok(User {
id: row.get("id"),
name: row.get("name"),
})
})
.try_collect()
.await
}
// Error recovery
async fn process_with_retry(stream: QueryStream) {
use futures::stream::TryStreamExt;
stream
.or_else(|error| async move {
// Log error and return fallback
eprintln!("Error: {}", error);
Ok(Row::default()) // Fallback value
})
.for_each(|row| async move {
process_row(row).await;
})
.await;
}
use futures::stream::{Stream, StreamExt};
async fn demonstrate_combinators<S: Stream<Item = i32>>(stream: S) {
// Map: Transform each item
stream
.map(|x| x * 2)
.collect::<Vec<_>>()
.await;
// Filter: Keep only matching items
stream
.filter(|x| futures::future::ready(*x > 10))
.collect::<Vec<_>>()
.await;
// FilterMap: Filter and transform
stream
.filter_map(|x| {
if x % 2 == 0 {
futures::future::ready(Some(x / 2))
} else {
futures::future::ready(None)
}
})
.collect::<Vec<_>>()
.await;
// Take: Limit number of items
stream
.take(10)
.collect::<Vec<_>>()
.await;
// Skip: Skip first N items
stream
.skip(5)
.collect::<Vec<_>>()
.await;
// Fold: Reduce to single value
let sum = stream
.fold(0, |acc, x| futures::future::ready(acc + x))
.await;
// ForEach: Process each item
stream
.for_each(|x| async move {
println!("{}", x);
})
.await;
}
// Concurrent processing
async fn concurrent_combinators<S: Stream<Item = i32>>(stream: S) {
// Buffered: Run N futures concurrently
stream
.map(|x| async move {
fetch_data(x).await
})
.buffered(10) // 10 concurrent requests
.collect::<Vec<_>>()
.await;
// BufferUnordered: Don't preserve order
stream
.map(|x| async move {
fetch_data(x).await
})
.buffer_unordered(10) // Faster!
.collect::<Vec<_>>()
.await;
}
// Combining streams
async fn combine_streams() {
use futures::stream;
let stream1 = stream::iter(vec![1, 2, 3]);
let stream2 = stream::iter(vec![4, 5, 6]);
// Chain: Concatenate streams
stream1
.chain(stream2)
.collect::<Vec<_>>()
.await;
// Zip: Pair items from two streams
let stream1 = stream::iter(vec![1, 2, 3]);
let stream2 = stream::iter(vec!["a", "b", "c"]);
stream1
.zip(stream2)
.collect::<Vec<_>>()
.await; // [(1, "a"), (2, "b"), (3, "c")]
}
---
// OVERKILL: Using stream for small Vec
let stream = stream::iter(vec![1, 2, 3]);
// BETTER: Just use iterator
for item in vec![1, 2, 3] {
process(item);
}
// UNNECESSARY: No async needed
stream::iter(users)
.for_each(|user| async move {
// Synchronous operation!
println!("{}", user.name);
})
.await;
// BETTER: Standard loop
for user in users {
println!("{}", user.name);
}
// POINTLESS: Data already in memory
let data = vec![1, 2, 3, 4, 5];
let stream = stream::iter(data);
// BETTER: Direct iteration
for item in data {
process(item);
}
// SLOW: Async overhead for hot path
stream.for_each(|x| async move {
hot_path_computation(x); // Pure CPU, no I/O
}).await;
// BETTER: Synchronous iterator
for x in data {
hot_path_computation(x);
}
---
// WRONG: Unbounded buffer
let stream = source_stream
.map(|item| expensive_operation(item))
.buffered(usize::MAX); // OOM if source is fast!
// CORRECT: Bounded buffer
let stream = source_stream
.map(|item| expensive_operation(item))
.buffered(10); // At most 10 concurrent operations
// WRONG: Blocking I/O in poll_next
impl Stream for BadStream {
type Item = String;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<String>> {
// This blocks the executor!
let data = std::fs::read_to_string("file.txt").unwrap();
Poll::Ready(Some(data))
}
}
// CORRECT: Async I/O
impl Stream for GoodStream {
type Item = String;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<String>> {
// Non-blocking async I/O
match Pin::new(&mut self.read_future).poll(cx) {
Poll::Ready(Ok(data)) => Poll::Ready(Some(data)),
Poll::Ready(Err(_)) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
// WRONG: Ignoring slow consumer
let (tx, rx) = mpsc::unbounded_channel(); // Unbounded!
tokio::spawn(async move {
loop {
let data = generate_data();
tx.send(data).unwrap(); // Never blocks - memory leak!
}
});
// CORRECT: Bounded channel with backpressure
let (tx, rx) = mpsc::channel(100); // Bounded
tokio::spawn(async move {
loop {
let data = generate_data();
// Blocks when channel full - backpressure!
if tx.send(data).await.is_err() {
break;
}
}
});
// WRONG: Doesn't handle None
while let Some(item) = stream.next().await {
process(item).await;
}
// What if stream errors? Lost!
// CORRECT: Handle both success and error termination
loop {
match stream.next().await {
Some(Ok(item)) => process(item).await,
Some(Err(e)) => {
eprintln!("Stream error: {}", e);
break;
}
None => {
println!("Stream complete");
break;
}
}
}
// WRONG: Polling after None
let mut stream = source_stream();
while let Some(item) = stream.next().await {
process(item).await;
}
// Polling again - undefined behavior!
if let Some(item) = stream.next().await {
// This might work, might panic, might return garbage
}
// CORRECT: Use fuse() to make idempotent
let mut stream = source_stream().fuse();
while let Some(item) = stream.next().await {
process(item).await;
}
// Safe: returns None forever after first None
if let Some(item) = stream.next().await {
// Always None
}
// WRONG: Defeats purpose of streaming
let all_rows: Vec<Row> = db_stream.collect().await?;
for row in all_rows {
process(row).await;
}
// CORRECT: Process as stream
db_stream
.for_each(|row| async move {
process(row).await;
})
.await?;
---
// Iterator: O(1) memory
let sum: i32 = (0..1_000_000).sum();
// Stream: O(1) memory (if not buffered)
let sum = stream::iter(0..1_000_000)
.fold(0, |acc, x| futures::future::ready(acc + x))
.await;
// Buffered stream: O(buffer_size) memory
let results = stream::iter(0..1_000_000)
.map(|x| fetch(x))
.buffered(10) // 10 * size_of(Future) in memory
.collect::<Vec<_>>()
.await;
// Benchmark: Sync iterator
fn sync_sum(data: &[i32]) -> i32 {
data.iter().sum() // ~1ns per item
}
// Benchmark: Async stream
async fn async_sum(data: &[i32]) -> i32 {
stream::iter(data)
.fold(0, |acc, &x| futures::future::ready(acc + x))
.await
// ~10-50ns per item (poll overhead)
}
// Lesson: Don't use streams for CPU-bound hot paths!
| Aspect | Iterator | Stream |
|--------|----------|--------|
| Latency per item | 0ns (inline) | 10-50ns (poll overhead) |
| Memory | O(1) | O(1) + waker storage |
| Concurrency | No | Yes (with buffered) |
| Backpressure | No | Yes |
| Async I/O | Blocks | Non-blocking |
| Use case | CPU-bound | I/O-bound |
// BAD: Copying on every item
impl Stream for CopyStream {
type Item = Vec<u8>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Vec<u8>>> {
// Clones data!
Poll::Ready(Some(self.buffer.clone()))
}
}
// GOOD: Zero-copy with Arc
impl Stream for ZeroCopyStream {
type Item = Arc<Vec<u8>>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Arc<Vec<u8>>>> {
// Shared ownership, no copy
Poll::Ready(Some(self.buffer.clone())) // Just increments refcount
}
}
// BEST: Zero-copy with bytes crate
use bytes::Bytes;
impl Stream for BytesStream {
type Item = Bytes;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Bytes>> {
// Bytes uses Arc internally, cheap clone
Poll::Ready(Some(self.buffer.clone()))
}
}
---
Create a stream that yields numbers from start to end:
use futures::stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
struct RangeStream {
current: i32,
end: i32,
}
impl RangeStream {
pub fn new(start: i32, end: i32) -> Self {
RangeStream {
current: start,
end,
}
}
}
impl Stream for RangeStream {
type Item = i32;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<i32>> {
// TODO: Implement
todo!()
}
}
// Test:
#[tokio::main]
async fn main() {
use futures::stream::StreamExt;
let mut stream = RangeStream::new(1, 5);
while let Some(value) = stream.next().await {
println!("{}", value);
}
// Should print: 1, 2, 3, 4
}
impl Stream for RangeStream {
type Item = i32;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<i32>> {
if self.current < self.end {
let value = self.current;
self.current += 1;
Poll::Ready(Some(value))
} else {
Poll::Ready(None)
}
}
}
Implement a stream that rate-limits item emission:
use futures::stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::{Duration, Instant, Sleep};
struct ThrottledStream<S> {
stream: S,
delay: Duration,
next_allowed: Instant,
sleep: Option<Pin<Box<Sleep>>>,
}
impl<S: Stream> ThrottledStream<S> {
pub fn new(stream: S, delay: Duration) -> Self {
ThrottledStream {
stream,
delay,
next_allowed: Instant::now(),
sleep: None,
}
}
}
impl<S: Stream + Unpin> Stream for ThrottledStream<S> {
type Item = S::Item;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<S::Item>> {
// TODO: Implement rate limiting
todo!()
}
}
// Test:
#[tokio::main]
async fn main() {
use futures::stream::{self, StreamExt};
use tokio::time::Instant;
let start = Instant::now();
let stream = stream::iter(vec![1, 2, 3, 4, 5]);
let throttled = ThrottledStream::new(stream, Duration::from_millis(100));
throttled.for_each(|x| async move {
println!("{}: {}", start.elapsed().as_millis(), x);
}).await;
// Should print items ~100ms apart
}
impl<S: Stream + Unpin> Stream for ThrottledStream<S> {
type Item = S::Item;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<S::Item>> {
let now = Instant::now();
// Check if we need to delay
if now < self.next_allowed {
// Create sleep if needed
if self.sleep.is_none() {
self.sleep = Some(Box::pin(tokio::time::sleep_until(self.next_allowed)));
}
// Poll the sleep
match self.sleep.as_mut().unwrap().as_mut().poll(cx) {
Poll::Ready(()) => {
self.sleep = None;
// Fall through to poll stream
}
Poll::Pending => return Poll::Pending,
}
}
// Poll underlying stream
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(Some(item)) => {
// Schedule next allowed time
self.next_allowed = Instant::now() + self.delay;
Poll::Ready(Some(item))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
Create a stream that distributes items to multiple consumers with backpressure:
use futures::stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
struct FanoutStream<T> {
source: Pin<Box<dyn Stream<Item = T>>>,
sinks: Vec<mpsc::Sender<T>>,
}
impl<T: Clone> FanoutStream<T> {
pub fn new(
source: impl Stream<Item = T> + 'static,
sink_count: usize,
buffer_size: usize,
) -> (Self, Vec<mpsc::Receiver<T>>) {
let mut sinks = Vec::new();
let mut receivers = Vec::new();
for _ in 0..sink_count {
let (tx, rx) = mpsc::channel(buffer_size);
sinks.push(tx);
receivers.push(rx);
}
let fanout = FanoutStream {
source: Box::pin(source),
sinks,
};
(fanout, receivers)
}
}
impl<T: Clone> Stream for FanoutStream<T> {
type Item = ();
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<()>> {
// TODO: Implement fan-out with backpressure
// - Poll source for next item
// - Try to send to all sinks
// - If any sink is full, apply backpressure
// - Remove closed sinks
todo!()
}
}
// Test:
#[tokio::main]
async fn main() {
use futures::stream::{self, StreamExt};
let source = stream::iter(0..100);
let (fanout, mut receivers) = FanoutStream::new(source, 3, 10);
// Spawn fanout task
tokio::spawn(async move {
fanout.for_each(|_| async {}).await;
});
// Spawn consumers
for (i, mut rx) in receivers.into_iter().enumerate() {
tokio::spawn(async move {
while let Some(item) = rx.recv().await {
println!("Consumer {}: {}", i, item);
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
You'll need to:
try_send() to check if sink is full---
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::fs::File;
async fn read_lines(path: &str) -> std::io::Result<()> {
let file = File::open(path).await?;
let reader = BufReader::new(file);
let mut lines = reader.lines(); // Returns Stream!
while let Some(line) = lines.next_line().await? {
println!("{}", line);
}
Ok(())
}
use tokio_stream::{StreamExt, Stream};
async fn process_events<S: Stream<Item = Event>>(stream: S) {
stream
.filter(|event| event.severity >= Severity::Warning)
.map(|event| event.message)
.take(100)
.for_each(|msg| async move {
send_alert(&msg).await;
})
.await;
}
use futures::stream::{self, StreamExt};
async fn parallel_requests(urls: Vec<String>) -> Vec<Response> {
stream::iter(urls)
.map(|url| async move {
reqwest::get(&url).await
})
.buffered(10) // 10 concurrent requests
.collect()
.await
}
use async_stream::stream;
fn fibonacci() -> impl Stream<Item = u64> {
stream! {
let mut a = 0u64;
let mut b = 1u64;
loop {
yield a;
let next = a + b;
a = b;
b = next;
}
}
}
// Usage:
#[tokio::main]
async fn main() {
let mut fib = fibonacci();
while let Some(n) = fib.next().await {
if n > 1000 {
break;
}
println!("{}", n);
}
}
use tonic::{Request, Response, Status};
use futures::Stream;
// Server-side streaming RPC
async fn list_features(
request: Request<Rectangle>,
) -> Result<Response<impl Stream<Item = Result<Feature, Status>>>, Status> {
let rect = request.into_inner();
let stream = stream! {
for feature in get_features_in_rectangle(&rect) {
yield Ok(feature);
}
};
Ok(Response::new(Box::pin(stream)))
}
---
---
// Helper functions used in examples above
use tokio_postgres::Row;
use std::io;
// WebSocket helpers
async fn process_chat_message(client_id: u64, text: &str) {
println!("[Processing] Client {}: {}", client_id, text);
}
async fn process_binary_data(client_id: u64, data: Vec<u8>) {
println!("[Processing] Client {} sent {} bytes", client_id, data.len());
}
async fn save_to_database(text: &str) -> Result<(), String> {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
Ok(())
}
async fn broadcast_to_all_clients(message: tokio_tungstenite::tungstenite::Message) {
println!("[Broadcast] {}", message);
}
// Database helpers
async fn process_user(id: i64, name: &str, email: &str) {
println!("Processing user: {} - {} ({})", id, name, email);
}
async fn expensive_processing(id: i64) -> Result<(), Box<dyn std::error::Error>> {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
Ok(())
}
// File processing helpers
async fn send_to_api(line: &str) -> io::Result<()> {
println!("[API] Sending: {}", line);
Ok(())
}
// Log aggregation helpers
async fn send_to_storage(batch: &[LogEntry]) {
println!("[Storage] Sending batch of {} entries", batch.len());
}
async fn write_to_elasticsearch(mut rx: tokio::sync::mpsc::Receiver<LogEntry>) {
while let Some(entry) = rx.recv().await {
println!("[Elasticsearch] {:?}", entry);
}
}
async fn write_to_s3(mut rx: tokio::sync::mpsc::Receiver<LogEntry>) {
while let Some(entry) = rx.recv().await {
println!("[S3] {:?}", entry);
}
}
async fn update_metrics(mut rx: tokio::sync::mpsc::Receiver<LogEntry>) {
while let Some(entry) = rx.recv().await {
println!("[Metrics] {:?}", entry);
}
}
// Generic helpers
async fn expensive_operation(x: i32) -> i32 {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
x * 2
}
fn hot_path_computation(x: i32) -> i32 {
x * x
}
async fn fetch_data(id: i32) -> String {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
format!("Data for {}", id)
}
async fn fetch(x: i32) -> Result<String, String> {
Ok(format!("Fetched {}", x))
}
async fn send_alert(msg: &str) {
println!("[ALERT] {}", msg);
}
async fn process(item: i32) {
println!("Processing {}", item);
}
async fn process_row(row: Row) {
println!("Processing row: {:?}", row);
}
---
Streams are Rust's async iterators:
Key Takeaways:Understanding streams makes you proficient at building scalable, memory-efficient async systems in Rust!
Now go stream some legendary data pipelines!
Run this code in the official Rust Playground