サーチ…
前書き
std::thread
モジュール、チャネル、アトミックなどのさまざまなクラスを通じて、Rustの標準ライブラリによってよくサポートされています。このセクションでは、これらのタイプの使用法について説明します。
新しいスレッドを開始する
新しいスレッドを開始するには:
use std::thread;
fn main() {
thread::spawn(move || {
// The main thread will not wait for this thread to finish. That
// might mean that the next println isn't even executed before the
// program exits.
println!("Hello from spawned thread");
});
let join_handle = thread::spawn(move || {
println!("Hello from second spawned thread");
// To ensure that the program waits for a thread to finish, we must
// call `join()` on its join handle. It is even possible to send a
// value to a different thread through the join handle, like the
// integer 17 in this case:
17
});
println!("Hello from the main thread");
// The above three printlns can be observed in any order.
// Block until the second spawned thread has finished.
match join_handle.join() {
Ok(x) => println!("Second spawned thread returned {}", x),
Err(_) => println!("Second spawned thread panicked")
}
}
チャネルとのクロススレッド通信
チャネルを使用して、あるスレッドから別のスレッドにデータを送信できます。以下は、メインスレッドが0,1、...、9の値を生成するシンプルなプロデューサ - コンシューマシステムの例です。生成されたスレッドはそれらを出力します。
use std::thread;
use std::sync::mpsc::channel;
fn main() {
// Create a channel with a sending end (tx) and a receiving end (rx).
let (tx, rx) = channel();
// Spawn a new thread, and move the receiving end into the thread.
let join_handle = thread::spawn(move || {
// Keep receiving in a loop, until tx is dropped!
while let Ok(n) = rx.recv() { // Note: `recv()` always blocks
println!("Received {}", n);
}
});
// Note: using `rx` here would be a compile error, as it has been
// moved into the spawned thread.
// Send some values to the spawned thread. `unwrap()` crashes only if the
// receiving end was dropped before it could be buffered.
for i in 0..10 {
tx.send(i).unwrap(); // Note: `send()` never blocks
}
// Drop `tx` so that `rx.recv()` returns an `Err(_)`.
drop(tx);
// Wait for the spawned thread to finish.
join_handle.join().unwrap();
}
セッションタイプとのクロススレッド通信
セッションタイプは、HTTPやFTPのようなプロトコルではなく、スレッド間の情報フローのパターンで、スレッド間の通信に使用するプロトコルについてコンパイラに指示する方法です。これは、コンパイラが誤ってプロトコルを破棄してスレッド間でデッドロックやライブロックを引き起こすのを防ぐことができます。これは、最もよく知られているデバッグの難しい問題のいくつかとHeisenbugsの主要なソースです。セッションタイプは上記のチャンネルと同様に機能しますが、使用を開始するのがさらに難しくなります。ここでは簡単な2スレッド通信を示します:
// Session Types aren't part of the standard library, but are part of this crate.
// You'll need to add session_types to your Cargo.toml file.
extern crate session_types;
// For now, it's easiest to just import everything from the library.
use session_types::*;
// First, we describe what our client thread will do. Note that there's no reason
// you have to use a client/server model - it's just convenient for this example.
// This type says that a client will first send a u32, then quit. `Eps` is
// shorthand for "end communication".
// Session Types use two generic parameters to describe the protocol - the first
// for the current communication, and the second for what will happen next.
type Client = Send<u32, Eps>;
// Now, we define what the server will do: it will receive as u32, then quit.
type Server = Recv<u32, Eps>;
// This function is ordinary code to run the client. Notice that it takes
// ownership of a channel, just like other forms of interthread communication -
// but this one about the protocol we just defined.
fn run_client(channel: Chan<(), Client>) {
let channel = channel.send(42);
println!("The client just sent the number 42!");
channel.close();
}
// Now we define some code to run the server. It just accepts a value and prints
// it.
fn run_server(channel: Chan<(), Server>) {
let (channel, data) = channel.recv();
println!("The server received some data: {}", data);
channel.close();
}
fn main() {
// First, create the channels used for the two threads to talk to each other.
let (server_channel, client_channel) = session_channel();
// Start the server on a new thread
let server_thread = std::thread::spawn(move || {
run_server(server_channel);
});
// Run the client on this thread.
run_client(client_channel);
// Wait for the server to finish.
server_thread.join().unwrap();
}
メインメソッドは、サーバーがそれ自身の関数に移動された場合に、上記で定義したクロススレッド通信のメインメソッドと非常によく似ていることに注意してください。これを実行すると、出力が得られます:
The client just sent the number 42!
The server received some data: 42
その順序で。
クライアントとサーバーの種類を定義するすべての面倒をなぜ避けていますか?クライアントとサーバーでチャネルを再定義するのはなぜですか?これらの質問にも同じ答えがあります:コンパイラはプロトコルを壊すのを止めます!クライアントがデータを送信する代わりにデータを受信しようとすると(通常のコードではデッドロックになる)、クライアントのチャネルオブジェクトにはrecv
メソッドがないため、プログラムはコンパイルされません。また、デッドロックを引き起こす可能性のあるプロトコルを定義しようとすると(たとえば、クライアントとサーバーの両方が値を受け取ろうとした場合など)、チャネルを作成するとコンパイルが失敗します。これは、 Send
とRecv
が "Dual Type"であることを意味します。つまり、サーバーがRecv
場合、クライアントはもう一方を実行する必要があります。両方がRecv
場合、問題が発生します。 Eps
は独自のデュアルタイプです。これは、クライアントとサーバーの両方がチャネルを閉じることに合意しているからです。
もちろん、チャンネルで何らかの操作を行うと、プロトコルの新しい状態に移行し、使用できる機能が変更される可能性があります。そのため、チャンネルバインディングを再定義する必要があります。幸いにも、 session_types
私たちのためにそれの世話をすると、必ず(除いて、新たなチャネルを返しclose
、その場合には新たなチャネルが存在しません、)。これは、チャンネルのすべてのメソッドもチャンネルの所有権を取得することを意味します。チャンネルを再定義するのを忘れた場合、コンパイラはそのことについてもエラーを表示します。チャンネルを閉じずにチャンネルをドロップすると、それもランタイムエラーです(残念ながら、コンパイル時にチェックすることは不可能です)。
より多くのだけよりも、通信の種類がありますSend
し、 Recv
例えば、 - Offer
、プロトコルの2つの可能性のある枝の間で選択したためにチャネルの反対側を与える能力、およびRec
とVar
プロトコルでループや再帰を許可する一緒に仕事が。 session_types
GitHubリポジトリには、Session Typesやその他のタイプのサンプルがさらにたくさんあります 。図書館の資料はここにあります。
原子およびメモリの順序
原子タイプは、ロックフリーのデータ構造とその他の並行タイプのビルディングブロックです。アトミックタイプにアクセス/変更するときは、メモリーバリアーの強さを表すメモリーの順序付けを指定する必要があります。 Rustは5つのメモリ順序付けプリミティブを提供します: Relaxed (最も弱い)、 Acquire (読み込み用)、 Release (別名ストア用)、 AcqRel (「Acquire-for-load」および「 Release-コンペア・アンド・スワップなどの単一の操作に関与しています)、 SeqCst (最も強いもの)です。以下の例では、「リラックス」オーダーと「取得」オーダーとの違いを示します。
use std::cell::UnsafeCell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
struct UsizePair {
atom: AtomicUsize,
norm: UnsafeCell<usize>,
}
// UnsafeCell is not thread-safe. So manually mark our UsizePair to be Sync.
// (Effectively telling the compiler "I'll take care of it!")
unsafe impl Sync for UsizePair {}
static NTHREADS: usize = 8;
static NITERS: usize = 1000000;
fn main() {
let upair = Arc::new(UsizePair::new(0));
// Barrier is a counter-like synchronization structure (not to be confused
// with a memory barrier). It blocks on a `wait` call until a fixed number
// of `wait` calls are made from various threads (like waiting for all
// players to get to the starting line before firing the starter pistol).
let barrier = Arc::new(Barrier::new(NTHREADS + 1));
let mut children = vec![];
for _ in 0..NTHREADS {
let upair = upair.clone();
let barrier = barrier.clone();
children.push(thread::spawn(move || {
barrier.wait();
let mut v = 0;
while v < NITERS - 1 {
// Read both members `atom` and `norm`, and check whether `atom`
// contains a newer value than `norm`. See `UsizePair` impl for
// details.
let (atom, norm) = upair.get();
if atom > norm {
// If `Acquire`-`Release` ordering is used in `get` and
// `set`, then this statement will never be reached.
println!("Reordered! {} > {}", atom, norm);
}
v = atom;
}
}));
}
barrier.wait();
for v in 1..NITERS {
// Update both members `atom` and `norm` to value `v`. See the impl for
// details.
upair.set(v);
}
for child in children {
let _ = child.join();
}
}
impl UsizePair {
pub fn new(v: usize) -> UsizePair {
UsizePair {
atom: AtomicUsize::new(v),
norm: UnsafeCell::new(v),
}
}
pub fn get(&self) -> (usize, usize) {
let atom = self.atom.load(Ordering::Relaxed); //Ordering::Acquire
// If the above load operation is performed with `Acquire` ordering,
// then all writes before the corresponding `Release` store is
// guaranteed to be visible below.
let norm = unsafe { *self.norm.get() };
(atom, norm)
}
pub fn set(&self, v: usize) {
unsafe { *self.norm.get() = v };
// If the below store operation is performed with `Release` ordering,
// then the write to `norm` above is guaranteed to be visible to all
// threads that "loads `atom` with `Acquire` ordering and sees the same
// value that was stored below". However, no guarantees are provided as
// to when other readers will witness the below store, and consequently
// the above write. On the other hand, there is also no guarantee that
// these two values will be in sync for readers. Even if another thread
// sees the same value that was stored below, it may actually see a
// "later" value in `norm` than what was written above. That is, there
// is no restriction on visibility into the future.
self.atom.store(v, Ordering::Relaxed); //Ordering::Release
}
}
注:x86アーキテクチャは強力なメモリモデルを持っています。 この記事で詳しく説明しています。また、アーキテクチャの比較については、Wikipediaのページをご覧ください 。
読み書きロック
RwLockを使用すると、単一のプロデューサが任意の数の読者にデータを提供し、読者は無効または不一致のデータを見ることができなくなります。
次の例では、RwLockを使用して、2つのコンシューマスレッドが値を読み取っている間に、1つのプロデューサスレッドが定期的に値を増やす方法を示しています。
use std::time::Duration;
use std::thread;
use std::thread::sleep;
use std::sync::{Arc, RwLock };
fn main() {
// Create an u32 with an inital value of 0
let initial_value = 0u32;
// Move the initial value into the read-write lock which is wrapped into an atomic reference
// counter in order to allow safe sharing.
let rw_lock = Arc::new(RwLock::new(initial_value));
// Create a clone for each thread
let producer_lock = rw_lock.clone();
let consumer_id_lock = rw_lock.clone();
let consumer_square_lock = rw_lock.clone();
let producer_thread = thread::spawn(move || {
loop {
// write() blocks this thread until write-exclusive access can be acquired and retuns an
// RAII guard upon completion
if let Ok(mut write_guard) = producer_lock.write() {
// the returned write_guard implements `Deref` giving us easy access to the target value
*write_guard += 1;
println!("Updated value: {}", *write_guard);
}
// ^
// | when the RAII guard goes out of the scope, write access will be dropped, allowing
// +~ other threads access the lock
sleep(Duration::from_millis(1000));
}
});
// A reader thread that prints the current value to the screen
let consumer_id_thread = thread::spawn(move || {
loop {
// read() will only block when `producer_thread` is holding a write lock
if let Ok(read_guard) = consumer_id_lock.read() {
// the returned read_guard also implements `Deref`
println!("Read value: {}", *read_guard);
}
sleep(Duration::from_millis(500));
}
});
// A second reader thread is printing the squared value to the screen. Note that readers don't
// block each other so `consumer_square_thread` can run simultaneously with `consumer_id_lock`.
let consumer_square_thread = thread::spawn(move || {
loop {
if let Ok(lock) = consumer_square_lock.read() {
let value = *lock;
println!("Read value squared: {}", value * value);
}
sleep(Duration::from_millis(750));
}
});
let _ = producer_thread.join();
let _ = consumer_id_thread.join();
let _ = consumer_square_thread.join();
}
出力例:
Updated value: 1
Read value: 1
Read value squared: 1
Read value: 1
Read value squared: 1
Updated value: 2
Read value: 2
Read value: 2
Read value squared: 4
Updated value: 3
Read value: 3
Read value squared: 9
Read value: 3
Updated value: 4
Read value: 4
Read value squared: 16
Read value: 4
Read value squared: 16
Updated value: 5
Read value: 5
Read value: 5
Read value squared: 25
...(Interrupted)...