Szukaj…


Uwagi

MailboxProcessor utrzymuje wewnętrzną kolejkę komunikatów, w której wielu producentów może wysyłać wiadomości przy użyciu różnych wariantów metody Post . Komunikaty te są następnie pobierane i przetwarzane przez pojedynczego konsumenta (chyba że wdrożyć go w inny sposób), używając Retrieve i Scan warianty. Domyślnie zarówno wytwarzanie, jak i używanie wiadomości jest bezpieczne dla wątków.

Domyślnie nie jest dostępna obsługa błędów. Jeśli nieprzechwycony wyjątek zostanie zgłoszony w ciele procesora, funkcja treści zakończy się, wszystkie wiadomości w kolejce zostaną utracone, nie będzie można więcej wysyłać wiadomości, a kanał odpowiedzi (jeśli jest dostępny) otrzyma wyjątek zamiast odpowiedzi. Musisz sam zapewnić obsługę wszystkich błędów, na wypadek gdyby to zachowanie nie pasowało do Twojego przypadku użycia.

Basic Hello World

Najpierw stwórzmy prosty „Witaj świecie!” MailboxProcessor który przetwarza jeden typ wiadomości i drukuje pozdrowienia.

Będziesz potrzebował typu wiadomości. Może to być wszystko, ale związki dyskryminacyjne są tutaj naturalnym wyborem, ponieważ zawierają listę wszystkich możliwych przypadków w jednym miejscu i można łatwo użyć dopasowania wzorca podczas ich przetwarzania.

// In this example there is only a single case, it could technically be just a string
type GreeterMessage = SayHelloTo of string

Teraz zdefiniuj sam procesor. Można tego dokonać za pomocą MailboxProcessor<'message>.Start metoda statyczna, która zwraca uruchomiony procesor gotowy do wykonania zadania. Możesz także użyć konstruktora, ale musisz później uruchomić procesor.

let processor = MailboxProcessor<GreeterMessage>.Start(fun inbox ->
    let rec innerLoop () = async {
        // This way you retrieve message from the mailbox queue
        // or await them in case the queue empty.
        // You can think of the `inbox` parameter as a reference to self.
        let! message = inbox.Receive()
        // Now you can process the retrieved message.
        match message with
        | SayHelloTo name ->
            printfn "Hi, %s! This is mailbox processor's inner loop!" name
        // After that's done, don't forget to recurse so you can process the next messages!
        innerLoop()
    }
    innerLoop ())

Parametr Start to funkcja, która odwołuje się do samego MailboxProcessor (który jeszcze nie istnieje, ponieważ właśnie go tworzysz, ale będzie dostępny po uruchomieniu funkcji). To daje dostęp do różnych metod Receive i Scan celu uzyskania dostępu do wiadomości ze skrzynki pocztowej. Wewnątrz tej funkcji możesz wykonać dowolne przetwarzanie, ale zwykle stosuje się nieskończoną pętlę, która odczytuje wiadomości jeden po drugim i wywołuje się po każdej z nich.

Teraz procesor jest gotowy, ale do niczego! Dlaczego? Musisz wysłać wiadomość do przetworzenia. Odbywa się to za pomocą wariantów metody Post - zastosujmy najbardziej podstawowy, odpal i zapomnij.

processor.Post(SayHelloTo "Alice")

To umieszcza komunikat w wewnętrznej kolejce processor , skrzynce pocztowej, i natychmiast wraca, aby kod wywołujący mógł być kontynuowany. Gdy procesor pobierze wiadomość, przetworzy ją, ale zostanie to zrobione asynchronicznie do opublikowania i najprawdopodobniej zostanie wykonane w osobnym wątku.

Wkrótce potem powinieneś zobaczyć komunikat "Hi, Alice! This is mailbox processor's inner loop!" wydrukowane na wyjściu i jesteś gotowy na bardziej skomplikowane próbki.

Zmienne zarządzanie stanem

Procesory skrzynek pocztowych mogą służyć do zarządzania stanem zmiennych w przejrzysty i bezpieczny dla wątków sposób. Zbudujmy prosty licznik.

// Increment or decrement by one.
type CounterMessage =
    | Increment
    | Decrement

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // You can represent the processor's internal mutable state
        // as an immutable parameter to the inner loop function
        let rec innerLoop state = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            // In each call you use the current state to produce a new
            // value, which will be passed to the next call, so that
            // next message sees only the new value as its local state
            match message with
            | Increment ->
                let state' = state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop state'
            | Decrement ->
                let state' = state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop state'
        }
        // We pass the initialState to the first call to innerLoop
        innerLoop initialState)

// Let's pick an initial value and create the processor
let processor = createProcessor 10

Teraz wygenerujmy niektóre operacje

processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)

I zobaczysz następujący dziennik

Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12

Konkurencja

Ponieważ procesor skrzynki pocztowej przetwarza wiadomości jeden po drugim i nie ma przeplotu, możesz również wytwarzać wiadomości z wielu wątków i nie zobaczysz typowych problemów z zagubionymi lub zduplikowanymi operacjami. Nie ma sposobu, aby wiadomość wykorzystała stary stan innych wiadomości, chyba że procesor tak zaimplementuje.

let processor = createProcessor 0

[ async { processor.Post(Increment) }
  async { processor.Post(Increment) }
  async { processor.Post(Decrement) }
  async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously

Wszystkie wiadomości są publikowane z różnych wątków. Kolejność wysyłania wiadomości do skrzynki pocztowej nie jest deterministyczna, więc kolejność ich przetwarzania nie jest deterministyczna, ale ponieważ ogólna liczba przyrostów i dekrecji jest zrównoważona, zobaczysz stan końcowy równy 0, bez względu na to, w jakiej kolejności i z których wątków wiadomości zostały wysłane.

Prawdziwy stan zmienny

W poprzednim przykładzie symulowaliśmy tylko stan zmienny, przekazując parametr pętli rekurencyjnej, ale procesor skrzynki pocztowej ma wszystkie te właściwości, nawet dla stanu naprawdę zmiennego. Jest to ważne, gdy utrzymujesz duży stan, a niezmienność jest niepraktyczna ze względu na wydajność.

Możemy przepisać nasz licznik na następującą implementację

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // In this case we represent the state as a mutable binding
        // local to this function. innerLoop will close over it and
        // change its value in each iteration instead of passing it around
        let mutable state = initialState

        let rec innerLoop () = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            match message with
            | Increment ->
                let state <- state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop ()
            | Decrement ->
                let state <- state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop ()
        }
        innerLoop ())

Chociaż nie byłoby to z pewnością bezpieczne dla wątków, gdyby stan licznika został zmodyfikowany bezpośrednio z wielu wątków, można zobaczyć, używając równoległego komunikatu Posty z poprzedniej sekcji, że procesor skrzynki pocztowej przetwarza wiadomości jeden po drugim bez przeplotu, więc każda wiadomość używa najbardziej aktualna wartość.

Zwracane wartości

Możesz asynchronicznie zwrócić wartość dla każdej przetworzonej wiadomości, jeśli wysyłasz AsyncReplyChannel<'a> jako część wiadomości.

type MessageWithResponse = Message of InputData * AsyncReplyChannel<OutputData>

Następnie procesor skrzynki pocztowej może korzystać z tego kanału podczas przetwarzania wiadomości w celu wysłania wartości z powrotem do dzwoniącego.

let! message = inbox.Receive()
match message with
| MessageWithResponse (data, r) ->
    // ...process the data
    let output = ...
    r.Reply(output)

Teraz, aby utworzyć wiadomość, potrzebujesz AsyncReplyChannel<'a> - co to jest i jak utworzyć działającą instancję? Najlepszym sposobem jest udostępnienie go przez MailboxProcessor i wyodrębnienie odpowiedzi na bardziej powszechny Async<'a> . Można to zrobić na przykład za PostAndAsynReply metody PostAndAsynReply , w której nie publikuje się pełnej wiadomości, ale zamiast tego funkcja typu (w naszym przypadku) AsyncReplyChannel<OutputData> -> MessageWithResponse :

let! output = processor.PostAndAsyncReply(r -> MessageWithResponse(input, r))

Spowoduje to opublikowanie wiadomości w kolejce i oczekiwanie na odpowiedź, która dotrze, gdy procesor dotrze do tej wiadomości i odpowie za pomocą kanału.

Istnieje również wariant synchroniczny PostAndReply który blokuje wątek wywołujący, dopóki procesor nie odpowie.

Przetwarzanie komunikatu o braku zamówienia

Za pomocą metod Scan lub TryScan można wyszukiwać określone wiadomości w kolejce i przetwarzać je bez względu na to, ile wiadomości jest przed nimi. Obie metody sprawdzają wiadomości w kolejce w kolejności, w jakiej przybyły, i szukają określonej wiadomości (aż do opcjonalnego przekroczenia limitu czasu). W przypadku braku takiej wiadomości TryScan zwróci Brak, a Scan będzie czekał do momentu otrzymania takiej wiadomości lub TryScan czasu operacji.

Zobaczmy to w praktyce. Chcemy, aby procesor przetwarzał RegularOperations gdy tylko jest to możliwe, ale ilekroć istnieje RegularOperations PriorityOperation , powinna być przetwarzana tak szybko, jak to możliwe, bez względu na to, ile innych RegularOperations znajduje się w kolejce.

type Message =
    | RegularOperation of string
    | PriorityOperation of string

let processor = MailboxProcessor<Message>.Start(fun inbox ->
    let rec innerLoop () = async {
        let! priorityData = inbox.TryScan(fun msg ->
            // If there is a PriorityOperation, retrieve its data.
            match msg with
            | PriorityOperation data -> Some data
            | _ -> None)

        match priorityData with
        | Some data ->
            // Process the data from PriorityOperation.
        | None ->
            // No PriorityOperation was in the queue at the time, so
            // let's fall back to processing all possible messages
            let! message = inbox.Receive()
            match message with
            | RegularOperation data ->
                // We have free time, let's process the RegularOperation.
            | PriorityOperation data ->
                // We did scan the queue, but it might have been empty
                // so it is possible that in the meantime a producer
                // posted a new message and it is a PriorityOperation.
        // And never forget to process next messages.
        innerLoop ()
    }
    innerLoop())


Modified text is an extract of the original Stack Overflow Documentation
Licencjonowany na podstawie CC BY-SA 3.0
Nie związany z Stack Overflow