खोज…


टिप्पणियों

MailboxProcessor एक आंतरिक संदेश कतार बनाए रखता है, जहां कई निर्माता विभिन्न Post विधि वेरिएंट का उपयोग करके संदेश पोस्ट कर सकते हैं। इन संदेशों को तब लिया गया है और एक भी उपभोक्ता द्वारा संसाधित (जब तक आप इसे अन्यथा लागू) प्रयोग कर रहे हैं Retrieve और Scan वेरिएंट। डिफ़ॉल्ट रूप से संदेशों का उत्पादन और उपभोग दोनों ही थ्रेड-सुरक्षित है।

डिफ़ॉल्ट रूप से कोई त्रुटि त्रुटि प्रदान की जाती है। यदि प्रोसेसर के बॉडी के अंदर एक अनकैप्ड अपवाद फेंका जाता है, तो बॉडी फंक्शन खत्म हो जाएगा, क्यू में सभी मैसेज खो जाएंगे, कोई और मैसेज पोस्ट नहीं किया जा सकेगा और रिप्लाई चैनल (यदि उपलब्ध हो) को रिस्पॉन्स के बजाय एक अपवाद मिलेगा। यदि आपको यह व्यवहार आपके उपयोग के मामले में नहीं आता है तो आपको स्वयं को संभालने में सभी त्रुटि प्रदान करनी होगी।

बेसिक हैलो वर्ल्ड

आइए सबसे पहले एक सरल "हैलो वर्ल्ड" बनाएं! MailboxProcessor जो एक प्रकार के संदेश को संसाधित करता है और शुभकामनाएँ देता है।

आपको संदेश प्रकार की आवश्यकता होगी। यह कुछ भी हो सकता है, लेकिन भेदभाव रहित यूनियनें यहां एक स्वाभाविक पसंद हैं क्योंकि वे सभी संभावित मामलों को एक स्थान पर सूचीबद्ध करती हैं और आप उन्हें संसाधित करते समय आसानी से पैटर्न मिलान का उपयोग कर सकते हैं।

// In this example there is only a single case, it could technically be just a string
type GreeterMessage = SayHelloTo of string

अब प्रोसेसर को ही परिभाषित करें। इसे MailboxProcessor<'message>.Start साथ किया जा सकता है। स्टैटिक स्टैटिक विधि जो अपना काम करने के लिए तैयार प्रोसेसर को लौटाती है। आप कंस्ट्रक्टर का उपयोग भी कर सकते हैं, लेकिन फिर आपको प्रोसेसर को बाद में शुरू करना सुनिश्चित करना होगा।

let processor = MailboxProcessor<GreeterMessage>.Start(fun inbox ->
    let rec innerLoop () = async {
        // This way you retrieve message from the mailbox queue
        // or await them in case the queue empty.
        // You can think of the `inbox` parameter as a reference to self.
        let! message = inbox.Receive()
        // Now you can process the retrieved message.
        match message with
        | SayHelloTo name ->
            printfn "Hi, %s! This is mailbox processor's inner loop!" name
        // After that's done, don't forget to recurse so you can process the next messages!
        innerLoop()
    }
    innerLoop ())

Start करने के लिए पैरामीटर एक फ़ंक्शन है जो स्वयं MailboxProcessor संदर्भ लेता है (जो अभी तक मौजूद नहीं है क्योंकि आप इसे बना रहे हैं, लेकिन फ़ंक्शन निष्पादित होने के बाद उपलब्ध होगा)। यह आपको मेलबॉक्स से संदेशों तक पहुंचने के लिए इसके विभिन्न Receive और Scan विधियों तक पहुंच प्रदान करता है। इस फ़ंक्शन के अंदर, आपको जो भी प्रसंस्करण की आवश्यकता होती है, आप कर सकते हैं, लेकिन एक सामान्य दृष्टिकोण एक अनंत लूप है जो संदेशों को एक-एक करके पढ़ता है और प्रत्येक के बाद खुद को कॉल करता है।

अब प्रोसेसर तैयार है, लेकिन यह कुछ भी नहीं करता है! क्यों? आपको इसे संसाधित करने के लिए एक संदेश भेजने की आवश्यकता है। यह Post विधि वेरिएंट के साथ किया जाता है - चलो सबसे बुनियादी, आग और एक भूल का उपयोग करें।

processor.Post(SayHelloTo "Alice")

यह processor की आंतरिक कतार, मेलबॉक्स और तुरंत रिटर्न के लिए एक संदेश डालता है ताकि कॉलिंग कोड जारी रह सके। एक बार जब प्रोसेसर संदेश को पुनर्प्राप्त करता है, तो यह इसे संसाधित करेगा, लेकिन इसे पोस्ट करने के लिए अतुल्यकालिक रूप से किया जाएगा, और यह एक अलग धागे पर सबसे अधिक संभावना होगा।

बहुत जल्द बाद में आपको संदेश "Hi, Alice! This is mailbox processor's inner loop!" देखना चाहिए "Hi, Alice! This is mailbox processor's inner loop!" उत्पादन के लिए मुद्रित और आप अधिक जटिल नमूनों के लिए तैयार हैं।

म्यूटेबल स्टेट मैनेजमेंट

मेलबॉक्स प्रोसेसर का उपयोग पारदर्शी और थ्रेड-सुरक्षित तरीके से उत्परिवर्तनीय स्थिति को प्रबंधित करने के लिए किया जा सकता है। चलो एक साधारण काउंटर का निर्माण करते हैं।

// Increment or decrement by one.
type CounterMessage =
    | Increment
    | Decrement

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // You can represent the processor's internal mutable state
        // as an immutable parameter to the inner loop function
        let rec innerLoop state = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            // In each call you use the current state to produce a new
            // value, which will be passed to the next call, so that
            // next message sees only the new value as its local state
            match message with
            | Increment ->
                let state' = state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop state'
            | Decrement ->
                let state' = state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop state'
        }
        // We pass the initialState to the first call to innerLoop
        innerLoop initialState)

// Let's pick an initial value and create the processor
let processor = createProcessor 10

अब कुछ ऑपरेशन जनरेट करते हैं

processor.Post(Increment)
processor.Post(Increment)
processor.Post(Decrement)
processor.Post(Increment)

और आपको निम्न लॉग दिखाई देगा

Waiting for message, the current state is: 10
Counter incremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12
Counter decremented, the new state is: 11
Waiting for message, the current state is: 11
Counter incremented, the new state is: 12
Waiting for message, the current state is: 12

संगामिति

चूंकि मेलबॉक्स प्रोसेसर संदेशों को एक-एक करके संसाधित करता है और कोई इंटरलेइंग नहीं होता है, आप कई थ्रेड से संदेशों का उत्पादन भी कर सकते हैं और आप खोए हुए या डुप्लिकेट किए गए कार्यों की विशिष्ट समस्याओं को नहीं देखेंगे। जब तक आप विशेष रूप से प्रोसेसर को लागू नहीं करते हैं, तब तक किसी संदेश के लिए पुराने संदेशों का उपयोग करने का कोई तरीका नहीं है।

let processor = createProcessor 0

[ async { processor.Post(Increment) }
  async { processor.Post(Increment) }
  async { processor.Post(Decrement) }
  async { processor.Post(Decrement) } ]
|> Async.Parallel
|> Async.RunSynchronously

सभी संदेश विभिन्न थ्रेड्स से पोस्ट किए जाते हैं। जिस क्रम में मेलबॉक्स में संदेश पोस्ट किए जाते हैं, वह नियतात्मक नहीं होता है, इसलिए उन्हें संसाधित करने का क्रम नियतात्मक नहीं होता है, लेकिन चूंकि समग्र संख्या और वेतन वृद्धि संतुलित है, इसलिए आप देखेंगे कि अंतिम स्थिति 0 है, चाहे वह किस क्रम में हो और जिन थ्रेड्स से संदेश भेजे गए थे।

सत्य परस्पर अवस्था

पिछले उदाहरण में हमने केवल पुनरावर्ती लूप पैरामीटर पास करके उत्परिवर्तनीय स्थिति का अनुकरण किया है, लेकिन मेलबॉक्स प्रोसेसर के पास इन सभी गुणों को यहां तक कि वास्तव में उत्परिवर्तनीय स्थिति के लिए भी है। यह महत्वपूर्ण है जब आप बड़े राज्य बनाए रखते हैं और प्रदर्शन के कारणों के लिए अपरिवर्तनीयता अव्यवहारिक है।

हम निम्नलिखित कार्यान्वयन के लिए अपने काउंटर को फिर से लिख सकते हैं

let createProcessor initialState =
    MailboxProcessor<CounterMessage>.Start(fun inbox ->
        // In this case we represent the state as a mutable binding
        // local to this function. innerLoop will close over it and
        // change its value in each iteration instead of passing it around
        let mutable state = initialState

        let rec innerLoop () = async {
            printfn "Waiting for message, the current state is: %i" state
            let! message = inbox.Receive()
            match message with
            | Increment ->
                let state <- state + 1
                printfn "Counter incremented, the new state is: %i" state'
                innerLoop ()
            | Decrement ->
                let state <- state - 1
                printfn "Counter decremented, the new state is: %i" state'
                innerLoop ()
        }
        innerLoop ())

भले ही यह निश्चित रूप से थ्रेड सुरक्षित नहीं होगा यदि काउंटर स्टेट को सीधे कई थ्रेड्स से संशोधित किया गया था, तो आप पिछले अनुभाग से समानांतर संदेश पोस्ट का उपयोग करके देख सकते हैं कि मेलबॉक्स प्रोसेसर एक के बाद एक संदेशों को संसाधित करता है, जिसमें कोई इंटरलेविंग नहीं होता है, इसलिए प्रत्येक संदेश का उपयोग करता है सबसे वर्तमान मूल्य।

वापसी मान

यदि आप एक AsyncReplyChannel<'a> संदेश के हिस्से के रूप में भेजते हैं, तो आप प्रत्येक संसाधित संदेश के लिए एक मूल्य वापस कर सकते हैं।

type MessageWithResponse = Message of InputData * AsyncReplyChannel<OutputData>

तब मेलबॉक्स प्रोसेसर इस चैनल का उपयोग कर सकता है जब कॉल करने वाले को वापस मान भेजने के लिए संदेश को संसाधित करता है।

let! message = inbox.Receive()
match message with
| MessageWithResponse (data, r) ->
    // ...process the data
    let output = ...
    r.Reply(output)

अब एक संदेश बनाने के लिए, आपको AsyncReplyChannel<'a> - क्या है और आप एक काम करने का उदाहरण कैसे बनाते हैं? सबसे अच्छा तरीका यह है कि MailboxProcessor को आपके लिए प्रदान करने और एक अधिक सामान्य Async<'a> की प्रतिक्रिया निकालने के लिए। यह उदाहरण के लिए PostAndAsynReply विधि का उपयोग करके किया जा सकता है, जहां आप पूरा संदेश पोस्ट नहीं करते हैं, लेकिन इसके बजाय प्रकार का एक फ़ंक्शन (हमारे मामले में) AsyncReplyChannel<OutputData> -> MessageWithResponse :

let! output = processor.PostAndAsyncReply(r -> MessageWithResponse(input, r))

यह संदेश को एक कतार में पोस्ट करेगा और उत्तर की प्रतीक्षा करेगा, जो प्रोसेसर द्वारा इस संदेश को प्राप्त करने और चैनल का उपयोग करने के उत्तर देने के बाद आएगा।

एक सिंक्रोनस वेरिएंट PostAndReply भी है जो प्रोसेसर के जवाब देने तक कॉलिंग थ्रेड को ब्लॉक करता है।

आउट-ऑफ-ऑर्डर संदेश प्रसंस्करण

आप कतार में विशिष्ट संदेशों को देखने के लिए Scan या TryScan विधियों का उपयोग कर सकते हैं और चाहे उनके कितने भी संदेश हों, उन्हें संसाधित करें। दोनों विधियाँ कतार में लगे संदेशों को देखती हैं कि वे किस क्रम में आए थे और एक निर्दिष्ट संदेश (वैकल्पिक समय समाप्त होने तक) की तलाश करेंगे। यदि ऐसा कोई संदेश नहीं है, तो TryScan कोई नहीं लौटेगा, जबकि Scan इस तरह के संदेश के आने तक या ऑपरेशन के समय तक इंतजार करता रहेगा।

आइए इसे व्यवहार में देखें। हम चाहते हैं कि प्रोसेसर RegularOperations को प्रोसेस कर सके, लेकिन जब भी PriorityOperation , तो इसे जल्द से जल्द प्रोसेस किया जाना चाहिए, फिर चाहे कितने ही अन्य RegularOperations क्यू में क्यों न हों।

type Message =
    | RegularOperation of string
    | PriorityOperation of string

let processor = MailboxProcessor<Message>.Start(fun inbox ->
    let rec innerLoop () = async {
        let! priorityData = inbox.TryScan(fun msg ->
            // If there is a PriorityOperation, retrieve its data.
            match msg with
            | PriorityOperation data -> Some data
            | _ -> None)

        match priorityData with
        | Some data ->
            // Process the data from PriorityOperation.
        | None ->
            // No PriorityOperation was in the queue at the time, so
            // let's fall back to processing all possible messages
            let! message = inbox.Receive()
            match message with
            | RegularOperation data ->
                // We have free time, let's process the RegularOperation.
            | PriorityOperation data ->
                // We did scan the queue, but it might have been empty
                // so it is possible that in the meantime a producer
                // posted a new message and it is a PriorityOperation.
        // And never forget to process next messages.
        innerLoop ()
    }
    innerLoop())


Modified text is an extract of the original Stack Overflow Documentation
के तहत लाइसेंस प्राप्त है CC BY-SA 3.0
से संबद्ध नहीं है Stack Overflow