Zoeken…


Opmerkingen

MailboxProcessor onderhoudt een interne berichtenwachtrij, waar meerdere producenten berichten kunnen posten met behulp van verschillende varianten van de Post methode. Deze berichten worden vervolgens opgehaald en verwerkt door een enkele consument (tenzij u het anders implementeert) met behulp van de varianten Retrieve en Scan . Standaard zijn zowel het produceren als het consumeren van de berichten thread-safe.

Standaard is er geen foutafhandeling beschikbaar. Als een niet-opgevangen uitzondering in de body van de processor wordt gegooid, wordt de body-functie beëindigd, gaan alle berichten in de wachtrij verloren, kunnen er geen berichten meer worden geplaatst en krijgt het antwoordkanaal (indien beschikbaar) een uitzondering in plaats van een antwoord. U moet zelf alle foutafhandeling opgeven als dit gedrag niet geschikt is voor uw gebruik.

Basic Hello World

Laten we eerst een eenvoudige "Hallo wereld!" MailboxProcessor die één type bericht verwerkt en begroetingen afdrukt.

U hebt het berichttype nodig. Het kan van alles zijn, maar gediscrimineerde vakbonden zijn hier een natuurlijke keuze, omdat ze alle mogelijke gevallen op één plaats vermelden en u gemakkelijk patroonvergelijking kunt gebruiken bij het verwerken ervan.

// In this example there is only a single case, it could technically be just a string
type GreeterMessage = SayHelloTo of string

Definieer nu de processor zelf. Dit kan worden gedaan met MailboxProcessor<'message>.Start statische methode die een gestarte processor retourneert die klaar is om zijn werk te doen. U kunt ook de constructor gebruiken, maar dan moet u ervoor zorgen dat u de processor later start.

let processor = MailboxProcessor<GreeterMessage>.Start(fun inbox ->
    let rec innerLoop () = async {
        // This way you retrieve message from the mailbox queue
        // or await them in case the queue empty.
        // You can think of the `inbox` parameter as a reference to self.
        let! message = inbox.Receive()
        // Now you can process the retrieved message.
        match message with
        | SayHelloTo name ->
            printfn "Hi, %s! This is mailbox processor's inner loop!" name
        // After that's done, don't forget to recurse so you can process the next messages!
        innerLoop()
    }
    innerLoop ())

De parameter voor Start is een functie die verwijst naar de MailboxProcessor zelf (die nog niet bestaat omdat u deze maakt, maar beschikbaar zal zijn zodra de functie wordt uitgevoerd). Dat geeft u toegang tot de verschillende Receive en Scan om toegang te krijgen tot de berichten vanuit de mailbox. Binnen deze functie kun je alles verwerken wat je nodig hebt, maar een gebruikelijke aanpak is een oneindige lus die de berichten een voor een leest en zichzelf na elke oproep aanroept.

Nu is de processor klaar, maar dat doet niets! Waarom? U moet het een bericht sturen om te verwerken. Dit wordt gedaan met de varianten van de Post methode - laten we de meest basale, vuur-en-vergeet-variant gebruiken.

processor.Post(SayHelloTo "Alice")

Dit plaatst een bericht in de interne wachtrij van de processor , de mailbox, en keert onmiddellijk terug zodat de belcode kan worden voortgezet. Zodra de processor het bericht heeft opgehaald, zal het het verwerken, maar dat zal asynchroon worden gedaan om het te posten, en het zal hoogstwaarschijnlijk op een afzonderlijke thread worden gedaan.

Al snel daarna zou je het bericht moeten zien "Hi, Alice! This is mailbox processor's inner loop!" afgedrukt naar de uitvoer en u bent klaar voor meer gecompliceerde voorbeelden.

Beheer van veranderlijke staat

Mailbox-processors kunnen worden gebruikt om de veranderlijke status op een transparante en thread-veilige manier te beheren. Laten we een eenvoudige teller bouwen.

// Increment or decrement by one.
type CounterMessage =
    | Increment
    | Decrement

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // You can represent the processor's internal mutable state
        // as an immutable parameter to the inner loop function
        let rec innerLoop state = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            // In each call you use the current state to produce a new
            // value, which will be passed to the next call, so that
            // next message sees only the new value as its local state
            match message with
            | Increment ->
                let state' = state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop state'
            | Decrement ->
                let state' = state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop state'
        }
        // We pass the initialState to the first call to innerLoop
        innerLoop initialState)

// Let's pick an initial value and create the processor
let processor = createProcessor 10

Laten we nu enkele bewerkingen genereren

processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)

En u zult het volgende logboek zien

Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12

samenloop

Aangezien de mailboxprocessor de berichten één voor één verwerkt en er geen interleaving is, kunt u de berichten ook vanuit meerdere threads produceren en ziet u niet de typische problemen van verloren of gedupliceerde bewerkingen. Een bericht kan op geen enkele manier de oude status van andere berichten gebruiken, tenzij u de processor specifiek zo implementeert.

let processor = createProcessor 0

[ async { processor.Post(Increment) }
  async { processor.Post(Increment) }
  async { processor.Post(Decrement) }
  async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously

Alle berichten worden vanuit verschillende threads gepost. De volgorde waarin berichten in de mailbox worden gepost, is niet deterministisch, dus de volgorde van verwerking is niet deterministisch, maar omdat het totale aantal stappen en afnames in evenwicht is, ziet u de uiteindelijke status 0, ongeacht in welke volgorde en vanuit welke threads de berichten zijn verzonden.

Echte veranderlijke staat

In het vorige voorbeeld hebben we alleen de veranderlijke status gesimuleerd door de parameter recursieve lus door te geven, maar mailboxprocessor heeft al deze eigenschappen, zelfs voor een echt veranderlijke status. Dit is belangrijk wanneer u een grote toestand handhaaft en onveranderlijkheid om praktische redenen onpraktisch is.

We kunnen onze tegenhanger van de volgende implementatie herschrijven

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // In this case we represent the state as a mutable binding
        // local to this function. innerLoop will close over it and
        // change its value in each iteration instead of passing it around
        let mutable state = initialState

        let rec innerLoop () = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            match message with
            | Increment ->
                let state <- state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop ()
            | Decrement ->
                let state <- state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop ()
        }
        innerLoop ())

Hoewel dit zeker niet thread-safe zou zijn als de tellerstatus rechtstreeks vanuit meerdere threads werd gewijzigd, kun je met behulp van de parallelle berichtenposten uit de vorige sectie zien dat mailboxprocessor de berichten na elkaar verwerkt zonder interleaving, dus elk bericht gebruikt de meest actuele waarde.

Retourwaarden

U kunt asynchroon een waarde retourneren voor elk verwerkt bericht als u een AsyncReplyChannel<'a> verzendt als onderdeel van het bericht.

type MessageWithResponse = Message of InputData * AsyncReplyChannel<OutputData>

Vervolgens kan de mailboxprocessor dit kanaal gebruiken bij het verwerken van het bericht om een waarde terug te sturen naar de beller.

let! message = inbox.Receive()
match message with
| MessageWithResponse (data, r) ->
    // ...process the data
    let output = ...
    r.Reply(output)

Om een bericht te maken, hebt u het AsyncReplyChannel<'a> - wat is het en hoe maakt u een werkend exemplaar? De beste manier is om MailboxProcessor het voor u te laten leveren en het antwoord op een meer algemene Async<'a> extraheren. Dit kan bijvoorbeeld met behulp van de methode PostAndAsynReply , waarbij u niet het volledige bericht post, maar in plaats daarvan een functie van het type (in ons geval) AsyncReplyChannel<OutputData> -> MessageWithResponse :

let! output = processor.PostAndAsyncReply(r -> MessageWithResponse(input, r))

Dit zal het bericht in een wachtrij plaatsen en wachten op het antwoord, dat zal aankomen zodra de processor dit bericht ontvangt en antwoordt via het kanaal.

Er is ook een synchrone variant PostAndReply die de aanroepende thread blokkeert totdat de processor antwoordt.

Out-of-Order berichtverwerking

U kunt Scan of TryScan methoden gebruiken om naar specifieke berichten in de wachtrij te zoeken en deze te verwerken, ongeacht hoeveel berichten er vóór staan. Beide methoden bekijken de berichten in de wachtrij in de volgorde waarin ze zijn aangekomen en zoeken naar een specifiek bericht (tot optionele time-out). In het geval dat er geen dergelijk bericht is, zal TryScan Geen retourneren, terwijl Scan blijft wachten tot een dergelijk bericht arriveert of de operatie TryScan .

Laten we het in de praktijk zien. We willen dat de processor RegularOperations wanneer het kan, maar wanneer er een PriorityOperation , moet deze zo snel mogelijk worden verwerkt, ongeacht hoeveel andere RegularOperations in de wachtrij staan.

type Message =
    | RegularOperation of string
    | PriorityOperation of string

let processor = MailboxProcessor<Message>.Start(fun inbox ->
    let rec innerLoop () = async {
        let! priorityData = inbox.TryScan(fun msg ->
            // If there is a PriorityOperation, retrieve its data.
            match msg with
            | PriorityOperation data -> Some data
            | _ -> None)

        match priorityData with
        | Some data ->
            // Process the data from PriorityOperation.
        | None ->
            // No PriorityOperation was in the queue at the time, so
            // let's fall back to processing all possible messages
            let! message = inbox.Receive()
            match message with
            | RegularOperation data ->
                // We have free time, let's process the RegularOperation.
            | PriorityOperation data ->
                // We did scan the queue, but it might have been empty
                // so it is possible that in the meantime a producer
                // posted a new message and it is a PriorityOperation.
        // And never forget to process next messages.
        innerLoop ()
    }
    innerLoop())


Modified text is an extract of the original Stack Overflow Documentation
Licentie onder CC BY-SA 3.0
Niet aangesloten bij Stack Overflow