F#
Processeur de boîtes aux lettres
Recherche…
Remarques
MailboxProcessor
gère une file d'attente de messages interne, dans laquelle plusieurs producteurs peuvent publier des messages à l'aide de différentes variantes de la méthode Post
. Ces messages sont ensuite récupérés et traités par un seul consommateur (sauf si vous les implémentez autrement) à l'aide des variantes Retrieve
et Scan
. Par défaut, la production et la consommation des messages sont sécurisées pour les threads.
Par défaut, aucune gestion des erreurs n'est fournie. Si une exception non capturée est lancée dans le corps du processeur, la fonction body se terminera, tous les messages de la file d'attente seront perdus, aucun message ne pourra plus être envoyé et le canal de réponse (si disponible) recevra une exception au lieu d'une réponse. Vous devez fournir toutes les erreurs vous-même si ce comportement ne vous convient pas.
Basic Hello World
Créons d'abord un simple "Bonjour tout le monde!" MailboxProcessor
qui traite un type de message et imprime les messages d'accueil.
Vous aurez besoin du type de message Cela peut être n'importe quoi, mais les Unions Discriminées sont un choix naturel ici car elles listent tous les cas possibles sur un même endroit et vous pouvez facilement utiliser le filtrage lors du traitement.
// In this example there is only a single case, it could technically be just a string
type GreeterMessage = SayHelloTo of string
Définissez maintenant le processeur lui-même. Cela peut être fait avec la méthode statique MailboxProcessor<'message>.Start
qui renvoie un processeur démarré prêt à faire son travail. Vous pouvez également utiliser le constructeur, mais vous devez ensuite vous assurer de démarrer le processeur ultérieurement.
let processor = MailboxProcessor<GreeterMessage>.Start(fun inbox ->
let rec innerLoop () = async {
// This way you retrieve message from the mailbox queue
// or await them in case the queue empty.
// You can think of the `inbox` parameter as a reference to self.
let! message = inbox.Receive()
// Now you can process the retrieved message.
match message with
| SayHelloTo name ->
printfn "Hi, %s! This is mailbox processor's inner loop!" name
// After that's done, don't forget to recurse so you can process the next messages!
innerLoop()
}
innerLoop ())
Le paramètre à Start
est une fonction qui fait référence au MailboxProcessor
lui-même (qui n'existe pas encore car vous le créez, mais sera disponible une fois la fonction exécutée). Cela vous donne accès à ses différentes méthodes de Receive
et d' Scan
pour accéder aux messages de la boîte aux lettres. A l'intérieur de cette fonction, vous pouvez faire tout le traitement dont vous avez besoin, mais une approche habituelle est une boucle infinie qui lit les messages un par un et s'appelle après chacun.
Maintenant, le processeur est prêt, mais rien à faire! Pourquoi? Vous devez lui envoyer un message à traiter. Ceci est fait avec les variantes de la méthode Post
- utilisons le plus simple, celui du feu et de l’oubli.
processor.Post(SayHelloTo "Alice")
Cela place un message dans la file d'attente interne du processor
, la boîte aux lettres, et retourne immédiatement afin que le code appelant puisse continuer. Une fois que le processeur aura récupéré le message, il le traitera, mais cela sera fait de manière asynchrone lors de son envoi, et cela se fera probablement sur un thread séparé.
Peu après, vous devriez voir le message "Hi, Alice! This is mailbox processor's inner loop!"
imprimé à la sortie et vous êtes prêt pour des échantillons plus compliqués.
Mutable State Management
Les processeurs de boîtes aux lettres peuvent être utilisés pour gérer l’état mutable de manière transparente et sécurisée. Construisons un compteur simple.
// Increment or decrement by one.
type CounterMessage =
| Increment
| Decrement
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// You can represent the processor's internal mutable state
// as an immutable parameter to the inner loop function
let rec innerLoop state = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
// In each call you use the current state to produce a new
// value, which will be passed to the next call, so that
// next message sees only the new value as its local state
match message with
| Increment ->
let state' = state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop state'
| Decrement ->
let state' = state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop state'
}
// We pass the initialState to the first call to innerLoop
innerLoop initialState)
// Let's pick an initial value and create the processor
let processor = createProcessor 10
Maintenant, générons des opérations
processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)
Et vous verrez le journal suivant
Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Concurrence
Comme le processeur de boîte aux lettres traite les messages un par un et qu'il n'y a pas d'entrelacement, vous pouvez également produire les messages à partir de plusieurs threads et vous ne verrez pas les problèmes typiques des opérations perdues ou dupliquées. Un message ne permet en aucun cas d'utiliser l'ancien état des autres messages, à moins que vous ne l'implémentiez spécifiquement.
let processor = createProcessor 0
[ async { processor.Post(Increment) }
async { processor.Post(Increment) }
async { processor.Post(Decrement) }
async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously
Tous les messages sont publiés à partir de différents threads. L'ordre dans lequel les messages sont publiés dans la boîte aux lettres n'est pas déterministe. Par conséquent, l'ordre de traitement n'est pas déterministe, mais comme le nombre total d'incréments et de décréments est équilibré, l'état final sera 0, peu importe dans quel ordre et à partir de quels threads les messages ont été envoyés.
Véritable état mutable
Dans l'exemple précédent, nous n'avons simulé qu'un état mutable en transmettant le paramètre de boucle récursif, mais le processeur de boîte aux lettres possède toutes ces propriétés même pour un état véritablement mutable. Ceci est important lorsque vous maintenez un état élevé et que l'immutabilité est impraticable pour des raisons de performance.
Nous pouvons réécrire notre compteur à l'implémentation suivante
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// In this case we represent the state as a mutable binding
// local to this function. innerLoop will close over it and
// change its value in each iteration instead of passing it around
let mutable state = initialState
let rec innerLoop () = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
match message with
| Increment ->
let state <- state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop ()
| Decrement ->
let state <- state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop ()
}
innerLoop ())
Même si cela ne serait certainement pas thread-safe si l'état du compteur était modifié directement à partir de plusieurs threads, vous pouvez voir en utilisant le message parallèle Posts from previous section que le processeur de boîte aux lettres traite les messages les uns après les autres sans entrelacement. valeur la plus courante.
Valeurs de retour
Vous pouvez retourner une valeur asynchrone pour chaque message traité si vous envoyez un AsyncReplyChannel<'a>
dans le message.
type MessageWithResponse = Message of InputData * AsyncReplyChannel<OutputData>
Ensuite, le processeur de boîte aux lettres peut utiliser ce canal lors du traitement du message pour renvoyer une valeur à l'appelant.
let! message = inbox.Receive()
match message with
| MessageWithResponse (data, r) ->
// ...process the data
let output = ...
r.Reply(output)
Maintenant, pour créer un message, vous avez besoin d' AsyncReplyChannel<'a>
- qu'est-ce que c'est et comment créez-vous une instance de travail? Le meilleur moyen est de laisser MailboxProcessor le fournir pour vous et d'extraire la réponse à un Async<'a>
plus commun Async<'a>
. Cela peut être fait en utilisant par exemple la méthode PostAndAsynReply
, où vous ne publiez pas le message complet, mais plutôt une fonction de type (dans notre cas) AsyncReplyChannel<OutputData> -> MessageWithResponse
:
let! output = processor.PostAndAsyncReply(r -> MessageWithResponse(input, r))
Cela affichera le message dans une file d'attente et attendra la réponse, qui arrivera une fois que le processeur aura atteint ce message et aura répondu en utilisant le canal.
Il existe également une variante synchrone PostAndReply
qui bloque le thread appelant jusqu'à ce que le processeur réponde.
Traitement des messages hors service
Vous pouvez utiliser les méthodes Scan
ou TryScan
pour rechercher des messages spécifiques dans la file d'attente et les traiter, quel que soit le nombre de messages devant eux. Les deux méthodes examinent les messages dans la file d'attente dans l'ordre où elles sont arrivées et recherchent un message spécifié (jusqu'à la temporisation facultative). Au cas où il n'y aurait pas de message, TryScan
renverrait None, tandis que Scan
continuerait d'attendre que ce message arrive ou que l'opération arrive à TryScan
.
Voyons cela en pratique. Nous voulons que le processeur traite RegularOperations
quand c'est possible, mais chaque fois qu'il y a un PriorityOperation
, il doit être traité dès que possible, peu importe le nombre d'autres RegularOperations
dans la file d'attente.
type Message =
| RegularOperation of string
| PriorityOperation of string
let processor = MailboxProcessor<Message>.Start(fun inbox ->
let rec innerLoop () = async {
let! priorityData = inbox.TryScan(fun msg ->
// If there is a PriorityOperation, retrieve its data.
match msg with
| PriorityOperation data -> Some data
| _ -> None)
match priorityData with
| Some data ->
// Process the data from PriorityOperation.
| None ->
// No PriorityOperation was in the queue at the time, so
// let's fall back to processing all possible messages
let! message = inbox.Receive()
match message with
| RegularOperation data ->
// We have free time, let's process the RegularOperation.
| PriorityOperation data ->
// We did scan the queue, but it might have been empty
// so it is possible that in the meantime a producer
// posted a new message and it is a PriorityOperation.
// And never forget to process next messages.
innerLoop ()
}
innerLoop())