수색…
소개
std::thread
모듈, 채널 및 원자 단위와 같은 다양한 클래스를 통해 Rust의 표준 라이브러리에서 잘 지원됩니다. 이 섹션에서는 이러한 유형의 사용법을 안내합니다.
새 스레드 시작
새 스레드를 시작하려면 :
use std::thread;
fn main() {
thread::spawn(move || {
// The main thread will not wait for this thread to finish. That
// might mean that the next println isn't even executed before the
// program exits.
println!("Hello from spawned thread");
});
let join_handle = thread::spawn(move || {
println!("Hello from second spawned thread");
// To ensure that the program waits for a thread to finish, we must
// call `join()` on its join handle. It is even possible to send a
// value to a different thread through the join handle, like the
// integer 17 in this case:
17
});
println!("Hello from the main thread");
// The above three printlns can be observed in any order.
// Block until the second spawned thread has finished.
match join_handle.join() {
Ok(x) => println!("Second spawned thread returned {}", x),
Err(_) => println!("Second spawned thread panicked")
}
}
채널과의 크로스 스레드 통신
채널을 사용하여 한 스레드에서 다른 스레드로 데이터를 보낼 수 있습니다. 다음은 주 생성자가 0, 1, ..., 9 값을 생성하고 생성 된 스레드가 값을 인쇄하는 간단한 생산자 - 소비자 시스템의 예입니다.
use std::thread;
use std::sync::mpsc::channel;
fn main() {
// Create a channel with a sending end (tx) and a receiving end (rx).
let (tx, rx) = channel();
// Spawn a new thread, and move the receiving end into the thread.
let join_handle = thread::spawn(move || {
// Keep receiving in a loop, until tx is dropped!
while let Ok(n) = rx.recv() { // Note: `recv()` always blocks
println!("Received {}", n);
}
});
// Note: using `rx` here would be a compile error, as it has been
// moved into the spawned thread.
// Send some values to the spawned thread. `unwrap()` crashes only if the
// receiving end was dropped before it could be buffered.
for i in 0..10 {
tx.send(i).unwrap(); // Note: `send()` never blocks
}
// Drop `tx` so that `rx.recv()` returns an `Err(_)`.
drop(tx);
// Wait for the spawned thread to finish.
join_handle.join().unwrap();
}
세션 유형과의 크로스 스레드 통신
세션 유형은 HTTP 또는 FTP와 같은 프로토콜이 아니라 스레드 간의 정보 흐름 패턴과 같이 스레드간에 통신하는 데 사용할 프로토콜에 대해 컴파일러에 알리는 방법입니다. 이것은 컴파일러가 우연히 프로토콜을 깨뜨리고 스레드 사이에서 교착 상태 또는 라이브 록을 일으키는 것을 막을 수 있기 때문에 유용합니다. 가장 악명 높은 디버깅 문제 중 일부와 Heisenbugs의 주요 출처입니다. 세션 유형은 위에서 설명한 채널과 유사하게 작동하지만 사용하기가 더 어려워 질 수 있습니다. 다음은 간단한 2 스레드 통신입니다.
// Session Types aren't part of the standard library, but are part of this crate.
// You'll need to add session_types to your Cargo.toml file.
extern crate session_types;
// For now, it's easiest to just import everything from the library.
use session_types::*;
// First, we describe what our client thread will do. Note that there's no reason
// you have to use a client/server model - it's just convenient for this example.
// This type says that a client will first send a u32, then quit. `Eps` is
// shorthand for "end communication".
// Session Types use two generic parameters to describe the protocol - the first
// for the current communication, and the second for what will happen next.
type Client = Send<u32, Eps>;
// Now, we define what the server will do: it will receive as u32, then quit.
type Server = Recv<u32, Eps>;
// This function is ordinary code to run the client. Notice that it takes
// ownership of a channel, just like other forms of interthread communication -
// but this one about the protocol we just defined.
fn run_client(channel: Chan<(), Client>) {
let channel = channel.send(42);
println!("The client just sent the number 42!");
channel.close();
}
// Now we define some code to run the server. It just accepts a value and prints
// it.
fn run_server(channel: Chan<(), Server>) {
let (channel, data) = channel.recv();
println!("The server received some data: {}", data);
channel.close();
}
fn main() {
// First, create the channels used for the two threads to talk to each other.
let (server_channel, client_channel) = session_channel();
// Start the server on a new thread
let server_thread = std::thread::spawn(move || {
run_server(server_channel);
});
// Run the client on this thread.
run_client(client_channel);
// Wait for the server to finish.
server_thread.join().unwrap();
}
서버가 자체 기능으로 이동하면 main 메소드는 위에서 정의한 크로스 스레드 통신의 main 메소드와 매우 유사하게 보입니다. 이것을 실행한다면 결과를 얻을 수 있습니다 :
The client just sent the number 42!
The server received some data: 42
그와 같은 순서로.
클라이언트와 서버 유형을 정의하는 모든 번거로운 과정을 거쳐야하는 이유는 무엇입니까? 그리고 클라이언트와 서버에서 채널을 재정의하는 이유는 무엇입니까? 이 질문에는 동일한 대답이 있습니다. 컴파일러가 프로토콜을 위반하는 것을 막을 것입니다! 클라이언트가 데이터를 보내지 않고 데이터를 수신하려고 시도하면 (보통 코드에서 교착 상태가 발생 함) 클라이언트의 채널 객체 에 recv
메소드가 없으므로 프로그램이 컴파일 되지 않습니다. 또한 교착 상태로 이어질 수있는 방식으로 프로토콜을 정의하려고 시도한 경우 (예 : 클라이언트와 서버가 모두 값을 수신하려고 시도한 경우) 채널을 만들 때 컴파일이 실패합니다. 이것은 Send
와 Recv
가 "Dual Type"이기 때문에 서버가 하나를 수행하면 클라이언트가 다른 서버를 처리해야 함을 의미합니다. 둘 다 Recv
시도하면 문제가 발생할 것입니다. Eps
는 클라이언트와 서버가 채널을 닫는 데 동의하기 때문에 자체의 이중 유형입니다.
물론 채널에서 작업을 수행 할 때 프로토콜에서 새로운 상태로 바뀌며 사용할 수있는 기능이 변경 될 수 있으므로 채널 바인딩을 다시 정의해야합니다. 다행히도 session_types
는 우리를 대신하여 새로운 채널을 반환합니다 ( close
경우 제외, 새 채널이없는 경우 제외). 이것은 또한 채널의 모든 메소드가 채널의 소유권을 갖게됨을 의미합니다. 따라서 채널을 다시 정의하는 것을 잊어 버리면 컴파일러는 그에 대한 오류도 줄 것입니다. 채널을 닫지 않고 드롭하면 런타임 오류도 발생합니다 (불행히도 컴파일 타임에는 확인할 수 없음).
단지보다 의사 소통의 더 많은 종류가 있습니다 Send
및 Recv
예를 들어, - Offer
채널의 다른 팀의 프로토콜의 두 가지 지점 사이에서 선택 할 수있는 기능을 제공하고, Rec
및 Var
함께 작업 프로토콜의 루프와 재귀 수 있도록이 . 세션 유형 및 기타 유형의 많은 예제가 session_types
GitHub 저장소 에서 사용 가능 합니다 . 도서관의 문서는 여기 에서 찾을 수 있습니다.
원자 및 메모리 주문
원자 유형은 잠금없는 데이터 구조 및 기타 동시 유형의 빌딩 블록입니다. 원자 유형에 액세스하거나 수정할 때 메모리 배리어의 강도를 나타내는 메모리 정렬을 지정해야합니다. Rust는 Relaxed (가장 약한), Acquire (읽기 일 경우), Release (일명 스토어 쓰기), AcqRel ( "Acquire-for-load"및 "Release-for-store"와 동일)의 다섯 가지 메모리 순서 지정 프리미티브를 제공합니다. (예 : compare-and-swap), SeqCst (가장 강한 것)와 같은 단일 작업에 관여합니다. 아래 예제에서 "Relaxed"주문이 "Acquire"및 "Release"주문과 다른 점을 보여줍니다.
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
struct UsizePair {
atom: AtomicUsize,
norm: UnsafeCell<usize>,
}
// UnsafeCell is not thread-safe. So manually mark our UsizePair to be Sync.
// (Effectively telling the compiler "I'll take care of it!")
unsafe impl Sync for UsizePair {}
static NTHREADS: usize = 8;
static NITERS: usize = 1000000;
fn main() {
let upair = Arc::new(UsizePair::new(0));
// Barrier is a counter-like synchronization structure (not to be confused
// with a memory barrier). It blocks on a `wait` call until a fixed number
// of `wait` calls are made from various threads (like waiting for all
// players to get to the starting line before firing the starter pistol).
let barrier = Arc::new(Barrier::new(NTHREADS + 1));
let mut children = vec![];
for _ in 0..NTHREADS {
let upair = upair.clone();
let barrier = barrier.clone();
children.push(thread::spawn(move || {
barrier.wait();
let mut v = 0;
while v < NITERS - 1 {
// Read both members `atom` and `norm`, and check whether `atom`
// contains a newer value than `norm`. See `UsizePair` impl for
// details.
let (atom, norm) = upair.get();
if atom > norm {
// If `Acquire`-`Release` ordering is used in `get` and
// `set`, then this statement will never be reached.
println!("Reordered! {} > {}", atom, norm);
}
v = atom;
}
}));
}
barrier.wait();
for v in 1..NITERS {
// Update both members `atom` and `norm` to value `v`. See the impl for
// details.
upair.set(v);
}
for child in children {
let _ = child.join();
}
}
impl UsizePair {
pub fn new(v: usize) -> UsizePair {
UsizePair {
atom: AtomicUsize::new(v),
norm: UnsafeCell::new(v),
}
}
pub fn get(&self) -> (usize, usize) {
let atom = self.atom.load(Ordering::Relaxed); //Ordering::Acquire
// If the above load operation is performed with `Acquire` ordering,
// then all writes before the corresponding `Release` store is
// guaranteed to be visible below.
let norm = unsafe { *self.norm.get() };
(atom, norm)
}
pub fn set(&self, v: usize) {
unsafe { *self.norm.get() = v };
// If the below store operation is performed with `Release` ordering,
// then the write to `norm` above is guaranteed to be visible to all
// threads that "loads `atom` with `Acquire` ordering and sees the same
// value that was stored below". However, no guarantees are provided as
// to when other readers will witness the below store, and consequently
// the above write. On the other hand, there is also no guarantee that
// these two values will be in sync for readers. Even if another thread
// sees the same value that was stored below, it may actually see a
// "later" value in `norm` than what was written above. That is, there
// is no restriction on visibility into the future.
self.atom.store(v, Ordering::Relaxed); //Ordering::Release
}
}
참고 : x86 아키텍처는 강력한 메모리 모델을 가지고 있습니다. 이 게시물 에 자세히 설명되어 있습니다. 또한 아키텍처 비교를 위해 Wikipedia 페이지 를 살펴보십시오.
읽기 - 쓰기 잠금
RwLocks를 사용하면 단일 제작자가 임의의 수의 독자에게 데이터를 제공 할 수 있으며 독자는 유효하지 않거나 일치하지 않는 데이터를 보지 못하게 할 수 있습니다.
다음 예제에서는 RwLock을 사용하여 두 개의 소비자 스레드가 값을 읽는 동안 단일 제작자 스레드가 값을 주기적으로 늘릴 수있는 방법을 보여줍니다.
use std::time::Duration;
use std::thread;
use std::thread::sleep;
use std::sync::{Arc, RwLock };
fn main() {
// Create an u32 with an inital value of 0
let initial_value = 0u32;
// Move the initial value into the read-write lock which is wrapped into an atomic reference
// counter in order to allow safe sharing.
let rw_lock = Arc::new(RwLock::new(initial_value));
// Create a clone for each thread
let producer_lock = rw_lock.clone();
let consumer_id_lock = rw_lock.clone();
let consumer_square_lock = rw_lock.clone();
let producer_thread = thread::spawn(move || {
loop {
// write() blocks this thread until write-exclusive access can be acquired and retuns an
// RAII guard upon completion
if let Ok(mut write_guard) = producer_lock.write() {
// the returned write_guard implements `Deref` giving us easy access to the target value
*write_guard += 1;
println!("Updated value: {}", *write_guard);
}
// ^
// | when the RAII guard goes out of the scope, write access will be dropped, allowing
// +~ other threads access the lock
sleep(Duration::from_millis(1000));
}
});
// A reader thread that prints the current value to the screen
let consumer_id_thread = thread::spawn(move || {
loop {
// read() will only block when `producer_thread` is holding a write lock
if let Ok(read_guard) = consumer_id_lock.read() {
// the returned read_guard also implements `Deref`
println!("Read value: {}", *read_guard);
}
sleep(Duration::from_millis(500));
}
});
// A second reader thread is printing the squared value to the screen. Note that readers don't
// block each other so `consumer_square_thread` can run simultaneously with `consumer_id_lock`.
let consumer_square_thread = thread::spawn(move || {
loop {
if let Ok(lock) = consumer_square_lock.read() {
let value = *lock;
println!("Read value squared: {}", value * value);
}
sleep(Duration::from_millis(750));
}
});
let _ = producer_thread.join();
let _ = consumer_id_thread.join();
let _ = consumer_square_thread.join();
}
예제 출력 :
Updated value: 1
Read value: 1
Read value squared: 1
Read value: 1
Read value squared: 1
Updated value: 2
Read value: 2
Read value: 2
Read value squared: 4
Updated value: 3
Read value: 3
Read value squared: 9
Read value: 3
Updated value: 4
Read value: 4
Read value squared: 16
Read value: 4
Read value squared: 16
Updated value: 5
Read value: 5
Read value: 5
Read value squared: 25
...(Interrupted)...