buildwithalex

dev topics

Rust Channels vs Arc & Mutex

|

Having used async Rust a couple of times, I’ve noticed that my go-to solution to data sharing across threads is to generously wrap existing types in an Arc & Mutex. That is, something like

  ids: HashSet<usize> 

turns into

  ids: Arc<Mutex<HashSet<usize>>> 

This then allows access to ‘ids’ across threads and by acquiring the lock, then accessing the value, then releasing the lock (manually, or automatically, when it goes out of scope). Sounds sensible, right? Admittedly, it’s the easy way out and it gets rid of compiler errors so nobody’s complaining, right?

Then again, I remember golang’s Rob Pike’s quote:

Don’t communicate by sharing memory, share memory by communicating (through channels).

So how does this work in Rust? Looking at the Rust documentation, creating channels is straightforward and part of the std lib:

  let (tx, rx) = std::sync::mpsc::channel(); 

‘mpsc’ stands for ‘multiple producers single consumer’ which means we have multiple threads transmitting data to a single destination. Interestingly, this implementation was taken from the crossbeam crate in February, 2022.

  • ‘tx’ is short for ‘transmitter’. It’s the end of the channel where you send messages. This is the one that produces data.
  • ‘rx’: is short for ‘receiver’. It’s the end of the channel where you receive messages. This is the one that consumes data.

The Example

In an effort to keep it as simple as possible, let’s consider a counter that is updated in 5 threads, each thread incrementing the counter by 1.

  
struct Counter {
    value: i32,
}

impl Counter {
    fn new() -> Self {
        Counter { value: 0 }
    }

    fn increment(&mut self) {
        self.value += 1;
    }
} 

Using Arc & Mutex

Since we need to share the Counter between threads, we wrap it in an Arc & Mutex:

  
let counter = Arc::new(Mutex::new(Counter::new())); 

Final result:

  
use std::sync::{Arc, Mutex};
use std::thread;

// Define a struct containing the data to be shared
#[derive(Debug)]
struct Counter {
    value: i32,
}

impl Counter {
    fn new() -> Self {
        Counter { value: 0 }
    }

    fn increment(&mut self) {
        self.value += 1;
    }
}

fn main() {
    // Create a shared counter wrapped in Arc and Mutex
    let counter = Arc::new(Mutex::new(Counter::new()));

    // Create 5 threads that each increment the counter
    let mut handles: Vec<thread::JoinHandle<()>> = vec![];
    for _ in 0..5 {
        let counter_clone = counter.clone();
        let handle = thread::spawn(move || {
            let mut counter = counter_clone.lock().unwrap();
            counter.increment();
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    // Print the final value of the counter
    println!("Final counter value: {:?}", counter.lock().unwrap());
} 

Using ‘mpsc’ Channel

The main point here is that we no longer need Arc & Mutex. We now use 5 threads to send increments of 1 via the transmitting channel to the receiving channel. The receiving channel then increments the counter.

  
use std::sync::mpsc::{Sender, Receiver};
// Define a struct containing the data to be shared
#[derive(Debug)]
struct Counter {
    value: i32,
}

impl Counter {
    fn new() -> Self {
        Counter { value: 0 }
    }

    fn increment(&mut self, increment_by: i32) {
        self.value += increment_by;
    }
}

fn main() {
    let (tx, rx): (Sender<i32>, Receiver<i32>) =
        std::sync::mpsc::channel();
    let mut counter = Counter::new();

    for _ in 0..5 {
        let tx_clone = tx.clone();
        std::thread::spawn(move || {
            tx_clone.send(1).unwrap();
            // tx_clone is automatically dropped when it goes out-of-scope
        });
    }

    // Drop the last transmitter
    drop(tx);

    for inc in rx {
        counter.increment(inc);
    }

    // Print the final value of the counter
    println!("Final counter value: {:?}", counter);
}