F#
मेलबॉक्स प्रोसेसर
खोज…
टिप्पणियों
MailboxProcessor
एक आंतरिक संदेश कतार बनाए रखता है, जहां कई निर्माता विभिन्न Post
विधि वेरिएंट का उपयोग करके संदेश पोस्ट कर सकते हैं। इन संदेशों को तब लिया गया है और एक भी उपभोक्ता द्वारा संसाधित (जब तक आप इसे अन्यथा लागू) प्रयोग कर रहे हैं Retrieve
और Scan
वेरिएंट। डिफ़ॉल्ट रूप से संदेशों का उत्पादन और उपभोग दोनों ही थ्रेड-सुरक्षित है।
डिफ़ॉल्ट रूप से कोई त्रुटि त्रुटि प्रदान की जाती है। यदि प्रोसेसर के बॉडी के अंदर एक अनकैप्ड अपवाद फेंका जाता है, तो बॉडी फंक्शन खत्म हो जाएगा, क्यू में सभी मैसेज खो जाएंगे, कोई और मैसेज पोस्ट नहीं किया जा सकेगा और रिप्लाई चैनल (यदि उपलब्ध हो) को रिस्पॉन्स के बजाय एक अपवाद मिलेगा। यदि आपको यह व्यवहार आपके उपयोग के मामले में नहीं आता है तो आपको स्वयं को संभालने में सभी त्रुटि प्रदान करनी होगी।
बेसिक हैलो वर्ल्ड
आइए सबसे पहले एक सरल "हैलो वर्ल्ड" बनाएं! MailboxProcessor
जो एक प्रकार के संदेश को संसाधित करता है और शुभकामनाएँ देता है।
आपको संदेश प्रकार की आवश्यकता होगी। यह कुछ भी हो सकता है, लेकिन भेदभाव रहित यूनियनें यहां एक स्वाभाविक पसंद हैं क्योंकि वे सभी संभावित मामलों को एक स्थान पर सूचीबद्ध करती हैं और आप उन्हें संसाधित करते समय आसानी से पैटर्न मिलान का उपयोग कर सकते हैं।
// In this example there is only a single case, it could technically be just a string
type GreeterMessage = SayHelloTo of string
अब प्रोसेसर को ही परिभाषित करें। इसे MailboxProcessor<'message>.Start
साथ किया जा सकता है। स्टैटिक स्टैटिक विधि जो अपना काम करने के लिए तैयार प्रोसेसर को लौटाती है। आप कंस्ट्रक्टर का उपयोग भी कर सकते हैं, लेकिन फिर आपको प्रोसेसर को बाद में शुरू करना सुनिश्चित करना होगा।
let processor = MailboxProcessor<GreeterMessage>.Start(fun inbox ->
let rec innerLoop () = async {
// This way you retrieve message from the mailbox queue
// or await them in case the queue empty.
// You can think of the `inbox` parameter as a reference to self.
let! message = inbox.Receive()
// Now you can process the retrieved message.
match message with
| SayHelloTo name ->
printfn "Hi, %s! This is mailbox processor's inner loop!" name
// After that's done, don't forget to recurse so you can process the next messages!
innerLoop()
}
innerLoop ())
Start
करने के लिए पैरामीटर एक फ़ंक्शन है जो स्वयं MailboxProcessor
संदर्भ लेता है (जो अभी तक मौजूद नहीं है क्योंकि आप इसे बना रहे हैं, लेकिन फ़ंक्शन निष्पादित होने के बाद उपलब्ध होगा)। यह आपको मेलबॉक्स से संदेशों तक पहुंचने के लिए इसके विभिन्न Receive
और Scan
विधियों तक पहुंच प्रदान करता है। इस फ़ंक्शन के अंदर, आपको जो भी प्रसंस्करण की आवश्यकता होती है, आप कर सकते हैं, लेकिन एक सामान्य दृष्टिकोण एक अनंत लूप है जो संदेशों को एक-एक करके पढ़ता है और प्रत्येक के बाद खुद को कॉल करता है।
अब प्रोसेसर तैयार है, लेकिन यह कुछ भी नहीं करता है! क्यों? आपको इसे संसाधित करने के लिए एक संदेश भेजने की आवश्यकता है। यह Post
विधि वेरिएंट के साथ किया जाता है - चलो सबसे बुनियादी, आग और एक भूल का उपयोग करें।
processor.Post(SayHelloTo "Alice")
यह processor
की आंतरिक कतार, मेलबॉक्स और तुरंत रिटर्न के लिए एक संदेश डालता है ताकि कॉलिंग कोड जारी रह सके। एक बार जब प्रोसेसर संदेश को पुनर्प्राप्त करता है, तो यह इसे संसाधित करेगा, लेकिन इसे पोस्ट करने के लिए अतुल्यकालिक रूप से किया जाएगा, और यह एक अलग धागे पर सबसे अधिक संभावना होगा।
बहुत जल्द बाद में आपको संदेश "Hi, Alice! This is mailbox processor's inner loop!"
देखना चाहिए "Hi, Alice! This is mailbox processor's inner loop!"
उत्पादन के लिए मुद्रित और आप अधिक जटिल नमूनों के लिए तैयार हैं।
म्यूटेबल स्टेट मैनेजमेंट
मेलबॉक्स प्रोसेसर का उपयोग पारदर्शी और थ्रेड-सुरक्षित तरीके से उत्परिवर्तनीय स्थिति को प्रबंधित करने के लिए किया जा सकता है। चलो एक साधारण काउंटर का निर्माण करते हैं।
// Increment or decrement by one.
type CounterMessage =
| Increment
| Decrement
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// You can represent the processor's internal mutable state
// as an immutable parameter to the inner loop function
let rec innerLoop state = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
// In each call you use the current state to produce a new
// value, which will be passed to the next call, so that
// next message sees only the new value as its local state
match message with
| Increment ->
let state' = state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop state'
| Decrement ->
let state' = state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop state'
}
// We pass the initialState to the first call to innerLoop
innerLoop initialState)
// Let's pick an initial value and create the processor
let processor = createProcessor 10
अब कुछ ऑपरेशन जनरेट करते हैं
processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)
और आपको निम्न लॉग दिखाई देगा
Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
संगामिति
चूंकि मेलबॉक्स प्रोसेसर संदेशों को एक-एक करके संसाधित करता है और कोई इंटरलेइंग नहीं होता है, आप कई थ्रेड से संदेशों का उत्पादन भी कर सकते हैं और आप खोए हुए या डुप्लिकेट किए गए कार्यों की विशिष्ट समस्याओं को नहीं देखेंगे। जब तक आप विशेष रूप से प्रोसेसर को लागू नहीं करते हैं, तब तक किसी संदेश के लिए पुराने संदेशों का उपयोग करने का कोई तरीका नहीं है।
let processor = createProcessor 0
[ async { processor.Post(Increment) }
async { processor.Post(Increment) }
async { processor.Post(Decrement) }
async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously
सभी संदेश विभिन्न थ्रेड्स से पोस्ट किए जाते हैं। जिस क्रम में मेलबॉक्स में संदेश पोस्ट किए जाते हैं, वह नियतात्मक नहीं होता है, इसलिए उन्हें संसाधित करने का क्रम नियतात्मक नहीं होता है, लेकिन चूंकि समग्र संख्या और वेतन वृद्धि संतुलित है, इसलिए आप देखेंगे कि अंतिम स्थिति 0 है, चाहे वह किस क्रम में हो और जिन थ्रेड्स से संदेश भेजे गए थे।
सत्य परस्पर अवस्था
पिछले उदाहरण में हमने केवल पुनरावर्ती लूप पैरामीटर पास करके उत्परिवर्तनीय स्थिति का अनुकरण किया है, लेकिन मेलबॉक्स प्रोसेसर के पास इन सभी गुणों को यहां तक कि वास्तव में उत्परिवर्तनीय स्थिति के लिए भी है। यह महत्वपूर्ण है जब आप बड़े राज्य बनाए रखते हैं और प्रदर्शन के कारणों के लिए अपरिवर्तनीयता अव्यवहारिक है।
हम निम्नलिखित कार्यान्वयन के लिए अपने काउंटर को फिर से लिख सकते हैं
let createProcessor initialState =
MailboxProcessor<CounterMessage>.Start(fun inbox ->
// In this case we represent the state as a mutable binding
// local to this function. innerLoop will close over it and
// change its value in each iteration instead of passing it around
let mutable state = initialState
let rec innerLoop () = async {
printfn "Waiting for message, the current state is: %i" state
let! message = inbox.Receive()
match message with
| Increment ->
let state <- state + 1
printfn "Counter incremented, the new state is: %i" state'
innerLoop ()
| Decrement ->
let state <- state - 1
printfn "Counter decremented, the new state is: %i" state'
innerLoop ()
}
innerLoop ())
भले ही यह निश्चित रूप से थ्रेड सुरक्षित नहीं होगा यदि काउंटर स्टेट को सीधे कई थ्रेड्स से संशोधित किया गया था, तो आप पिछले अनुभाग से समानांतर संदेश पोस्ट का उपयोग करके देख सकते हैं कि मेलबॉक्स प्रोसेसर एक के बाद एक संदेशों को संसाधित करता है, जिसमें कोई इंटरलेविंग नहीं होता है, इसलिए प्रत्येक संदेश का उपयोग करता है सबसे वर्तमान मूल्य।
वापसी मान
यदि आप एक AsyncReplyChannel<'a>
संदेश के हिस्से के रूप में भेजते हैं, तो आप प्रत्येक संसाधित संदेश के लिए एक मूल्य वापस कर सकते हैं।
type MessageWithResponse = Message of InputData * AsyncReplyChannel<OutputData>
तब मेलबॉक्स प्रोसेसर इस चैनल का उपयोग कर सकता है जब कॉल करने वाले को वापस मान भेजने के लिए संदेश को संसाधित करता है।
let! message = inbox.Receive()
match message with
| MessageWithResponse (data, r) ->
// ...process the data
let output = ...
r.Reply(output)
अब एक संदेश बनाने के लिए, आपको AsyncReplyChannel<'a>
- क्या है और आप एक काम करने का उदाहरण कैसे बनाते हैं? सबसे अच्छा तरीका यह है कि MailboxProcessor को आपके लिए प्रदान करने और एक अधिक सामान्य Async<'a>
की प्रतिक्रिया निकालने के लिए। यह उदाहरण के लिए PostAndAsynReply
विधि का उपयोग करके किया जा सकता है, जहां आप पूरा संदेश पोस्ट नहीं करते हैं, लेकिन इसके बजाय प्रकार का एक फ़ंक्शन (हमारे मामले में) AsyncReplyChannel<OutputData> -> MessageWithResponse
:
let! output = processor.PostAndAsyncReply(r -> MessageWithResponse(input, r))
यह संदेश को एक कतार में पोस्ट करेगा और उत्तर की प्रतीक्षा करेगा, जो प्रोसेसर द्वारा इस संदेश को प्राप्त करने और चैनल का उपयोग करने के उत्तर देने के बाद आएगा।
एक सिंक्रोनस वेरिएंट PostAndReply
भी है जो प्रोसेसर के जवाब देने तक कॉलिंग थ्रेड को ब्लॉक करता है।
आउट-ऑफ-ऑर्डर संदेश प्रसंस्करण
आप कतार में विशिष्ट संदेशों को देखने के लिए Scan
या TryScan
विधियों का उपयोग कर सकते हैं और चाहे उनके कितने भी संदेश हों, उन्हें संसाधित करें। दोनों विधियाँ कतार में लगे संदेशों को देखती हैं कि वे किस क्रम में आए थे और एक निर्दिष्ट संदेश (वैकल्पिक समय समाप्त होने तक) की तलाश करेंगे। यदि ऐसा कोई संदेश नहीं है, तो TryScan
कोई नहीं लौटेगा, जबकि Scan
इस तरह के संदेश के आने तक या ऑपरेशन के समय तक इंतजार करता रहेगा।
आइए इसे व्यवहार में देखें। हम चाहते हैं कि प्रोसेसर RegularOperations
को प्रोसेस कर सके, लेकिन जब भी PriorityOperation
, तो इसे जल्द से जल्द प्रोसेस किया जाना चाहिए, फिर चाहे कितने ही अन्य RegularOperations
क्यू में क्यों न हों।
type Message =
| RegularOperation of string
| PriorityOperation of string
let processor = MailboxProcessor<Message>.Start(fun inbox ->
let rec innerLoop () = async {
let! priorityData = inbox.TryScan(fun msg ->
// If there is a PriorityOperation, retrieve its data.
match msg with
| PriorityOperation data -> Some data
| _ -> None)
match priorityData with
| Some data ->
// Process the data from PriorityOperation.
| None ->
// No PriorityOperation was in the queue at the time, so
// let's fall back to processing all possible messages
let! message = inbox.Receive()
match message with
| RegularOperation data ->
// We have free time, let's process the RegularOperation.
| PriorityOperation data ->
// We did scan the queue, but it might have been empty
// so it is possible that in the meantime a producer
// posted a new message and it is a PriorityOperation.
// And never forget to process next messages.
innerLoop ()
}
innerLoop())