Rust :: Concurrency and Async

Concurrency and Async

Fork-Join Parallelism

├── Child ──|
├── Child ──|
├── Child ──|

fork       join
├─         ─|
 ──  Main ──

Threads

Spawning a thread

pub fn spawn<F, T>(f: F) -> JoinHandle<T> 
where
    F: FnOnce() -> T,
    F: Send + 'static,
    T: Send + 'static, 

let handler = thread::spawn(|| {
    // thread code
});
handler.join().unwrap(); // code after join() will not run until handle is done

// use `move` to take ownership of env var
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
    println!("{:?}", v);
});
// v is out of scope
pub fn spawn<F, T>(self, f: F) -> Result<JoinHandle<T>>
where
    F: FnOnce() -> T,
    F: Send + 'static,
    T: Send + 'static, 

let builder = thread::Builder::new()
                              .name("foo".into())
                              .stack_size(32 * 1024);
let handler = builder.spawn(|| {
    // thread code
}).unwrap();    // notice the .unwrap()
handler.join().unwrap();
// thread
fn thread(&self) -> &Thread

let thread = join_handle.thread();
println!("thread id: {:?}", thread.id());

// join
// wait for the associated thread to finish
// or return immediately if the thread has already finsished
// if thread panics, `Err` is returned with the parameter given to `panic!`
fn join(self) -> Result<T>

join_handle.joint().unwrap();

Channel

// multiple producer, single consumer
use std::sync::mpsc;
let (tx, rx) = mpsc::channel();
// tx can be cloned, but there can only be one rx
let tx1 = tx.clone();
thread::spawn(move || {
    // once sending the val down the channel, should not use it again
    tx.send(string_val).unwrap();
}):
let received = rx.recv().unwrap(); // received should have `string_val`

Arc and Mutex

Arc

// Atomic Reference Counted
// Share ownership between threads, similar to Rc
use std::sync::Arc;
use std::thread;

let a = Arc::new("apple");
for _ in 0..10 {
    // a pointer to a ref in the memory heap
    let a = Arc::clone(&a);
    thread::spawn(move || {
        // As Arc was used, threads can be spawned using the value allocated
        // in the Arc variable pointer's location.
        println!("{:?}", a);
    });
}

Mutex

// Mutex: change values without declaring `mut`
// Mutual exclusion: only one at a time
use std::sync::Mutex;
fn main() {
    let my_mutex = Mutex::new(5); // A new Mutex<i32>. We don't need to say mut
    let mut mutex_changer = my_mutex.lock().unwrap(); // mutex_changer is a MutexGuard
                                                     // It has to be mut because we will change it
                                                     // Now it has access to the Mutex
                                                     // Let's print my_mutex to see:

    println!("{:?}", my_mutex); // This prints "Mutex { data: <locked> }"
                                // So we can't access the data with my_mutex now,
                                // only with mutex_changer

    println!("{:?}", mutex_changer); // This prints 5. Let's change it to 6.

    *mutex_changer = 6; // mutex_changer is a MutexGuard<i32> so we use * to change the i32

    println!("{:?}", mutex_changer); // Now it says 6

    std::mem::drop(mutex_changer); // drop mutex_changer - it is gone now
                                   // and my_mutex is unlocked

    println!("{:?}", my_mutex); // Now it says: Mutex { data: 6 }
}

Arc and Mutex

use std::sync::{Arc, Mutex};
fn main() {
    let my_number = Arc::new(Mutex::new(0));

    let my_number1 = Arc::clone(&my_number);
    let my_number2 = Arc::clone(&my_number);

    let thread1 = std::thread::spawn(move || { // Only the clone goes into Thread 1
        for _ in 0..10 {
            *my_number1.lock().unwrap() +=1; // Lock the Mutex, change the value
        }
    });

    let thread2 = std::thread::spawn(move || { // Only the clone goes into Thread 2
        for _ in 0..10 {
            *my_number2.lock().unwrap() += 1;
        }
    });

    thread1.join().unwrap();
    thread2.join().unwrap();
    println!("Value is: {:?}", my_number);
    println!("Exiting the program");
}

Send and Sync

TODO