Rust
Parallelism
Zoeken…
Invoering
std::thread
module, kanalen en atomica. In dit gedeelte wordt u door het gebruik van deze typen geleid.
Een nieuwe thread starten
Om een nieuwe thread te starten:
use std::thread;
fn main() {
thread::spawn(move || {
// The main thread will not wait for this thread to finish. That
// might mean that the next println isn't even executed before the
// program exits.
println!("Hello from spawned thread");
});
let join_handle = thread::spawn(move || {
println!("Hello from second spawned thread");
// To ensure that the program waits for a thread to finish, we must
// call `join()` on its join handle. It is even possible to send a
// value to a different thread through the join handle, like the
// integer 17 in this case:
17
});
println!("Hello from the main thread");
// The above three printlns can be observed in any order.
// Block until the second spawned thread has finished.
match join_handle.join() {
Ok(x) => println!("Second spawned thread returned {}", x),
Err(_) => println!("Second spawned thread panicked")
}
}
Cross-thread communicatie met kanalen
Kanalen kunnen worden gebruikt om gegevens van de ene thread naar de andere te verzenden. Hieronder is een voorbeeld van een eenvoudig producent-consumentensysteem, waarbij de hoofdthread de waarden 0, 1, ..., 9 produceert en de spawned thread deze afdrukt:
use std::thread;
use std::sync::mpsc::channel;
fn main() {
// Create a channel with a sending end (tx) and a receiving end (rx).
let (tx, rx) = channel();
// Spawn a new thread, and move the receiving end into the thread.
let join_handle = thread::spawn(move || {
// Keep receiving in a loop, until tx is dropped!
while let Ok(n) = rx.recv() { // Note: `recv()` always blocks
println!("Received {}", n);
}
});
// Note: using `rx` here would be a compile error, as it has been
// moved into the spawned thread.
// Send some values to the spawned thread. `unwrap()` crashes only if the
// receiving end was dropped before it could be buffered.
for i in 0..10 {
tx.send(i).unwrap(); // Note: `send()` never blocks
}
// Drop `tx` so that `rx.recv()` returns an `Err(_)`.
drop(tx);
// Wait for the spawned thread to finish.
join_handle.join().unwrap();
}
Cross-thread communicatie met sessietypen
Sessietypen zijn een manier om de compiler te informeren over het protocol dat u wilt gebruiken om te communiceren tussen threads - niet het protocol zoals in HTTP of FTP, maar het patroon van informatiestroom tussen threads. Dit is handig omdat de compiler nu voorkomt dat u per ongeluk uw protocol breekt en deadlocks of livelocks tussen threads veroorzaakt - enkele van de meest notoir moeilijk te debuggen problemen en een belangrijke bron van Heisenbugs. Sessietypen werken op dezelfde manier als de hierboven beschreven kanalen, maar kunnen intimiderend zijn om te gaan gebruiken. Hier is een eenvoudige tweedraadscommunicatie:
// Session Types aren't part of the standard library, but are part of this crate.
// You'll need to add session_types to your Cargo.toml file.
extern crate session_types;
// For now, it's easiest to just import everything from the library.
use session_types::*;
// First, we describe what our client thread will do. Note that there's no reason
// you have to use a client/server model - it's just convenient for this example.
// This type says that a client will first send a u32, then quit. `Eps` is
// shorthand for "end communication".
// Session Types use two generic parameters to describe the protocol - the first
// for the current communication, and the second for what will happen next.
type Client = Send<u32, Eps>;
// Now, we define what the server will do: it will receive as u32, then quit.
type Server = Recv<u32, Eps>;
// This function is ordinary code to run the client. Notice that it takes
// ownership of a channel, just like other forms of interthread communication -
// but this one about the protocol we just defined.
fn run_client(channel: Chan<(), Client>) {
let channel = channel.send(42);
println!("The client just sent the number 42!");
channel.close();
}
// Now we define some code to run the server. It just accepts a value and prints
// it.
fn run_server(channel: Chan<(), Server>) {
let (channel, data) = channel.recv();
println!("The server received some data: {}", data);
channel.close();
}
fn main() {
// First, create the channels used for the two threads to talk to each other.
let (server_channel, client_channel) = session_channel();
// Start the server on a new thread
let server_thread = std::thread::spawn(move || {
run_server(server_channel);
});
// Run the client on this thread.
run_client(client_channel);
// Wait for the server to finish.
server_thread.join().unwrap();
}
U moet opmerken dat de hoofdmethode erg lijkt op de hoofdmethode voor cross-thread-communicatie die hierboven is gedefinieerd, als de server naar zijn eigen functie wordt verplaatst. Als u dit zou uitvoeren, zou u de uitvoer krijgen:
The client just sent the number 42!
The server received some data: 42
in die volgorde.
Waarom zou u alle moeite doen om de client- en servertypen te definiëren? En waarom definiëren we het kanaal in de client en server opnieuw? Deze vragen hebben hetzelfde antwoord: de compiler zal ons stoppen van het breken van het protocol! Als de client gegevens probeert te ontvangen in plaats van deze te verzenden (wat zou resulteren in een impasse in de gewone code), zou het programma niet compileren, omdat het kanaalobject van de client geen recv
methode bevat. Als we het protocol proberen te definiëren op een manier die kan leiden tot een impasse (bijvoorbeeld als zowel de client als de server een waarde proberen te ontvangen), mislukt de compilatie wanneer we de kanalen maken. Dit komt omdat Send
en Recv
"Dual Types" zijn, wat betekent dat als de server het ene doet, de Client het andere moet doen - als beide proberen te Recv
, zit je in de problemen. Eps
is zijn eigen dubbele type, omdat het prima is voor zowel de client als de server om overeen te komen het kanaal te sluiten.
Wanneer we een bewerking op het kanaal uitvoeren, gaan we natuurlijk naar een nieuwe status in het protocol en kunnen de beschikbare functies veranderen - dus moeten we de kanaalbinding opnieuw definiëren. Gelukkig zorgt session_types
daar voor ons voor en retourneert altijd het nieuwe kanaal (behalve close
, in welk geval er geen nieuw kanaal is). Dit betekent ook dat alle methoden op een kanaal ook eigenaar worden van het kanaal - dus als je vergeet het kanaal opnieuw te definiëren, geeft de compiler je daarover ook een foutmelding. Als je een kanaal laat vallen zonder het te sluiten, is dat ook een runtime-fout (helaas is dat niet te controleren tijdens het compileren).
Er zijn veel meer soorten communicatie dan alleen Send
en Recv
- zo biedt Offer
de andere kant van het kanaal de mogelijkheid om te kiezen tussen twee mogelijke takken van het protocol, en Rec
en Var
werken samen om lussen en recursie in het protocol mogelijk te maken . Veel meer voorbeelden van sessietypen en andere typen zijn beschikbaar in de session_types
GitHub-repository . De documentatie van de bibliotheek is hier te vinden .
Atomics en geheugen bestellen
Atoomtypen zijn de bouwstenen van lock-free gegevensstructuren en andere gelijktijdige typen. Een geheugenvolgorde, die de sterkte van de geheugenbarrière weergeeft, moet worden gespecificeerd bij het benaderen / wijzigen van een atoomtype. Rust biedt 5 geheugenvolgorde-primitieven: Relaxed (de zwakste), Acquire (voor het lezen van aka-ladingen), Release (voor het schrijven van aka-winkels), AcqRel (gelijk aan "Acquire-for-load en Release-for-store"; handig wanneer beide zijn betrokken bij een enkele bewerking zoals vergelijken en ruilen) en SeqCst (de sterkste). In het onderstaande voorbeeld laten we zien hoe "Relaxed" bestellen verschilt van "Acquire" en "Release" bestellingen.
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
struct UsizePair {
atom: AtomicUsize,
norm: UnsafeCell<usize>,
}
// UnsafeCell is not thread-safe. So manually mark our UsizePair to be Sync.
// (Effectively telling the compiler "I'll take care of it!")
unsafe impl Sync for UsizePair {}
static NTHREADS: usize = 8;
static NITERS: usize = 1000000;
fn main() {
let upair = Arc::new(UsizePair::new(0));
// Barrier is a counter-like synchronization structure (not to be confused
// with a memory barrier). It blocks on a `wait` call until a fixed number
// of `wait` calls are made from various threads (like waiting for all
// players to get to the starting line before firing the starter pistol).
let barrier = Arc::new(Barrier::new(NTHREADS + 1));
let mut children = vec![];
for _ in 0..NTHREADS {
let upair = upair.clone();
let barrier = barrier.clone();
children.push(thread::spawn(move || {
barrier.wait();
let mut v = 0;
while v < NITERS - 1 {
// Read both members `atom` and `norm`, and check whether `atom`
// contains a newer value than `norm`. See `UsizePair` impl for
// details.
let (atom, norm) = upair.get();
if atom > norm {
// If `Acquire`-`Release` ordering is used in `get` and
// `set`, then this statement will never be reached.
println!("Reordered! {} > {}", atom, norm);
}
v = atom;
}
}));
}
barrier.wait();
for v in 1..NITERS {
// Update both members `atom` and `norm` to value `v`. See the impl for
// details.
upair.set(v);
}
for child in children {
let _ = child.join();
}
}
impl UsizePair {
pub fn new(v: usize) -> UsizePair {
UsizePair {
atom: AtomicUsize::new(v),
norm: UnsafeCell::new(v),
}
}
pub fn get(&self) -> (usize, usize) {
let atom = self.atom.load(Ordering::Relaxed); //Ordering::Acquire
// If the above load operation is performed with `Acquire` ordering,
// then all writes before the corresponding `Release` store is
// guaranteed to be visible below.
let norm = unsafe { *self.norm.get() };
(atom, norm)
}
pub fn set(&self, v: usize) {
unsafe { *self.norm.get() = v };
// If the below store operation is performed with `Release` ordering,
// then the write to `norm` above is guaranteed to be visible to all
// threads that "loads `atom` with `Acquire` ordering and sees the same
// value that was stored below". However, no guarantees are provided as
// to when other readers will witness the below store, and consequently
// the above write. On the other hand, there is also no guarantee that
// these two values will be in sync for readers. Even if another thread
// sees the same value that was stored below, it may actually see a
// "later" value in `norm` than what was written above. That is, there
// is no restriction on visibility into the future.
self.atom.store(v, Ordering::Relaxed); //Ordering::Release
}
}
Opmerking: x86-architecturen hebben een sterk geheugenmodel. In dit bericht wordt het in detail uitgelegd. Kijk ook op de Wikipedia-pagina voor een vergelijking van architecturen.
Lees-schrijfvergrendelingen
Met RwLocks kan een enkele producent een willekeurig aantal lezers van gegevens voorzien, terwijl wordt voorkomen dat lezers ongeldige of inconsistente gegevens zien.
In het volgende voorbeeld wordt RwLock gebruikt om te laten zien hoe een enkele producentthread een waarde periodiek kan verhogen terwijl twee consumentthreads de waarde lezen.
use std::time::Duration;
use std::thread;
use std::thread::sleep;
use std::sync::{Arc, RwLock };
fn main() {
// Create an u32 with an inital value of 0
let initial_value = 0u32;
// Move the initial value into the read-write lock which is wrapped into an atomic reference
// counter in order to allow safe sharing.
let rw_lock = Arc::new(RwLock::new(initial_value));
// Create a clone for each thread
let producer_lock = rw_lock.clone();
let consumer_id_lock = rw_lock.clone();
let consumer_square_lock = rw_lock.clone();
let producer_thread = thread::spawn(move || {
loop {
// write() blocks this thread until write-exclusive access can be acquired and retuns an
// RAII guard upon completion
if let Ok(mut write_guard) = producer_lock.write() {
// the returned write_guard implements `Deref` giving us easy access to the target value
*write_guard += 1;
println!("Updated value: {}", *write_guard);
}
// ^
// | when the RAII guard goes out of the scope, write access will be dropped, allowing
// +~ other threads access the lock
sleep(Duration::from_millis(1000));
}
});
// A reader thread that prints the current value to the screen
let consumer_id_thread = thread::spawn(move || {
loop {
// read() will only block when `producer_thread` is holding a write lock
if let Ok(read_guard) = consumer_id_lock.read() {
// the returned read_guard also implements `Deref`
println!("Read value: {}", *read_guard);
}
sleep(Duration::from_millis(500));
}
});
// A second reader thread is printing the squared value to the screen. Note that readers don't
// block each other so `consumer_square_thread` can run simultaneously with `consumer_id_lock`.
let consumer_square_thread = thread::spawn(move || {
loop {
if let Ok(lock) = consumer_square_lock.read() {
let value = *lock;
println!("Read value squared: {}", value * value);
}
sleep(Duration::from_millis(750));
}
});
let _ = producer_thread.join();
let _ = consumer_id_thread.join();
let _ = consumer_square_thread.join();
}
Voorbeelduitgang:
Updated value: 1
Read value: 1
Read value squared: 1
Read value: 1
Read value squared: 1
Updated value: 2
Read value: 2
Read value: 2
Read value squared: 4
Updated value: 3
Read value: 3
Read value squared: 9
Read value: 3
Updated value: 4
Read value: 4
Read value squared: 16
Read value: 4
Read value squared: 16
Updated value: 5
Read value: 5
Read value: 5
Read value squared: 25
...(Interrupted)...