Sök…


Introduktion

Parallellism stöds väl av Rusts standardbibliotek genom olika klasser som std::thread trådmodulen, kanaler och atom. Det här avsnittet guidar dig genom användningen av dessa typer.

Starta en ny tråd

Så här startar du en ny tråd:

use std::thread;

fn main() {
    thread::spawn(move || {
        // The main thread will not wait for this thread to finish. That
        // might mean that the next println isn't even executed before the
        // program exits.
        println!("Hello from spawned thread");
    });

    let join_handle = thread::spawn(move || {
        println!("Hello from second spawned thread");
        // To ensure that the program waits for a thread to finish, we must
        // call `join()` on its join handle. It is even possible to send a
        // value to a different thread through the join handle, like the
        // integer 17 in this case:
        17
    });

    println!("Hello from the main thread");
    
    // The above three printlns can be observed in any order.

    // Block until the second spawned thread has finished.
    match join_handle.join() {
        Ok(x) => println!("Second spawned thread returned {}", x),
        Err(_) => println!("Second spawned thread panicked")
    }
}

Tvär trådad kommunikation med kanaler

Kanaler kan användas för att skicka data från en tråd till en annan. Nedan följer ett exempel på ett enkelt producent-konsumentsystem, där huvudtråden producerar värdena 0, 1, ..., 9, och den spawnade tråden skriver ut dem:

use std::thread;
use std::sync::mpsc::channel;

fn main() {
    // Create a channel with a sending end (tx) and a receiving end (rx).
    let (tx, rx) = channel();

    // Spawn a new thread, and move the receiving end into the thread.
    let join_handle = thread::spawn(move || {
        // Keep receiving in a loop, until tx is dropped!
        while let Ok(n) = rx.recv() { // Note: `recv()` always blocks
            println!("Received {}", n);
        }
    });

    // Note: using `rx` here would be a compile error, as it has been
    // moved into the spawned thread.

    // Send some values to the spawned thread. `unwrap()` crashes only if the
    // receiving end was dropped before it could be buffered.
    for i in 0..10 {
        tx.send(i).unwrap(); // Note: `send()` never blocks
    }

    // Drop `tx` so that `rx.recv()` returns an `Err(_)`.
    drop(tx);

    // Wait for the spawned thread to finish.
    join_handle.join().unwrap();
}

Tvär trådad kommunikation med sessionstyper

Sessiontyper är ett sätt att berätta kompilatorn om det protokoll du vill använda för att kommunicera mellan trådar - inte protokoll som i HTTP eller FTP, men mönstret för informationsflöde mellan trådar. Detta är användbart eftersom kompilatorn nu kommer att hindra dig från att oavsiktligt bryta protokollet och orsaka dödlås eller livlås mellan trådarna - några av de mest notoriskt svåra att felsöka problem och en viktig källa till Heisenbugs. Sessionstyper fungerar på samma sätt som de kanaler som beskrivs ovan, men kan vara mer skrämmande att börja använda. Här är en enkel kommunikation med två trådar:

// Session Types aren't part of the standard library, but are part of this crate.
// You'll need to add session_types to your Cargo.toml file.
extern crate session_types;

// For now, it's easiest to just import everything from the library.
use session_types::*;

// First, we describe what our client thread will do. Note that there's no reason
// you have to use a client/server model - it's just convenient for this example.
// This type says that a client will first send a u32, then quit. `Eps` is
// shorthand for "end communication".
// Session Types use two generic parameters to describe the protocol - the first
// for the current communication, and the second for what will happen next.
type Client = Send<u32, Eps>;
// Now, we define what the server will do: it will receive as u32, then quit.
type Server = Recv<u32, Eps>;

// This function is ordinary code to run the client. Notice that it takes    
// ownership of a channel, just like other forms of interthread communication - 
// but this one about the protocol we just defined.
fn run_client(channel: Chan<(), Client>) {
    let channel = channel.send(42);
    println!("The client just sent the number 42!");
    channel.close();
}

// Now we define some code to run the server. It just accepts a value and prints
// it.
fn run_server(channel: Chan<(), Server>) {
    let (channel, data) = channel.recv();
    println!("The server received some data: {}", data);
    channel.close();
}

fn main() {
    // First, create the channels used for the two threads to talk to each other.
    let (server_channel, client_channel) = session_channel();

    // Start the server on a new thread
    let server_thread = std::thread::spawn(move || {
        run_server(server_channel);
    });

    // Run the client on this thread.
    run_client(client_channel);

    // Wait for the server to finish.
    server_thread.join().unwrap();
}

Du bör märka att huvudmetoden ser mycket lik huvudmetoden för cross-thread-kommunikation definierad ovan, om servern flyttades till sin egen funktion. Om du skulle köra detta skulle du få utdata:

The client just sent the number 42!
The server received some data: 42

i den ordningen.

Varför gå igenom allt besväret med att definiera klient- och servertyper? Och varför omdefinierar vi kanalen i klienten och servern? Dessa frågor har samma svar: kompilatorn kommer att hindra oss från att bryta protokollet! Om klienten försökte ta emot data istället för att skicka den (vilket skulle resultera i en dödlås i vanlig kod) skulle programmet inte kompilera eftersom klientens recv inte har en recv metod på den. Om vi försökte definiera protokollet på ett sätt som kan leda till dödläge (till exempel om både klienten och servern försökte få ett värde), skulle kompilering misslyckas när vi skapar kanalerna. Detta beror på att Send och Recv är "Dualtyper", vilket betyder att om servern gör det ena måste klienten göra det andra - om båda försöker Recv , kommer du att ha problem. Eps är sin egen dubbla typ, eftersom det är bra för både klienten och servern att gå med på att stänga kanalen.

Naturligtvis när vi gör några operationer på kanalen flyttar vi till ett nytt tillstånd i protokollet, och funktionerna som finns tillgängliga kan ändras - så vi måste omdefiniera kanalbindningen. Lyckligtvis tar session_types hand om det för oss och returnerar alltid den nya kanalen (utom close , i vilket fall det inte finns någon ny kanal). Detta innebär också att alla metoder på en kanal också äger kanalen - så om du glömmer att omdefiniera kanalen kommer kompilatorn att ge dig ett fel också. Om du släpper en kanal utan att stänga den är det också ett runtime-fel (tyvärr är det omöjligt att kontrollera vid kompileringstid).

Det finns många fler typer av kommunikation än bara Send och Recv - till exempel Offer den andra sidan av kanalen möjlighet att välja mellan två möjliga grenar av protokollet, och Rec och Var arbetar tillsammans för att tillåta slingor och rekursion i protokollet . Många fler exempel på sessionstyper och andra typer finns tillgängliga i session_types GitHub-arkivet . Bibliotekets dokumentation kan hittas här.

Atomik och minne

Atomtyper är byggstenarna i låsfria datastrukturer och andra samtidiga typer. En minnesordning som representerar styrkan hos minnesbarriären bör specificeras vid åtkomst / modifiering av en atomtyp. Rust ger fem minne som beställer primitiv: Avslappnad (de svagaste), Förvärva (för läser aka belastningar), Release (för skriver aka-butiker), AcqRel (motsvarande "Acquire-for-load och Release-for-store"; användbart när båda är involverade i en enda operation som jämför och byt) och SeqCst (den starkaste). I exemplet nedan demonstrerar vi hur "Avslappnad" beställning skiljer sig från "Förvärva" och "Släpp" beställningar.

use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;

struct UsizePair {
    atom: AtomicUsize,
    norm: UnsafeCell<usize>,
}

// UnsafeCell is not thread-safe. So manually mark our UsizePair to be Sync.
// (Effectively telling the compiler "I'll take care of it!")
unsafe impl Sync for UsizePair {}

static NTHREADS: usize = 8;
static NITERS: usize = 1000000;

fn main() {
    let upair = Arc::new(UsizePair::new(0));

    // Barrier is a counter-like synchronization structure (not to be confused
    // with a memory barrier). It blocks on a `wait` call until a fixed number
    // of `wait` calls are made from various threads (like waiting for all
    // players to get to the starting line before firing the starter pistol).
    let barrier = Arc::new(Barrier::new(NTHREADS + 1));

    let mut children = vec![];

    for _ in 0..NTHREADS {
        let upair = upair.clone();
        let barrier = barrier.clone();
        children.push(thread::spawn(move || {
            barrier.wait();

            let mut v = 0;
            while v < NITERS - 1 {
                // Read both members `atom` and `norm`, and check whether `atom`
                // contains a newer value than `norm`. See `UsizePair` impl for
                // details.
                let (atom, norm) = upair.get();
                if atom > norm {
                    // If `Acquire`-`Release` ordering is used in `get` and
                    // `set`, then this statement will never be reached.
                    println!("Reordered! {} > {}", atom, norm);
                }
                v = atom;
            }
        }));
    }

    barrier.wait();

    for v in 1..NITERS {
        // Update both members `atom` and `norm` to value `v`. See the impl for
        // details.
        upair.set(v);
    }

    for child in children {
        let _ = child.join();
    }
}

impl UsizePair {
    pub fn new(v: usize) -> UsizePair {
        UsizePair {
            atom: AtomicUsize::new(v),
            norm: UnsafeCell::new(v),
        }
    }

    pub fn get(&self) -> (usize, usize) {
        let atom = self.atom.load(Ordering::Relaxed); //Ordering::Acquire

        // If the above load operation is performed with `Acquire` ordering,
        // then all writes before the corresponding `Release` store is
        // guaranteed to be visible below.

        let norm = unsafe { *self.norm.get() };
        (atom, norm)
    }

    pub fn set(&self, v: usize) {
        unsafe { *self.norm.get() = v };

        // If the below store operation is performed with `Release` ordering,
        // then the write to `norm` above is guaranteed to be visible to all
        // threads that "loads `atom` with `Acquire` ordering and sees the same
        // value that was stored below". However, no guarantees are provided as
        // to when other readers will witness the below store, and consequently
        // the above write. On the other hand, there is also no guarantee that
        // these two values will be in sync for readers. Even if another thread
        // sees the same value that was stored below, it may actually see a
        // "later" value in `norm` than what was written above. That is, there
        // is no restriction on visibility into the future.

        self.atom.store(v, Ordering::Relaxed); //Ordering::Release
    }
}

Obs: x86-arkitekturer har en stark minnesmodell. Det här inlägget förklarar det i detalj. Titta också på Wikipedia-sidan för jämförelse av arkitekturer.

Läs-skrivlås

RwLocks tillåter en enda producent att ge valfritt antal läsare data samtidigt som de hindrar läsarna från att se ogiltiga eller inkonsekventa data.

Följande exempel använder RwLock för att visa hur en enda producenttråd periodvis kan öka ett värde medan två konsumenttrådar läser värdet.

use std::time::Duration;
use std::thread;
use std::thread::sleep;
use std::sync::{Arc, RwLock };

fn main() {
    // Create an u32 with an inital value of 0
    let initial_value = 0u32;

    // Move the initial value into the read-write lock which is wrapped into an atomic reference
    // counter in order to allow safe sharing.
    let rw_lock = Arc::new(RwLock::new(initial_value));

    // Create a clone for each thread
    let producer_lock = rw_lock.clone();
    let consumer_id_lock = rw_lock.clone();
    let consumer_square_lock = rw_lock.clone();

    let producer_thread = thread::spawn(move || {
        loop {
            // write() blocks this thread until write-exclusive access can be acquired and retuns an 
            // RAII guard upon completion
            if let Ok(mut write_guard) = producer_lock.write() {
                // the returned write_guard implements `Deref` giving us easy access to the target value
                *write_guard += 1;

                println!("Updated value: {}", *write_guard);
            }

            // ^
            // |   when the RAII guard goes out of the scope, write access will be dropped, allowing
            // +~  other threads access the lock

            sleep(Duration::from_millis(1000));
        }
    });

    // A reader thread that prints the current value to the screen
    let consumer_id_thread = thread::spawn(move || {
        loop {
            // read() will only block when `producer_thread` is holding a write lock
            if let Ok(read_guard) = consumer_id_lock.read() {
                // the returned read_guard also implements `Deref`
                println!("Read value: {}", *read_guard);
            }

            sleep(Duration::from_millis(500));
        }
    });

    // A second reader thread is printing the squared value to the screen. Note that readers don't
    // block each other so `consumer_square_thread` can run simultaneously with `consumer_id_lock`.
    let consumer_square_thread = thread::spawn(move || {
        loop {
            if let Ok(lock) = consumer_square_lock.read() {
                let value = *lock;
                println!("Read value squared: {}", value * value);
            }

            sleep(Duration::from_millis(750));
        }
    });

    let _ = producer_thread.join();
    let _ = consumer_id_thread.join();
    let _ = consumer_square_thread.join();
}

Exempel på utgång:

Updated value: 1
Read value: 1
Read value squared: 1
Read value: 1
Read value squared: 1
Updated value: 2
Read value: 2
Read value: 2
Read value squared: 4
Updated value: 3
Read value: 3
Read value squared: 9
Read value: 3
Updated value: 4
Read value: 4
Read value squared: 16
Read value: 4
Read value squared: 16
Updated value: 5
Read value: 5
Read value: 5
Read value squared: 25
...(Interrupted)...


Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow