Using atomics correctly
Lock-free data structures use atomic operations instead of locks (mutexes) for synchronization. They guarantee that at least one thread makes progress in a finite number of steps, providing better scalability and avoiding common pitfalls like deadlocks and priority inversion.
Rust's std::sync::atomic module provides atomic types that enable lock-free programming with strong memory ordering guarantees.
Traditional lock-based synchronization has several issues:
Lock-free structures eliminate these issues but require careful handling of memory ordering and the ABA problem.
use std::sync::atomic::{AtomicBool, AtomicUsize, AtomicPtr, Ordering};
use std::thread;
use std::sync::Arc;
use std::ptr;
// Example 1: Atomic counter
fn atomic_counter() {
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
for _ in 0..1000 {
counter.fetch_add(1, Ordering::Relaxed);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Final count: {}", counter.load(Ordering::Relaxed));
// Always outputs: Final count: 10000
}
// Example 2: Spinlock using AtomicBool
struct SpinLock {
locked: AtomicBool,
}
impl SpinLock {
fn new() -> Self {
SpinLock {
locked: AtomicBool::new(false),
}
}
fn lock(&self) {
// Spin until we acquire the lock
while self.locked.compare_exchange(
false,
true,
Ordering::Acquire,
Ordering::Relaxed,
).is_err() {
// Hint to CPU that we're spinning
std::hint::spin_loop();
}
}
fn unlock(&self) {
self.locked.store(false, Ordering::Release);
}
}
// Example 3: Lock-free stack (Treiber stack)
struct Node<T> {
data: T,
next: *mut Node<T>,
}
pub struct LockFreeStack<T> {
head: AtomicPtr<Node<T>>,
}
impl<T> LockFreeStack<T> {
pub fn new() -> Self {
LockFreeStack {
head: AtomicPtr::new(ptr::null_mut()),
}
}
pub fn push(&self, data: T) {
let new_node = Box::into_raw(Box::new(Node {
data,
next: ptr::null_mut(),
}));
loop {
let head = self.head.load(Ordering::Acquire);
unsafe {
(*new_node).next = head;
}
// Try to swap head pointer
match self.head.compare_exchange(
head,
new_node,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => break,
Err(_) => continue, // Retry if another thread modified head
}
}
}
pub fn pop(&self) -> Option<T> {
loop {
let head = self.head.load(Ordering::Acquire);
if head.is_null() {
return None;
}
let next = unsafe { (*head).next };
// Try to update head to next
match self.head.compare_exchange(
head,
next,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => {
// Successfully popped, take ownership and return
let node = unsafe { Box::from_raw(head) };
return Some(node.data);
}
Err(_) => continue, // Another thread modified head, retry
}
}
}
}
impl<T> Drop for LockFreeStack<T> {
fn drop(&mut self) {
while self.pop().is_some() {}
}
}
// Example 4: Compare-and-swap retry loop
fn cas_retry_pattern() {
let value = AtomicUsize::new(0);
// Atomically multiply by 2
let mut current = value.load(Ordering::Relaxed);
loop {
let new_value = current * 2;
match value.compare_exchange(
current,
new_value,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
// Example 5: Memory ordering examples
fn memory_ordering() {
let data = AtomicUsize::new(0);
let flag = AtomicBool::new(false);
// Thread 1: Producer
thread::spawn(move || {
data.store(42, Ordering::Relaxed);
flag.store(true, Ordering::Release); // Synchronizes with Acquire
});
// Thread 2: Consumer
thread::spawn(move || {
while !flag.load(Ordering::Acquire) {
std::hint::spin_loop();
}
// Guaranteed to see data == 42
let value = data.load(Ordering::Relaxed);
println!("Data: {}", value);
});
}
fn main() {
println!("=== Atomic Counter ===");
atomic_counter();
println!("\n=== Lock-Free Stack ===");
let stack = Arc::new(LockFreeStack::new());
// Multiple threads pushing
let mut handles = vec![];
for i in 0..5 {
let stack = Arc::clone(&stack);
let handle = thread::spawn(move || {
stack.push(i);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
// Pop all values
while let Some(value) = stack.pop() {
println!("Popped: {}", value);
}
}
// Atomically: if current == expected { current = new; Ok } else { Err }
compare_exchange(expected, new, success_order, failure_order)
use std::sync::atomic::{AtomicBool, Ordering};
// ❌ DON'T: Relaxed ordering for synchronization
let flag = AtomicBool::new(false);
let data = 42;
// Thread 1
// data = 100;
flag.store(true, Ordering::Relaxed); // Wrong!
// Thread 2
if flag.load(Ordering::Relaxed) { // Wrong!
// May not see data == 100!
}
// ✅ DO: Use Release-Acquire for synchronization
flag.store(true, Ordering::Release);
// ...
if flag.load(Ordering::Acquire) {
// Guaranteed to see data == 100
}
// ❌ DON'T: Naive CAS can have ABA problem
// Thread 1 reads A
// Thread 2 changes A to B, then back to A
// Thread 1's CAS succeeds but state has changed!
// ✅ DO: Use tagged pointers or generation counters
struct TaggedPtr<T> {
ptr: *mut T,
tag: usize, // Generation counter
}
// ❌ DON'T: Forget to free removed nodes
// Popped nodes might still be accessed by other threads!
// ✅ DO: Use hazard pointers or epoch-based reclamation
// See: crossbeam-epoch crate
use crossbeam_epoch::{self as epoch, Atomic, Owned};
use std::sync::atomic::{AtomicPtr, Ordering};
use std::ptr;
struct QueueNode<T> {
data: Option<T>,
next: AtomicPtr<QueueNode<T>>,
}
pub struct LockFreeQueue<T> {
head: AtomicPtr<QueueNode<T>>,
tail: AtomicPtr<QueueNode<T>>,
}
impl<T> LockFreeQueue<T> {
pub fn new() -> Self {
// Dummy node
let dummy = Box::into_raw(Box::new(QueueNode {
data: None,
next: AtomicPtr::new(ptr::null_mut()),
}));
LockFreeQueue {
head: AtomicPtr::new(dummy),
tail: AtomicPtr::new(dummy),
}
}
pub fn enqueue(&self, data: T) {
let new_node = Box::into_raw(Box::new(QueueNode {
data: Some(data),
next: AtomicPtr::new(ptr::null_mut()),
}));
loop {
let tail = self.tail.load(Ordering::Acquire);
let next = unsafe { (*tail).next.load(Ordering::Acquire) };
if tail == self.tail.load(Ordering::Acquire) {
if next.is_null() {
// Try to link new node
if unsafe { (*tail).next.compare_exchange(
next,
new_node,
Ordering::Release,
Ordering::Acquire,
).is_ok() } {
// Success, try to swing tail
let _ = self.tail.compare_exchange(
tail,
new_node,
Ordering::Release,
Ordering::Acquire,
);
break;
}
} else {
// Help other thread by swinging tail
let _ = self.tail.compare_exchange(
tail,
next,
Ordering::Release,
Ordering::Acquire,
);
}
}
}
}
pub fn dequeue(&self) -> Option<T> {
loop {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Acquire);
let next = unsafe { (*head).next.load(Ordering::Acquire) };
if head == self.head.load(Ordering::Acquire) {
if head == tail {
if next.is_null() {
return None; // Queue is empty
}
// Tail falling behind, help advance it
let _ = self.tail.compare_exchange(
tail,
next,
Ordering::Release,
Ordering::Acquire,
);
} else {
if let Some(data) = unsafe { (*next).data.take() } {
// Try to swing head
if self.head.compare_exchange(
head,
next,
Ordering::Release,
Ordering::Acquire,
).is_ok() {
// Success, free old dummy node
unsafe { Box::from_raw(head); }
return Some(data);
}
}
}
}
}
}
}
// Note: Proper cleanup requires more sophisticated memory reclamation
impl<T> Drop for LockFreeQueue<T> {
fn drop(&mut self) {
while self.dequeue().is_some() {}
// Free dummy node
unsafe {
let head = self.head.load(Ordering::Acquire);
if !head.is_null() {
Box::from_raw(head);
}
}
}
}
AtomicBooluse crossbeam_epoch::{self as epoch, Atomic, Owned};
// Safe memory reclamation for lock-free structures
let guard = epoch::pin();
let data = Atomic::new(vec![1, 2, 3]);
// Efficient synchronization primitives
use parking_lot::Mutex;
// Uses atomic operations for fast path
let mutex = Mutex::new(0);
// Lock-free task scheduling
// Tokio's work-stealing scheduler uses lock-free queues
// Arc uses atomic reference counting
use std::sync::Arc;
let data = Arc::new(42);
// Reference count updated atomically
Run this code in the official Rust Playground