Concurrency and Async
Fork-Join Parallelism
├── Child ──|
├── Child ──|
├── Child ──|
fork join
├─ ─|
── Main ──
Threads
- Module
std::thread
- Crate rayon
- If the join handle of a thread is dropped, the spawned thread will implicitly be detached
- A thread can outlive its spawning thread, unless the spawning thread is main
Spawning a thread
thread::spawn function
- Create a thread using default parameters of
Builder
- Returns
JoinHandle<T> - will return an Err if thread panics
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
let handler = thread::spawn(|| {
// thread code
});
handler.join().unwrap(); // code after join() will not run until handle is done
// use `move` to take ownership of env var
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("{:?}", v);
});
// v is out of scope
thread::Builder::spawn method
- Returns a
Result<JoinHandle<T>> so need to unwrap
- Chain methods:
.new(), .name(String), .stack_size(usize)
pub fn spawn<F, T>(self, f: F) -> Result<JoinHandle<T>>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
let builder = thread::Builder::new()
.name("foo".into())
.stack_size(32 * 1024);
let handler = builder.spawn(|| {
// thread code
}).unwrap(); // notice the .unwrap()
handler.join().unwrap();
JoinHandle<T> is created by the spawn function
// thread
fn thread(&self) -> &Thread
let thread = join_handle.thread();
println!("thread id: {:?}", thread.id());
// join
// wait for the associated thread to finish
// or return immediately if the thread has already finsished
// if thread panics, `Err` is returned with the parameter given to `panic!`
fn join(self) -> Result<T>
join_handle.joint().unwrap();
Channel
- A channel is a one-way conduit for sending values from one thread to another
sender.send(item) puts a value into the channel
receiver.recv(item) removes the value
- Ownership is transferred from the sending thread to the receiving thread
- If the channel is empty,
receiver.recv() blocks until a value is sent
// multiple producer, single consumer
use std::sync::mpsc;
let (tx, rx) = mpsc::channel();
// tx can be cloned, but there can only be one rx
let tx1 = tx.clone();
thread::spawn(move || {
// once sending the val down the channel, should not use it again
tx.send(string_val).unwrap();
}):
let received = rx.recv().unwrap(); // received should have `string_val`
Arc and Mutex
// Atomic Reference Counted
// Share ownership between threads, similar to Rc
use std::sync::Arc;
use std::thread;
let a = Arc::new("apple");
for _ in 0..10 {
// a pointer to a ref in the memory heap
let a = Arc::clone(&a);
thread::spawn(move || {
// As Arc was used, threads can be spawned using the value allocated
// in the Arc variable pointer's location.
println!("{:?}", a);
});
}
// Mutex: change values without declaring `mut`
// Mutual exclusion: only one at a time
use std::sync::Mutex;
fn main() {
let my_mutex = Mutex::new(5); // A new Mutex<i32>. We don't need to say mut
let mut mutex_changer = my_mutex.lock().unwrap(); // mutex_changer is a MutexGuard
// It has to be mut because we will change it
// Now it has access to the Mutex
// Let's print my_mutex to see:
println!("{:?}", my_mutex); // This prints "Mutex { data: <locked> }"
// So we can't access the data with my_mutex now,
// only with mutex_changer
println!("{:?}", mutex_changer); // This prints 5. Let's change it to 6.
*mutex_changer = 6; // mutex_changer is a MutexGuard<i32> so we use * to change the i32
println!("{:?}", mutex_changer); // Now it says 6
std::mem::drop(mutex_changer); // drop mutex_changer - it is gone now
// and my_mutex is unlocked
println!("{:?}", my_mutex); // Now it says: Mutex { data: 6 }
}
Arc and Mutex
use std::sync::{Arc, Mutex};
fn main() {
let my_number = Arc::new(Mutex::new(0));
let my_number1 = Arc::clone(&my_number);
let my_number2 = Arc::clone(&my_number);
let thread1 = std::thread::spawn(move || { // Only the clone goes into Thread 1
for _ in 0..10 {
*my_number1.lock().unwrap() +=1; // Lock the Mutex, change the value
}
});
let thread2 = std::thread::spawn(move || { // Only the clone goes into Thread 2
for _ in 0..10 {
*my_number2.lock().unwrap() += 1;
}
});
thread1.join().unwrap();
thread2.join().unwrap();
println!("Value is: {:?}", my_number);
println!("Exiting the program");
}
Send and Sync