Suche…


Bemerkungen

MailboxProcessor verwaltet eine interne Nachrichtenwarteschlange, in der mehrere Produzenten Nachrichten mit verschiedenen Post Methodenvarianten MailboxProcessor können. Diese Nachrichten werden dann von einem einzelnen Consumer abgerufen und verarbeitet (sofern Sie ihn nicht anderweitig implementieren), und zwar mithilfe der Varianten " Retrieve und " Scan . Standardmäßig ist das Erstellen und Verwenden der Nachrichten Thread-sicher.

Standardmäßig ist keine Fehlerbehandlung vorgesehen. Wenn eine nicht erfasste Ausnahme im Rumpf des Prozessors ausgelöst wird, wird die Rumpffunktion beendet, alle Nachrichten in der Warteschlange gehen verloren, es können keine weiteren Nachrichten mehr gesendet werden, und der Antwortkanal (falls verfügbar) erhält eine Ausnahme anstelle einer Antwort. Sie müssen die Fehlerbehandlung selbst vornehmen, falls dieses Verhalten nicht zu Ihrem Anwendungsfall passt.

Grundlegende Hallo Welt

Lassen Sie uns zunächst eine einfache "Hallo Welt!" MailboxProcessor der einen Nachrichtentyp verarbeitet und Begrüßungen MailboxProcessor .

Sie benötigen den Nachrichtentyp. Es kann alles sein, aber diskriminierte Vereinigungen sind eine natürliche Wahl, da sie alle möglichen Fälle an einem Ort auflisten und Sie beim Zuordnen von Mustern problemlos den Musterabgleich verwenden können.

// In this example there is only a single case, it could technically be just a string
type GreeterMessage = SayHelloTo of string

Definieren Sie nun den Prozessor selbst. Dies kann mit MailboxProcessor<'message>.Start Statische Methode MailboxProcessor<'message>.Start , die einen gestarteten Prozessor zurückgibt, der bereit ist, seinen Job auszuführen. Sie können auch den Konstruktor verwenden, müssen jedoch sicherstellen, dass der Prozessor später gestartet wird.

let processor = MailboxProcessor<GreeterMessage>.Start(fun inbox ->
    let rec innerLoop () = async {
        // This way you retrieve message from the mailbox queue
        // or await them in case the queue empty.
        // You can think of the `inbox` parameter as a reference to self.
        let! message = inbox.Receive()
        // Now you can process the retrieved message.
        match message with
        | SayHelloTo name ->
            printfn "Hi, %s! This is mailbox processor's inner loop!" name
        // After that's done, don't forget to recurse so you can process the next messages!
        innerLoop()
    }
    innerLoop ())

Der Parameter zu Start ist eine Funktion , die einen Verweis auf das nimmt MailboxProcessor selbst (die noch nicht existiert , wie Sie es nur schaffen, wird aber verfügbar sein , sobald die Funktion ausführt). Dadurch haben Sie Zugriff auf die verschiedenen Receive und Scan Methoden, um auf die Nachrichten aus dem Postfach zuzugreifen. Innerhalb dieser Funktion können Sie die Verarbeitung ausführen, die Sie benötigen. Normalerweise wird jedoch eine Endlosschleife verwendet, die die Nachrichten nacheinander liest und sich nach jeder einzelnen Nachricht aufruft.

Jetzt ist der Prozessor fertig, aber es geht nichts! Warum? Sie müssen eine Nachricht senden, um sie zu bearbeiten. Dies geschieht mit den Post Methodenvarianten - verwenden wir die einfachsten, feuervergessenden Methoden.

processor.Post(SayHelloTo "Alice")

Dadurch wird eine Nachricht in die interne Warteschlange des processor , das Postfach, eingefügt und sofort zurückgegeben, sodass der aufrufende Code fortgesetzt werden kann. Sobald der Prozessor die Nachricht abgerufen hat, wird er sie verarbeiten. Dies geschieht jedoch asynchron zum Posten der Nachricht und wird höchstwahrscheinlich in einem separaten Thread durchgeführt.

Sehr bald danach sollte die Meldung "Hi, Alice! This is mailbox processor's inner loop!" zur Ausgabe gedruckt und Sie sind bereit für kompliziertere Proben.

Mutable State Management

Postfachprozessoren können verwendet werden, um den veränderbaren Status auf transparente und Thread-sichere Weise zu verwalten. Lass uns einen einfachen Zähler bauen.

// Increment or decrement by one.
type CounterMessage =
    | Increment
    | Decrement

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // You can represent the processor's internal mutable state
        // as an immutable parameter to the inner loop function
        let rec innerLoop state = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            // In each call you use the current state to produce a new
            // value, which will be passed to the next call, so that
            // next message sees only the new value as its local state
            match message with
            | Increment ->
                let state' = state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop state'
            | Decrement ->
                let state' = state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop state'
        }
        // We pass the initialState to the first call to innerLoop
        innerLoop initialState)

// Let's pick an initial value and create the processor
let processor = createProcessor 10

Lassen Sie uns nun einige Operationen generieren

processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)

Das folgende Protokoll wird angezeigt

Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12

Parallelität

Da der Postfachprozessor die Nachrichten einzeln verarbeitet und es keine Verschachtelung gibt, können Sie die Nachrichten auch aus mehreren Threads erstellen und die typischen Probleme verlorener oder duplizierter Vorgänge nicht sehen. Es ist nicht möglich, dass eine Nachricht den alten Status anderer Nachrichten verwendet, es sei denn, Sie implementieren den Prozessor ausdrücklich.

let processor = createProcessor 0

[ async { processor.Post(Increment) }
  async { processor.Post(Increment) }
  async { processor.Post(Decrement) }
  async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously

Alle Nachrichten werden aus verschiedenen Threads gepostet. Die Reihenfolge, in der Nachrichten an das Postfach gesendet werden, ist nicht deterministisch. Daher ist die Reihenfolge der Verarbeitung nicht deterministisch. Da jedoch die Gesamtanzahl der Inkremente und Dekremente ausgeglichen ist, wird der Endstatus unabhängig von der Reihenfolge 0 angezeigt und von welchen Threads die Nachrichten gesendet wurden.

Wirklich veränderlicher Zustand

Im vorherigen Beispiel haben wir nur den veränderlichen Status durch Übergeben des rekursiven Schleifenparameters simuliert. Der Postfachprozessor verfügt jedoch über alle diese Eigenschaften, selbst für einen wirklich veränderbaren Status. Dies ist wichtig, wenn Sie einen großen Zustand beibehalten und die Unveränderlichkeit aus Leistungsgründen nicht praktikabel ist.

Wir können unseren Gegencode für die folgende Implementierung umschreiben

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // In this case we represent the state as a mutable binding
        // local to this function. innerLoop will close over it and
        // change its value in each iteration instead of passing it around
        let mutable state = initialState

        let rec innerLoop () = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            match message with
            | Increment ->
                let state <- state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop ()
            | Decrement ->
                let state <- state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop ()
        }
        innerLoop ())

Auch wenn dies definitiv nicht Thread-sicher wäre, wenn der Zählerstand direkt von mehreren Threads geändert wurde, können Sie anhand der parallelen Nachricht Posts aus dem vorherigen Abschnitt sehen, dass der Postfachprozessor die Nachrichten ohne Interleaving nacheinander verarbeitet aktuellster Wert.

Rückgabewerte

Sie können einen Wert für jede verarbeitete Nachricht asynchron zurückgeben, wenn Sie einen AsyncReplyChannel<'a> als Teil der Nachricht senden.

type MessageWithResponse = Message of InputData * AsyncReplyChannel<OutputData>

Dann kann der Mailbox-Prozessor diesen Kanal bei der Verarbeitung der Nachricht verwenden, um einen Wert an den Anrufer zurückzusenden.

let! message = inbox.Receive()
match message with
| MessageWithResponse (data, r) ->
    // ...process the data
    let output = ...
    r.Reply(output)

AsyncReplyChannel<'a> jetzt eine Nachricht zu erstellen, benötigen Sie den AsyncReplyChannel<'a> Was ist das und wie erstellen Sie eine Arbeitsinstanz? Der beste Weg ist, MailboxProcessor es für Sie zur Verfügung stellen zu lassen und die Antwort an ein häufigeres Async<'a> extrahieren. Dies kann beispielsweise mit der PostAndAsynReply Methode erfolgen, bei der Sie nicht die vollständige Nachricht veröffentlichen, sondern stattdessen eine Funktion vom Typ (in unserem Fall) AsyncReplyChannel<OutputData> -> MessageWithResponse :

let! output = processor.PostAndAsyncReply(r -> MessageWithResponse(input, r))

Dadurch wird die Nachricht in eine Warteschlange gestellt und auf die Antwort gewartet, die eintreffen wird, wenn der Prozessor diese Nachricht erhält und über den Kanal antwortet.

Es gibt auch eine synchrone Variante PostAndReply die den aufrufenden Thread blockiert, bis der Prozessor antwortet.

Out-of-Order-Nachrichtenverarbeitung

Sie können mit Scan oder TryScan Methoden für bestimmte Nachrichten in der Warteschlange suchen und verarbeiten sie , unabhängig davon , wie viele Nachrichten sind vor ihnen. Beide Methoden betrachten die Nachrichten in der Warteschlange in der Reihenfolge, in der sie eingetroffen sind, und suchen nach einer angegebenen Nachricht (bis zu einem optionalen Timeout). Falls keine solche Meldung TryScan wird, gibt TryScan None zurück, während Scan wartet, bis die Meldung eingeht oder der Vorgang TryScan .

Lassen Sie uns es in der Praxis sehen. Wir möchten, dass der Prozessor RegularOperations wenn dies möglich ist. Wenn es jedoch eine PriorityOperation , sollte diese so schnell wie möglich verarbeitet werden, unabhängig davon, wie viele andere RegularOperations in der Warteschlange sind.

type Message =
    | RegularOperation of string
    | PriorityOperation of string

let processor = MailboxProcessor<Message>.Start(fun inbox ->
    let rec innerLoop () = async {
        let! priorityData = inbox.TryScan(fun msg ->
            // If there is a PriorityOperation, retrieve its data.
            match msg with
            | PriorityOperation data -> Some data
            | _ -> None)

        match priorityData with
        | Some data ->
            // Process the data from PriorityOperation.
        | None ->
            // No PriorityOperation was in the queue at the time, so
            // let's fall back to processing all possible messages
            let! message = inbox.Receive()
            match message with
            | RegularOperation data ->
                // We have free time, let's process the RegularOperation.
            | PriorityOperation data ->
                // We did scan the queue, but it might have been empty
                // so it is possible that in the meantime a producer
                // posted a new message and it is a PriorityOperation.
        // And never forget to process next messages.
        innerLoop ()
    }
    innerLoop())


Modified text is an extract of the original Stack Overflow Documentation
Lizenziert unter CC BY-SA 3.0
Nicht angeschlossen an Stack Overflow