Szukaj…


Wprowadzenie

Równoległość jest dobrze obsługiwana przez standardową bibliotekę Rust poprzez różne klasy, takie jak moduł std::thread , kanały i atomika. Ta sekcja poprowadzi Cię przez korzystanie z tych typów.

Rozpoczęcie nowego wątku

Aby rozpocząć nowy wątek:

use std::thread;

fn main() {
    thread::spawn(move || {
        // The main thread will not wait for this thread to finish. That
        // might mean that the next println isn't even executed before the
        // program exits.
        println!("Hello from spawned thread");
    });

    let join_handle = thread::spawn(move || {
        println!("Hello from second spawned thread");
        // To ensure that the program waits for a thread to finish, we must
        // call `join()` on its join handle. It is even possible to send a
        // value to a different thread through the join handle, like the
        // integer 17 in this case:
        17
    });

    println!("Hello from the main thread");
    
    // The above three printlns can be observed in any order.

    // Block until the second spawned thread has finished.
    match join_handle.join() {
        Ok(x) => println!("Second spawned thread returned {}", x),
        Err(_) => println!("Second spawned thread panicked")
    }
}

Komunikacja między wątkami z kanałami

Kanały mogą służyć do przesyłania danych z jednego wątku do drugiego. Poniżej znajduje się przykład prostego systemu producent-konsument, w którym główny wątek generuje wartości 0, 1, ..., 9, a spawnowany wątek je drukuje:

use std::thread;
use std::sync::mpsc::channel;

fn main() {
    // Create a channel with a sending end (tx) and a receiving end (rx).
    let (tx, rx) = channel();

    // Spawn a new thread, and move the receiving end into the thread.
    let join_handle = thread::spawn(move || {
        // Keep receiving in a loop, until tx is dropped!
        while let Ok(n) = rx.recv() { // Note: `recv()` always blocks
            println!("Received {}", n);
        }
    });

    // Note: using `rx` here would be a compile error, as it has been
    // moved into the spawned thread.

    // Send some values to the spawned thread. `unwrap()` crashes only if the
    // receiving end was dropped before it could be buffered.
    for i in 0..10 {
        tx.send(i).unwrap(); // Note: `send()` never blocks
    }

    // Drop `tx` so that `rx.recv()` returns an `Err(_)`.
    drop(tx);

    // Wait for the spawned thread to finish.
    join_handle.join().unwrap();
}

Komunikacja między wątkami z typami sesji

Typy sesji są sposobem na poinformowanie kompilatora o protokole, którego chcesz używać do komunikacji między wątkami - nie protokole jak w HTTP lub FTP, ale wzorzec przepływu informacji między wątkami. Jest to przydatne, ponieważ kompilator zapobiega teraz przypadkowemu zerwaniu protokołu i spowodowaniu zakleszczeń lub blokad między wątkami - niektóre z najbardziej notorycznie trudnych do debugowania problemów i główne źródło Heisenbugs. Typy sesji działają podobnie do kanałów opisanych powyżej, ale ich użycie może być bardziej zastraszające. Oto prosta dwuwątkowa komunikacja:

// Session Types aren't part of the standard library, but are part of this crate.
// You'll need to add session_types to your Cargo.toml file.
extern crate session_types;

// For now, it's easiest to just import everything from the library.
use session_types::*;

// First, we describe what our client thread will do. Note that there's no reason
// you have to use a client/server model - it's just convenient for this example.
// This type says that a client will first send a u32, then quit. `Eps` is
// shorthand for "end communication".
// Session Types use two generic parameters to describe the protocol - the first
// for the current communication, and the second for what will happen next.
type Client = Send<u32, Eps>;
// Now, we define what the server will do: it will receive as u32, then quit.
type Server = Recv<u32, Eps>;

// This function is ordinary code to run the client. Notice that it takes    
// ownership of a channel, just like other forms of interthread communication - 
// but this one about the protocol we just defined.
fn run_client(channel: Chan<(), Client>) {
    let channel = channel.send(42);
    println!("The client just sent the number 42!");
    channel.close();
}

// Now we define some code to run the server. It just accepts a value and prints
// it.
fn run_server(channel: Chan<(), Server>) {
    let (channel, data) = channel.recv();
    println!("The server received some data: {}", data);
    channel.close();
}

fn main() {
    // First, create the channels used for the two threads to talk to each other.
    let (server_channel, client_channel) = session_channel();

    // Start the server on a new thread
    let server_thread = std::thread::spawn(move || {
        run_server(server_channel);
    });

    // Run the client on this thread.
    run_client(client_channel);

    // Wait for the server to finish.
    server_thread.join().unwrap();
}

Należy zauważyć, że główna metoda wygląda bardzo podobnie do głównej metody komunikacji krzyżowej zdefiniowanej powyżej, jeśli serwer został przeniesiony do swojej własnej funkcji. Jeśli miałbyś to uruchomić, uzyskałbyś wynik:

The client just sent the number 42!
The server received some data: 42

w tej kolejności.

Po co męczyć się z definiowaniem typów klientów i serwerów? A dlaczego redefiniujemy kanał w kliencie i serwerze? Te pytania mają tę samą odpowiedź: kompilator powstrzyma nas przed złamaniem protokołu! Gdyby klient próbował odebrać dane zamiast je wysłać (co spowodowałoby zakleszczenie w zwykłym kodzie), program nie skompilowałby się, ponieważ obiekt kanału klienta nie ma metody recv . Ponadto, jeśli spróbujemy zdefiniować protokół w sposób, który może doprowadzić do impasu (na przykład, jeśli zarówno klient, jak i serwer spróbują otrzymać wartość), kompilacja zakończy się niepowodzeniem podczas tworzenia kanałów. Wynika to z faktu, że Send i Recv to „Dual Types”, co oznacza, że jeśli serwer to robi, klient musi robić to samo - jeśli oboje spróbują Recv , będziesz mieć kłopoty. Eps jest własnym podwójnym typem, ponieważ zarówno Klient, jak i Serwer mogą zgodzić się na zamknięcie kanału.

Oczywiście, kiedy wykonujemy jakąś operację na kanale, przechodzimy do nowego stanu w protokole, a dostępne nam funkcje mogą się zmienić - więc musimy przedefiniować powiązanie kanału. Na szczęście session_types dba o to za nas i zawsze zwraca nowy kanał (z wyjątkiem close , w którym to przypadku nie ma nowego kanału). Oznacza to również, że wszystkie metody na kanale również przejmują na własność kanał - więc jeśli zapomnisz ponownie zdefiniować kanał, kompilator również wyświetli błąd. Jeśli upuścisz kanał bez jego zamknięcia, będzie to również błąd czasu wykonywania (niestety nie da się tego sprawdzić w czasie kompilacji).

Istnieje wiele innych rodzajów komunikacji niż tylko Send i Recv - na przykład Offer daje drugiej stronie kanału możliwość wyboru między dwoma możliwymi gałęziami protokołu, a Rec i Var współpracują ze sobą, aby umożliwić pętle i rekurencję w protokole . Wiele innych przykładów typów sesji i innych typów dostępnych jest w repozytorium GitHub session_types . Dokumentację biblioteki można znaleźć tutaj.

Kolejność atomów i pamięci

Typy atomowe to elementy składowe struktur danych bez blokad i inne typy współbieżne. Podczas uzyskiwania dostępu / modyfikowania typu atomowego należy określić kolejność pamięci, reprezentującą siłę bariery pamięci. Rust zapewnia 5 podstawowych operacji porządkowania pamięci: Relaxed (najsłabszy), Acquire (dla odczytów aka load ), Release (dla zapisów aka store ), AcqRel (odpowiednik „Acquire-for-load i Release-for-store”; przydatne, gdy oba biorą udział w jednej operacji, takiej jak porównywanie i zamiana) oraz SeqCst (najsilniejszy). W poniższym przykładzie zademonstrujemy, w jaki sposób kolejność „Spokojna” różni się od zamówień „Zdobądź” i „Zwolnij”.

use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;

struct UsizePair {
    atom: AtomicUsize,
    norm: UnsafeCell<usize>,
}

// UnsafeCell is not thread-safe. So manually mark our UsizePair to be Sync.
// (Effectively telling the compiler "I'll take care of it!")
unsafe impl Sync for UsizePair {}

static NTHREADS: usize = 8;
static NITERS: usize = 1000000;

fn main() {
    let upair = Arc::new(UsizePair::new(0));

    // Barrier is a counter-like synchronization structure (not to be confused
    // with a memory barrier). It blocks on a `wait` call until a fixed number
    // of `wait` calls are made from various threads (like waiting for all
    // players to get to the starting line before firing the starter pistol).
    let barrier = Arc::new(Barrier::new(NTHREADS + 1));

    let mut children = vec![];

    for _ in 0..NTHREADS {
        let upair = upair.clone();
        let barrier = barrier.clone();
        children.push(thread::spawn(move || {
            barrier.wait();

            let mut v = 0;
            while v < NITERS - 1 {
                // Read both members `atom` and `norm`, and check whether `atom`
                // contains a newer value than `norm`. See `UsizePair` impl for
                // details.
                let (atom, norm) = upair.get();
                if atom > norm {
                    // If `Acquire`-`Release` ordering is used in `get` and
                    // `set`, then this statement will never be reached.
                    println!("Reordered! {} > {}", atom, norm);
                }
                v = atom;
            }
        }));
    }

    barrier.wait();

    for v in 1..NITERS {
        // Update both members `atom` and `norm` to value `v`. See the impl for
        // details.
        upair.set(v);
    }

    for child in children {
        let _ = child.join();
    }
}

impl UsizePair {
    pub fn new(v: usize) -> UsizePair {
        UsizePair {
            atom: AtomicUsize::new(v),
            norm: UnsafeCell::new(v),
        }
    }

    pub fn get(&self) -> (usize, usize) {
        let atom = self.atom.load(Ordering::Relaxed); //Ordering::Acquire

        // If the above load operation is performed with `Acquire` ordering,
        // then all writes before the corresponding `Release` store is
        // guaranteed to be visible below.

        let norm = unsafe { *self.norm.get() };
        (atom, norm)
    }

    pub fn set(&self, v: usize) {
        unsafe { *self.norm.get() = v };

        // If the below store operation is performed with `Release` ordering,
        // then the write to `norm` above is guaranteed to be visible to all
        // threads that "loads `atom` with `Acquire` ordering and sees the same
        // value that was stored below". However, no guarantees are provided as
        // to when other readers will witness the below store, and consequently
        // the above write. On the other hand, there is also no guarantee that
        // these two values will be in sync for readers. Even if another thread
        // sees the same value that was stored below, it may actually see a
        // "later" value in `norm` than what was written above. That is, there
        // is no restriction on visibility into the future.

        self.atom.store(v, Ordering::Relaxed); //Ordering::Release
    }
}

Uwaga: architektury x86 mają silny model pamięci. Ten post szczegółowo to wyjaśnia. Zobacz także stronę Wikipedii, aby porównać architektury.

Blokady odczytu i zapisu

RwLocks pozwala jednemu producentowi na dostarczenie dowolnej liczbie czytników danych, jednocześnie zapobiegając wyświetlaniu nieprawidłowych lub niespójnych danych.

W poniższym przykładzie użyto RwLock, aby pokazać, jak pojedynczy wątek producenta może okresowo zwiększać wartość, podczas gdy dwa wątki konsumentów odczytują wartość.

use std::time::Duration;
use std::thread;
use std::thread::sleep;
use std::sync::{Arc, RwLock };

fn main() {
    // Create an u32 with an inital value of 0
    let initial_value = 0u32;

    // Move the initial value into the read-write lock which is wrapped into an atomic reference
    // counter in order to allow safe sharing.
    let rw_lock = Arc::new(RwLock::new(initial_value));

    // Create a clone for each thread
    let producer_lock = rw_lock.clone();
    let consumer_id_lock = rw_lock.clone();
    let consumer_square_lock = rw_lock.clone();

    let producer_thread = thread::spawn(move || {
        loop {
            // write() blocks this thread until write-exclusive access can be acquired and retuns an 
            // RAII guard upon completion
            if let Ok(mut write_guard) = producer_lock.write() {
                // the returned write_guard implements `Deref` giving us easy access to the target value
                *write_guard += 1;

                println!("Updated value: {}", *write_guard);
            }

            // ^
            // |   when the RAII guard goes out of the scope, write access will be dropped, allowing
            // +~  other threads access the lock

            sleep(Duration::from_millis(1000));
        }
    });

    // A reader thread that prints the current value to the screen
    let consumer_id_thread = thread::spawn(move || {
        loop {
            // read() will only block when `producer_thread` is holding a write lock
            if let Ok(read_guard) = consumer_id_lock.read() {
                // the returned read_guard also implements `Deref`
                println!("Read value: {}", *read_guard);
            }

            sleep(Duration::from_millis(500));
        }
    });

    // A second reader thread is printing the squared value to the screen. Note that readers don't
    // block each other so `consumer_square_thread` can run simultaneously with `consumer_id_lock`.
    let consumer_square_thread = thread::spawn(move || {
        loop {
            if let Ok(lock) = consumer_square_lock.read() {
                let value = *lock;
                println!("Read value squared: {}", value * value);
            }

            sleep(Duration::from_millis(750));
        }
    });

    let _ = producer_thread.join();
    let _ = consumer_id_thread.join();
    let _ = consumer_square_thread.join();
}

Przykładowe dane wyjściowe:

Updated value: 1
Read value: 1
Read value squared: 1
Read value: 1
Read value squared: 1
Updated value: 2
Read value: 2
Read value: 2
Read value squared: 4
Updated value: 3
Read value: 3
Read value squared: 9
Read value: 3
Updated value: 4
Read value: 4
Read value squared: 16
Read value: 4
Read value squared: 16
Updated value: 5
Read value: 5
Read value: 5
Read value squared: 25
...(Interrupted)...


Modified text is an extract of the original Stack Overflow Documentation
Licencjonowany na podstawie CC BY-SA 3.0
Nie związany z Stack Overflow