Having used async Rust a couple of times, I’ve noticed that my go-to solution to data sharing across threads is to generously wrap existing types in an Arc & Mutex. That is, something like
ids: HashSet<usize>
turns into
ids: Arc<Mutex<HashSet<usize>>>
This then allows access to ‘ids’ across threads and by acquiring the lock, then accessing the value, then releasing the lock (manually, or automatically, when it goes out of scope). Sounds sensible, right? Admittedly, it’s the easy way out and it gets rid of compiler errors so nobody’s complaining, right?
Then again, I remember golang’s Rob Pike’s quote:
Don’t communicate by sharing memory, share memory by communicating (through channels).
So how does this work in Rust? Looking at the Rust documentation, creating channels is straightforward and part of the std lib:
let (tx, rx) = std::sync::mpsc::channel();
‘mpsc’ stands for ‘multiple producers single consumer’ which means we have multiple threads transmitting data to a single destination. Interestingly, this implementation was taken from the crossbeam crate in February, 2022.
- ‘tx’ is short for ‘transmitter’. It’s the end of the channel where you send messages. This is the one that produces data.
- ‘rx’: is short for ‘receiver’. It’s the end of the channel where you receive messages. This is the one that consumes data.
The Example
In an effort to keep it as simple as possible, let’s consider a counter that is updated in 5 threads, each thread incrementing the counter by 1.
struct Counter { value: i32, } impl Counter { fn new() -> Self { Counter { value: 0 } } fn increment(&mut self) { self.value += 1; } }
Using Arc & Mutex
Since we need to share the Counter between threads, we wrap it in an Arc & Mutex:
let counter = Arc::new(Mutex::new(Counter::new()));
Final result:
use std::sync::{Arc, Mutex}; use std::thread; // Define a struct containing the data to be shared #[derive(Debug)] struct Counter { value: i32, } impl Counter { fn new() -> Self { Counter { value: 0 } } fn increment(&mut self) { self.value += 1; } } fn main() { // Create a shared counter wrapped in Arc and Mutex let counter = Arc::new(Mutex::new(Counter::new())); // Create 5 threads that each increment the counter let mut handles: Vec<thread::JoinHandle<()>> = vec![]; for _ in 0..5 { let counter_clone = counter.clone(); let handle = thread::spawn(move || { let mut counter = counter_clone.lock().unwrap(); counter.increment(); }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } // Print the final value of the counter println!("Final counter value: {:?}", counter.lock().unwrap()); }
Using ‘mpsc’ Channel
The main point here is that we no longer need Arc & Mutex. We now use 5 threads to send increments of 1 via the transmitting channel to the receiving channel. The receiving channel then increments the counter.
use std::sync::mpsc::{Sender, Receiver}; // Define a struct containing the data to be shared #[derive(Debug)] struct Counter { value: i32, } impl Counter { fn new() -> Self { Counter { value: 0 } } fn increment(&mut self, increment_by: i32) { self.value += increment_by; } } fn main() { let (tx, rx): (Sender<i32>, Receiver<i32>) = std::sync::mpsc::channel(); let mut counter = Counter::new(); for _ in 0..5 { let tx_clone = tx.clone(); std::thread::spawn(move || { tx_clone.send(1).unwrap(); // tx_clone is automatically dropped when it goes out-of-scope }); } // Drop the last transmitter drop(tx); for inc in rx { counter.increment(inc); } // Print the final value of the counter println!("Final counter value: {:?}", counter); }