F#
Postfachprozessor
Suche…
Bemerkungen
MailboxProcessor
verwaltet eine interne Nachrichtenwarteschlange, in der mehrere Produzenten Nachrichten mit verschiedenen Post
Methodenvarianten MailboxProcessor
können. Diese Nachrichten werden dann von einem einzelnen Consumer abgerufen und verarbeitet (sofern Sie ihn nicht anderweitig implementieren), und zwar mithilfe der Varianten " Retrieve
und " Scan
. Standardmäßig ist das Erstellen und Verwenden der Nachrichten Thread-sicher.
Standardmäßig ist keine Fehlerbehandlung vorgesehen. Wenn eine nicht erfasste Ausnahme im Rumpf des Prozessors ausgelöst wird, wird die Rumpffunktion beendet, alle Nachrichten in der Warteschlange gehen verloren, es können keine weiteren Nachrichten mehr gesendet werden, und der Antwortkanal (falls verfügbar) erhält eine Ausnahme anstelle einer Antwort. Sie müssen die Fehlerbehandlung selbst vornehmen, falls dieses Verhalten nicht zu Ihrem Anwendungsfall passt.
Grundlegende Hallo Welt
Lassen Sie uns zunächst eine einfache "Hallo Welt!" MailboxProcessor
der einen Nachrichtentyp verarbeitet und Begrüßungen MailboxProcessor
.
Sie benötigen den Nachrichtentyp. Es kann alles sein, aber diskriminierte Vereinigungen sind eine natürliche Wahl, da sie alle möglichen Fälle an einem Ort auflisten und Sie beim Zuordnen von Mustern problemlos den Musterabgleich verwenden können.
// In this example there is only a single case, it could technically be just a string
type GreeterMessage = SayHelloTo of string
Definieren Sie nun den Prozessor selbst. Dies kann mit MailboxProcessor<'message>.Start
Statische Methode MailboxProcessor<'message>.Start
, die einen gestarteten Prozessor zurückgibt, der bereit ist, seinen Job auszuführen. Sie können auch den Konstruktor verwenden, müssen jedoch sicherstellen, dass der Prozessor später gestartet wird.
let processor = MailboxProcessor<GreeterMessage>.Start(fun inbox ->
let rec innerLoop () = async {
// This way you retrieve message from the mailbox queue
// or await them in case the queue empty.
// You can think of the `inbox` parameter as a reference to self.
let! message = inbox.Receive()
// Now you can process the retrieved message.
match message with
| SayHelloTo name ->
printfn "Hi, %s! This is mailbox processor's inner loop!" name
// After that's done, don't forget to recurse so you can process the next messages!
innerLoop()
}
innerLoop ())
Der Parameter zu Start
ist eine Funktion , die einen Verweis auf das nimmt MailboxProcessor
selbst (die noch nicht existiert , wie Sie es nur schaffen, wird aber verfügbar sein , sobald die Funktion ausführt). Dadurch haben Sie Zugriff auf die verschiedenen Receive
und Scan
Methoden, um auf die Nachrichten aus dem Postfach zuzugreifen. Innerhalb dieser Funktion können Sie die Verarbeitung ausführen, die Sie benötigen. Normalerweise wird jedoch eine Endlosschleife verwendet, die die Nachrichten nacheinander liest und sich nach jeder einzelnen Nachricht aufruft.
Jetzt ist der Prozessor fertig, aber es geht nichts! Warum? Sie müssen eine Nachricht senden, um sie zu bearbeiten. Dies geschieht mit den Post
Methodenvarianten - verwenden wir die einfachsten, feuervergessenden Methoden.
processor.Post(SayHelloTo "Alice")
Dadurch wird eine Nachricht in die interne Warteschlange des processor
, das Postfach, eingefügt und sofort zurückgegeben, sodass der aufrufende Code fortgesetzt werden kann. Sobald der Prozessor die Nachricht abgerufen hat, wird er sie verarbeiten. Dies geschieht jedoch asynchron zum Posten der Nachricht und wird höchstwahrscheinlich in einem separaten Thread durchgeführt.
Sehr bald danach sollte die Meldung "Hi, Alice! This is mailbox processor's inner loop!"
zur Ausgabe gedruckt und Sie sind bereit für kompliziertere Proben.
Mutable State Management
Postfachprozessoren können verwendet werden, um den veränderbaren Status auf transparente und Thread-sichere Weise zu verwalten. Lass uns einen einfachen Zähler bauen.
// Increment or decrement by one.
type CounterMessage =
| Increment
| Decrement
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// You can represent the processor's internal mutable state
// as an immutable parameter to the inner loop function
let rec innerLoop state = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
// In each call you use the current state to produce a new
// value, which will be passed to the next call, so that
// next message sees only the new value as its local state
match message with
| Increment ->
let state' = state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop state'
| Decrement ->
let state' = state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop state'
}
// We pass the initialState to the first call to innerLoop
innerLoop initialState)
// Let's pick an initial value and create the processor
let processor = createProcessor 10
Lassen Sie uns nun einige Operationen generieren
processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)
Das folgende Protokoll wird angezeigt
Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Parallelität
Da der Postfachprozessor die Nachrichten einzeln verarbeitet und es keine Verschachtelung gibt, können Sie die Nachrichten auch aus mehreren Threads erstellen und die typischen Probleme verlorener oder duplizierter Vorgänge nicht sehen. Es ist nicht möglich, dass eine Nachricht den alten Status anderer Nachrichten verwendet, es sei denn, Sie implementieren den Prozessor ausdrücklich.
let processor = createProcessor 0
[ async { processor.Post(Increment) }
async { processor.Post(Increment) }
async { processor.Post(Decrement) }
async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously
Alle Nachrichten werden aus verschiedenen Threads gepostet. Die Reihenfolge, in der Nachrichten an das Postfach gesendet werden, ist nicht deterministisch. Daher ist die Reihenfolge der Verarbeitung nicht deterministisch. Da jedoch die Gesamtanzahl der Inkremente und Dekremente ausgeglichen ist, wird der Endstatus unabhängig von der Reihenfolge 0 angezeigt und von welchen Threads die Nachrichten gesendet wurden.
Wirklich veränderlicher Zustand
Im vorherigen Beispiel haben wir nur den veränderlichen Status durch Übergeben des rekursiven Schleifenparameters simuliert. Der Postfachprozessor verfügt jedoch über alle diese Eigenschaften, selbst für einen wirklich veränderbaren Status. Dies ist wichtig, wenn Sie einen großen Zustand beibehalten und die Unveränderlichkeit aus Leistungsgründen nicht praktikabel ist.
Wir können unseren Gegencode für die folgende Implementierung umschreiben
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// In this case we represent the state as a mutable binding
// local to this function. innerLoop will close over it and
// change its value in each iteration instead of passing it around
let mutable state = initialState
let rec innerLoop () = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
match message with
| Increment ->
let state <- state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop ()
| Decrement ->
let state <- state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop ()
}
innerLoop ())
Auch wenn dies definitiv nicht Thread-sicher wäre, wenn der Zählerstand direkt von mehreren Threads geändert wurde, können Sie anhand der parallelen Nachricht Posts aus dem vorherigen Abschnitt sehen, dass der Postfachprozessor die Nachrichten ohne Interleaving nacheinander verarbeitet aktuellster Wert.
Rückgabewerte
Sie können einen Wert für jede verarbeitete Nachricht asynchron zurückgeben, wenn Sie einen AsyncReplyChannel<'a>
als Teil der Nachricht senden.
type MessageWithResponse = Message of InputData * AsyncReplyChannel<OutputData>
Dann kann der Mailbox-Prozessor diesen Kanal bei der Verarbeitung der Nachricht verwenden, um einen Wert an den Anrufer zurückzusenden.
let! message = inbox.Receive()
match message with
| MessageWithResponse (data, r) ->
// ...process the data
let output = ...
r.Reply(output)
AsyncReplyChannel<'a>
jetzt eine Nachricht zu erstellen, benötigen Sie den AsyncReplyChannel<'a>
Was ist das und wie erstellen Sie eine Arbeitsinstanz? Der beste Weg ist, MailboxProcessor es für Sie zur Verfügung stellen zu lassen und die Antwort an ein häufigeres Async<'a>
extrahieren. Dies kann beispielsweise mit der PostAndAsynReply
Methode erfolgen, bei der Sie nicht die vollständige Nachricht veröffentlichen, sondern stattdessen eine Funktion vom Typ (in unserem Fall) AsyncReplyChannel<OutputData> -> MessageWithResponse
:
let! output = processor.PostAndAsyncReply(r -> MessageWithResponse(input, r))
Dadurch wird die Nachricht in eine Warteschlange gestellt und auf die Antwort gewartet, die eintreffen wird, wenn der Prozessor diese Nachricht erhält und über den Kanal antwortet.
Es gibt auch eine synchrone Variante PostAndReply
die den aufrufenden Thread blockiert, bis der Prozessor antwortet.
Out-of-Order-Nachrichtenverarbeitung
Sie können mit Scan
oder TryScan
Methoden für bestimmte Nachrichten in der Warteschlange suchen und verarbeiten sie , unabhängig davon , wie viele Nachrichten sind vor ihnen. Beide Methoden betrachten die Nachrichten in der Warteschlange in der Reihenfolge, in der sie eingetroffen sind, und suchen nach einer angegebenen Nachricht (bis zu einem optionalen Timeout). Falls keine solche Meldung TryScan
wird, gibt TryScan
None zurück, während Scan
wartet, bis die Meldung eingeht oder der Vorgang TryScan
.
Lassen Sie uns es in der Praxis sehen. Wir möchten, dass der Prozessor RegularOperations
wenn dies möglich ist. Wenn es jedoch eine PriorityOperation
, sollte diese so schnell wie möglich verarbeitet werden, unabhängig davon, wie viele andere RegularOperations
in der Warteschlange sind.
type Message =
| RegularOperation of string
| PriorityOperation of string
let processor = MailboxProcessor<Message>.Start(fun inbox ->
let rec innerLoop () = async {
let! priorityData = inbox.TryScan(fun msg ->
// If there is a PriorityOperation, retrieve its data.
match msg with
| PriorityOperation data -> Some data
| _ -> None)
match priorityData with
| Some data ->
// Process the data from PriorityOperation.
| None ->
// No PriorityOperation was in the queue at the time, so
// let's fall back to processing all possible messages
let! message = inbox.Receive()
match message with
| RegularOperation data ->
// We have free time, let's process the RegularOperation.
| PriorityOperation data ->
// We did scan the queue, but it might have been empty
// so it is possible that in the meantime a producer
// posted a new message and it is a PriorityOperation.
// And never forget to process next messages.
innerLoop ()
}
innerLoop())