Recherche…


Introduction

Le parallélisme est bien supporté par la bibliothèque standard de Rust à travers diverses classes telles que le module std::thread , les canaux et les atomiques. Cette section vous guidera à travers l'utilisation de ces types.

Commencer un nouveau fil

Pour démarrer un nouveau sujet:

use std::thread;

fn main() {
    thread::spawn(move || {
        // The main thread will not wait for this thread to finish. That
        // might mean that the next println isn't even executed before the
        // program exits.
        println!("Hello from spawned thread");
    });

    let join_handle = thread::spawn(move || {
        println!("Hello from second spawned thread");
        // To ensure that the program waits for a thread to finish, we must
        // call `join()` on its join handle. It is even possible to send a
        // value to a different thread through the join handle, like the
        // integer 17 in this case:
        17
    });

    println!("Hello from the main thread");
    
    // The above three printlns can be observed in any order.

    // Block until the second spawned thread has finished.
    match join_handle.join() {
        Ok(x) => println!("Second spawned thread returned {}", x),
        Err(_) => println!("Second spawned thread panicked")
    }
}

Communication croisée avec les canaux

Les canaux peuvent être utilisés pour envoyer des données d'un thread à un autre. Vous trouverez ci-dessous un exemple de système producteur-consommateur simple, dans lequel le thread principal produit les valeurs 0, 1, ..., 9 et le thread généré les imprime:

use std::thread;
use std::sync::mpsc::channel;

fn main() {
    // Create a channel with a sending end (tx) and a receiving end (rx).
    let (tx, rx) = channel();

    // Spawn a new thread, and move the receiving end into the thread.
    let join_handle = thread::spawn(move || {
        // Keep receiving in a loop, until tx is dropped!
        while let Ok(n) = rx.recv() { // Note: `recv()` always blocks
            println!("Received {}", n);
        }
    });

    // Note: using `rx` here would be a compile error, as it has been
    // moved into the spawned thread.

    // Send some values to the spawned thread. `unwrap()` crashes only if the
    // receiving end was dropped before it could be buffered.
    for i in 0..10 {
        tx.send(i).unwrap(); // Note: `send()` never blocks
    }

    // Drop `tx` so that `rx.recv()` returns an `Err(_)`.
    drop(tx);

    // Wait for the spawned thread to finish.
    join_handle.join().unwrap();
}

Communication croisée avec les types de session

Les types de session permettent au compilateur de connaître le protocole que vous souhaitez utiliser pour communiquer entre les threads, et non le protocole comme HTTP ou FTP, mais le modèle de flux d'informations entre les threads. Ceci est utile car le compilateur va maintenant vous empêcher de violer accidentellement votre protocole et de provoquer des blocages ou des "vivelocks" entre les threads - certains des problèmes les plus difficiles à déboguer et une source majeure de Heisenbugs. Les types de session fonctionnent de manière similaire aux canaux décrits ci-dessus, mais peuvent être plus intimidants à utiliser. Voici une communication simple à deux fils:

// Session Types aren't part of the standard library, but are part of this crate.
// You'll need to add session_types to your Cargo.toml file.
extern crate session_types;

// For now, it's easiest to just import everything from the library.
use session_types::*;

// First, we describe what our client thread will do. Note that there's no reason
// you have to use a client/server model - it's just convenient for this example.
// This type says that a client will first send a u32, then quit. `Eps` is
// shorthand for "end communication".
// Session Types use two generic parameters to describe the protocol - the first
// for the current communication, and the second for what will happen next.
type Client = Send<u32, Eps>;
// Now, we define what the server will do: it will receive as u32, then quit.
type Server = Recv<u32, Eps>;

// This function is ordinary code to run the client. Notice that it takes    
// ownership of a channel, just like other forms of interthread communication - 
// but this one about the protocol we just defined.
fn run_client(channel: Chan<(), Client>) {
    let channel = channel.send(42);
    println!("The client just sent the number 42!");
    channel.close();
}

// Now we define some code to run the server. It just accepts a value and prints
// it.
fn run_server(channel: Chan<(), Server>) {
    let (channel, data) = channel.recv();
    println!("The server received some data: {}", data);
    channel.close();
}

fn main() {
    // First, create the channels used for the two threads to talk to each other.
    let (server_channel, client_channel) = session_channel();

    // Start the server on a new thread
    let server_thread = std::thread::spawn(move || {
        run_server(server_channel);
    });

    // Run the client on this thread.
    run_client(client_channel);

    // Wait for the server to finish.
    server_thread.join().unwrap();
}

Vous devriez noter que la méthode principale ressemble beaucoup à la méthode principale de communication inter-thread définie ci-dessus, si le serveur a été déplacé vers sa propre fonction. Si vous deviez lancer ceci, vous obtiendriez la sortie:

The client just sent the number 42!
The server received some data: 42

dans cet ordre.

Pourquoi ne pas avoir à définir les types de client et de serveur? Et pourquoi redéfinir le canal dans le client et le serveur? Ces questions ont la même réponse: le compilateur nous empêchera de briser le protocole! Si le client essayait de recevoir des données au lieu de les envoyer (ce qui entraînerait un blocage dans le code ordinaire), le programme ne compilerait pas, car l'objet channel du client n'a pas de méthode recv . En outre, si nous essayions de définir le protocole de manière à provoquer un blocage (par exemple, si le client et le serveur essayaient de recevoir une valeur), la compilation échouerait lorsque nous créerions les canaux. En effet, Send et Recv sont des "Dual Types", ce qui signifie que si le serveur en fait un, le client doit faire l’autre - si les deux essaient de Recv , vous allez avoir des problèmes. Eps est son propre type, car le client et le serveur acceptent de fermer le canal.

Bien sûr, lorsque nous effectuons des opérations sur le canal, nous passons à un nouvel état dans le protocole, et les fonctions disponibles peuvent changer - nous devons donc redéfinir la liaison du canal. Heureusement, session_types s'en charge pour nous et renvoie toujours le nouveau canal (sauf close , auquel cas il n'y a pas de nouveau canal). Cela signifie également que toutes les méthodes sur un canal prennent également en charge le canal - donc si vous oubliez de redéfinir le canal, le compilateur vous donnera également une erreur à ce sujet. Si vous laissez tomber un canal sans le fermer, c'est une erreur d'exécution (malheureusement, il est impossible de vérifier au moment de la compilation).

Il y a beaucoup plus de types de communication que Send et Recv - par exemple, Offer offre à l’autre côté du canal la possibilité de choisir entre deux branches possibles du protocole, et Rec et Var fonctionnent ensemble pour autoriser les boucles et la récursivité dans le protocole. . De nombreux autres exemples de types de session et d'autres types sont disponibles dans le référentiel GitHub session_types . La documentation de la bibliothèque peut être trouvée ici.

Commande atomique et mémoire

Les types atomiques sont les blocs de construction des structures de données sans verrouillage et des autres types concurrents. Un ordre de mémoire, représentant la force de la barrière de mémoire, devrait être spécifié lors de l'accès / modification d'un type atomique. Rust fournit 5 primitives d’ordonnancement de mémoire: assouplies (les plus faibles), Acquire (pour les lectures des charges alias), Release (pour les écritures alias les magasins), AcqRel (équivalent à «Acquire-for-load et Release-for-store») sont impliqués dans une seule opération telle que compare-and-swap), et SeqCst (le plus fort). Dans l'exemple ci-dessous, nous montrerons en quoi l'ordre "Détendu" diffère des ordres "Acquérir" et "Libérer".

use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;

struct UsizePair {
    atom: AtomicUsize,
    norm: UnsafeCell<usize>,
}

// UnsafeCell is not thread-safe. So manually mark our UsizePair to be Sync.
// (Effectively telling the compiler "I'll take care of it!")
unsafe impl Sync for UsizePair {}

static NTHREADS: usize = 8;
static NITERS: usize = 1000000;

fn main() {
    let upair = Arc::new(UsizePair::new(0));

    // Barrier is a counter-like synchronization structure (not to be confused
    // with a memory barrier). It blocks on a `wait` call until a fixed number
    // of `wait` calls are made from various threads (like waiting for all
    // players to get to the starting line before firing the starter pistol).
    let barrier = Arc::new(Barrier::new(NTHREADS + 1));

    let mut children = vec![];

    for _ in 0..NTHREADS {
        let upair = upair.clone();
        let barrier = barrier.clone();
        children.push(thread::spawn(move || {
            barrier.wait();

            let mut v = 0;
            while v < NITERS - 1 {
                // Read both members `atom` and `norm`, and check whether `atom`
                // contains a newer value than `norm`. See `UsizePair` impl for
                // details.
                let (atom, norm) = upair.get();
                if atom > norm {
                    // If `Acquire`-`Release` ordering is used in `get` and
                    // `set`, then this statement will never be reached.
                    println!("Reordered! {} > {}", atom, norm);
                }
                v = atom;
            }
        }));
    }

    barrier.wait();

    for v in 1..NITERS {
        // Update both members `atom` and `norm` to value `v`. See the impl for
        // details.
        upair.set(v);
    }

    for child in children {
        let _ = child.join();
    }
}

impl UsizePair {
    pub fn new(v: usize) -> UsizePair {
        UsizePair {
            atom: AtomicUsize::new(v),
            norm: UnsafeCell::new(v),
        }
    }

    pub fn get(&self) -> (usize, usize) {
        let atom = self.atom.load(Ordering::Relaxed); //Ordering::Acquire

        // If the above load operation is performed with `Acquire` ordering,
        // then all writes before the corresponding `Release` store is
        // guaranteed to be visible below.

        let norm = unsafe { *self.norm.get() };
        (atom, norm)
    }

    pub fn set(&self, v: usize) {
        unsafe { *self.norm.get() = v };

        // If the below store operation is performed with `Release` ordering,
        // then the write to `norm` above is guaranteed to be visible to all
        // threads that "loads `atom` with `Acquire` ordering and sees the same
        // value that was stored below". However, no guarantees are provided as
        // to when other readers will witness the below store, and consequently
        // the above write. On the other hand, there is also no guarantee that
        // these two values will be in sync for readers. Even if another thread
        // sees the same value that was stored below, it may actually see a
        // "later" value in `norm` than what was written above. That is, there
        // is no restriction on visibility into the future.

        self.atom.store(v, Ordering::Relaxed); //Ordering::Release
    }
}

Remarque: les architectures x86 ont un modèle de mémoire robuste. Cet article l' explique en détail. Jetez également un coup d'oeil à la page Wikipedia pour la comparaison des architectures.

Verrous de lecture-écriture

RwLocks permet à un seul producteur de fournir à un nombre quelconque de lecteurs des données tout en empêchant les lecteurs de voir des données invalides ou incohérentes.

L'exemple suivant utilise RwLock pour montrer comment un thread producteur unique peut augmenter périodiquement une valeur tandis que deux threads consommateurs lisent la valeur.

use std::time::Duration;
use std::thread;
use std::thread::sleep;
use std::sync::{Arc, RwLock };

fn main() {
    // Create an u32 with an inital value of 0
    let initial_value = 0u32;

    // Move the initial value into the read-write lock which is wrapped into an atomic reference
    // counter in order to allow safe sharing.
    let rw_lock = Arc::new(RwLock::new(initial_value));

    // Create a clone for each thread
    let producer_lock = rw_lock.clone();
    let consumer_id_lock = rw_lock.clone();
    let consumer_square_lock = rw_lock.clone();

    let producer_thread = thread::spawn(move || {
        loop {
            // write() blocks this thread until write-exclusive access can be acquired and retuns an 
            // RAII guard upon completion
            if let Ok(mut write_guard) = producer_lock.write() {
                // the returned write_guard implements `Deref` giving us easy access to the target value
                *write_guard += 1;

                println!("Updated value: {}", *write_guard);
            }

            // ^
            // |   when the RAII guard goes out of the scope, write access will be dropped, allowing
            // +~  other threads access the lock

            sleep(Duration::from_millis(1000));
        }
    });

    // A reader thread that prints the current value to the screen
    let consumer_id_thread = thread::spawn(move || {
        loop {
            // read() will only block when `producer_thread` is holding a write lock
            if let Ok(read_guard) = consumer_id_lock.read() {
                // the returned read_guard also implements `Deref`
                println!("Read value: {}", *read_guard);
            }

            sleep(Duration::from_millis(500));
        }
    });

    // A second reader thread is printing the squared value to the screen. Note that readers don't
    // block each other so `consumer_square_thread` can run simultaneously with `consumer_id_lock`.
    let consumer_square_thread = thread::spawn(move || {
        loop {
            if let Ok(lock) = consumer_square_lock.read() {
                let value = *lock;
                println!("Read value squared: {}", value * value);
            }

            sleep(Duration::from_millis(750));
        }
    });

    let _ = producer_thread.join();
    let _ = consumer_id_thread.join();
    let _ = consumer_square_thread.join();
}

Exemple de sortie:

Updated value: 1
Read value: 1
Read value squared: 1
Read value: 1
Read value squared: 1
Updated value: 2
Read value: 2
Read value: 2
Read value squared: 4
Updated value: 3
Read value: 3
Read value squared: 9
Read value: 3
Updated value: 4
Read value: 4
Read value squared: 16
Read value: 4
Read value squared: 16
Updated value: 5
Read value: 5
Read value: 5
Read value squared: 25
...(Interrupted)...


Modified text is an extract of the original Stack Overflow Documentation
Sous licence CC BY-SA 3.0
Non affilié à Stack Overflow