Ricerca…


introduzione

Il parallelismo è supportato bene dalla libreria standard di Rust attraverso varie classi come il modulo std::thread , i canali e l'atomica. Questa sezione ti guiderà attraverso l'uso di questi tipi.

Iniziare una nuova discussione

Per iniziare una nuova discussione:

use std::thread;

fn main() {
    thread::spawn(move || {
        // The main thread will not wait for this thread to finish. That
        // might mean that the next println isn't even executed before the
        // program exits.
        println!("Hello from spawned thread");
    });

    let join_handle = thread::spawn(move || {
        println!("Hello from second spawned thread");
        // To ensure that the program waits for a thread to finish, we must
        // call `join()` on its join handle. It is even possible to send a
        // value to a different thread through the join handle, like the
        // integer 17 in this case:
        17
    });

    println!("Hello from the main thread");
    
    // The above three printlns can be observed in any order.

    // Block until the second spawned thread has finished.
    match join_handle.join() {
        Ok(x) => println!("Second spawned thread returned {}", x),
        Err(_) => println!("Second spawned thread panicked")
    }
}

Comunicazione cross-thread con i canali

I canali possono essere utilizzati per inviare dati da un thread all'altro. Di seguito è riportato un esempio di un semplice sistema produttore-consumatore, in cui il thread principale produce i valori 0, 1, ..., 9 e il thread generato li stampa:

use std::thread;
use std::sync::mpsc::channel;

fn main() {
    // Create a channel with a sending end (tx) and a receiving end (rx).
    let (tx, rx) = channel();

    // Spawn a new thread, and move the receiving end into the thread.
    let join_handle = thread::spawn(move || {
        // Keep receiving in a loop, until tx is dropped!
        while let Ok(n) = rx.recv() { // Note: `recv()` always blocks
            println!("Received {}", n);
        }
    });

    // Note: using `rx` here would be a compile error, as it has been
    // moved into the spawned thread.

    // Send some values to the spawned thread. `unwrap()` crashes only if the
    // receiving end was dropped before it could be buffered.
    for i in 0..10 {
        tx.send(i).unwrap(); // Note: `send()` never blocks
    }

    // Drop `tx` so that `rx.recv()` returns an `Err(_)`.
    drop(tx);

    // Wait for the spawned thread to finish.
    join_handle.join().unwrap();
}

Comunicazione cross-thread con i tipi di sessione

I tipi di sessione sono un modo per comunicare al compilatore il protocollo che si desidera utilizzare per comunicare tra i thread, non il protocollo come in HTTP o FTP, ma lo schema del flusso di informazioni tra i thread. Questo è utile dal momento che il compilatore ora ti impedirà di rompere accidentalmente il tuo protocollo e causare deadlock o livelock tra i thread - alcuni dei problemi notoriamente difficili da debugare e una delle principali fonti di Heisenbugs. I tipi di sessione funzionano in modo simile ai canali descritti sopra, ma possono essere più intimidatori per iniziare a utilizzare. Ecco una semplice comunicazione a due thread:

// Session Types aren't part of the standard library, but are part of this crate.
// You'll need to add session_types to your Cargo.toml file.
extern crate session_types;

// For now, it's easiest to just import everything from the library.
use session_types::*;

// First, we describe what our client thread will do. Note that there's no reason
// you have to use a client/server model - it's just convenient for this example.
// This type says that a client will first send a u32, then quit. `Eps` is
// shorthand for "end communication".
// Session Types use two generic parameters to describe the protocol - the first
// for the current communication, and the second for what will happen next.
type Client = Send<u32, Eps>;
// Now, we define what the server will do: it will receive as u32, then quit.
type Server = Recv<u32, Eps>;

// This function is ordinary code to run the client. Notice that it takes    
// ownership of a channel, just like other forms of interthread communication - 
// but this one about the protocol we just defined.
fn run_client(channel: Chan<(), Client>) {
    let channel = channel.send(42);
    println!("The client just sent the number 42!");
    channel.close();
}

// Now we define some code to run the server. It just accepts a value and prints
// it.
fn run_server(channel: Chan<(), Server>) {
    let (channel, data) = channel.recv();
    println!("The server received some data: {}", data);
    channel.close();
}

fn main() {
    // First, create the channels used for the two threads to talk to each other.
    let (server_channel, client_channel) = session_channel();

    // Start the server on a new thread
    let server_thread = std::thread::spawn(move || {
        run_server(server_channel);
    });

    // Run the client on this thread.
    run_client(client_channel);

    // Wait for the server to finish.
    server_thread.join().unwrap();
}

Si dovrebbe notare che il metodo principale sembra molto simile al metodo principale per le comunicazioni cross-thread definite sopra, se il server è stato spostato nella propria funzione. Se dovessi eseguire ciò, otterresti l'output:

The client just sent the number 42!
The server received some data: 42

in questo ordine.

Perché passare attraverso il fastidio di definire i tipi di client e server? E perché ridefiniamo il canale nel client e nel server? Queste domande hanno la stessa risposta: il compilatore ci impedirà di rompere il protocollo! Se il client tentava di ricevere dati invece di inviarlo (il che risulterebbe in un deadlock nel codice ordinario), il programma non si compilerebbe, poiché l'oggetto canale del client non ha un metodo recv su di esso. Inoltre, se provassimo a definire il protocollo in un modo che potrebbe portare a deadlock (ad esempio, se sia il client che il server tentassero di ricevere un valore), la compilazione fallirebbe quando creiamo i canali. Questo perché Send e Recv sono "Dual Types", cioè se il Server ne fa uno, il Client deve fare l'altro - se entrambi provano a Recv , sarai nei guai. Eps è il suo doppio tipo, dal momento che va bene che sia il Cliente che il Server accettino di chiudere il canale.

Naturalmente, quando eseguiamo qualche operazione sul canale, passiamo a un nuovo stato nel protocollo e le funzioni disponibili potrebbero cambiare, quindi dobbiamo ridefinire il binding del canale. Fortunatamente, session_types si occupa di questo per noi e restituisce sempre il nuovo canale (eccetto close , nel qual caso non c'è un nuovo canale). Ciò significa anche che tutti i metodi su un canale assumono la proprietà del canale, quindi se ti dimentichi di ridefinire il canale, il compilatore ti darà anche un errore. Se si rilascia un canale senza chiuderlo, si tratta anche di un errore di runtime (sfortunatamente è impossibile controllare in fase di compilazione).

Esistono molti più tipi di comunicazione oltre a Send e Recv - ad esempio, Offer offre all'altro lato del canale la possibilità di scegliere tra due possibili rami del protocollo, e Rec e Var lavorano insieme per consentire loop e ricorsività nel protocollo . Molti altri esempi di tipi di sessione e altri tipi sono disponibili nel repository GitHub session_types . La documentazione della biblioteca può essere trovata qui.

Atomica e ordine di memoria

I tipi atomici sono gli elementi costitutivi di strutture di dati senza blocco e altri tipi simultanei. Un ordine di memoria, che rappresenta la forza della barriera di memoria, dovrebbe essere specificato quando si accede / modifica un tipo atomico. Rust fornisce 5 primitive di ordinamento della memoria: Rilassato (il più debole), Acquisisci (per letture aka carichi), Rilascio (per scritture aka negozi), AcqRel (equivalente a "Acquisizione per caricamento e Rilascio per negozio"; utile quando entrambi sono coinvolti in una singola operazione come confrontare-e-swap) e SeqCst (il più forte). Nell'esempio seguente, dimostreremo come l'ordinamento "Rilassato" differisce dagli ordini "Acquisisci" e "Rilascia".

use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;

struct UsizePair {
    atom: AtomicUsize,
    norm: UnsafeCell<usize>,
}

// UnsafeCell is not thread-safe. So manually mark our UsizePair to be Sync.
// (Effectively telling the compiler "I'll take care of it!")
unsafe impl Sync for UsizePair {}

static NTHREADS: usize = 8;
static NITERS: usize = 1000000;

fn main() {
    let upair = Arc::new(UsizePair::new(0));

    // Barrier is a counter-like synchronization structure (not to be confused
    // with a memory barrier). It blocks on a `wait` call until a fixed number
    // of `wait` calls are made from various threads (like waiting for all
    // players to get to the starting line before firing the starter pistol).
    let barrier = Arc::new(Barrier::new(NTHREADS + 1));

    let mut children = vec![];

    for _ in 0..NTHREADS {
        let upair = upair.clone();
        let barrier = barrier.clone();
        children.push(thread::spawn(move || {
            barrier.wait();

            let mut v = 0;
            while v < NITERS - 1 {
                // Read both members `atom` and `norm`, and check whether `atom`
                // contains a newer value than `norm`. See `UsizePair` impl for
                // details.
                let (atom, norm) = upair.get();
                if atom > norm {
                    // If `Acquire`-`Release` ordering is used in `get` and
                    // `set`, then this statement will never be reached.
                    println!("Reordered! {} > {}", atom, norm);
                }
                v = atom;
            }
        }));
    }

    barrier.wait();

    for v in 1..NITERS {
        // Update both members `atom` and `norm` to value `v`. See the impl for
        // details.
        upair.set(v);
    }

    for child in children {
        let _ = child.join();
    }
}

impl UsizePair {
    pub fn new(v: usize) -> UsizePair {
        UsizePair {
            atom: AtomicUsize::new(v),
            norm: UnsafeCell::new(v),
        }
    }

    pub fn get(&self) -> (usize, usize) {
        let atom = self.atom.load(Ordering::Relaxed); //Ordering::Acquire

        // If the above load operation is performed with `Acquire` ordering,
        // then all writes before the corresponding `Release` store is
        // guaranteed to be visible below.

        let norm = unsafe { *self.norm.get() };
        (atom, norm)
    }

    pub fn set(&self, v: usize) {
        unsafe { *self.norm.get() = v };

        // If the below store operation is performed with `Release` ordering,
        // then the write to `norm` above is guaranteed to be visible to all
        // threads that "loads `atom` with `Acquire` ordering and sees the same
        // value that was stored below". However, no guarantees are provided as
        // to when other readers will witness the below store, and consequently
        // the above write. On the other hand, there is also no guarantee that
        // these two values will be in sync for readers. Even if another thread
        // sees the same value that was stored below, it may actually see a
        // "later" value in `norm` than what was written above. That is, there
        // is no restriction on visibility into the future.

        self.atom.store(v, Ordering::Relaxed); //Ordering::Release
    }
}

Nota: le architetture x86 hanno un modello di memoria forte. Questo post lo spiega in dettaglio. Dai anche un'occhiata alla pagina di Wikipedia per il confronto delle architetture.

Serrature di lettura-scrittura

RwLocks consente a un singolo produttore di fornire un numero qualsiasi di lettori di dati impedendo al tempo stesso ai lettori di visualizzare dati non validi o incoerenti.

Nell'esempio seguente viene utilizzato RwLock per mostrare come un singolo thread di produzione può aumentare periodicamente un valore mentre due thread di utenti stanno leggendo il valore.

use std::time::Duration;
use std::thread;
use std::thread::sleep;
use std::sync::{Arc, RwLock };

fn main() {
    // Create an u32 with an inital value of 0
    let initial_value = 0u32;

    // Move the initial value into the read-write lock which is wrapped into an atomic reference
    // counter in order to allow safe sharing.
    let rw_lock = Arc::new(RwLock::new(initial_value));

    // Create a clone for each thread
    let producer_lock = rw_lock.clone();
    let consumer_id_lock = rw_lock.clone();
    let consumer_square_lock = rw_lock.clone();

    let producer_thread = thread::spawn(move || {
        loop {
            // write() blocks this thread until write-exclusive access can be acquired and retuns an 
            // RAII guard upon completion
            if let Ok(mut write_guard) = producer_lock.write() {
                // the returned write_guard implements `Deref` giving us easy access to the target value
                *write_guard += 1;

                println!("Updated value: {}", *write_guard);
            }

            // ^
            // |   when the RAII guard goes out of the scope, write access will be dropped, allowing
            // +~  other threads access the lock

            sleep(Duration::from_millis(1000));
        }
    });

    // A reader thread that prints the current value to the screen
    let consumer_id_thread = thread::spawn(move || {
        loop {
            // read() will only block when `producer_thread` is holding a write lock
            if let Ok(read_guard) = consumer_id_lock.read() {
                // the returned read_guard also implements `Deref`
                println!("Read value: {}", *read_guard);
            }

            sleep(Duration::from_millis(500));
        }
    });

    // A second reader thread is printing the squared value to the screen. Note that readers don't
    // block each other so `consumer_square_thread` can run simultaneously with `consumer_id_lock`.
    let consumer_square_thread = thread::spawn(move || {
        loop {
            if let Ok(lock) = consumer_square_lock.read() {
                let value = *lock;
                println!("Read value squared: {}", value * value);
            }

            sleep(Duration::from_millis(750));
        }
    });

    let _ = producer_thread.join();
    let _ = consumer_id_thread.join();
    let _ = consumer_square_thread.join();
}

Esempio di output:

Updated value: 1
Read value: 1
Read value squared: 1
Read value: 1
Read value squared: 1
Updated value: 2
Read value: 2
Read value: 2
Read value squared: 4
Updated value: 3
Read value: 3
Read value squared: 9
Read value: 3
Updated value: 4
Read value: 4
Read value squared: 16
Read value: 4
Read value squared: 16
Updated value: 5
Read value: 5
Read value: 5
Read value squared: 25
...(Interrupted)...


Modified text is an extract of the original Stack Overflow Documentation
Autorizzato sotto CC BY-SA 3.0
Non affiliato con Stack Overflow