サーチ…


備考

MailboxProcessorは、複数のプロデューサがさまざまなPostメソッドのバリエーションを使用してメッセージを投稿できる、内部メッセージキューを維持します。これらのメッセージは、 RetrieveScanバリアントを使用して単一のコンシューマ(別途実装しない限り)によって取得され、処理されます。デフォルトでは、メッセージの生成と使用の両方がスレッドセーフです。

デフォルトでは、エラー処理は提供されていません。キャッチされていない例外がプロセッサの本体の内部で投げられると、本体機能は終了し、キュー内のすべてのメッセージが失われ、それ以上のメッセージは送信されず、返信チャネル(利用可能な場合)は応答ではなく例外が発生します。この動作がユースケースに合わない場合は、すべてのエラー処理を自分で行う必要があります。

基本的なHello World

簡単な「Hello world!」を作成しましょう。 1つのタイプのメッセージを処理し、グリーティングを印刷するMailboxProcessor

メッセージタイプが必要です。それは何でもかまいませんが、 差別化されたユニオンは、可能なすべてのケースを1か所にリストし、それらを処理する際に簡単にパターンマッチングを使用できるため、当然の選択です。

// In this example there is only a single case, it could technically be just a string
type GreeterMessage = SayHelloTo of string

次に、プロセッサ自体を定義します。これは、 MailboxProcessor<'message>.Startメソッドを使用して開始することができます。コンストラクタを使用することもできますが、後でプロセッサを起動する必要があります。

let processor = MailboxProcessor<GreeterMessage>.Start(fun inbox ->
    let rec innerLoop () = async {
        // This way you retrieve message from the mailbox queue
        // or await them in case the queue empty.
        // You can think of the `inbox` parameter as a reference to self.
        let! message = inbox.Receive()
        // Now you can process the retrieved message.
        match message with
        | SayHelloTo name ->
            printfn "Hi, %s! This is mailbox processor's inner loop!" name
        // After that's done, don't forget to recurse so you can process the next messages!
        innerLoop()
    }
    innerLoop ())

Startのパラメータは、 MailboxProcessor自身への参照をMailboxProcessorするMailboxProcessorです(まだ作成していないが、関数が実行されると利用可能になります)。これにより、さまざまなReceive and Scanメソッドにアクセスして、メールボックスからのメッセージにアクセスできます。この関数の中では、必要な処理を行うことができますが、通常の方法は、メッセージを1つずつ読み込んでそれぞれを呼び出した無限ループです。

今、プロセッサは準備ができていますが、何もしません!どうして?処理するメッセージを送信する必要があります。これはPostメソッドのバリエーションで行われます。最も基本的な、忘れないようにしましょう。

processor.Post(SayHelloTo "Alice")

これにより、 processorの内部キュー、メールボックスにメッセージが書き込まれ、すぐに呼び出しコードが続行できるように戻ります。プロセッサーがメッセージを取得すると、メッセージは処理されますが、メッセージは非同期でポスティングされ、別のスレッドで処理される可能性が高くなります。

その後すぐに"Hi, Alice! This is mailbox processor's inner loop!"というメッセージが表示され"Hi, Alice! This is mailbox processor's inner loop!"出力に印刷され、より複雑なサンプルの準備が整いました。

可変状態管理

メールボックスプロセッサを使用すると、変更可能な状態を透過的でスレッドセーフな方法で管理できます。簡単なカウンターを作ってみましょう。

// Increment or decrement by one.
type CounterMessage =
    | Increment
    | Decrement

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // You can represent the processor's internal mutable state
        // as an immutable parameter to the inner loop function
        let rec innerLoop state = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            // In each call you use the current state to produce a new
            // value, which will be passed to the next call, so that
            // next message sees only the new value as its local state
            match message with
            | Increment ->
                let state' = state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop state'
            | Decrement ->
                let state' = state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop state'
        }
        // We pass the initialState to the first call to innerLoop
        innerLoop initialState)

// Let's pick an initial value and create the processor
let processor = createProcessor 10

ここでいくつかの操作を生成しましょう

processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)

そして、次のログが表示されます

Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12

並行性

メールボックスプロセッサはメッセージを1つずつ処理し、インターリーブがないので、複数のスレッドからメッセージを生成することもできます。そのため、操作の紛失や重複の典型的な問題はありません。プロセッサーを具体的に実装しない限り、メッセージに古いメッセージの状態を使用する方法はありません。

let processor = createProcessor 0

[ async { processor.Post(Increment) }
  async { processor.Post(Increment) }
  async { processor.Post(Decrement) }
  async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously

すべてのメッセージは異なるスレッドから送信されます。メッセージがメールボックスに送信される順序は決定的ではないため、処理の順序は確定的ではありませんが、増分と減分の合計数が均衡しているため、最終状態はどの順序であっても0になりますどのスレッドからメッセージが送信されたかを示します。

真の可変状態

前の例では、再帰的なループパラメータを渡すことで変更可能な状態のみをシミュレートしましたが、メールボックスプロセッサは本当に変更可能な状態であってもこれらのプロパティをすべて保持しています。これは、大きな状態を維持し、パフォーマンスの理由から不変性が実用的でない場合に重要です。

カウンタを次の実装に書き換えることができます

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // In this case we represent the state as a mutable binding
        // local to this function. innerLoop will close over it and
        // change its value in each iteration instead of passing it around
        let mutable state = initialState

        let rec innerLoop () = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            match message with
            | Increment ->
                let state <- state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop ()
            | Decrement ->
                let state <- state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop ()
        }
        innerLoop ())

カウンタの状態が複数のスレッドから直接変更された場合、スレッドセーフではないにもかかわらず、前のセクションのPostsという並列メッセージを使用すると、各メッセージがインターリーブなしでメッセージを1つずつ処理することが分かります。最新の値。

戻り値

AsyncReplyChannel<'a>をメッセージの一部として送信すると、処理された各メッセージの値を非同期的に返すことができます。

type MessageWithResponse = Message of InputData * AsyncReplyChannel<OutputData>

次に、メールボックスプロセッサは、メッセージを処理するときにこのチャネルを使用して、呼び出し元に値を返すことができます。

let! message = inbox.Receive()
match message with
| MessageWithResponse (data, r) ->
    // ...process the data
    let output = ...
    r.Reply(output)

メッセージを作成するには、 AsyncReplyChannel<'a>が必要です。これは何ですか?そして、動作するインスタンスをどのように作成しますか?最も良い方法はMailboxProcessorにあなたのためにそれを提供させ、より一般的なAsync<'a>への応答を抽出することです。これは、完全なメッセージをポストするのではなく、代わりにタイプの関数(このケースでは) AsyncReplyChannel<OutputData> -> MessageWithResponsePostAndAsynReplyメソッドを使用することによって行うことができAsyncReplyChannel<OutputData> -> MessageWithResponse

let! output = processor.PostAndAsyncReply(r -> MessageWithResponse(input, r))

これは、メッセージをキューにポストし、プロセッサがこのメッセージを取得してチャネルを使用して応答すると到着する応答を待つ。

プロセッサが応答するまで呼び出しスレッドをブロックする同期バリアントPostAndReplyもあります。

アウト・オブ・オーダーのメッセージ処理

ScanまたはTryScan方法を使用してキュー内の特定のメッセージを検索し、それらの前にあるメッセージの数に関係なく処理できます。どちらの方法も、到着した順にキュー内のメッセージを調べ、指定されたメッセージを検索します(オプションのタイムアウトまで)。そのようなメッセージがない場合、 TryScanはNoneを返し、 Scanはそのようなメッセージが到着するまで、または操作がタイムアウトするまで待機します。

それを実際に見てみましょう。 RegularOperationsプロセッサがRegularOperations処理するようにしたいが、 PriorityOperationがあるときは、キューに他のRegularOperationsがいくつあっても、できるだけ早く処理する必要がある。

type Message =
    | RegularOperation of string
    | PriorityOperation of string

let processor = MailboxProcessor<Message>.Start(fun inbox ->
    let rec innerLoop () = async {
        let! priorityData = inbox.TryScan(fun msg ->
            // If there is a PriorityOperation, retrieve its data.
            match msg with
            | PriorityOperation data -> Some data
            | _ -> None)

        match priorityData with
        | Some data ->
            // Process the data from PriorityOperation.
        | None ->
            // No PriorityOperation was in the queue at the time, so
            // let's fall back to processing all possible messages
            let! message = inbox.Receive()
            match message with
            | RegularOperation data ->
                // We have free time, let's process the RegularOperation.
            | PriorityOperation data ->
                // We did scan the queue, but it might have been empty
                // so it is possible that in the meantime a producer
                // posted a new message and it is a PriorityOperation.
        // And never forget to process next messages.
        innerLoop ()
    }
    innerLoop())


Modified text is an extract of the original Stack Overflow Documentation
ライセンスを受けた CC BY-SA 3.0
所属していない Stack Overflow