F#
Procesador de buzones
Buscar..
Observaciones
MailboxProcessor
mantiene una cola de mensajes interna, donde varios productores pueden publicar mensajes utilizando diversas variantes del método Post
. Luego, un solo consumidor recupera y procesa estos mensajes (a menos que lo implemente de otra manera) utilizando las variantes de Retrieve
y Scan
. Por defecto, tanto producir como consumir los mensajes es seguro para subprocesos.
Por defecto, no hay un manejo de errores proporcionado. Si se lanza una excepción no detectada dentro del cuerpo del procesador, la función del cuerpo finalizará, todos los mensajes en la cola se perderán, no se podrán publicar más mensajes y el canal de respuesta (si está disponible) obtendrá una excepción en lugar de una respuesta. Debe proporcionar todo el manejo de errores en caso de que este comportamiento no se adapte a su caso de uso.
Hola mundo básico
Primero creamos un simple "¡Hola mundo!" MailboxProcessor
que procesa un tipo de mensaje e imprime saludos.
Necesitarás el tipo de mensaje. Puede ser cualquier cosa, pero las Uniones Discriminadas son una elección natural aquí, ya que enumeran todos los casos posibles en un solo lugar y puede usar fácilmente la comparación de patrones cuando los procesa.
// In this example there is only a single case, it could technically be just a string
type GreeterMessage = SayHelloTo of string
Ahora define el propio procesador. Esto se puede hacer con MailboxProcessor<'message>.Start
el método estático que devuelve un procesador iniciado listo para hacer su trabajo. También puede usar el constructor, pero luego debe asegurarse de iniciar el procesador más tarde.
let processor = MailboxProcessor<GreeterMessage>.Start(fun inbox ->
let rec innerLoop () = async {
// This way you retrieve message from the mailbox queue
// or await them in case the queue empty.
// You can think of the `inbox` parameter as a reference to self.
let! message = inbox.Receive()
// Now you can process the retrieved message.
match message with
| SayHelloTo name ->
printfn "Hi, %s! This is mailbox processor's inner loop!" name
// After that's done, don't forget to recurse so you can process the next messages!
innerLoop()
}
innerLoop ())
El parámetro para Start
es una función que toma una referencia al propio MailboxProcessor
(que aún no existe, solo lo está creando, pero estará disponible una vez que se ejecute la función). Eso le da acceso a sus diversos métodos de Receive
y Scan
para acceder a los mensajes del buzón. Dentro de esta función, puede hacer cualquier procesamiento que necesite, pero un enfoque habitual es un bucle infinito que lee los mensajes uno por uno y se llama a sí mismo después de cada uno.
Ahora el procesador está listo, ¡pero no sirve para nada! ¿Por qué? Necesitas enviar un mensaje para procesarlo. Esto se hace con las variantes del método de Post
: usemos la más básica, el de disparar y olvidar.
processor.Post(SayHelloTo "Alice")
Esto coloca un mensaje en la cola interna del processor
, el buzón, y se devuelve inmediatamente para que el código de llamada pueda continuar. Una vez que el procesador recupera el mensaje, lo procesará, pero eso se hará de forma asíncrona para publicarlo, y lo más probable es que se realice en un subproceso separado.
Muy pronto verá el mensaje "Hi, Alice! This is mailbox processor's inner loop!"
impreso en la salida y ya está listo para muestras más complicadas.
Gestión del estado mutable
Los procesadores de buzones se pueden utilizar para administrar el estado mutable de forma transparente y segura para subprocesos. Vamos a construir un contador simple.
// Increment or decrement by one.
type CounterMessage =
| Increment
| Decrement
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// You can represent the processor's internal mutable state
// as an immutable parameter to the inner loop function
let rec innerLoop state = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
// In each call you use the current state to produce a new
// value, which will be passed to the next call, so that
// next message sees only the new value as its local state
match message with
| Increment ->
let state' = state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop state'
| Decrement ->
let state' = state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop state'
}
// We pass the initialState to the first call to innerLoop
innerLoop initialState)
// Let's pick an initial value and create the processor
let processor = createProcessor 10
Ahora vamos a generar algunas operaciones.
processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)
Y verá el siguiente registro
Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Concurrencia
Dado que el procesador de buzones procesa los mensajes uno por uno y no hay intercalado, también puede producir los mensajes de varios subprocesos y no verá los problemas típicos de las operaciones perdidas o duplicadas. No hay forma de que un mensaje use el estado anterior de otros mensajes, a menos que implemente específicamente el procesador.
let processor = createProcessor 0
[ async { processor.Post(Increment) }
async { processor.Post(Increment) }
async { processor.Post(Decrement) }
async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously
Todos los mensajes son publicados desde diferentes hilos. El orden en el que se publican los mensajes en el buzón no es determinista, por lo que el orden de procesarlos no es determinista, pero como el número total de incrementos y decrementos está equilibrado, verá que el estado final es 0, sin importar el orden y desde qué hilos se enviaron los mensajes.
Verdadero estado mutable
En el ejemplo anterior, solo hemos simulado el estado mutable pasando el parámetro de bucle recursivo, pero el procesador del buzón tiene todas estas propiedades incluso para un estado verdaderamente mutable. Esto es importante cuando se mantiene un estado grande y la inmutabilidad no es práctica por razones de rendimiento.
Podemos reescribir nuestro contador a la siguiente implementación.
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// In this case we represent the state as a mutable binding
// local to this function. innerLoop will close over it and
// change its value in each iteration instead of passing it around
let mutable state = initialState
let rec innerLoop () = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
match message with
| Increment ->
let state <- state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop ()
| Decrement ->
let state <- state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop ()
}
innerLoop ())
Aunque esto definitivamente no sería seguro para subprocesos si el estado del contador se modificara directamente desde varios subprocesos, puede ver mediante el uso de mensajes paralelos en la sección anterior que el procesador del buzón procesa los mensajes uno tras otro sin intercalar, por lo que cada mensaje usa la Valor más actual.
Valores de retorno
Puede devolver de forma asíncrona un valor para cada mensaje procesado si envía un AsyncReplyChannel<'a>
como parte del mensaje.
type MessageWithResponse = Message of InputData * AsyncReplyChannel<OutputData>
Luego, el procesador del buzón puede usar este canal cuando procesa el mensaje para enviar un valor al llamante.
let! message = inbox.Receive()
match message with
| MessageWithResponse (data, r) ->
// ...process the data
let output = ...
r.Reply(output)
Ahora para crear un mensaje, necesita el AsyncReplyChannel<'a>
. ¿Qué es y cómo crea una instancia de trabajo? La mejor manera es dejar que MailboxProcessor se lo proporcione y extraer la respuesta a un Async<'a>
más común Async<'a>
. Esto se puede hacer usando, por ejemplo, el método PostAndAsynReply
, donde no se publica el mensaje completo, sino una función de tipo (en nuestro caso) AsyncReplyChannel<OutputData> -> MessageWithResponse
:
let! output = processor.PostAndAsyncReply(r -> MessageWithResponse(input, r))
Esto publicará el mensaje en una cola y esperará la respuesta, que llegará una vez que el procesador llegue a este mensaje y responda utilizando el canal.
También hay una variante síncrona PostAndReply
que bloquea el subproceso de llamada hasta que el procesador responde.
Procesamiento de mensajes fuera de orden
Puede usar los métodos Scan
o TryScan
para buscar mensajes específicos en la cola y procesarlos independientemente de cuántos mensajes haya antes de ellos. Ambos métodos miran los mensajes en la cola en el orden en que llegaron y buscarán un mensaje específico (hasta el tiempo de espera opcional). En caso de que no exista tal mensaje, TryScan
devolverá Ninguno, mientras que la Scan
seguirá esperando hasta que llegue ese mensaje o la operación se agote.
Vamos a verlo en la práctica. Queremos que el procesador procese las RegularOperations
cuando sea posible, pero siempre que haya una PriorityOperation
, debe procesarse lo antes posible, sin importar cuántas RegularOperations
en la cola.
type Message =
| RegularOperation of string
| PriorityOperation of string
let processor = MailboxProcessor<Message>.Start(fun inbox ->
let rec innerLoop () = async {
let! priorityData = inbox.TryScan(fun msg ->
// If there is a PriorityOperation, retrieve its data.
match msg with
| PriorityOperation data -> Some data
| _ -> None)
match priorityData with
| Some data ->
// Process the data from PriorityOperation.
| None ->
// No PriorityOperation was in the queue at the time, so
// let's fall back to processing all possible messages
let! message = inbox.Receive()
match message with
| RegularOperation data ->
// We have free time, let's process the RegularOperation.
| PriorityOperation data ->
// We did scan the queue, but it might have been empty
// so it is possible that in the meantime a producer
// posted a new message and it is a PriorityOperation.
// And never forget to process next messages.
innerLoop ()
}
innerLoop())