Home/Concurrency Patterns/Channel Patterns

Channel Patterns

mpsc, oneshot, broadcast patterns

intermediate
channelmpsccommunication
🎮 Interactive Playground

What are Channels?

Channels are a powerful concurrency primitive in Rust that allow threads to communicate by sending messages. They follow the "share memory by communicating" philosophy, providing a safe and ergonomic way to transfer data between threads without shared state.

Rust provides several channel types:

  • mpsc (multiple producer, single consumer): Standard library channels
  • oneshot: Single-use channels for one-time communication
  • broadcast: Multiple consumers receive the same messages
  • bounded/unbounded: With or without capacity limits

The Problem

Concurrent programs need to:

  1. Transfer data between threads safely
  2. Coordinate work without shared mutable state
  3. Handle backpressure and flow control
  4. Avoid deadlocks and race conditions

Traditional shared-memory approaches using mutexes can be error-prone. Channels provide a safer alternative.

Example Code

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

// Example 1: Basic mpsc channel
fn basic_channel() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let messages = vec!["hello", "from", "the", "thread"];
        for msg in messages {
            tx.send(msg).unwrap();
            thread::sleep(Duration::from_millis(100));
        }
    });

    // Receive messages
    for received in rx {
        println!("Got: {}", received);
    }
}

// Example 2: Multiple producers
fn multiple_producers() {
    let (tx, rx) = mpsc::channel();

    for i in 0..3 {
        let tx_clone = tx.clone();
        thread::spawn(move || {
            tx_clone.send(format!("Message from thread {}", i)).unwrap();
        });
    }

    // Drop original sender to close channel
    drop(tx);

    // Collect all messages
    for received in rx {
        println!("Got: {}", received);
    }
}

// Example 3: Bounded channel (sync_channel)
fn bounded_channel() {
    let (tx, rx) = mpsc::sync_channel(2); // Buffer size of 2

    let sender = thread::spawn(move || {
        for i in 0..5 {
            println!("Sending: {}", i);
            tx.send(i).unwrap(); // Blocks when buffer is full
            println!("Sent: {}", i);
        }
    });

    thread::sleep(Duration::from_millis(500));

    for received in rx {
        println!("Received: {}", received);
        thread::sleep(Duration::from_millis(200));
    }

    sender.join().unwrap();
}

// Example 4: Select pattern (using crossbeam)
use crossbeam::channel;
use crossbeam::select;

fn select_pattern() {
    let (tx1, rx1) = channel::unbounded();
    let (tx2, rx2) = channel::unbounded();

    thread::spawn(move || {
        thread::sleep(Duration::from_millis(100));
        tx1.send("from channel 1").unwrap();
    });

    thread::spawn(move || {
        thread::sleep(Duration::from_millis(50));
        tx2.send("from channel 2").unwrap();
    });

    // Wait for first message from either channel
    select! {
        recv(rx1) -> msg => println!("Received: {:?}", msg),
        recv(rx2) -> msg => println!("Received: {:?}", msg),
    }
}

fn main() {
    println!("=== Basic Channel ===");
    basic_channel();

    println!("\n=== Multiple Producers ===");
    multiple_producers();

    println!("\n=== Bounded Channel ===");
    bounded_channel();

    println!("\n=== Select Pattern ===");
    select_pattern();
}

Why It Works

Message Passing

  • Channels transfer ownership of messages
  • Sender and receiver are separate endpoints
  • Type safety ensures correct message types
  • Automatic cleanup when channels are dropped

Backpressure

  • Bounded channels block when full
  • Prevents unbounded memory growth
  • Natural flow control mechanism

Disconnection

  • Channel closes when all senders are dropped
  • Receivers can detect disconnection
  • Clean shutdown semantics

When to Use

Use mpsc when:

  • Multiple threads produce data for one consumer
  • Implementing work queues
  • Event processing systems
  • Actor model implementations

Use bounded channels when:

  • Need backpressure control
  • Want to limit memory usage
  • Coordinating producer/consumer speeds

Use broadcast when:

  • Multiple consumers need same data
  • Implementing pub/sub patterns
  • Broadcasting events to multiple listeners

Use oneshot when:

  • Single request/response pattern
  • Future-like behavior needed
  • RPC implementations

⚠️ Anti-patterns

⚠️ Mistake #1: Not Handling Disconnection

use std::sync::mpsc;

// ❌ DON'T: Ignore send errors
let (tx, rx) = mpsc::channel();
drop(rx); // Receiver dropped

// This will panic!
// tx.send(42).unwrap();

// ✅ DO: Handle disconnection gracefully
if let Err(e) = tx.send(42) {
    println!("Send failed: {}", e);
}

⚠️ Mistake #2: Creating Deadlocks

// ❌ DON'T: Block on bounded channel without consumer
let (tx, rx) = mpsc::sync_channel(1);

tx.send(1).unwrap();
tx.send(2).unwrap(); // Blocks forever if no receiver!

// ✅ DO: Ensure receiver is running
thread::spawn(move || {
    for msg in rx {
        println!("Got: {}", msg);
    }
});

tx.send(1).unwrap();
tx.send(2).unwrap();

⚠️ Mistake #3: Forgetting to Drop Senders

// ❌ DON'T: Keep sender alive unnecessarily
let (tx, rx) = mpsc::channel();

let tx_clone = tx.clone();
thread::spawn(move || {
    tx_clone.send(42).unwrap();
});

// rx.iter() will block forever because tx is still alive!

// ✅ DO: Drop unused senders
drop(tx);
for msg in rx {
    println!("{}", msg);
}

Advanced Example: Work Stealing Queue

use std::sync::mpsc::{channel, Sender, Receiver};
use std::thread;
use std::sync::{Arc, Mutex};

/// Work-stealing thread pool using channels
pub struct WorkPool {
    workers: Vec<Worker>,
    sender: Sender<Job>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv();

            match job {
                Ok(job) => {
                    println!("Worker {} executing job", id);
                    job();
                }
                Err(_) => {
                    println!("Worker {} shutting down", id);
                    break;
                }
            }
        });

        Worker { id, thread }
    }
}

impl WorkPool {
    pub fn new(size: usize) -> WorkPool {
        let (sender, receiver) = channel();
        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        WorkPool { workers, sender }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        self.sender.send(job).unwrap();
    }
}

impl Drop for WorkPool {
    fn drop(&mut self) {
        drop(self.sender.clone());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);
            // Join will wait for thread to finish
        }
    }
}

// Usage
fn work_pool_example() {
    let pool = WorkPool::new(4);

    for i in 0..8 {
        pool.execute(move || {
            println!("Task {} running", i);
            thread::sleep(std::time::Duration::from_millis(100));
            println!("Task {} done", i);
        });
    }

    thread::sleep(std::time::Duration::from_secs(2));
}

Advanced Example: Actor System

use std::sync::mpsc::{channel, Sender};
use std::thread;

// Actor message types
enum ActorMessage {
    Increment,
    GetValue(Sender<i32>),
    Stop,
}

// Actor with internal state
struct CounterActor {
    receiver: std::sync::mpsc::Receiver<ActorMessage>,
    value: i32,
}

impl CounterActor {
    fn new() -> (Self, Sender<ActorMessage>) {
        let (tx, rx) = channel();
        let actor = CounterActor {
            receiver: rx,
            value: 0,
        };
        (actor, tx)
    }

    fn run(mut self) {
        loop {
            match self.receiver.recv() {
                Ok(ActorMessage::Increment) => {
                    self.value += 1;
                    println!("Counter incremented to {}", self.value);
                }
                Ok(ActorMessage::GetValue(reply)) => {
                    reply.send(self.value).unwrap();
                }
                Ok(ActorMessage::Stop) => {
                    println!("Actor stopping");
                    break;
                }
                Err(_) => break,
            }
        }
    }
}

fn actor_example() {
    let (actor, handle) = CounterActor::new();

    // Spawn actor thread
    thread::spawn(move || {
        actor.run();
    });

    // Send messages to actor
    handle.send(ActorMessage::Increment).unwrap();
    handle.send(ActorMessage::Increment).unwrap();

    // Get current value
    let (reply_tx, reply_rx) = channel();
    handle.send(ActorMessage::GetValue(reply_tx)).unwrap();
    let value = reply_rx.recv().unwrap();
    println!("Current value: {}", value);

    // Stop actor
    handle.send(ActorMessage::Stop).unwrap();
}

Performance Characteristics

mpsc Channel

  • Unbounded: Fast sends, potential memory issues
  • Bounded: Backpressure but may block senders
  • Lock-free: std implementation uses atomic operations
  • Overhead: Minimal for small messages, consider batching for high throughput

crossbeam Channel

  • Faster: Often outperforms std::mpsc
  • More features: Select, multiple receivers
  • Better scalability: Less contention under high load

Trade-offs

  • Unbounded: Fast but unbounded memory
  • Bounded: Controlled memory but potential blocking
  • Batch sends: Amortize overhead for small messages

Exercises

Beginner

  1. Create a channel that sends 10 numbers from one thread to another
  2. Implement a fan-out pattern (one sender, multiple receivers using cloning)
  3. Write a program that uses a bounded channel to limit concurrent tasks

Intermediate

  1. Build a simple task queue using channels
  2. Implement a pipeline pattern (thread A → thread B → thread C)
  3. Create a request/response pattern using two channels

Advanced

  1. Implement a work-stealing scheduler with multiple queues
  2. Build an actor system with typed messages
  3. Create a rate limiter using bounded channels and timers

Real-World Usage

Tokio mpsc

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);

    tokio::spawn(async move {
        tx.send("hello").await.unwrap();
    });

    while let Some(msg) = rx.recv().await {
        println!("Got: {}", msg);
    }
}

crossbeam Channel

use crossbeam::channel::unbounded;

let (tx, rx) = unbounded();

// High-performance message passing
tx.send(42).unwrap();
let msg = rx.recv().unwrap();

Actix Actor Framework

// Actix uses channels internally for message passing
use actix::prelude::*;

// Messages are sent through channels to actors
// addr.send(MyMessage).await;

Further Reading

🎮 Try it Yourself

🎮

Channel Patterns - Playground

Run this code in the official Rust Playground