Поиск…


замечания

MailboxProcessor поддерживает внутреннюю очередь сообщений, где несколько производителей могут отправлять сообщения с использованием различных вариантов метода Post . Эти сообщения затем извлекаются и обрабатываются одним потребителем (если вы не реализуете его в противном случае) с использованием вариантов Retrieve и Scan . По умолчанию, как производство, так и потребление сообщений являются потокобезопасными.

По умолчанию обработка ошибок отсутствует. Если в тело процессора выбрано неперехваченное исключение, функция body прекратится, все сообщения в очереди будут потеряны, больше сообщений не будет опубликовано, и канал ответа (если он доступен) получит исключение вместо ответа. Вы должны предоставить всю обработку ошибок самостоятельно, если это поведение не подходит для вашего использования.

Основной Hello World

Давайте сначала создадим простой «Hello world!». MailboxProcessor который обрабатывает сообщения одного типа и печатает приветствия.

Вам понадобится тип сообщения. Это может быть что угодно, но дискриминированные союзы являются естественным выбором здесь, поскольку они перечисляют все возможные случаи в одном месте, и вы можете легко использовать сопоставление образцов при их обработке.

// In this example there is only a single case, it could technically be just a string
type GreeterMessage = SayHelloTo of string

Теперь определите сам процессор. Это можно сделать с помощью MailboxProcessor<'message>.Start Статический метод, который возвращает запущенный процессор, готовый выполнить свою работу. Вы также можете использовать конструктор, но тогда вам нужно обязательно запустить процессор позже.

let processor = MailboxProcessor<GreeterMessage>.Start(fun inbox ->
    let rec innerLoop () = async {
        // This way you retrieve message from the mailbox queue
        // or await them in case the queue empty.
        // You can think of the `inbox` parameter as a reference to self.
        let! message = inbox.Receive()
        // Now you can process the retrieved message.
        match message with
        | SayHelloTo name ->
            printfn "Hi, %s! This is mailbox processor's inner loop!" name
        // After that's done, don't forget to recurse so you can process the next messages!
        innerLoop()
    }
    innerLoop ())

Параметр Start - это функция, которая ссылается на сам MailboxProcessor (который еще не существует, поскольку вы только создаете его, но будет доступен после выполнения функции). Это дает вам доступ к различным методам Receive и Scan для доступа к сообщениям из почтового ящика. Внутри этой функции вы можете делать любую необходимую обработку, но обычный подход - это бесконечный цикл, который читает сообщения один за другим и вызывает себя после каждого.

Теперь процессор готов, но это ни к чему! Зачем? Вам нужно отправить сообщение для обработки. Это делается с вариантами метода Post - давайте используем самый простой, с легкостью и забытый.

processor.Post(SayHelloTo "Alice")

Это помещает сообщение во внутреннюю очередь processor , почтовый ящик и немедленно возвращается, чтобы код вызова мог продолжаться. Как только процессор получит сообщение, он обработает его, но это будет сделано асинхронно для его публикации и, скорее всего, будет выполнено в отдельном потоке.

Очень скоро после этого вы увидите сообщение "Hi, Alice! This is mailbox processor's inner loop!" напечатаны на выходе, и вы готовы к более сложным образцам.

Управление взаимным управлением

Процессоры почтовых ящиков могут использоваться для управления изменяемым состоянием прозрачным и потокобезопасным способом. Давайте построим простой счетчик.

// Increment or decrement by one.
type CounterMessage =
    | Increment
    | Decrement

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // You can represent the processor's internal mutable state
        // as an immutable parameter to the inner loop function
        let rec innerLoop state = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            // In each call you use the current state to produce a new
            // value, which will be passed to the next call, so that
            // next message sees only the new value as its local state
            match message with
            | Increment ->
                let state' = state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop state'
            | Decrement ->
                let state' = state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop state'
        }
        // We pass the initialState to the first call to innerLoop
        innerLoop initialState)

// Let's pick an initial value and create the processor
let processor = createProcessor 10

Теперь давайте сгенерируем некоторые операции

processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)

И вы увидите следующий журнал

Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12

совпадение

Поскольку процессор почтовых ящиков обрабатывает сообщения один за другим, и нет чередования, вы также можете создавать сообщения из нескольких потоков, и вы не увидите типичных проблем с потерянными или дублируемыми операциями. Невозможно, чтобы сообщение использовало старое состояние других сообщений, если вы специально не применяете процессор.

let processor = createProcessor 0

[ async { processor.Post(Increment) }
  async { processor.Post(Increment) }
  async { processor.Post(Decrement) }
  async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously

Все сообщения публикуются из разных тем. Порядок, в котором сообщения отправляются в почтовый ящик, не является детерминированным, поэтому порядок их обработки не является детерминированным, но поскольку общее количество приращений и декрементов сбалансировано, вы увидите, что конечное состояние равно 0, независимо от того, в каком порядке и из каких потоков были отправлены сообщения.

Истинное изменяемое состояние

В предыдущем примере мы только моделировали изменяемое состояние, передавая параметр рекурсивного цикла, но процессор почтовых ящиков обладает всеми этими свойствами даже для действительно изменяемого состояния. Это важно, когда вы поддерживаете большое состояние и неизменность, непрактично по соображениям производительности.

Мы можем переписать наш счетчик на следующую реализацию

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // In this case we represent the state as a mutable binding
        // local to this function. innerLoop will close over it and
        // change its value in each iteration instead of passing it around
        let mutable state = initialState

        let rec innerLoop () = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            match message with
            | Increment ->
                let state <- state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop ()
            | Decrement ->
                let state <- state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop ()
        }
        innerLoop ())

Хотя это определенно не было бы потокобезопасным, если состояние счетчика было изменено непосредственно из нескольких потоков, вы можете увидеть, используя параллельное сообщение Сообщения из предыдущего раздела, что процессор почтовых ящиков обрабатывает сообщения один за другим без перемежения, поэтому каждое сообщение использует самое текущее значение.

Возвращаемые значения

Вы можете асинхронно возвращать значение для каждого обработанного сообщения, если вы отправляете AsyncReplyChannel<'a> как часть сообщения.

type MessageWithResponse = Message of InputData * AsyncReplyChannel<OutputData>

Затем процессор почтовых ящиков может использовать этот канал при обработке сообщения для отправки значения обратно вызывающему абоненту.

let! message = inbox.Receive()
match message with
| MessageWithResponse (data, r) ->
    // ...process the data
    let output = ...
    r.Reply(output)

Теперь, чтобы создать сообщение, вам нужен AsyncReplyChannel<'a> - что есть и как вы создаете рабочий экземпляр? Лучший способ - позволить MailboxProcessor предоставить его для вас и извлечь ответ на более распространенный Async<'a> . Это можно сделать, используя, например, метод PostAndAsynReply , в котором вы не PostAndAsynReply полное сообщение, а вместо этого - функцию типа (в нашем случае) AsyncReplyChannel<OutputData> -> MessageWithResponse :

let! output = processor.PostAndAsyncReply(r -> MessageWithResponse(input, r))

Это отправит сообщение в очередь и ждет ответа, который поступит после того, как процессор получит это сообщение и ответит с использованием канала.

Существует также синхронный вариант PostAndReply который блокирует вызывающий поток, пока процессор не ответит.

Обработка сообщений вне ордера

Вы можете использовать методы Scan или TryScan для поиска определенных сообщений в очереди и обработки их независимо от количества сообщений перед ними. Оба метода просматривают сообщения в очереди в том порядке, в котором они прибыли, и будут искать указанное сообщение (вплоть до дополнительного тайм-аута). Если такого сообщения нет, TryScan вернет None, в то время как Scan продолжит ждать, пока не поступит такое сообщение или не TryScan время работы.

Давайте посмотрим это на практике. Мы хотим, чтобы процессор обрабатывал RegularOperations когда это возможно, но всякий раз, когда есть PriorityOperation , его следует обрабатывать как можно скорее, независимо от того, сколько других RegularOperations находится в очереди.

type Message =
    | RegularOperation of string
    | PriorityOperation of string

let processor = MailboxProcessor<Message>.Start(fun inbox ->
    let rec innerLoop () = async {
        let! priorityData = inbox.TryScan(fun msg ->
            // If there is a PriorityOperation, retrieve its data.
            match msg with
            | PriorityOperation data -> Some data
            | _ -> None)

        match priorityData with
        | Some data ->
            // Process the data from PriorityOperation.
        | None ->
            // No PriorityOperation was in the queue at the time, so
            // let's fall back to processing all possible messages
            let! message = inbox.Receive()
            match message with
            | RegularOperation data ->
                // We have free time, let's process the RegularOperation.
            | PriorityOperation data ->
                // We did scan the queue, but it might have been empty
                // so it is possible that in the meantime a producer
                // posted a new message and it is a PriorityOperation.
        // And never forget to process next messages.
        innerLoop ()
    }
    innerLoop())


Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow