Suche…


Einführung

Parallelität wird von Rusts Standardbibliothek durch verschiedene Klassen wie das Modul std::thread , Channels und Atomics gut unterstützt. Dieser Abschnitt führt Sie durch die Verwendung dieser Typen.

Einen neuen Thread starten

So starten Sie einen neuen Thread:

use std::thread;

fn main() {
    thread::spawn(move || {
        // The main thread will not wait for this thread to finish. That
        // might mean that the next println isn't even executed before the
        // program exits.
        println!("Hello from spawned thread");
    });

    let join_handle = thread::spawn(move || {
        println!("Hello from second spawned thread");
        // To ensure that the program waits for a thread to finish, we must
        // call `join()` on its join handle. It is even possible to send a
        // value to a different thread through the join handle, like the
        // integer 17 in this case:
        17
    });

    println!("Hello from the main thread");
    
    // The above three printlns can be observed in any order.

    // Block until the second spawned thread has finished.
    match join_handle.join() {
        Ok(x) => println!("Second spawned thread returned {}", x),
        Err(_) => println!("Second spawned thread panicked")
    }
}

Cross-Thread-Kommunikation mit Kanälen

Über Kanäle können Daten von einem Thread zu einem anderen gesendet werden. Im Folgenden finden Sie ein Beispiel für ein einfaches Producer-Consumer-System, bei dem der Haupt-Thread die Werte 0, 1, ..., 9 erzeugt und der erzeugte Thread diese druckt:

use std::thread;
use std::sync::mpsc::channel;

fn main() {
    // Create a channel with a sending end (tx) and a receiving end (rx).
    let (tx, rx) = channel();

    // Spawn a new thread, and move the receiving end into the thread.
    let join_handle = thread::spawn(move || {
        // Keep receiving in a loop, until tx is dropped!
        while let Ok(n) = rx.recv() { // Note: `recv()` always blocks
            println!("Received {}", n);
        }
    });

    // Note: using `rx` here would be a compile error, as it has been
    // moved into the spawned thread.

    // Send some values to the spawned thread. `unwrap()` crashes only if the
    // receiving end was dropped before it could be buffered.
    for i in 0..10 {
        tx.send(i).unwrap(); // Note: `send()` never blocks
    }

    // Drop `tx` so that `rx.recv()` returns an `Err(_)`.
    drop(tx);

    // Wait for the spawned thread to finish.
    join_handle.join().unwrap();
}

Cross-Thread-Kommunikation mit Sitzungstypen

Session-Typen sind eine Möglichkeit, dem Compiler das Protokoll mitzuteilen, das Sie für die Kommunikation zwischen Threads verwenden möchten - nicht das Protokoll wie bei HTTP oder FTP, sondern das Muster des Informationsflusses zwischen Threads. Dies ist nützlich, da der Compiler Sie jetzt daran hindern kann, versehentlich Ihr Protokoll zu brechen und Deadlocks oder Livelocks zwischen Threads zu verursachen - einige der bekanntesten Probleme, die Sie beim Debuggen lösen können, und eine wichtige Quelle für Heisenbugs. Sitzungstypen funktionieren ähnlich wie die oben beschriebenen Kanäle, können jedoch einschüchternd wirken. Hier ist eine einfache Zwei-Thread-Kommunikation:

// Session Types aren't part of the standard library, but are part of this crate.
// You'll need to add session_types to your Cargo.toml file.
extern crate session_types;

// For now, it's easiest to just import everything from the library.
use session_types::*;

// First, we describe what our client thread will do. Note that there's no reason
// you have to use a client/server model - it's just convenient for this example.
// This type says that a client will first send a u32, then quit. `Eps` is
// shorthand for "end communication".
// Session Types use two generic parameters to describe the protocol - the first
// for the current communication, and the second for what will happen next.
type Client = Send<u32, Eps>;
// Now, we define what the server will do: it will receive as u32, then quit.
type Server = Recv<u32, Eps>;

// This function is ordinary code to run the client. Notice that it takes    
// ownership of a channel, just like other forms of interthread communication - 
// but this one about the protocol we just defined.
fn run_client(channel: Chan<(), Client>) {
    let channel = channel.send(42);
    println!("The client just sent the number 42!");
    channel.close();
}

// Now we define some code to run the server. It just accepts a value and prints
// it.
fn run_server(channel: Chan<(), Server>) {
    let (channel, data) = channel.recv();
    println!("The server received some data: {}", data);
    channel.close();
}

fn main() {
    // First, create the channels used for the two threads to talk to each other.
    let (server_channel, client_channel) = session_channel();

    // Start the server on a new thread
    let server_thread = std::thread::spawn(move || {
        run_server(server_channel);
    });

    // Run the client on this thread.
    run_client(client_channel);

    // Wait for the server to finish.
    server_thread.join().unwrap();
}

Sie sollten feststellen, dass die Hauptmethode der oben definierten Hauptmethode für die Cross-Thread-Kommunikation sehr ähnlich ist, wenn der Server in eine eigene Funktion verschoben wurde. Wenn Sie dies ausführen würden, würden Sie die Ausgabe erhalten:

The client just sent the number 42!
The server received some data: 42

in dieser Reihenfolge.

Warum müssen Sie den ganzen Aufwand bei der Definition der Client- und Servertypen durchgehen? Und warum definieren wir den Kanal in Client und Server neu? Diese Fragen haben die gleiche Antwort: Der Compiler wird uns daran hindern, das Protokoll zu brechen! Wenn der Client versucht hat, Daten zu empfangen, anstatt sie zu senden (was im normalen Code zu einem Deadlock führen würde), würde das Programm nicht kompilieren, da das recv des Clients keine recv Methode enthält. Wenn wir versucht haben, das Protokoll auf eine Weise zu definieren, die zu einem Deadlock führen könnte (wenn beispielsweise Client und Server versucht haben, einen Wert zu erhalten), schlägt die Kompilierung fehl, wenn wir die Kanäle erstellen. Dies liegt daran, dass Send und Recv "Dual Types" sind, dh wenn der Server einen tut, muss der Client den anderen tun. Wenn beide versuchen, den Recv zu Recv , werden Sie in Schwierigkeiten geraten. Eps ist ein eigener dualer Typ, da sowohl der Client als auch der Server bereit sind, den Kanal zu schließen.

Wenn wir den Kanal bearbeiten, wechseln wir natürlich zu einem neuen Status im Protokoll, und die für uns verfügbaren Funktionen können sich ändern - daher müssen wir die Kanalbindung neu definieren. Glücklicherweise kümmert sich session_types für uns und gibt immer den neuen Kanal zurück (außer close , in diesem Fall gibt es keinen neuen Kanal). Dies bedeutet auch, dass alle Methoden in einem Channel auch den Besitz des Channels übernehmen. Wenn Sie also vergessen, den Channel neu zu definieren, gibt der Compiler auch einen Fehler aus. Wenn Sie einen Kanal löschen, ohne ihn zu schließen, ist dies ebenfalls ein Laufzeitfehler (dies ist leider zur Kompilierzeit nicht überprüfbar).

Es gibt viel mehr Arten der Kommunikation als nur Send und Recv - beispielsweise bietet Offer der anderen Seite des Kanals die Möglichkeit, zwischen zwei möglichen Zweigen des Protokolls zu Recv , und Rec und Var arbeiten zusammen, um Schleifen und Rekursion im Protokoll zu ermöglichen . Viele weitere Beispiele für Sitzungstypen und andere Typen sind im GitHub-Repository session_types verfügbar. Die Dokumentation der Bibliothek finden Sie hier.

Atomik und Speicherordnung

Atomtypen sind die Bausteine ​​für sperrungsfreie Datenstrukturen und andere gleichzeitige Typen. Eine Speicherreihenfolge, die die Stärke der Speicherbarriere darstellt, sollte beim Zugriff auf / Änderung eines atomaren Typs angegeben werden. Rust bietet 5 Grundeinheiten für die Speicherreihenfolge: Entspannt (die schwächste), Acquire (für Reads aka Lades ), Release (für Schreibdatensysteme ), AcqRel (entspricht "Acquire-for-Load" und "Release-for-Store") an einem einzigen Vorgang beteiligt sind (z. B. Compare-and-Swap) und SeqCst (der stärkste). In dem folgenden Beispiel wird gezeigt, wie sich die "entspannte" Reihenfolge von der Reihenfolge "Acquire" und "Release" unterscheidet.

use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;

struct UsizePair {
    atom: AtomicUsize,
    norm: UnsafeCell<usize>,
}

// UnsafeCell is not thread-safe. So manually mark our UsizePair to be Sync.
// (Effectively telling the compiler "I'll take care of it!")
unsafe impl Sync for UsizePair {}

static NTHREADS: usize = 8;
static NITERS: usize = 1000000;

fn main() {
    let upair = Arc::new(UsizePair::new(0));

    // Barrier is a counter-like synchronization structure (not to be confused
    // with a memory barrier). It blocks on a `wait` call until a fixed number
    // of `wait` calls are made from various threads (like waiting for all
    // players to get to the starting line before firing the starter pistol).
    let barrier = Arc::new(Barrier::new(NTHREADS + 1));

    let mut children = vec![];

    for _ in 0..NTHREADS {
        let upair = upair.clone();
        let barrier = barrier.clone();
        children.push(thread::spawn(move || {
            barrier.wait();

            let mut v = 0;
            while v < NITERS - 1 {
                // Read both members `atom` and `norm`, and check whether `atom`
                // contains a newer value than `norm`. See `UsizePair` impl for
                // details.
                let (atom, norm) = upair.get();
                if atom > norm {
                    // If `Acquire`-`Release` ordering is used in `get` and
                    // `set`, then this statement will never be reached.
                    println!("Reordered! {} > {}", atom, norm);
                }
                v = atom;
            }
        }));
    }

    barrier.wait();

    for v in 1..NITERS {
        // Update both members `atom` and `norm` to value `v`. See the impl for
        // details.
        upair.set(v);
    }

    for child in children {
        let _ = child.join();
    }
}

impl UsizePair {
    pub fn new(v: usize) -> UsizePair {
        UsizePair {
            atom: AtomicUsize::new(v),
            norm: UnsafeCell::new(v),
        }
    }

    pub fn get(&self) -> (usize, usize) {
        let atom = self.atom.load(Ordering::Relaxed); //Ordering::Acquire

        // If the above load operation is performed with `Acquire` ordering,
        // then all writes before the corresponding `Release` store is
        // guaranteed to be visible below.

        let norm = unsafe { *self.norm.get() };
        (atom, norm)
    }

    pub fn set(&self, v: usize) {
        unsafe { *self.norm.get() = v };

        // If the below store operation is performed with `Release` ordering,
        // then the write to `norm` above is guaranteed to be visible to all
        // threads that "loads `atom` with `Acquire` ordering and sees the same
        // value that was stored below". However, no guarantees are provided as
        // to when other readers will witness the below store, and consequently
        // the above write. On the other hand, there is also no guarantee that
        // these two values will be in sync for readers. Even if another thread
        // sees the same value that was stored below, it may actually see a
        // "later" value in `norm` than what was written above. That is, there
        // is no restriction on visibility into the future.

        self.atom.store(v, Ordering::Relaxed); //Ordering::Release
    }
}

Hinweis: x86-Architekturen verfügen über ein starkes Speichermodell. Dieser Beitrag erklärt es ausführlich. Schauen Sie sich auch die Wikipedia-Seite an, um Architekturen miteinander zu vergleichen.

Schreib-Lese-Sperren

Mit RwLocks kann ein einziger Produzent einer beliebigen Anzahl von Lesern Daten zur Verfügung stellen und gleichzeitig verhindern, dass Leser ungültige oder inkonsistente Daten sehen.

Im folgenden Beispiel wird mit RwLock gezeigt, wie ein einzelner Produzententhread einen Wert periodisch erhöhen kann, während zwei Konsumententhreads den Wert lesen.

use std::time::Duration;
use std::thread;
use std::thread::sleep;
use std::sync::{Arc, RwLock };

fn main() {
    // Create an u32 with an inital value of 0
    let initial_value = 0u32;

    // Move the initial value into the read-write lock which is wrapped into an atomic reference
    // counter in order to allow safe sharing.
    let rw_lock = Arc::new(RwLock::new(initial_value));

    // Create a clone for each thread
    let producer_lock = rw_lock.clone();
    let consumer_id_lock = rw_lock.clone();
    let consumer_square_lock = rw_lock.clone();

    let producer_thread = thread::spawn(move || {
        loop {
            // write() blocks this thread until write-exclusive access can be acquired and retuns an 
            // RAII guard upon completion
            if let Ok(mut write_guard) = producer_lock.write() {
                // the returned write_guard implements `Deref` giving us easy access to the target value
                *write_guard += 1;

                println!("Updated value: {}", *write_guard);
            }

            // ^
            // |   when the RAII guard goes out of the scope, write access will be dropped, allowing
            // +~  other threads access the lock

            sleep(Duration::from_millis(1000));
        }
    });

    // A reader thread that prints the current value to the screen
    let consumer_id_thread = thread::spawn(move || {
        loop {
            // read() will only block when `producer_thread` is holding a write lock
            if let Ok(read_guard) = consumer_id_lock.read() {
                // the returned read_guard also implements `Deref`
                println!("Read value: {}", *read_guard);
            }

            sleep(Duration::from_millis(500));
        }
    });

    // A second reader thread is printing the squared value to the screen. Note that readers don't
    // block each other so `consumer_square_thread` can run simultaneously with `consumer_id_lock`.
    let consumer_square_thread = thread::spawn(move || {
        loop {
            if let Ok(lock) = consumer_square_lock.read() {
                let value = *lock;
                println!("Read value squared: {}", value * value);
            }

            sleep(Duration::from_millis(750));
        }
    });

    let _ = producer_thread.join();
    let _ = consumer_id_thread.join();
    let _ = consumer_square_thread.join();
}

Beispielausgabe:

Updated value: 1
Read value: 1
Read value squared: 1
Read value: 1
Read value squared: 1
Updated value: 2
Read value: 2
Read value: 2
Read value squared: 4
Updated value: 3
Read value: 3
Read value squared: 9
Read value: 3
Updated value: 4
Read value: 4
Read value squared: 16
Read value: 4
Read value squared: 16
Updated value: 5
Read value: 5
Read value: 5
Read value squared: 25
...(Interrupted)...


Modified text is an extract of the original Stack Overflow Documentation
Lizenziert unter CC BY-SA 3.0
Nicht angeschlossen an Stack Overflow