수색…


소개

병렬 처리는 std::thread 모듈, 채널 및 원자 단위와 같은 다양한 클래스를 통해 Rust의 표준 라이브러리에서 잘 지원됩니다. 이 섹션에서는 이러한 유형의 사용법을 안내합니다.

새 스레드 시작

새 스레드를 시작하려면 :

use std::thread;

fn main() {
    thread::spawn(move || {
        // The main thread will not wait for this thread to finish. That
        // might mean that the next println isn't even executed before the
        // program exits.
        println!("Hello from spawned thread");
    });

    let join_handle = thread::spawn(move || {
        println!("Hello from second spawned thread");
        // To ensure that the program waits for a thread to finish, we must
        // call `join()` on its join handle. It is even possible to send a
        // value to a different thread through the join handle, like the
        // integer 17 in this case:
        17
    });

    println!("Hello from the main thread");
    
    // The above three printlns can be observed in any order.

    // Block until the second spawned thread has finished.
    match join_handle.join() {
        Ok(x) => println!("Second spawned thread returned {}", x),
        Err(_) => println!("Second spawned thread panicked")
    }
}

채널과의 크로스 스레드 통신

채널을 사용하여 한 스레드에서 다른 스레드로 데이터를 보낼 수 있습니다. 다음은 주 생성자가 0, 1, ..., 9 값을 생성하고 생성 된 스레드가 값을 인쇄하는 간단한 생산자 - 소비자 시스템의 예입니다.

use std::thread;
use std::sync::mpsc::channel;

fn main() {
    // Create a channel with a sending end (tx) and a receiving end (rx).
    let (tx, rx) = channel();

    // Spawn a new thread, and move the receiving end into the thread.
    let join_handle = thread::spawn(move || {
        // Keep receiving in a loop, until tx is dropped!
        while let Ok(n) = rx.recv() { // Note: `recv()` always blocks
            println!("Received {}", n);
        }
    });

    // Note: using `rx` here would be a compile error, as it has been
    // moved into the spawned thread.

    // Send some values to the spawned thread. `unwrap()` crashes only if the
    // receiving end was dropped before it could be buffered.
    for i in 0..10 {
        tx.send(i).unwrap(); // Note: `send()` never blocks
    }

    // Drop `tx` so that `rx.recv()` returns an `Err(_)`.
    drop(tx);

    // Wait for the spawned thread to finish.
    join_handle.join().unwrap();
}

세션 유형과의 크로스 스레드 통신

세션 유형은 HTTP 또는 FTP와 같은 프로토콜이 아니라 스레드 간의 정보 흐름 패턴과 같이 스레드간에 통신하는 데 사용할 프로토콜에 대해 컴파일러에 알리는 방법입니다. 이것은 컴파일러가 우연히 프로토콜을 깨뜨리고 스레드 사이에서 교착 상태 또는 라이브 록을 일으키는 것을 막을 수 있기 때문에 유용합니다. 가장 악명 높은 디버깅 문제 중 일부와 Heisenbugs의 주요 출처입니다. 세션 유형은 위에서 설명한 채널과 유사하게 작동하지만 사용하기가 더 어려워 질 수 있습니다. 다음은 간단한 2 스레드 통신입니다.

// Session Types aren't part of the standard library, but are part of this crate.
// You'll need to add session_types to your Cargo.toml file.
extern crate session_types;

// For now, it's easiest to just import everything from the library.
use session_types::*;

// First, we describe what our client thread will do. Note that there's no reason
// you have to use a client/server model - it's just convenient for this example.
// This type says that a client will first send a u32, then quit. `Eps` is
// shorthand for "end communication".
// Session Types use two generic parameters to describe the protocol - the first
// for the current communication, and the second for what will happen next.
type Client = Send<u32, Eps>;
// Now, we define what the server will do: it will receive as u32, then quit.
type Server = Recv<u32, Eps>;

// This function is ordinary code to run the client. Notice that it takes    
// ownership of a channel, just like other forms of interthread communication - 
// but this one about the protocol we just defined.
fn run_client(channel: Chan<(), Client>) {
    let channel = channel.send(42);
    println!("The client just sent the number 42!");
    channel.close();
}

// Now we define some code to run the server. It just accepts a value and prints
// it.
fn run_server(channel: Chan<(), Server>) {
    let (channel, data) = channel.recv();
    println!("The server received some data: {}", data);
    channel.close();
}

fn main() {
    // First, create the channels used for the two threads to talk to each other.
    let (server_channel, client_channel) = session_channel();

    // Start the server on a new thread
    let server_thread = std::thread::spawn(move || {
        run_server(server_channel);
    });

    // Run the client on this thread.
    run_client(client_channel);

    // Wait for the server to finish.
    server_thread.join().unwrap();
}

서버가 자체 기능으로 이동하면 main 메소드는 위에서 정의한 크로스 스레드 통신의 main 메소드와 매우 유사하게 보입니다. 이것을 실행한다면 결과를 얻을 수 있습니다 :

The client just sent the number 42!
The server received some data: 42

그와 같은 순서로.

클라이언트와 서버 유형을 정의하는 모든 번거로운 과정을 거쳐야하는 이유는 무엇입니까? 그리고 클라이언트와 서버에서 채널을 재정의하는 이유는 무엇입니까? 이 질문에는 동일한 대답이 있습니다. 컴파일러가 프로토콜을 위반하는 것을 막을 것입니다! 클라이언트가 데이터를 보내지 않고 데이터를 수신하려고 시도하면 (보통 코드에서 교착 상태가 발생 함) 클라이언트의 채널 객체 recv 메소드가 없으므로 프로그램이 컴파일 되지 않습니다. 또한 교착 상태로 이어질 수있는 방식으로 프로토콜을 정의하려고 시도한 경우 (예 : 클라이언트와 서버가 모두 값을 수신하려고 시도한 경우) 채널을 만들 때 컴파일이 실패합니다. 이것은 SendRecv 가 "Dual Type"이기 때문에 서버가 하나를 수행하면 클라이언트가 다른 서버를 처리해야 함을 의미합니다. 둘 다 Recv 시도하면 문제가 발생할 것입니다. Eps 는 클라이언트와 서버가 채널을 닫는 데 동의하기 때문에 자체의 이중 유형입니다.

물론 채널에서 작업을 수행 할 때 프로토콜에서 새로운 상태로 바뀌며 사용할 수있는 기능이 변경 될 수 있으므로 채널 바인딩을 다시 정의해야합니다. 다행히도 session_types 는 우리를 대신하여 새로운 채널을 반환합니다 ( close 경우 제외, 새 채널이없는 경우 제외). 이것은 또한 채널의 모든 메소드가 채널의 소유권을 갖게됨을 의미합니다. 따라서 채널을 다시 정의하는 것을 잊어 버리면 컴파일러는 그에 대한 오류도 줄 것입니다. 채널을 닫지 않고 드롭하면 런타임 오류도 발생합니다 (불행히도 컴파일 타임에는 확인할 수 없음).

단지보다 의사 소통의 더 많은 종류가 있습니다 SendRecv 예를 들어, - Offer 채널의 다른 팀의 프로토콜의 두 가지 지점 사이에서 선택 할 수있는 기능을 제공하고, RecVar 함께 작업 프로토콜의 루프와 재귀 수 있도록이 . 세션 유형 및 기타 유형의 많은 예제가 session_types GitHub 저장소 에서 사용 가능 합니다 . 도서관의 문서는 여기 에서 찾을 수 있습니다.

원자 및 메모리 주문

원자 유형은 잠금없는 데이터 구조 및 기타 동시 유형의 빌딩 블록입니다. 원자 유형에 액세스하거나 수정할 때 메모리 배리어의 강도를 나타내는 메모리 정렬을 지정해야합니다. Rust는 Relaxed (가장 약한), Acquire (읽기 일 경우), Release (일명 스토어 쓰기), AcqRel ( "Acquire-for-load"및 "Release-for-store"와 동일)의 다섯 가지 메모리 순서 지정 프리미티브를 제공합니다. (예 : compare-and-swap), SeqCst (가장 강한 것)와 같은 단일 작업에 관여합니다. 아래 예제에서 "Relaxed"주문이 "Acquire"및 "Release"주문과 다른 점을 보여줍니다.

use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;

struct UsizePair {
    atom: AtomicUsize,
    norm: UnsafeCell<usize>,
}

// UnsafeCell is not thread-safe. So manually mark our UsizePair to be Sync.
// (Effectively telling the compiler "I'll take care of it!")
unsafe impl Sync for UsizePair {}

static NTHREADS: usize = 8;
static NITERS: usize = 1000000;

fn main() {
    let upair = Arc::new(UsizePair::new(0));

    // Barrier is a counter-like synchronization structure (not to be confused
    // with a memory barrier). It blocks on a `wait` call until a fixed number
    // of `wait` calls are made from various threads (like waiting for all
    // players to get to the starting line before firing the starter pistol).
    let barrier = Arc::new(Barrier::new(NTHREADS + 1));

    let mut children = vec![];

    for _ in 0..NTHREADS {
        let upair = upair.clone();
        let barrier = barrier.clone();
        children.push(thread::spawn(move || {
            barrier.wait();

            let mut v = 0;
            while v < NITERS - 1 {
                // Read both members `atom` and `norm`, and check whether `atom`
                // contains a newer value than `norm`. See `UsizePair` impl for
                // details.
                let (atom, norm) = upair.get();
                if atom > norm {
                    // If `Acquire`-`Release` ordering is used in `get` and
                    // `set`, then this statement will never be reached.
                    println!("Reordered! {} > {}", atom, norm);
                }
                v = atom;
            }
        }));
    }

    barrier.wait();

    for v in 1..NITERS {
        // Update both members `atom` and `norm` to value `v`. See the impl for
        // details.
        upair.set(v);
    }

    for child in children {
        let _ = child.join();
    }
}

impl UsizePair {
    pub fn new(v: usize) -> UsizePair {
        UsizePair {
            atom: AtomicUsize::new(v),
            norm: UnsafeCell::new(v),
        }
    }

    pub fn get(&self) -> (usize, usize) {
        let atom = self.atom.load(Ordering::Relaxed); //Ordering::Acquire

        // If the above load operation is performed with `Acquire` ordering,
        // then all writes before the corresponding `Release` store is
        // guaranteed to be visible below.

        let norm = unsafe { *self.norm.get() };
        (atom, norm)
    }

    pub fn set(&self, v: usize) {
        unsafe { *self.norm.get() = v };

        // If the below store operation is performed with `Release` ordering,
        // then the write to `norm` above is guaranteed to be visible to all
        // threads that "loads `atom` with `Acquire` ordering and sees the same
        // value that was stored below". However, no guarantees are provided as
        // to when other readers will witness the below store, and consequently
        // the above write. On the other hand, there is also no guarantee that
        // these two values will be in sync for readers. Even if another thread
        // sees the same value that was stored below, it may actually see a
        // "later" value in `norm` than what was written above. That is, there
        // is no restriction on visibility into the future.

        self.atom.store(v, Ordering::Relaxed); //Ordering::Release
    }
}

참고 : x86 아키텍처는 강력한 메모리 모델을 가지고 있습니다. 이 게시물 에 자세히 설명되어 있습니다. 또한 아키텍처 비교를 위해 Wikipedia 페이지 를 살펴보십시오.

읽기 - 쓰기 잠금

RwLocks를 사용하면 단일 제작자가 임의의 수의 독자에게 데이터를 제공 할 수 있으며 독자는 유효하지 않거나 일치하지 않는 데이터를 보지 못하게 할 수 있습니다.

다음 예제에서는 RwLock을 사용하여 두 개의 소비자 스레드가 값을 읽는 동안 단일 제작자 스레드가 값을 주기적으로 늘릴 수있는 방법을 보여줍니다.

use std::time::Duration;
use std::thread;
use std::thread::sleep;
use std::sync::{Arc, RwLock };

fn main() {
    // Create an u32 with an inital value of 0
    let initial_value = 0u32;

    // Move the initial value into the read-write lock which is wrapped into an atomic reference
    // counter in order to allow safe sharing.
    let rw_lock = Arc::new(RwLock::new(initial_value));

    // Create a clone for each thread
    let producer_lock = rw_lock.clone();
    let consumer_id_lock = rw_lock.clone();
    let consumer_square_lock = rw_lock.clone();

    let producer_thread = thread::spawn(move || {
        loop {
            // write() blocks this thread until write-exclusive access can be acquired and retuns an 
            // RAII guard upon completion
            if let Ok(mut write_guard) = producer_lock.write() {
                // the returned write_guard implements `Deref` giving us easy access to the target value
                *write_guard += 1;

                println!("Updated value: {}", *write_guard);
            }

            // ^
            // |   when the RAII guard goes out of the scope, write access will be dropped, allowing
            // +~  other threads access the lock

            sleep(Duration::from_millis(1000));
        }
    });

    // A reader thread that prints the current value to the screen
    let consumer_id_thread = thread::spawn(move || {
        loop {
            // read() will only block when `producer_thread` is holding a write lock
            if let Ok(read_guard) = consumer_id_lock.read() {
                // the returned read_guard also implements `Deref`
                println!("Read value: {}", *read_guard);
            }

            sleep(Duration::from_millis(500));
        }
    });

    // A second reader thread is printing the squared value to the screen. Note that readers don't
    // block each other so `consumer_square_thread` can run simultaneously with `consumer_id_lock`.
    let consumer_square_thread = thread::spawn(move || {
        loop {
            if let Ok(lock) = consumer_square_lock.read() {
                let value = *lock;
                println!("Read value squared: {}", value * value);
            }

            sleep(Duration::from_millis(750));
        }
    });

    let _ = producer_thread.join();
    let _ = consumer_id_thread.join();
    let _ = consumer_square_thread.join();
}

예제 출력 :

Updated value: 1
Read value: 1
Read value squared: 1
Read value: 1
Read value squared: 1
Updated value: 2
Read value: 2
Read value: 2
Read value squared: 4
Updated value: 3
Read value: 3
Read value squared: 9
Read value: 3
Updated value: 4
Read value: 4
Read value squared: 16
Read value: 4
Read value squared: 16
Updated value: 5
Read value: 5
Read value: 5
Read value squared: 25
...(Interrupted)...


Modified text is an extract of the original Stack Overflow Documentation
아래 라이선스 CC BY-SA 3.0
와 제휴하지 않음 Stack Overflow