Ricerca…


Osservazioni

MailboxProcessor gestisce una coda di messaggi interna, in cui più produttori possono pubblicare messaggi utilizzando varie varianti del metodo Post . Questi messaggi vengono quindi recuperati ed elaborati da un singolo utente (a meno che non lo si implementi diversamente) utilizzando le varianti Retrieve e Scan . Di default sia la produzione che il consumo dei messaggi sono thread-safe.

Di default non esiste una gestione degli errori fornita. Se viene generata un'eccezione non rilevata all'interno del corpo del processore, la funzione body termina, tutti i messaggi nella coda andranno persi, non è più possibile inviare messaggi e il canale di risposta (se disponibile) riceverà un'eccezione anziché una risposta. Devi fornire tutta la gestione degli errori da solo nel caso in cui questo comportamento non si adatti al tuo caso d'uso.

Basic Hello World

Iniziamo con la creazione di un semplice "Hello world!" MailboxProcessor che elabora un tipo di messaggio e stampa i saluti.

Avrai bisogno del tipo di messaggio. Può essere qualsiasi cosa, ma le Unioni Discriminate sono una scelta naturale in quanto elencano tutti i possibili casi in un unico posto e puoi facilmente utilizzare la corrispondenza dei modelli durante l'elaborazione.

// In this example there is only a single case, it could technically be just a string
type GreeterMessage = SayHelloTo of string

Ora definisci il processore stesso. Questo può essere fatto con MailboxProcessor<'message>.Start metodo statico che restituisce un processore avviato pronto a fare il suo lavoro. È anche possibile utilizzare il costruttore, ma in seguito è necessario assicurarsi di avviare il processore in un secondo momento.

let processor = MailboxProcessor<GreeterMessage>.Start(fun inbox ->
    let rec innerLoop () = async {
        // This way you retrieve message from the mailbox queue
        // or await them in case the queue empty.
        // You can think of the `inbox` parameter as a reference to self.
        let! message = inbox.Receive()
        // Now you can process the retrieved message.
        match message with
        | SayHelloTo name ->
            printfn "Hi, %s! This is mailbox processor's inner loop!" name
        // After that's done, don't forget to recurse so you can process the next messages!
        innerLoop()
    }
    innerLoop ())

Il parametro Start è una funzione che prende un riferimento al MailboxProcessor stesso (che non esiste ancora come lo si sta creando, ma sarà disponibile una volta eseguita la funzione). Ciò consente di accedere ai vari metodi di Receive e Scan per accedere ai messaggi dalla casella di posta. All'interno di questa funzione, puoi eseguire qualsiasi elaborazione di cui hai bisogno, ma un approccio normale è un ciclo infinito che legge i messaggi uno ad uno e si richiama dopo ognuno di essi.

Ora il processore è pronto, ma non fa niente! Perché? È necessario inviare un messaggio per elaborare. Questo è fatto con le varianti del metodo Post - usiamo il più elementare, fire-and-forget.

processor.Post(SayHelloTo "Alice")

Questo inserisce un messaggio nella coda interna del processor , nella cassetta postale, e ritorna immediatamente in modo che il codice chiamante possa continuare. Una volta che il processore ha recuperato il messaggio, lo elaborerà, ma verrà eseguito in modo asincrono per pubblicarlo e molto probabilmente verrà fatto su un thread separato.

Molto presto dovresti vedere il messaggio "Hi, Alice! This is mailbox processor's inner loop!" stampato sull'output e sei pronto per campioni più complicati.

Gestione statale mutabile

I processori delle cassette postali possono essere utilizzati per gestire lo stato mutabile in modo trasparente e sicuro per i thread. Costruiamo un semplice contatore.

// Increment or decrement by one.
type CounterMessage =
    | Increment
    | Decrement

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // You can represent the processor's internal mutable state
        // as an immutable parameter to the inner loop function
        let rec innerLoop state = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            // In each call you use the current state to produce a new
            // value, which will be passed to the next call, so that
            // next message sees only the new value as its local state
            match message with
            | Increment ->
                let state' = state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop state'
            | Decrement ->
                let state' = state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop state'
        }
        // We pass the initialState to the first call to innerLoop
        innerLoop initialState)

// Let's pick an initial value and create the processor
let processor = createProcessor 10

Ora generiamo alcune operazioni

processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)

E vedrai il seguente registro

Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12

Concorrenza

Poiché il processore della casella di posta elabora i messaggi uno per uno e non c'è interleaving, è possibile anche produrre i messaggi da più thread e non si vedranno i problemi tipici delle operazioni perse o duplicate. Non è possibile che un messaggio utilizzi lo stato precedente di altri messaggi, a meno che non si implementi in modo specifico il processore.

let processor = createProcessor 0

[ async { processor.Post(Increment) }
  async { processor.Post(Increment) }
  async { processor.Post(Decrement) }
  async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously

Tutti i messaggi sono pubblicati da diversi thread. L'ordine in cui i messaggi vengono inviati alla casella di posta non è deterministico, quindi l'ordine di elaborazione non è deterministico, ma poiché il numero complessivo di incrementi e decrementi è bilanciato, lo stato finale è 0, indipendentemente dall'ordine e da quale thread sono stati inviati i messaggi.

Vero stato mutabile

Nell'esempio precedente abbiamo solo simulato lo stato mutabile passando il parametro del ciclo ricorsivo, ma il processore di cassette postali ha tutte queste proprietà anche per uno stato veramente mutabile. Questo è importante quando si mantiene uno stato ampio e l'immutabilità non è pratica per motivi di prestazioni.

Possiamo riscrivere il nostro contatore alla seguente implementazione

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // In this case we represent the state as a mutable binding
        // local to this function. innerLoop will close over it and
        // change its value in each iteration instead of passing it around
        let mutable state = initialState

        let rec innerLoop () = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            match message with
            | Increment ->
                let state <- state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop ()
            | Decrement ->
                let state <- state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop ()
        }
        innerLoop ())

Anche se questo sicuramente non sarebbe thread-safe se lo stato del contatore fosse stato modificato direttamente da più thread, è possibile vedere utilizzando i messaggi di messaggi paralleli della sezione precedente che il processore della casella di posta elabora i messaggi uno dopo l'altro senza interleaving, quindi ogni messaggio usa il valore più attuale.

Valori di ritorno

È possibile restituire in modo asincrono un valore per ogni messaggio elaborato se si invia un AsyncReplyChannel<'a> come parte del messaggio.

type MessageWithResponse = Message of InputData * AsyncReplyChannel<OutputData>

Quindi il processore della casella di posta può utilizzare questo canale durante l'elaborazione del messaggio per inviare un valore al chiamante.

let! message = inbox.Receive()
match message with
| MessageWithResponse (data, r) ->
    // ...process the data
    let output = ...
    r.Reply(output)

Ora per creare un messaggio, hai bisogno di AsyncReplyChannel<'a> - che cos'è e come crei un'istanza di lavoro? Il modo migliore è lasciare che MailboxProcessor lo fornisca per te ed estrai la risposta a un più comune Async<'a> . Questo può essere fatto usando, ad esempio, il metodo PostAndAsynReply , dove non si pubblica il messaggio completo, ma invece una funzione di tipo (nel nostro caso) AsyncReplyChannel<OutputData> -> MessageWithResponse :

let! output = processor.PostAndAsyncReply(r -> MessageWithResponse(input, r))

Questo invierà il messaggio in coda e attenderà la risposta, che arriverà quando il processore arriva a questo messaggio e risponde utilizzando il canale.

Esiste anche una variante sincrona PostAndReply che blocca il thread chiamante finché il processore non risponde.

Elaborazione dei messaggi fuori ordine

È possibile utilizzare i metodi di Scan o TryScan per cercare messaggi specifici nella coda ed elaborarli indipendentemente dal numero di messaggi che li precedono. Entrambi i metodi esaminano i messaggi nella coda nell'ordine in cui sono arrivati ​​e cercheranno un messaggio specificato (fino al timeout opzionale). Nel caso in cui non vi sia alcun messaggio di questo tipo, TryScan restituirà None, mentre Scan continuerà ad attendere fino a quando il messaggio non arriva o l'operazione TryScan .

Vediamolo in pratica. Vogliamo che il processore elabori RegularOperations quando può, ma ogni volta che c'è un PriorityOperation , dovrebbe essere elaborato il prima possibile, indipendentemente da quante altre RegularOperations sono in coda.

type Message =
    | RegularOperation of string
    | PriorityOperation of string

let processor = MailboxProcessor<Message>.Start(fun inbox ->
    let rec innerLoop () = async {
        let! priorityData = inbox.TryScan(fun msg ->
            // If there is a PriorityOperation, retrieve its data.
            match msg with
            | PriorityOperation data -> Some data
            | _ -> None)

        match priorityData with
        | Some data ->
            // Process the data from PriorityOperation.
        | None ->
            // No PriorityOperation was in the queue at the time, so
            // let's fall back to processing all possible messages
            let! message = inbox.Receive()
            match message with
            | RegularOperation data ->
                // We have free time, let's process the RegularOperation.
            | PriorityOperation data ->
                // We did scan the queue, but it might have been empty
                // so it is possible that in the meantime a producer
                // posted a new message and it is a PriorityOperation.
        // And never forget to process next messages.
        innerLoop ()
    }
    innerLoop())


Modified text is an extract of the original Stack Overflow Documentation
Autorizzato sotto CC BY-SA 3.0
Non affiliato con Stack Overflow