수색…
비고
MailboxProcessor
는 내부 메시지 대기열을 유지 관리하며 여러 생성자가 다양한 Post
메서드 변형을 사용하여 메시지를 게시 할 수 있습니다. 이러한 메시지는 Retrieve
및 Scan
변형을 사용하여 단일 소비자가 검색하고 처리합니다 (달리 구현하지 않는 한). 기본적으로 메시지를 생성하고 소비하는 것은 스레드로부터 안전합니다.
기본적으로 제공된 오류 처리가 없습니다. 포착되지 않은 예외가 프로세서 몸체 내부에 던져지면 본문 기능이 종료되고 대기열의 모든 메시지가 손실되며 더 이상 메시지를 게시 할 수 없으며 응답 채널 (사용 가능한 경우)이 응답 대신 예외를 가져옵니다. 이 동작이 사용 사례에 맞지 않을 경우를 대비하여 모든 오류 처리를 제공해야합니다.
기본 Hello World
먼저 간단한 "Hello world!"를 만들어 보겠습니다. 한 가지 유형의 메시지를 처리하고 인사말을 인쇄하는 MailboxProcessor
입니다.
메시지 유형이 필요합니다. 무엇이든 될 수 있지만 차별화 된 노동 조합 은 모든 가능한 사례를 한 곳에서 나열하고 처리 할 때 쉽게 패턴 일치를 사용할 수 있으므로 자연 선택입니다.
// In this example there is only a single case, it could technically be just a string
type GreeterMessage = SayHelloTo of string
이제 프로세서 자체를 정의하십시오. 이 작업은 MailboxProcessor<'message>.Start
하여 수행 할 수 있습니다. 작업을 시작할 준비가 된 시작된 프로세서를 반환하는 정적 메서드를 시작합니다. 생성자를 사용할 수도 있지만 나중에 프로세서를 시작해야합니다.
let processor = MailboxProcessor<GreeterMessage>.Start(fun inbox ->
let rec innerLoop () = async {
// This way you retrieve message from the mailbox queue
// or await them in case the queue empty.
// You can think of the `inbox` parameter as a reference to self.
let! message = inbox.Receive()
// Now you can process the retrieved message.
match message with
| SayHelloTo name ->
printfn "Hi, %s! This is mailbox processor's inner loop!" name
// After that's done, don't forget to recurse so you can process the next messages!
innerLoop()
}
innerLoop ())
Start
매개 변수는 MailboxProcessor
자체에 대한 참조를 취하는 함수입니다 (아직 작성하지 않았지만 함수가 실행되면 사용할 수 있음). 그러면 다양한 Receive
및 Scan
방법에 액세스 할 수있어 사서함의 메시지에 액세스 할 수 있습니다. 이 함수 안에서는 필요한 처리를 할 수 있지만 일반적인 방법은 메시지를 하나씩 읽고 각 메시지를 호출하는 무한 루프입니다.
이제 프로세서가 준비되었지만 아무 것도하지 않습니다! 왜? 처리 할 메시지를 보내야합니다. 이것은 Post
메소드 변형을 통해 수행됩니다. 가장 기본적인 화재 및 잊어 버림을 사용합시다.
processor.Post(SayHelloTo "Alice")
이렇게하면 processor
의 내부 대기열 인 사서함에 메시지가 저장되고 즉시 호출 코드가 계속 될 수 있도록 반환됩니다. 일단 프로세서가 메시지를 가져 오면 메시지를 처리하지만, 메시지를 게시하는 데 비동기 적으로 완료되며 별도의 스레드에서 처리됩니다.
그 후에 곧 "Hi, Alice! This is mailbox processor's inner loop!"
메시지 "Hi, Alice! This is mailbox processor's inner loop!"
출력물에 인쇄하면 더 복잡한 샘플을 준비 할 수 있습니다.
가변 상태 관리
사서함 프로세서는 투명하고 스레드로부터 안전한 방식으로 변경 가능한 상태를 관리하는 데 사용할 수 있습니다. 간단한 카운터를 만들어 봅시다.
// Increment or decrement by one.
type CounterMessage =
| Increment
| Decrement
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// You can represent the processor's internal mutable state
// as an immutable parameter to the inner loop function
let rec innerLoop state = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
// In each call you use the current state to produce a new
// value, which will be passed to the next call, so that
// next message sees only the new value as its local state
match message with
| Increment ->
let state' = state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop state'
| Decrement ->
let state' = state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop state'
}
// We pass the initialState to the first call to innerLoop
innerLoop initialState)
// Let's pick an initial value and create the processor
let processor = createProcessor 10
이제 몇 가지 작업을 생성 해 보겠습니다.
processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)
그러면 다음 로그가 표시됩니다.
Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
동시성
사서함 프로세서는 메시지를 하나씩 처리하고 인터리빙이 없으므로 여러 스레드에서 메시지를 생성 할 수 있으므로 손실되거나 중복 된 작업의 일반적인 문제는 볼 수 없습니다. 특별히 프로세서를 구현하지 않는 한 메시지가 다른 메시지의 이전 상태를 사용하는 방법은 없습니다.
let processor = createProcessor 0
[ async { processor.Post(Increment) }
async { processor.Post(Increment) }
async { processor.Post(Decrement) }
async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously
모든 메시지는 다른 스레드에서 게시됩니다. 메시지가 사서함에 게시되는 순서는 결정적이 아니므로 처리 순서가 결정적이지는 않지만 전체 증분 및 감소 수가 균형을 이루기 때문에 최종 상태가 어떤 순서로든 0이됩니다. 메시지가 전송 된 스레드가 표시됩니다.
True mutable 상태
이전 예제에서 우리는 재귀 루프 매개 변수를 전달하여 변경 가능한 상태를 시뮬레이션했지만 사서함 프로세서는 진정으로 변경 가능한 상태에서도 이러한 모든 속성을가집니다. 이것은 큰 상태를 유지할 때 중요하며 성능상의 이유로 인해 불변성이 비현실적입니다.
카운터를 다음 구현으로 다시 작성할 수 있습니다.
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// In this case we represent the state as a mutable binding
// local to this function. innerLoop will close over it and
// change its value in each iteration instead of passing it around
let mutable state = initialState
let rec innerLoop () = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
match message with
| Increment ->
let state <- state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop ()
| Decrement ->
let state <- state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop ()
}
innerLoop ())
카운터 상태가 여러 스레드에서 직접 수정 된 경우에도 스레드 안전성이 보장되지는 않지만 이전 섹션의 Posts라는 병렬 메시지를 사용하면 메일 함 처리기가 메시지를 차례로 처리하므로 인터셉트없이 각 메시지에서 가장 최근의 가치.
반환 값
메시지의 AsyncReplyChannel<'a>
을 보내는 경우 처리 된 각 메시지의 값을 비동기 적으로 반환 할 수 있습니다.
type MessageWithResponse = Message of InputData * AsyncReplyChannel<OutputData>
그런 다음 메시지를 처리 할 때 사서함 프로세서가이 채널을 사용하여 호출자에게 값을 다시 보냅니다.
let! message = inbox.Receive()
match message with
| MessageWithResponse (data, r) ->
// ...process the data
let output = ...
r.Reply(output)
이제 메시지를 만들려면 AsyncReplyChannel<'a>
이 필요합니다. - 무엇이며 어떻게 작동하는 인스턴스를 만드나요? 가장 좋은 방법은 MailboxProcessor에서 제공하고보다 일반적인 Async<'a>
응답을 추출하는 것입니다. 이 작업은 예를 들어 PostAndAsynReply
메서드를 사용하여 수행 할 수 있습니다. PostAndAsynReply
전체 메시지를 게시하지 않고 대신 (이 경우) AsyncReplyChannel<OutputData> -> MessageWithResponse
형식의 함수를 게시합니다.
let! output = processor.PostAndAsyncReply(r -> MessageWithResponse(input, r))
이렇게하면 메시지가 대기열에 게시되고 응답을 기다리게되며, 프로세서가이 메시지를 받고 채널을 사용하여 응답하면 도착합니다.
프로세서가 응답 할 때까지 호출 스레드를 차단하는 동기 변형 PostAndReply
도 있습니다.
순서가 잘못된 메시지 처리
Scan
또는 TryScan
방법을 사용하여 대기열에있는 특정 메시지를 찾고 처리 할 메시지 수에 관계없이 처리 할 수 있습니다. 두 방법 모두 도착한 순서대로 대기열의 메시지를보고 지정된 메시지를 찾습니다 (선택적 시간 제한까지). 그러한 메시지가없는 경우 TryScan
은 None을 반환하고 Scan
은 해당 메시지가 도착하거나 작업 시간이 초과 될 때까지 대기합니다.
실제로 그것을 보자. 가능한 경우 프로세서가 RegularOperations
를 처리하기를 RegularOperations
하지만 PriorityOperation
이있을 때마다 큐에 다른 RegularOperations
가 몇 개 RegularOperations
상관없이 가능한 빨리 처리해야합니다.
type Message =
| RegularOperation of string
| PriorityOperation of string
let processor = MailboxProcessor<Message>.Start(fun inbox ->
let rec innerLoop () = async {
let! priorityData = inbox.TryScan(fun msg ->
// If there is a PriorityOperation, retrieve its data.
match msg with
| PriorityOperation data -> Some data
| _ -> None)
match priorityData with
| Some data ->
// Process the data from PriorityOperation.
| None ->
// No PriorityOperation was in the queue at the time, so
// let's fall back to processing all possible messages
let! message = inbox.Receive()
match message with
| RegularOperation data ->
// We have free time, let's process the RegularOperation.
| PriorityOperation data ->
// We did scan the queue, but it might have been empty
// so it is possible that in the meantime a producer
// posted a new message and it is a PriorityOperation.
// And never forget to process next messages.
innerLoop ()
}
innerLoop())