Rust
Równoległość
Szukaj…
Wprowadzenie
std::thread
, kanały i atomika. Ta sekcja poprowadzi Cię przez korzystanie z tych typów.
Rozpoczęcie nowego wątku
Aby rozpocząć nowy wątek:
use std::thread;
fn main() {
thread::spawn(move || {
// The main thread will not wait for this thread to finish. That
// might mean that the next println isn't even executed before the
// program exits.
println!("Hello from spawned thread");
});
let join_handle = thread::spawn(move || {
println!("Hello from second spawned thread");
// To ensure that the program waits for a thread to finish, we must
// call `join()` on its join handle. It is even possible to send a
// value to a different thread through the join handle, like the
// integer 17 in this case:
17
});
println!("Hello from the main thread");
// The above three printlns can be observed in any order.
// Block until the second spawned thread has finished.
match join_handle.join() {
Ok(x) => println!("Second spawned thread returned {}", x),
Err(_) => println!("Second spawned thread panicked")
}
}
Komunikacja między wątkami z kanałami
Kanały mogą służyć do przesyłania danych z jednego wątku do drugiego. Poniżej znajduje się przykład prostego systemu producent-konsument, w którym główny wątek generuje wartości 0, 1, ..., 9, a spawnowany wątek je drukuje:
use std::thread;
use std::sync::mpsc::channel;
fn main() {
// Create a channel with a sending end (tx) and a receiving end (rx).
let (tx, rx) = channel();
// Spawn a new thread, and move the receiving end into the thread.
let join_handle = thread::spawn(move || {
// Keep receiving in a loop, until tx is dropped!
while let Ok(n) = rx.recv() { // Note: `recv()` always blocks
println!("Received {}", n);
}
});
// Note: using `rx` here would be a compile error, as it has been
// moved into the spawned thread.
// Send some values to the spawned thread. `unwrap()` crashes only if the
// receiving end was dropped before it could be buffered.
for i in 0..10 {
tx.send(i).unwrap(); // Note: `send()` never blocks
}
// Drop `tx` so that `rx.recv()` returns an `Err(_)`.
drop(tx);
// Wait for the spawned thread to finish.
join_handle.join().unwrap();
}
Komunikacja między wątkami z typami sesji
Typy sesji są sposobem na poinformowanie kompilatora o protokole, którego chcesz używać do komunikacji między wątkami - nie protokole jak w HTTP lub FTP, ale wzorzec przepływu informacji między wątkami. Jest to przydatne, ponieważ kompilator zapobiega teraz przypadkowemu zerwaniu protokołu i spowodowaniu zakleszczeń lub blokad między wątkami - niektóre z najbardziej notorycznie trudnych do debugowania problemów i główne źródło Heisenbugs. Typy sesji działają podobnie do kanałów opisanych powyżej, ale ich użycie może być bardziej zastraszające. Oto prosta dwuwątkowa komunikacja:
// Session Types aren't part of the standard library, but are part of this crate.
// You'll need to add session_types to your Cargo.toml file.
extern crate session_types;
// For now, it's easiest to just import everything from the library.
use session_types::*;
// First, we describe what our client thread will do. Note that there's no reason
// you have to use a client/server model - it's just convenient for this example.
// This type says that a client will first send a u32, then quit. `Eps` is
// shorthand for "end communication".
// Session Types use two generic parameters to describe the protocol - the first
// for the current communication, and the second for what will happen next.
type Client = Send<u32, Eps>;
// Now, we define what the server will do: it will receive as u32, then quit.
type Server = Recv<u32, Eps>;
// This function is ordinary code to run the client. Notice that it takes
// ownership of a channel, just like other forms of interthread communication -
// but this one about the protocol we just defined.
fn run_client(channel: Chan<(), Client>) {
let channel = channel.send(42);
println!("The client just sent the number 42!");
channel.close();
}
// Now we define some code to run the server. It just accepts a value and prints
// it.
fn run_server(channel: Chan<(), Server>) {
let (channel, data) = channel.recv();
println!("The server received some data: {}", data);
channel.close();
}
fn main() {
// First, create the channels used for the two threads to talk to each other.
let (server_channel, client_channel) = session_channel();
// Start the server on a new thread
let server_thread = std::thread::spawn(move || {
run_server(server_channel);
});
// Run the client on this thread.
run_client(client_channel);
// Wait for the server to finish.
server_thread.join().unwrap();
}
Należy zauważyć, że główna metoda wygląda bardzo podobnie do głównej metody komunikacji krzyżowej zdefiniowanej powyżej, jeśli serwer został przeniesiony do swojej własnej funkcji. Jeśli miałbyś to uruchomić, uzyskałbyś wynik:
The client just sent the number 42!
The server received some data: 42
w tej kolejności.
Po co męczyć się z definiowaniem typów klientów i serwerów? A dlaczego redefiniujemy kanał w kliencie i serwerze? Te pytania mają tę samą odpowiedź: kompilator powstrzyma nas przed złamaniem protokołu! Gdyby klient próbował odebrać dane zamiast je wysłać (co spowodowałoby zakleszczenie w zwykłym kodzie), program nie skompilowałby się, ponieważ obiekt kanału klienta nie ma metody recv
. Ponadto, jeśli spróbujemy zdefiniować protokół w sposób, który może doprowadzić do impasu (na przykład, jeśli zarówno klient, jak i serwer spróbują otrzymać wartość), kompilacja zakończy się niepowodzeniem podczas tworzenia kanałów. Wynika to z faktu, że Send
i Recv
to „Dual Types”, co oznacza, że jeśli serwer to robi, klient musi robić to samo - jeśli oboje spróbują Recv
, będziesz mieć kłopoty. Eps
jest własnym podwójnym typem, ponieważ zarówno Klient, jak i Serwer mogą zgodzić się na zamknięcie kanału.
Oczywiście, kiedy wykonujemy jakąś operację na kanale, przechodzimy do nowego stanu w protokole, a dostępne nam funkcje mogą się zmienić - więc musimy przedefiniować powiązanie kanału. Na szczęście session_types
dba o to za nas i zawsze zwraca nowy kanał (z wyjątkiem close
, w którym to przypadku nie ma nowego kanału). Oznacza to również, że wszystkie metody na kanale również przejmują na własność kanał - więc jeśli zapomnisz ponownie zdefiniować kanał, kompilator również wyświetli błąd. Jeśli upuścisz kanał bez jego zamknięcia, będzie to również błąd czasu wykonywania (niestety nie da się tego sprawdzić w czasie kompilacji).
Istnieje wiele innych rodzajów komunikacji niż tylko Send
i Recv
- na przykład Offer
daje drugiej stronie kanału możliwość wyboru między dwoma możliwymi gałęziami protokołu, a Rec
i Var
współpracują ze sobą, aby umożliwić pętle i rekurencję w protokole . Wiele innych przykładów typów sesji i innych typów dostępnych jest w repozytorium GitHub session_types
. Dokumentację biblioteki można znaleźć tutaj.
Kolejność atomów i pamięci
Typy atomowe to elementy składowe struktur danych bez blokad i inne typy współbieżne. Podczas uzyskiwania dostępu / modyfikowania typu atomowego należy określić kolejność pamięci, reprezentującą siłę bariery pamięci. Rust zapewnia 5 podstawowych operacji porządkowania pamięci: Relaxed (najsłabszy), Acquire (dla odczytów aka load ), Release (dla zapisów aka store ), AcqRel (odpowiednik „Acquire-for-load i Release-for-store”; przydatne, gdy oba biorą udział w jednej operacji, takiej jak porównywanie i zamiana) oraz SeqCst (najsilniejszy). W poniższym przykładzie zademonstrujemy, w jaki sposób kolejność „Spokojna” różni się od zamówień „Zdobądź” i „Zwolnij”.
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
struct UsizePair {
atom: AtomicUsize,
norm: UnsafeCell<usize>,
}
// UnsafeCell is not thread-safe. So manually mark our UsizePair to be Sync.
// (Effectively telling the compiler "I'll take care of it!")
unsafe impl Sync for UsizePair {}
static NTHREADS: usize = 8;
static NITERS: usize = 1000000;
fn main() {
let upair = Arc::new(UsizePair::new(0));
// Barrier is a counter-like synchronization structure (not to be confused
// with a memory barrier). It blocks on a `wait` call until a fixed number
// of `wait` calls are made from various threads (like waiting for all
// players to get to the starting line before firing the starter pistol).
let barrier = Arc::new(Barrier::new(NTHREADS + 1));
let mut children = vec![];
for _ in 0..NTHREADS {
let upair = upair.clone();
let barrier = barrier.clone();
children.push(thread::spawn(move || {
barrier.wait();
let mut v = 0;
while v < NITERS - 1 {
// Read both members `atom` and `norm`, and check whether `atom`
// contains a newer value than `norm`. See `UsizePair` impl for
// details.
let (atom, norm) = upair.get();
if atom > norm {
// If `Acquire`-`Release` ordering is used in `get` and
// `set`, then this statement will never be reached.
println!("Reordered! {} > {}", atom, norm);
}
v = atom;
}
}));
}
barrier.wait();
for v in 1..NITERS {
// Update both members `atom` and `norm` to value `v`. See the impl for
// details.
upair.set(v);
}
for child in children {
let _ = child.join();
}
}
impl UsizePair {
pub fn new(v: usize) -> UsizePair {
UsizePair {
atom: AtomicUsize::new(v),
norm: UnsafeCell::new(v),
}
}
pub fn get(&self) -> (usize, usize) {
let atom = self.atom.load(Ordering::Relaxed); //Ordering::Acquire
// If the above load operation is performed with `Acquire` ordering,
// then all writes before the corresponding `Release` store is
// guaranteed to be visible below.
let norm = unsafe { *self.norm.get() };
(atom, norm)
}
pub fn set(&self, v: usize) {
unsafe { *self.norm.get() = v };
// If the below store operation is performed with `Release` ordering,
// then the write to `norm` above is guaranteed to be visible to all
// threads that "loads `atom` with `Acquire` ordering and sees the same
// value that was stored below". However, no guarantees are provided as
// to when other readers will witness the below store, and consequently
// the above write. On the other hand, there is also no guarantee that
// these two values will be in sync for readers. Even if another thread
// sees the same value that was stored below, it may actually see a
// "later" value in `norm` than what was written above. That is, there
// is no restriction on visibility into the future.
self.atom.store(v, Ordering::Relaxed); //Ordering::Release
}
}
Uwaga: architektury x86 mają silny model pamięci. Ten post szczegółowo to wyjaśnia. Zobacz także stronę Wikipedii, aby porównać architektury.
Blokady odczytu i zapisu
RwLocks pozwala jednemu producentowi na dostarczenie dowolnej liczbie czytników danych, jednocześnie zapobiegając wyświetlaniu nieprawidłowych lub niespójnych danych.
W poniższym przykładzie użyto RwLock, aby pokazać, jak pojedynczy wątek producenta może okresowo zwiększać wartość, podczas gdy dwa wątki konsumentów odczytują wartość.
use std::time::Duration;
use std::thread;
use std::thread::sleep;
use std::sync::{Arc, RwLock };
fn main() {
// Create an u32 with an inital value of 0
let initial_value = 0u32;
// Move the initial value into the read-write lock which is wrapped into an atomic reference
// counter in order to allow safe sharing.
let rw_lock = Arc::new(RwLock::new(initial_value));
// Create a clone for each thread
let producer_lock = rw_lock.clone();
let consumer_id_lock = rw_lock.clone();
let consumer_square_lock = rw_lock.clone();
let producer_thread = thread::spawn(move || {
loop {
// write() blocks this thread until write-exclusive access can be acquired and retuns an
// RAII guard upon completion
if let Ok(mut write_guard) = producer_lock.write() {
// the returned write_guard implements `Deref` giving us easy access to the target value
*write_guard += 1;
println!("Updated value: {}", *write_guard);
}
// ^
// | when the RAII guard goes out of the scope, write access will be dropped, allowing
// +~ other threads access the lock
sleep(Duration::from_millis(1000));
}
});
// A reader thread that prints the current value to the screen
let consumer_id_thread = thread::spawn(move || {
loop {
// read() will only block when `producer_thread` is holding a write lock
if let Ok(read_guard) = consumer_id_lock.read() {
// the returned read_guard also implements `Deref`
println!("Read value: {}", *read_guard);
}
sleep(Duration::from_millis(500));
}
});
// A second reader thread is printing the squared value to the screen. Note that readers don't
// block each other so `consumer_square_thread` can run simultaneously with `consumer_id_lock`.
let consumer_square_thread = thread::spawn(move || {
loop {
if let Ok(lock) = consumer_square_lock.read() {
let value = *lock;
println!("Read value squared: {}", value * value);
}
sleep(Duration::from_millis(750));
}
});
let _ = producer_thread.join();
let _ = consumer_id_thread.join();
let _ = consumer_square_thread.join();
}
Przykładowe dane wyjściowe:
Updated value: 1
Read value: 1
Read value squared: 1
Read value: 1
Read value squared: 1
Updated value: 2
Read value: 2
Read value: 2
Read value squared: 4
Updated value: 3
Read value: 3
Read value squared: 9
Read value: 3
Updated value: 4
Read value: 4
Read value squared: 16
Read value: 4
Read value squared: 16
Updated value: 5
Read value: 5
Read value: 5
Read value squared: 25
...(Interrupted)...