F#
Mailbox Processor
Sök…
Anmärkningar
MailboxProcessor
upprätthåller en intern meddelandekö, där flera tillverkare kan posta meddelanden med olika Post
metodvarianter. Dessa meddelanden hämtas sedan och behandlas av en enda konsument (om du inte implementerar det på annat sätt) med hjälp av Retrieve
och Scan
varianter. Som standard är både att producera och konsumera meddelanden trådsäker.
Som standard finns det inget felhantering. Om ett okänt undantag kastas in i processorns kropp slutar kroppsfunktionen, alla meddelanden i kön går förlorade, inga fler meddelanden kan läggas ut och svarskanalen (om tillgänglig) får ett undantag istället för ett svar. Du måste tillhandahålla all felhantering själv om detta beteende inte passar ditt användningsfall.
Grundläggande Hello World
Låt oss först skapa en enkel "Hej värld!" MailboxProcessor
som bearbetar en typ av meddelanden och skriver ut hälsningar.
Du behöver meddelandetypen. Det kan vara vad som helst, men diskriminerade fackföreningar är ett naturligt val här eftersom de listar alla möjliga fall på ett ställe och du kan enkelt använda mönstermatchning när du bearbetar dem.
// In this example there is only a single case, it could technically be just a string
type GreeterMessage = SayHelloTo of string
Definiera nu själva processorn. Detta kan göras med MailboxProcessor<'message>.Start
statisk metod som returnerar en startad processor som är redo att göra sitt jobb. Du kan också använda konstruktören, men då måste du se till att starta processorn senare.
let processor = MailboxProcessor<GreeterMessage>.Start(fun inbox ->
let rec innerLoop () = async {
// This way you retrieve message from the mailbox queue
// or await them in case the queue empty.
// You can think of the `inbox` parameter as a reference to self.
let! message = inbox.Receive()
// Now you can process the retrieved message.
match message with
| SayHelloTo name ->
printfn "Hi, %s! This is mailbox processor's inner loop!" name
// After that's done, don't forget to recurse so you can process the next messages!
innerLoop()
}
innerLoop ())
Parametern till Start
är en funktion som tar en referens till själva MailboxProcessor
(som inte finns ännu eftersom du bara skapar den, men kommer att finnas tillgänglig när funktionen körs). Det ger dig tillgång till de olika Receive
och Scan
att få åtkomst till meddelanden från brevlådan. Inuti denna funktion kan du göra den behandling du behöver, men en vanlig metod är en oändlig slinga som läser meddelandena en efter en och kallar sig själv efter var och en.
Nu är processorn klar, men det gör ingenting! Varför? Du måste skicka ett meddelande för att behandla det. Detta görs med Post
metodvarianterna - låt oss använda den mest grundläggande, eld-och-glömma.
processor.Post(SayHelloTo "Alice")
Detta ger ett meddelande till processor
interna kö, postlådan och återgår omedelbart så att samtalskoden kan fortsätta. När processorn har hämtat meddelandet kommer den att behandla det, men det kommer att göras asynkront för att lägga ut det, och det kommer troligtvis att göras på en separat tråd.
Mycket snart efteråt bör du se meddelandet "Hi, Alice! This is mailbox processor's inner loop!"
tryckt på utgången och du är redo för mer komplicerade prover.
Mutable State Management
Mailbox-processorer kan användas för att hantera muterbara tillstånd på ett öppet och tråd-säkert sätt. Låt oss bygga en enkel räknare.
// Increment or decrement by one.
type CounterMessage =
| Increment
| Decrement
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// You can represent the processor's internal mutable state
// as an immutable parameter to the inner loop function
let rec innerLoop state = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
// In each call you use the current state to produce a new
// value, which will be passed to the next call, so that
// next message sees only the new value as its local state
match message with
| Increment ->
let state' = state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop state'
| Decrement ->
let state' = state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop state'
}
// We pass the initialState to the first call to innerLoop
innerLoop initialState)
// Let's pick an initial value and create the processor
let processor = createProcessor 10
Låt oss nu generera några operationer
processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)
Och du kommer att se följande logg
Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
samtidighet
Eftersom brevlådeprocessorn bearbetar meddelandena en efter en och det inte finns någon sammanflätning kan du också producera meddelandena från flera trådar och du kommer inte att se de typiska problemen med förlorade eller duplicerade operationer. Det finns inget sätt för ett meddelande att använda det gamla läget för andra meddelanden, såvida du inte specifikt implementerar processorn så.
let processor = createProcessor 0
[ async { processor.Post(Increment) }
async { processor.Post(Increment) }
async { processor.Post(Decrement) }
async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously
Alla meddelanden publiceras från olika trådar. Ordningen i vilken meddelanden skickas till brevlådan är inte deterministisk, så ordningen för att bearbeta dem är inte deterministisk, men eftersom det totala antalet steg och minskningar är balanserat ser du att slutläget är 0, oavsett i vilken ordning och från vilka trådar meddelandena skickades.
Äkta muterbara tillstånd
I det föregående exemplet har vi bara simulerat det muterbara tillståndet genom att passera den rekursiva slingparametern, men postlådeprocessorn har alla dessa egenskaper även för ett verkligt muterbart tillstånd. Detta är viktigt när du upprätthåller stort tillstånd och oföränderlighet är opraktiskt av prestandaskäl.
Vi kan skriva om vår räknare till följande implementering
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// In this case we represent the state as a mutable binding
// local to this function. innerLoop will close over it and
// change its value in each iteration instead of passing it around
let mutable state = initialState
let rec innerLoop () = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
match message with
| Increment ->
let state <- state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop ()
| Decrement ->
let state <- state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop ()
}
innerLoop ())
Även om detta definitivt inte skulle vara trådsäkert om räknarläget ändrades direkt från flera trådar, kan du se genom att använda det parallella meddelandet Inlägg från föregående avsnitt att brevlådeprocessorn behandlar meddelanden efter varandra utan sammanflätning, så varje meddelande använder mest aktuella värde.
Returnera värden
Du kan asynkront returnera ett värde för varje behandlat meddelande om du skickar en AsyncReplyChannel<'a>
som en del av meddelandet.
type MessageWithResponse = Message of InputData * AsyncReplyChannel<OutputData>
Sedan kan brevlådeprocessorn använda den här kanalen när han behandlar meddelandet för att skicka ett värde tillbaka till den som ringer.
let! message = inbox.Receive()
match message with
| MessageWithResponse (data, r) ->
// ...process the data
let output = ...
r.Reply(output)
För att skapa ett meddelande behöver du AsyncReplyChannel<'a>
- vad är och hur skapar du en fungerande instans? Det bästa sättet är att låta MailboxProcessor tillhandahålla det åt dig och extrahera svaret på en vanligare Async<'a>
. Detta kan göras genom att till exempel PostAndAsynReply
metoden PostAndAsynReply
, där du inte lägger ut hela meddelandet utan istället en funktion av typen (i vårt fall) AsyncReplyChannel<OutputData> -> MessageWithResponse
:
let! output = processor.PostAndAsyncReply(r -> MessageWithResponse(input, r))
Detta kommer att posta meddelandet i en kö och vänta på svaret, vilket kommer att komma när processorn kommer till detta meddelande och svarar med kanalen.
Det finns också en synkron variant PostAndReply
som blockerar den ringande tråden tills processorn svarar.
Bearbetning av meddelanden utanför beställningen
Du kan använda Scan
eller TryScan
metoder för att leta efter specifika meddelanden i kön och bearbeta dem oavsett hur många meddelanden är före dem. Båda metoderna tittar på meddelandena i kön i den ordning de kom och letar efter ett specificerat meddelande (fram till valfri timeout). Om det inte finns något sådant meddelande kommer TryScan
att returnera Ingen, medan Scan
fortsätter att vänta tills ett sådant meddelande kommer in eller åtgärden går ut.
Låt oss se det i praktiken. Vi vill att processorn ska behandla RegularOperations
när den kan, men när det finns en PriorityOperation
, ska den behandlas så snart som möjligt, oavsett hur många andra RegularOperations
finns i kön.
type Message =
| RegularOperation of string
| PriorityOperation of string
let processor = MailboxProcessor<Message>.Start(fun inbox ->
let rec innerLoop () = async {
let! priorityData = inbox.TryScan(fun msg ->
// If there is a PriorityOperation, retrieve its data.
match msg with
| PriorityOperation data -> Some data
| _ -> None)
match priorityData with
| Some data ->
// Process the data from PriorityOperation.
| None ->
// No PriorityOperation was in the queue at the time, so
// let's fall back to processing all possible messages
let! message = inbox.Receive()
match message with
| RegularOperation data ->
// We have free time, let's process the RegularOperation.
| PriorityOperation data ->
// We did scan the queue, but it might have been empty
// so it is possible that in the meantime a producer
// posted a new message and it is a PriorityOperation.
// And never forget to process next messages.
innerLoop ()
}
innerLoop())