Concurrent Programming
Need to write concurrent Rust programs? This guide covers spawning threads, message passing with channels, shared state with Arc and Mutex, and common concurrency patterns.
Problem: Running Code in Parallel
Scenario
You need to execute code simultaneously on multiple threads.
Solution: Spawn Threads
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("Number {} from spawned thread", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("Number {} from main thread", i);
thread::sleep(Duration::from_millis(1));
}
// Wait for spawned thread to finish
handle.join().unwrap();
}Multiple threads:
use std::thread;
fn main() {
let mut handles = vec![];
for i in 0..10 {
let handle = thread::spawn(move || {
println!("Thread {} reporting", i);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}Problem: Passing Data to Threads
Scenario
Spawned thread needs data from main thread.
Solution: Use move Closures
use std::thread;
fn main() {
let data = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("Data: {:?}", data);
});
// data is no longer available here (moved into thread)
handle.join().unwrap();
}Cloning for multiple threads:
use std::thread;
fn main() {
let data = vec![1, 2, 3];
let mut handles = vec![];
for i in 0..3 {
let data_clone = data.clone();
let handle = thread::spawn(move || {
println!("Thread {}: {:?}", i, data_clone);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}Problem: Communicating Between Threads
Scenario
Threads need to send data to each other.
Solution: Use Channels
Single producer, single consumer:
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hello");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}Multiple messages:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hello"),
String::from("from"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(100));
}
});
// Iterate over received values
for received in rx {
println!("Got: {}", received);
}
}Multiple producers:
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
for i in 0..3 {
let tx_clone = tx.clone();
thread::spawn(move || {
tx_clone.send(format!("Message from thread {}", i)).unwrap();
});
}
drop(tx); // Drop original sender
for received in rx {
println!("Got: {}", received);
}
}Problem: Sharing Mutable State
Scenario
Multiple threads need to read and modify shared data.
Solution: Use Arc<Mutex>
Arc for shared ownership, Mutex for exclusive access:
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter_clone = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter_clone.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
// Result: 10
}How it works:
Arc(Atomic Reference Counted): Shared ownership across threadsMutex: Ensures only one thread accesses data at a timelock(): Acquires the lock, blocks if locked by another thread
Problem: Read-Heavy Workloads
Scenario
Many threads need to read, few need to write.
Solution: Use RwLock
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let data = Arc::new(RwLock::new(vec![1, 2, 3]));
// Spawn readers
let mut handles = vec![];
for i in 0..5 {
let data_clone = Arc::clone(&data);
let handle = thread::spawn(move || {
let read_guard = data_clone.read().unwrap();
println!("Thread {} read: {:?}", i, *read_guard);
});
handles.push(handle);
}
// Spawn writer
let data_clone = Arc::clone(&data);
let writer = thread::spawn(move || {
let mut write_guard = data_clone.write().unwrap();
write_guard.push(4);
println!("Writer added 4");
});
handles.push(writer);
for handle in handles {
handle.join().unwrap();
}
}Benefits: Multiple concurrent readers, exclusive writer access.
Problem: Worker Thread Pool
Scenario
You want to process tasks with a fixed number of threads.
Solution: Create a Thread Pool
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
type Job = Box<dyn FnOnce() + Send + 'static>;
struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
impl ThreadPool {
fn new(size: usize) -> ThreadPool {
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv();
match job {
Ok(job) => {
println!("Worker {} executing job", id);
job();
}
Err(_) => {
println!("Worker {} shutting down", id);
break;
}
}
});
Worker { id, thread }
}
}
fn main() {
let pool = ThreadPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("Task {} executing", i);
});
}
}Problem: Preventing Deadlocks
Scenario
Multiple locks could cause deadlocks.
Solution: Lock Ordering and Try-Lock
Always acquire locks in same order:
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let lock1 = Arc::new(Mutex::new(0));
let lock2 = Arc::new(Mutex::new(0));
let lock1_clone = Arc::clone(&lock1);
let lock2_clone = Arc::clone(&lock2);
let handle1 = thread::spawn(move || {
// Always lock1 then lock2
let _guard1 = lock1_clone.lock().unwrap();
let _guard2 = lock2_clone.lock().unwrap();
println!("Thread 1 acquired both locks");
});
let handle2 = thread::spawn(move || {
// Same order: lock1 then lock2
let _guard1 = lock1.lock().unwrap();
let _guard2 = lock2.lock().unwrap();
println!("Thread 2 acquired both locks");
});
handle1.join().unwrap();
handle2.join().unwrap();
}Use try_lock to avoid blocking:
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
fn main() {
let lock = Arc::new(Mutex::new(0));
let lock_clone = Arc::clone(&lock);
let handle = thread::spawn(move || {
let _guard = lock_clone.lock().unwrap();
thread::sleep(Duration::from_secs(2));
});
thread::sleep(Duration::from_millis(100));
match lock.try_lock() {
Ok(_guard) => println!("Acquired lock"),
Err(_) => println!("Lock is busy, doing something else"),
}
handle.join().unwrap();
}Problem: One-Time Initialization
Scenario
You need to initialize something once, safely across threads.
Solution: Use Once or OnceLock
Once for side effects:
use std::sync::Once;
static INIT: Once = Once::new();
fn initialize() {
INIT.call_once(|| {
println!("Initializing... This runs only once");
// Expensive initialization
});
}
fn main() {
let handles: Vec<_> = (0..10)
.map(|i| {
std::thread::spawn(move || {
initialize();
println!("Thread {} running", i);
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}OnceLock for storing value (Rust 1.70+):
use std::sync::OnceLock;
static CONFIG: OnceLock<String> = OnceLock::new();
fn get_config() -> &'static String {
CONFIG.get_or_init(|| {
println!("Loading config...");
String::from("Configuration data")
})
}
fn main() {
println!("{}", get_config());
println!("{}", get_config()); // Doesn't reinitialize
}Problem: Atomic Operations
Scenario
You need simple counters or flags shared across threads.
Solution: Use Atomic Types
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
fn main() {
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter_clone = Arc::clone(&counter);
let handle = thread::spawn(move || {
for _ in 0..1000 {
counter_clone.fetch_add(1, Ordering::SeqCst);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", counter.load(Ordering::SeqCst));
// Result: 10000
}Atomic bool for flags:
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
fn main() {
let running = Arc::new(AtomicBool::new(true));
let running_clone = Arc::clone(&running);
let handle = thread::spawn(move || {
while running_clone.load(Ordering::SeqCst) {
println!("Working...");
thread::sleep(Duration::from_millis(100));
}
println!("Stopped");
});
thread::sleep(Duration::from_secs(1));
running.store(false, Ordering::SeqCst);
handle.join().unwrap();
}Problem: Parallel Processing with Rayon
Scenario
You want easy data parallelism.
Solution: Use Rayon Crate
[dependencies]
rayon = "1.7"use rayon::prelude::*;
fn main() {
let numbers: Vec<i32> = (1..=1000).collect();
// Parallel map
let squares: Vec<_> = numbers.par_iter()
.map(|&x| x * x)
.collect();
// Parallel filter
let evens: Vec<_> = numbers.par_iter()
.filter(|&&x| x % 2 == 0)
.collect();
// Parallel sum
let sum: i32 = numbers.par_iter().sum();
println!("Sum: {}", sum);
}Common Pitfalls
Pitfall 1: Not Joining Threads
Problem: Program exits before threads finish.
// Bad
thread::spawn(|| {
println!("This might not print");
});Solution: Join threads.
let handle = thread::spawn(|| {
println!("This will print");
});
handle.join().unwrap();Pitfall 2: Holding Locks Too Long
Problem: Lock held during expensive operation.
// Bad
let data = mutex.lock().unwrap();
expensive_computation(&data);
*data = new_value;Solution: Release lock early.
{
let data = mutex.lock().unwrap();
let copy = data.clone();
} // Lock released
let result = expensive_computation(©);
{
let mut data = mutex.lock().unwrap();
*data = result;
}Pitfall 3: Data Races with Non-Atomic Types
Problem: Using non-thread-safe types.
Solution: Use Arc<Mutex<T>> or atomic types.
Related Resources
- Tutorials: Intermediate - Concurrency fundamentals
- Cookbook - Concurrency recipes
- Best Practices - Safe concurrency patterns
- Async/Await Patterns - Async concurrency
Write safe concurrent Rust programs with confidence!