

MailboxProcessorは、複数のプロデューサがさまざまなPostメソッドのバリエーションを使用してメッセージを投稿できる、内部メッセージキューを維持します。これらのメッセージは、 RetrieveScanバリアントを使用して単一のコンシューマ(別途実装しない限り)によって取得され、処理されます。デフォルトでは、メッセージの生成と使用の両方がスレッドセーフです。


基本的なHello World

簡単な「Hello world!」を作成しましょう。 1つのタイプのメッセージを処理し、グリーティングを印刷するMailboxProcessor

メッセージタイプが必要です。それは何でもかまいませんが、 差別化されたユニオンは、可能なすべてのケースを1か所にリストし、それらを処理する際に簡単にパターンマッチングを使用できるため、当然の選択です。

// In this example there is only a single case, it could technically be just a string
type GreeterMessage = SayHelloTo of string

次に、プロセッサ自体を定義します。これは、 MailboxProcessor<'message>.Startメソッドを使用して開始することができます。コンストラクタを使用することもできますが、後でプロセッサを起動する必要があります。

let processor = MailboxProcessor<GreeterMessage>.Start(fun inbox ->
    let rec innerLoop () = async {
        // This way you retrieve message from the mailbox queue
        // or await them in case the queue empty.
        // You can think of the `inbox` parameter as a reference to self.
        let! message = inbox.Receive()
        // Now you can process the retrieved message.
        match message with
        | SayHelloTo name ->
            printfn "Hi, %s! This is mailbox processor's inner loop!" name
        // After that's done, don't forget to recurse so you can process the next messages!
    innerLoop ())

Startのパラメータは、 MailboxProcessor自身への参照をMailboxProcessorするMailboxProcessorです(まだ作成していないが、関数が実行されると利用可能になります)。これにより、さまざまなReceive and Scanメソッドにアクセスして、メールボックスからのメッセージにアクセスできます。この関数の中では、必要な処理を行うことができますが、通常の方法は、メッセージを1つずつ読み込んでそれぞれを呼び出した無限ループです。


processor.Post(SayHelloTo "Alice")

これにより、 processorの内部キュー、メールボックスにメッセージが書き込まれ、すぐに呼び出しコードが続行できるように戻ります。プロセッサーがメッセージを取得すると、メッセージは処理されますが、メッセージは非同期でポスティングされ、別のスレッドで処理される可能性が高くなります。

その後すぐに"Hi, Alice! This is mailbox processor's inner loop!"というメッセージが表示され"Hi, Alice! This is mailbox processor's inner loop!"出力に印刷され、より複雑なサンプルの準備が整いました。



// Increment or decrement by one.
type CounterMessage =
    | Increment
    | Decrement

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // You can represent the processor's internal mutable state
        // as an immutable parameter to the inner loop function
        let rec innerLoop state = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            // In each call you use the current state to produce a new
            // value, which will be passed to the next call, so that
            // next message sees only the new value as its local state
            match message with
            | Increment ->
                let state' = state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop state'
            | Decrement ->
                let state' = state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop state'
        // We pass the initialState to the first call to innerLoop
        innerLoop initialState)

// Let's pick an initial value and create the processor
let processor = createProcessor 10




Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12



let processor = createProcessor 0

[ async { processor.Post(Increment) }
  async { processor.Post(Increment) }
  async { processor.Post(Decrement) }
  async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously





let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // In this case we represent the state as a mutable binding
        // local to this function. innerLoop will close over it and
        // change its value in each iteration instead of passing it around
        let mutable state = initialState

        let rec innerLoop () = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            match message with
            | Increment ->
                let state <- state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop ()
            | Decrement ->
                let state <- state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop ()
        innerLoop ())




type MessageWithResponse = Message of InputData * AsyncReplyChannel<OutputData>


let! message = inbox.Receive()
match message with
| MessageWithResponse (data, r) ->
    // ...process the data
    let output = ...

メッセージを作成するには、 AsyncReplyChannel<'a>が必要です。これは何ですか?そして、動作するインスタンスをどのように作成しますか?最も良い方法はMailboxProcessorにあなたのためにそれを提供させ、より一般的なAsync<'a>への応答を抽出することです。これは、完全なメッセージをポストするのではなく、代わりにタイプの関数(このケースでは) AsyncReplyChannel<OutputData> -> MessageWithResponsePostAndAsynReplyメソッドを使用することによって行うことができAsyncReplyChannel<OutputData> -> MessageWithResponse

let! output = processor.PostAndAsyncReply(r -> MessageWithResponse(input, r))




ScanまたはTryScan方法を使用してキュー内の特定のメッセージを検索し、それらの前にあるメッセージの数に関係なく処理できます。どちらの方法も、到着した順にキュー内のメッセージを調べ、指定されたメッセージを検索します(オプションのタイムアウトまで)。そのようなメッセージがない場合、 TryScanはNoneを返し、 Scanはそのようなメッセージが到着するまで、または操作がタイムアウトするまで待機します。

それを実際に見てみましょう。 RegularOperationsプロセッサがRegularOperations処理するようにしたいが、 PriorityOperationがあるときは、キューに他のRegularOperationsがいくつあっても、できるだけ早く処理する必要がある。

type Message =
    | RegularOperation of string
    | PriorityOperation of string

let processor = MailboxProcessor<Message>.Start(fun inbox ->
    let rec innerLoop () = async {
        let! priorityData = inbox.TryScan(fun msg ->
            // If there is a PriorityOperation, retrieve its data.
            match msg with
            | PriorityOperation data -> Some data
            | _ -> None)

        match priorityData with
        | Some data ->
            // Process the data from PriorityOperation.
        | None ->
            // No PriorityOperation was in the queue at the time, so
            // let's fall back to processing all possible messages
            let! message = inbox.Receive()
            match message with
            | RegularOperation data ->
                // We have free time, let's process the RegularOperation.
            | PriorityOperation data ->
                // We did scan the queue, but it might have been empty
                // so it is possible that in the meantime a producer
                // posted a new message and it is a PriorityOperation.
        // And never forget to process next messages.
        innerLoop ()

Modified text is an extract of the original Stack Overflow Documentation
ライセンスを受けた CC BY-SA 3.0
所属していない Stack Overflow