F#
Processore di cassette postali
Ricerca…
Osservazioni
MailboxProcessor
gestisce una coda di messaggi interna, in cui più produttori possono pubblicare messaggi utilizzando varie varianti del metodo Post
. Questi messaggi vengono quindi recuperati ed elaborati da un singolo utente (a meno che non lo si implementi diversamente) utilizzando le varianti Retrieve
e Scan
. Di default sia la produzione che il consumo dei messaggi sono thread-safe.
Di default non esiste una gestione degli errori fornita. Se viene generata un'eccezione non rilevata all'interno del corpo del processore, la funzione body termina, tutti i messaggi nella coda andranno persi, non è più possibile inviare messaggi e il canale di risposta (se disponibile) riceverà un'eccezione anziché una risposta. Devi fornire tutta la gestione degli errori da solo nel caso in cui questo comportamento non si adatti al tuo caso d'uso.
Basic Hello World
Iniziamo con la creazione di un semplice "Hello world!" MailboxProcessor
che elabora un tipo di messaggio e stampa i saluti.
Avrai bisogno del tipo di messaggio. Può essere qualsiasi cosa, ma le Unioni Discriminate sono una scelta naturale in quanto elencano tutti i possibili casi in un unico posto e puoi facilmente utilizzare la corrispondenza dei modelli durante l'elaborazione.
// In this example there is only a single case, it could technically be just a string
type GreeterMessage = SayHelloTo of string
Ora definisci il processore stesso. Questo può essere fatto con MailboxProcessor<'message>.Start
metodo statico che restituisce un processore avviato pronto a fare il suo lavoro. È anche possibile utilizzare il costruttore, ma in seguito è necessario assicurarsi di avviare il processore in un secondo momento.
let processor = MailboxProcessor<GreeterMessage>.Start(fun inbox ->
let rec innerLoop () = async {
// This way you retrieve message from the mailbox queue
// or await them in case the queue empty.
// You can think of the `inbox` parameter as a reference to self.
let! message = inbox.Receive()
// Now you can process the retrieved message.
match message with
| SayHelloTo name ->
printfn "Hi, %s! This is mailbox processor's inner loop!" name
// After that's done, don't forget to recurse so you can process the next messages!
innerLoop()
}
innerLoop ())
Il parametro Start
è una funzione che prende un riferimento al MailboxProcessor
stesso (che non esiste ancora come lo si sta creando, ma sarà disponibile una volta eseguita la funzione). Ciò consente di accedere ai vari metodi di Receive
e Scan
per accedere ai messaggi dalla casella di posta. All'interno di questa funzione, puoi eseguire qualsiasi elaborazione di cui hai bisogno, ma un approccio normale è un ciclo infinito che legge i messaggi uno ad uno e si richiama dopo ognuno di essi.
Ora il processore è pronto, ma non fa niente! Perché? È necessario inviare un messaggio per elaborare. Questo è fatto con le varianti del metodo Post
- usiamo il più elementare, fire-and-forget.
processor.Post(SayHelloTo "Alice")
Questo inserisce un messaggio nella coda interna del processor
, nella cassetta postale, e ritorna immediatamente in modo che il codice chiamante possa continuare. Una volta che il processore ha recuperato il messaggio, lo elaborerà, ma verrà eseguito in modo asincrono per pubblicarlo e molto probabilmente verrà fatto su un thread separato.
Molto presto dovresti vedere il messaggio "Hi, Alice! This is mailbox processor's inner loop!"
stampato sull'output e sei pronto per campioni più complicati.
Gestione statale mutabile
I processori delle cassette postali possono essere utilizzati per gestire lo stato mutabile in modo trasparente e sicuro per i thread. Costruiamo un semplice contatore.
// Increment or decrement by one.
type CounterMessage =
| Increment
| Decrement
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// You can represent the processor's internal mutable state
// as an immutable parameter to the inner loop function
let rec innerLoop state = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
// In each call you use the current state to produce a new
// value, which will be passed to the next call, so that
// next message sees only the new value as its local state
match message with
| Increment ->
let state' = state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop state'
| Decrement ->
let state' = state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop state'
}
// We pass the initialState to the first call to innerLoop
innerLoop initialState)
// Let's pick an initial value and create the processor
let processor = createProcessor 10
Ora generiamo alcune operazioni
processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)
E vedrai il seguente registro
Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Concorrenza
Poiché il processore della casella di posta elabora i messaggi uno per uno e non c'è interleaving, è possibile anche produrre i messaggi da più thread e non si vedranno i problemi tipici delle operazioni perse o duplicate. Non è possibile che un messaggio utilizzi lo stato precedente di altri messaggi, a meno che non si implementi in modo specifico il processore.
let processor = createProcessor 0
[ async { processor.Post(Increment) }
async { processor.Post(Increment) }
async { processor.Post(Decrement) }
async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously
Tutti i messaggi sono pubblicati da diversi thread. L'ordine in cui i messaggi vengono inviati alla casella di posta non è deterministico, quindi l'ordine di elaborazione non è deterministico, ma poiché il numero complessivo di incrementi e decrementi è bilanciato, lo stato finale è 0, indipendentemente dall'ordine e da quale thread sono stati inviati i messaggi.
Vero stato mutabile
Nell'esempio precedente abbiamo solo simulato lo stato mutabile passando il parametro del ciclo ricorsivo, ma il processore di cassette postali ha tutte queste proprietà anche per uno stato veramente mutabile. Questo è importante quando si mantiene uno stato ampio e l'immutabilità non è pratica per motivi di prestazioni.
Possiamo riscrivere il nostro contatore alla seguente implementazione
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// In this case we represent the state as a mutable binding
// local to this function. innerLoop will close over it and
// change its value in each iteration instead of passing it around
let mutable state = initialState
let rec innerLoop () = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
match message with
| Increment ->
let state <- state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop ()
| Decrement ->
let state <- state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop ()
}
innerLoop ())
Anche se questo sicuramente non sarebbe thread-safe se lo stato del contatore fosse stato modificato direttamente da più thread, è possibile vedere utilizzando i messaggi di messaggi paralleli della sezione precedente che il processore della casella di posta elabora i messaggi uno dopo l'altro senza interleaving, quindi ogni messaggio usa il valore più attuale.
Valori di ritorno
È possibile restituire in modo asincrono un valore per ogni messaggio elaborato se si invia un AsyncReplyChannel<'a>
come parte del messaggio.
type MessageWithResponse = Message of InputData * AsyncReplyChannel<OutputData>
Quindi il processore della casella di posta può utilizzare questo canale durante l'elaborazione del messaggio per inviare un valore al chiamante.
let! message = inbox.Receive()
match message with
| MessageWithResponse (data, r) ->
// ...process the data
let output = ...
r.Reply(output)
Ora per creare un messaggio, hai bisogno di AsyncReplyChannel<'a>
- che cos'è e come crei un'istanza di lavoro? Il modo migliore è lasciare che MailboxProcessor lo fornisca per te ed estrai la risposta a un più comune Async<'a>
. Questo può essere fatto usando, ad esempio, il metodo PostAndAsynReply
, dove non si pubblica il messaggio completo, ma invece una funzione di tipo (nel nostro caso) AsyncReplyChannel<OutputData> -> MessageWithResponse
:
let! output = processor.PostAndAsyncReply(r -> MessageWithResponse(input, r))
Questo invierà il messaggio in coda e attenderà la risposta, che arriverà quando il processore arriva a questo messaggio e risponde utilizzando il canale.
Esiste anche una variante sincrona PostAndReply
che blocca il thread chiamante finché il processore non risponde.
Elaborazione dei messaggi fuori ordine
È possibile utilizzare i metodi di Scan
o TryScan
per cercare messaggi specifici nella coda ed elaborarli indipendentemente dal numero di messaggi che li precedono. Entrambi i metodi esaminano i messaggi nella coda nell'ordine in cui sono arrivati e cercheranno un messaggio specificato (fino al timeout opzionale). Nel caso in cui non vi sia alcun messaggio di questo tipo, TryScan
restituirà None, mentre Scan
continuerà ad attendere fino a quando il messaggio non arriva o l'operazione TryScan
.
Vediamolo in pratica. Vogliamo che il processore elabori RegularOperations
quando può, ma ogni volta che c'è un PriorityOperation
, dovrebbe essere elaborato il prima possibile, indipendentemente da quante altre RegularOperations
sono in coda.
type Message =
| RegularOperation of string
| PriorityOperation of string
let processor = MailboxProcessor<Message>.Start(fun inbox ->
let rec innerLoop () = async {
let! priorityData = inbox.TryScan(fun msg ->
// If there is a PriorityOperation, retrieve its data.
match msg with
| PriorityOperation data -> Some data
| _ -> None)
match priorityData with
| Some data ->
// Process the data from PriorityOperation.
| None ->
// No PriorityOperation was in the queue at the time, so
// let's fall back to processing all possible messages
let! message = inbox.Receive()
match message with
| RegularOperation data ->
// We have free time, let's process the RegularOperation.
| PriorityOperation data ->
// We did scan the queue, but it might have been empty
// so it is possible that in the meantime a producer
// posted a new message and it is a PriorityOperation.
// And never forget to process next messages.
innerLoop ()
}
innerLoop())