Event-driven systems with channels
The Observer pattern defines a one-to-many dependency between objects. When one object (the subject) changes state, all its dependents (observers) are notified. In Rust, we typically implement this using channels or callback traits.
Implementing observers in Rust involves:
use std::sync::{Arc, Mutex, Weak};
use std::collections::HashMap;
/// Observer trait - what observers must implement
pub trait Observer<T>: Send + Sync {
fn on_notify(&self, event: &T);
}
/// Observable subject using trait objects
pub struct Subject<T> {
observers: Mutex<Vec<Weak<dyn Observer<T>>>>,
}
impl<T> Subject<T> {
pub fn new() -> Self {
Subject {
observers: Mutex::new(Vec::new()),
}
}
pub fn subscribe(&self, observer: &Arc<dyn Observer<T>>) {
let weak = Arc::downgrade(observer);
self.observers.lock().unwrap().push(weak);
}
pub fn notify(&self, event: &T) {
let mut observers = self.observers.lock().unwrap();
// Remove dead observers and notify live ones
observers.retain(|weak| {
if let Some(observer) = weak.upgrade() {
observer.on_notify(event);
true
} else {
false // Observer was dropped, remove it
}
});
}
}
impl<T> Default for Subject<T> {
fn default() -> Self {
Self::new()
}
}
/// Channel-based observer (recommended for most cases)
use std::sync::mpsc::{self, Sender, Receiver};
pub struct EventBus<T: Clone> {
subscribers: Mutex<Vec<Sender<T>>>,
}
impl<T: Clone + Send + 'static> EventBus<T> {
pub fn new() -> Self {
EventBus {
subscribers: Mutex::new(Vec::new()),
}
}
pub fn subscribe(&self) -> Receiver<T> {
let (tx, rx) = mpsc::channel();
self.subscribers.lock().unwrap().push(tx);
rx
}
pub fn publish(&self, event: T) {
let mut subs = self.subscribers.lock().unwrap();
// Remove disconnected subscribers
subs.retain(|tx| tx.send(event.clone()).is_ok());
}
}
impl<T: Clone + Send + 'static> Default for EventBus<T> {
fn default() -> Self {
Self::new()
}
}
/// Callback-based observer (closure approach)
pub struct CallbackSubject<T> {
callbacks: Mutex<HashMap<usize, Box<dyn Fn(&T) + Send + Sync>>>,
next_id: Mutex<usize>,
}
pub struct Subscription {
id: usize,
}
impl<T> CallbackSubject<T> {
pub fn new() -> Self {
CallbackSubject {
callbacks: Mutex::new(HashMap::new()),
next_id: Mutex::new(0),
}
}
pub fn subscribe<F>(&self, callback: F) -> Subscription
where
F: Fn(&T) + Send + Sync + 'static,
{
let mut next_id = self.next_id.lock().unwrap();
let id = *next_id;
*next_id += 1;
self.callbacks.lock().unwrap().insert(id, Box::new(callback));
Subscription { id }
}
pub fn unsubscribe(&self, subscription: Subscription) {
self.callbacks.lock().unwrap().remove(&subscription.id);
}
pub fn notify(&self, event: &T) {
let callbacks = self.callbacks.lock().unwrap();
for callback in callbacks.values() {
callback(event);
}
}
}
impl<T> Default for CallbackSubject<T> {
fn default() -> Self {
Self::new()
}
}
/// Typed event system
#[derive(Debug, Clone)]
pub enum StockEvent {
PriceChanged { symbol: String, price: f64 },
VolumeChanged { symbol: String, volume: u64 },
TradeExecuted { symbol: String, quantity: u32, price: f64 },
}
/// Stock ticker implementing observable
pub struct StockTicker {
prices: Mutex<HashMap<String, f64>>,
event_bus: EventBus<StockEvent>,
}
impl StockTicker {
pub fn new() -> Self {
StockTicker {
prices: Mutex::new(HashMap::new()),
event_bus: EventBus::new(),
}
}
pub fn subscribe(&self) -> Receiver<StockEvent> {
self.event_bus.subscribe()
}
pub fn update_price(&self, symbol: &str, price: f64) {
self.prices.lock().unwrap().insert(symbol.to_string(), price);
self.event_bus.publish(StockEvent::PriceChanged {
symbol: symbol.to_string(),
price,
});
}
pub fn execute_trade(&self, symbol: &str, quantity: u32, price: f64) {
self.event_bus.publish(StockEvent::TradeExecuted {
symbol: symbol.to_string(),
quantity,
price,
});
}
}
impl Default for StockTicker {
fn default() -> Self {
Self::new()
}
}
/// Async observer using tokio (shown as example, won't run without tokio)
#[cfg(feature = "tokio")]
mod async_observer {
use tokio::sync::broadcast;
pub struct AsyncEventBus<T: Clone> {
sender: broadcast::Sender<T>,
}
impl<T: Clone + Send + 'static> AsyncEventBus<T> {
pub fn new(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
AsyncEventBus { sender }
}
pub fn subscribe(&self) -> broadcast::Receiver<T> {
self.sender.subscribe()
}
pub fn publish(&self, event: T) -> Result<usize, broadcast::error::SendError<T>> {
self.sender.send(event)
}
}
}
// Example concrete observer
struct PriceLogger {
name: String,
}
impl Observer<StockEvent> for PriceLogger {
fn on_notify(&self, event: &StockEvent) {
match event {
StockEvent::PriceChanged { symbol, price } => {
println!("[{}] {} price changed to ${:.2}", self.name, symbol, price);
}
StockEvent::TradeExecuted { symbol, quantity, price } => {
println!(
"[{}] Trade: {} shares of {} at ${:.2}",
self.name, quantity, symbol, price
);
}
_ => {}
}
}
}
fn main() {
println!("=== Trait-based Observer ===");
let subject: Subject<StockEvent> = Subject::new();
let logger: Arc<dyn Observer<StockEvent>> = Arc::new(PriceLogger {
name: "Logger".to_string(),
});
subject.subscribe(&logger);
subject.notify(&StockEvent::PriceChanged {
symbol: "AAPL".to_string(),
price: 150.0,
});
println!("\n=== Channel-based Observer ===");
let ticker = StockTicker::new();
let rx = ticker.subscribe();
// Simulate updates
ticker.update_price("GOOG", 2800.0);
ticker.execute_trade("GOOG", 100, 2800.0);
// Process events
while let Ok(event) = rx.try_recv() {
println!("Received: {:?}", event);
}
println!("\n=== Callback-based Observer ===");
let subject: CallbackSubject<i32> = CallbackSubject::new();
let sub1 = subject.subscribe(|value| {
println!("Callback 1 received: {}", value);
});
let _sub2 = subject.subscribe(|value| {
println!("Callback 2 received: {}", value * 2);
});
subject.notify(&42);
// Unsubscribe first callback
subject.unsubscribe(sub1);
subject.notify(&100); // Only callback 2 fires
}
Weak: Avoids preventing observer dropmpsc::channel: Built-in thread-safe message passingMutex: Safe concurrent access to callbacksClone bound: Events can be sent to multiple observers| Approach | Thread Safety | Ownership | Use Case |
|----------|--------------|-----------|----------|
| Trait objects + Weak | Send + Sync required | Shared (Arc) | OOP-style, typed observers |
| Channels | Built-in | Owned receivers | Async, decoupled |
| Callbacks | Mutex-protected | Owned closures | Simple notifications |
| tokio broadcast | Async-native | Cloned receivers | High-performance async |
// DON'T: Strong references to observers (memory leak)
struct BadSubject {
observers: Vec<Arc<dyn Observer<Event>>>, // Keeps observers alive forever!
}
// DON'T: Blocking in notify
impl Observer<Event> for BlockingObserver {
fn on_notify(&self, _event: &Event) {
std::thread::sleep(Duration::from_secs(10)); // Blocks all other observers!
}
}
// DO: Use channels or spawn tasks for slow operations
impl Observer<Event> for GoodObserver {
fn on_notify(&self, event: &Event) {
let event = event.clone();
std::thread::spawn(move || {
// Handle event in background
});
}
}
Run this code in the official Rust Playground