mpsc, oneshot, broadcast patterns
Channels are a powerful concurrency primitive in Rust that allow threads to communicate by sending messages. They follow the "share memory by communicating" philosophy, providing a safe and ergonomic way to transfer data between threads without shared state.
Rust provides several channel types:
Concurrent programs need to:
Traditional shared-memory approaches using mutexes can be error-prone. Channels provide a safer alternative.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
// Example 1: Basic mpsc channel
fn basic_channel() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let messages = vec!["hello", "from", "the", "thread"];
for msg in messages {
tx.send(msg).unwrap();
thread::sleep(Duration::from_millis(100));
}
});
// Receive messages
for received in rx {
println!("Got: {}", received);
}
}
// Example 2: Multiple producers
fn multiple_producers() {
let (tx, rx) = mpsc::channel();
for i in 0..3 {
let tx_clone = tx.clone();
thread::spawn(move || {
tx_clone.send(format!("Message from thread {}", i)).unwrap();
});
}
// Drop original sender to close channel
drop(tx);
// Collect all messages
for received in rx {
println!("Got: {}", received);
}
}
// Example 3: Bounded channel (sync_channel)
fn bounded_channel() {
let (tx, rx) = mpsc::sync_channel(2); // Buffer size of 2
let sender = thread::spawn(move || {
for i in 0..5 {
println!("Sending: {}", i);
tx.send(i).unwrap(); // Blocks when buffer is full
println!("Sent: {}", i);
}
});
thread::sleep(Duration::from_millis(500));
for received in rx {
println!("Received: {}", received);
thread::sleep(Duration::from_millis(200));
}
sender.join().unwrap();
}
// Example 4: Select pattern (using crossbeam)
use crossbeam::channel;
use crossbeam::select;
fn select_pattern() {
let (tx1, rx1) = channel::unbounded();
let (tx2, rx2) = channel::unbounded();
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
tx1.send("from channel 1").unwrap();
});
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
tx2.send("from channel 2").unwrap();
});
// Wait for first message from either channel
select! {
recv(rx1) -> msg => println!("Received: {:?}", msg),
recv(rx2) -> msg => println!("Received: {:?}", msg),
}
}
fn main() {
println!("=== Basic Channel ===");
basic_channel();
println!("\n=== Multiple Producers ===");
multiple_producers();
println!("\n=== Bounded Channel ===");
bounded_channel();
println!("\n=== Select Pattern ===");
select_pattern();
}
use std::sync::mpsc;
// ❌ DON'T: Ignore send errors
let (tx, rx) = mpsc::channel();
drop(rx); // Receiver dropped
// This will panic!
// tx.send(42).unwrap();
// ✅ DO: Handle disconnection gracefully
if let Err(e) = tx.send(42) {
println!("Send failed: {}", e);
}
// ❌ DON'T: Block on bounded channel without consumer
let (tx, rx) = mpsc::sync_channel(1);
tx.send(1).unwrap();
tx.send(2).unwrap(); // Blocks forever if no receiver!
// ✅ DO: Ensure receiver is running
thread::spawn(move || {
for msg in rx {
println!("Got: {}", msg);
}
});
tx.send(1).unwrap();
tx.send(2).unwrap();
// ❌ DON'T: Keep sender alive unnecessarily
let (tx, rx) = mpsc::channel();
let tx_clone = tx.clone();
thread::spawn(move || {
tx_clone.send(42).unwrap();
});
// rx.iter() will block forever because tx is still alive!
// ✅ DO: Drop unused senders
drop(tx);
for msg in rx {
println!("{}", msg);
}
use std::sync::mpsc::{channel, Sender, Receiver};
use std::thread;
use std::sync::{Arc, Mutex};
/// Work-stealing thread pool using channels
pub struct WorkPool {
workers: Vec<Worker>,
sender: Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv();
match job {
Ok(job) => {
println!("Worker {} executing job", id);
job();
}
Err(_) => {
println!("Worker {} shutting down", id);
break;
}
}
});
Worker { id, thread }
}
}
impl WorkPool {
pub fn new(size: usize) -> WorkPool {
let (sender, receiver) = channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
WorkPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for WorkPool {
fn drop(&mut self) {
drop(self.sender.clone());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
// Join will wait for thread to finish
}
}
}
// Usage
fn work_pool_example() {
let pool = WorkPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("Task {} running", i);
thread::sleep(std::time::Duration::from_millis(100));
println!("Task {} done", i);
});
}
thread::sleep(std::time::Duration::from_secs(2));
}
use std::sync::mpsc::{channel, Sender};
use std::thread;
// Actor message types
enum ActorMessage {
Increment,
GetValue(Sender<i32>),
Stop,
}
// Actor with internal state
struct CounterActor {
receiver: std::sync::mpsc::Receiver<ActorMessage>,
value: i32,
}
impl CounterActor {
fn new() -> (Self, Sender<ActorMessage>) {
let (tx, rx) = channel();
let actor = CounterActor {
receiver: rx,
value: 0,
};
(actor, tx)
}
fn run(mut self) {
loop {
match self.receiver.recv() {
Ok(ActorMessage::Increment) => {
self.value += 1;
println!("Counter incremented to {}", self.value);
}
Ok(ActorMessage::GetValue(reply)) => {
reply.send(self.value).unwrap();
}
Ok(ActorMessage::Stop) => {
println!("Actor stopping");
break;
}
Err(_) => break,
}
}
}
}
fn actor_example() {
let (actor, handle) = CounterActor::new();
// Spawn actor thread
thread::spawn(move || {
actor.run();
});
// Send messages to actor
handle.send(ActorMessage::Increment).unwrap();
handle.send(ActorMessage::Increment).unwrap();
// Get current value
let (reply_tx, reply_rx) = channel();
handle.send(ActorMessage::GetValue(reply_tx)).unwrap();
let value = reply_rx.recv().unwrap();
println!("Current value: {}", value);
// Stop actor
handle.send(ActorMessage::Stop).unwrap();
}
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
tokio::spawn(async move {
tx.send("hello").await.unwrap();
});
while let Some(msg) = rx.recv().await {
println!("Got: {}", msg);
}
}
use crossbeam::channel::unbounded;
let (tx, rx) = unbounded();
// High-performance message passing
tx.send(42).unwrap();
let msg = rx.recv().unwrap();
// Actix uses channels internally for message passing
use actix::prelude::*;
// Messages are sent through channels to actors
// addr.send(MyMessage).await;
Run this code in the official Rust Playground