C# Language
टास्क पैरेलल लाइब्रेरी (TPL) डाटाफ्लो कंस्ट्रक्शंस
खोज…
JoinBlock
(2-3 इनपुट एकत्र करता है और उन्हें एक टुपल में जोड़ता है)
बैचब्लॉक की तरह, JoinBlock <T1, T2, ...> कई डेटा स्रोतों से डेटा को ग्रुप करने में सक्षम है। वास्तव में, यह JoinBlock <T1, T2, ... का प्राथमिक उद्देश्य है।
उदाहरण के लिए, JoinBlock <string, double, int> एक ISourceBlock <Tuple <string, double, int >> है।
बैचब्लॉक के साथ, जॉइनब्लॉक <टी 1, टी 2, ...> लालची और गैर-लालची दोनों मोड में काम करने में सक्षम है।
- डिफ़ॉल्ट लालची मोड में, लक्ष्य को दिए गए सभी डेटा स्वीकार किए जाते हैं, भले ही दूसरे लक्ष्य में आवश्यक डेटा न हो, जिसके साथ एक टपल का निर्माण हो।
- गैर-लालची मोड में, ब्लॉक के लक्ष्य डेटा को तब तक स्थगित कर देंगे जब तक कि सभी लक्ष्यों को एक टपल बनाने के लिए आवश्यक डेटा की पेशकश नहीं की जाती है, जिस बिंदु पर ब्लॉक दो-चरण प्रतिबद्ध प्रोटोकॉल में संलग्न होगा स्रोतों से सभी आवश्यक वस्तुओं को अनधिकृत रूप से पुनर्प्राप्त करने के लिए। यह स्थगन किसी अन्य इकाई के लिए इस बीच डेटा का उपभोग करने के लिए संभव बनाता है ताकि समग्र प्रणाली को आगे बढ़ने की अनुमति मिल सके।
संसाधित वस्तुओं की सीमित संख्या के साथ प्रसंस्करण अनुरोध
var throttle = new JoinBlock<ExpensiveObject, Request>();
for(int i=0; i<10; i++)
{
requestProcessor.Target1.Post(new ExpensiveObject());
}
var processor = new Transform<Tuple<ExpensiveObject, Request>, ExpensiveObject>(pair =>
{
var resource = pair.Item1;
var request = pair.Item2;
request.ProcessWith(resource);
return resource;
});
throttle.LinkTo(processor);
processor.LinkTo(throttle.Target1);
स्टीफन Toub द्वारा TPL डेटाफ़्लो का परिचय
BroadcastBlock
(किसी आइटम को कॉपी करें और हर ब्लॉक को कॉपी भेजें जिसे वह लिंक किया गया है)
बफ़रब्लॉक के विपरीत, जीवन में ब्रॉडकास्टब्लॉक का मिशन ब्लॉक से जुड़े सभी लक्ष्यों को प्रकाशित किए गए प्रत्येक तत्व की एक प्रति प्राप्त करने के लिए सक्षम करना है, जो इसे प्रचारित करने के साथ "वर्तमान" मूल्य को लगातार लिख रहे हैं।
इसके अतिरिक्त, BufferBlock के विपरीत, BroadcastBlock अनावश्यक रूप से डेटा पर पकड़ नहीं रखता है। सभी लक्ष्यों के लिए एक विशेष डेटम की पेशकश की जाने के बाद, वह तत्व जो भी डेटा का टुकड़ा लाइन के बगल में है (सभी डेटाफ़्लो ब्लॉक के साथ, संदेशों को FIFO क्रम में नियंत्रित किया जाता है) द्वारा अधिलेखित कर दिया जाएगा। वह तत्व सभी लक्ष्यों को प्रदान किया जाएगा, और इसी तरह।
असिंक्रोनस प्रोड्यूसर / कंज्यूमर विथ थ्रोटल्ड प्रोड्यूसर
var ui = TaskScheduler.FromCurrentSynchronizationContext();
var bb = new BroadcastBlock<ImageData>(i => i);
var saveToDiskBlock = new ActionBlock<ImageData>(item =>
item.Image.Save(item.Path)
);
var showInUiBlock = new ActionBlock<ImageData>(item =>
imagePanel.AddImage(item.Image),
new DataflowBlockOptions { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() }
);
bb.LinkTo(saveToDiskBlock);
bb.LinkTo(showInUiBlock);
एक एजेंट से स्थिति उजागर करना
public class MyAgent
{
public ISourceBlock<string> Status { get; private set; }
public MyAgent()
{
Status = new BroadcastBlock<string>();
Run();
}
private void Run()
{
Status.Post("Starting");
Status.Post("Doing cool stuff");
…
Status.Post("Done");
}
}
स्टीफन Toub द्वारा TPL डेटाफ़्लो का परिचय
WriteOnceBlock
(Readonly variable: इसके पहले डेटा आइटम को याद करता है और इसके आउटपुट के रूप में इसकी प्रतियां निकालता है। अन्य सभी डेटा आइटमों की उपेक्षा करता है)
यदि टीपीएल डेटाफ़्लो में बफ़रब्लॉक सबसे बुनियादी ब्लॉक है, तो राइटऑनब्लॉक सबसे सरल है।
यह ज्यादातर एक मूल्य पर संग्रहीत करता है, और एक बार उस मूल्य को सेट कर दिया गया है, इसे कभी भी प्रतिस्थापित या अधिलेखित नहीं किया जाएगा।
आप WriteOnceBlock को C # में एक पठनीय सदस्य चर के समान होने के बारे में सोच सकते हैं, सिवाय इसके कि केवल एक कंस्ट्रक्टर में बसने योग्य और फिर अपरिवर्तनीय होने के अलावा, यह केवल एक बार निपटाने योग्य है और फिर अपरिवर्तनीय है।
टास्क के संभावित आउटपुट को विभाजित करना
public static async void SplitIntoBlocks(this Task<T> task,
out IPropagatorBlock<T> result,
out IPropagatorBlock<Exception> exception)
{
result = new WriteOnceBlock<T>(i => i);
exception = new WriteOnceBlock<Exception>(i => i);
try
{
result.Post(await task);
}
catch(Exception ex)
{
exception.Post(ex);
}
}
स्टीफन Toub द्वारा TPL डेटाफ़्लो का परिचय
BatchedJoinBlock
(2-3 इनपुटों से कुल आइटमों की एक निश्चित संख्या एकत्र करता है और उन्हें डेटा आइटमों के संग्रह के एक समूह में शामिल करता है)
BatchedJoinBlock <T1, T2, ...> एक मायने में बैचब्लॉक और JoinBlock <T1, T2, ...> का एक संयोजन है।
जबकि JoinBlock <T1, T2, ...> का उपयोग प्रत्येक लक्ष्य से एक इनपुट को टूपल में एकत्रित करने के लिए किया जाता है, और B बैचब्लॉक का उपयोग N इनपुट को एक संग्रह में एकत्रित करने के लिए किया जाता है, BatchedJoinBlock <T1, T2, ...> का उपयोग एन इनपुट को एकत्रित करने के लिए किया जाता है। संग्रह के tuples में लक्ष्य के सभी।
स्कैटर / इकट्ठा
एक बिखराव / इकट्ठा समस्या पर विचार करें जहां एन ऑपरेशन लॉन्च किए गए हैं, जिनमें से कुछ स्ट्रिंग आउटपुट को सफल और उत्पादन कर सकते हैं, और जिनमें से अन्य विफल हो सकते हैं और अपवाद उत्पन्न कर सकते हैं।
var batchedJoin = new BatchedJoinBlock<string, Exception>(10);
for (int i=0; i<10; i++)
{
Task.Factory.StartNew(() => {
try { batchedJoin.Target1.Post(DoWork()); }
catch(Exception ex) { batchJoin.Target2.Post(ex); }
});
}
var results = await batchedJoin.ReceiveAsync();
foreach(string s in results.Item1)
{
Console.WriteLine(s);
}
foreach(Exception e in results.Item2)
{
Console.WriteLine(e);
}
स्टीफन Toub द्वारा TPL डेटाफ़्लो का परिचय
TransformBlock
(चुनें, एक-से-एक)
एक्शनब्लॉक, ट्रांसफॉर्मब्लॉक <टइनपुट, टुटपुट के साथ, प्रत्येक इनपुट डेटम के लिए कुछ कार्रवाई करने के लिए एक प्रतिनिधि के निष्पादन को सक्षम बनाता है; एक्शनब्लॉक के विपरीत, इस प्रसंस्करण का एक आउटपुट है। यह प्रतिनिधि एक Func <TInput, Toutput> हो सकता है, जिस स्थिति में प्रतिनिधि के वापस आने पर उस तत्व की प्रोसेसिंग पूरी हो जाती है, या यह एक Func <TInput, Task> हो सकती है, जिस स्थिति में उस तत्व का प्रसंस्करण पूरा नहीं माना जाता है। जब प्रतिनिधि लौटता है लेकिन जब लौटाया गया टास्क पूरा होता है। LINQ से परिचित लोगों के लिए, यह कुछ हद तक Select () के समान है जिसमें यह एक इनपुट लेता है, उस इनपुट को किसी तरीके से बदल देता है, और फिर एक आउटपुट तैयार करता है।
डिफ़ॉल्ट रूप से, TransformBlock <TInput, Toutput> अपने डेटा को क्रमिक रूप से MaxDegreeOfParallelism के साथ 1 के बराबर संसाधित करता है। बफर इनपुट प्राप्त करने और इसे संसाधित करने के अलावा, यह ब्लॉक अपने सभी संसाधित आउटपुट और बफर को ले जाएगा, साथ ही साथ (डेटा जो नहीं किया गया है) संसाधित और डेटा जो संसाधित किया गया है)।
इसके 2 कार्य हैं: एक डेटा को संसाधित करने के लिए, और दूसरा डेटा को अगले ब्लॉक पर ले जाने के लिए।
एक समवर्ती पाइपलाइन
var compressor = new TransformBlock<byte[], byte[]>(input => Compress(input));
var encryptor = new TransformBlock<byte[], byte[]>(input => Encrypt(input));
compressor.LinkTo(Encryptor);
स्टीफन Toub द्वारा TPL डेटाफ़्लो का परिचय
ActionBlock
(प्रत्येक के लिए)
इस वर्ग को तार्किक रूप से उस डेटा को संसाधित करने के लिए कार्यों के साथ संयुक्त डेटा के लिए एक बफर के रूप में सोचा जा सकता है, "डेटाफ्लो ब्लॉक" दोनों को प्रबंधित करने के साथ। इसके सबसे बुनियादी उपयोग में, हम एक ActionBlock और इसके लिए "पोस्ट" डेटा को त्वरित कर सकते हैं; ActionBlock के निर्माण में प्रदान किए गए प्रतिनिधि को पोस्ट किए गए प्रत्येक डेटा के लिए अतुल्यकालिक रूप से निष्पादित किया जाएगा।
तुल्यकालिक संगणना
var ab = new ActionBlock<TInput>(i =>
{
Compute(i);
});
…
ab.Post(1);
ab.Post(2);
ab.Post(3);
अतुल्यकालिक डाउनलोड को अधिकतम 5 समकालिक रूप से थ्रॉटलिंग करें
var downloader = new ActionBlock<string>(async url =>
{
byte [] imageData = await DownloadAsync(url);
Process(imageData);
}, new DataflowBlockOptions { MaxDegreeOfParallelism = 5 });
downloader.Post("http://website.com/path/to/images");
downloader.Post("http://another-website.com/path/to/images");
स्टीफन Toub द्वारा TPL डेटाफ़्लो का परिचय
TransformManyBlock
(SelectMany, 1-m: इस मैपिंग के परिणाम LINQ के SelectMany की तरह "चपटा" हैं,
TransformManyBlock <TInput, Toutput> TransformBlock <TInput, Toutput> से काफी मिलता-जुलता है।
मुख्य अंतर यह है कि जबकि एक ट्रांसफ़ॉर्मब्लॉक <टइनपुट, टुटपुट> प्रत्येक इनपुट के लिए एक और केवल एक आउटपुट का उत्पादन करता है, ट्रांसफॉर्मएमब्लॉक <टइनपुट, टुटपुट> प्रत्येक इनपुट के लिए कोई भी संख्या (शून्य या अधिक) आउटपुट उत्पन्न करता है। ActionBlock और TransformBlock <TInput, Toutput> के साथ, इस प्रसंस्करण को समकालिक और अतुल्यकालिक प्रसंस्करण दोनों के लिए प्रतिनिधियों का उपयोग करके निर्दिष्ट किया जा सकता है।
एक फंक <TInput, IEnumerable> का उपयोग सिंक्रोनस के लिए किया जाता है, और एक फंक <TInput, Task <IEnumerable> का उपयोग अतुल्यकालिक के लिए किया जाता है। ActionBlock और TransformBlock <TInput, Toutput>, TransformManyBlock <TInput, Toutput> दोनों के साथ अनुक्रमिक प्रसंस्करण के लिए चूक, लेकिन अन्यथा कॉन्फ़िगर किया जा सकता है।
मैपिंग डेलीगेट आइटम के एक संग्रह को फिर से जमा करता है, जिसे व्यक्तिगत रूप से आउटपुट बफर में डाला जाता है।
अतुल्यकालिक वेब क्रॉलर
var downloader = new TransformManyBlock<string, string>(async url =>
{
Console.WriteLine(“Downloading “ + url);
try
{
return ParseLinks(await DownloadContents(url));
}
catch{}
return Enumerable.Empty<string>();
});
downloader.LinkTo(downloader);
अपने संविधान के तत्वों में एक विस्तार का विस्तार करना
var expanded = new TransformManyBlock<T[], T>(array => array);
1 से 0 या 1 तत्वों से जाकर छानना
public IPropagatorBlock<T> CreateFilteredBuffer<T>(Predicate<T> filter)
{
return new TransformManyBlock<T, T>(item =>
filter(item) ? new [] { item } : Enumerable.Empty<T>());
}
स्टीफन Toub द्वारा TPL डेटाफ़्लो का परिचय
BatchBlock
(समूह की एक निश्चित संख्या में अनुक्रमिक डेटा आइटम डेटा संग्रह में)
बैचबॉक एन सिंगल आइटम को एक बैच आइटम में जोड़ता है, तत्वों की एक सरणी के रूप में दर्शाया गया है। एक उदाहरण एक विशिष्ट बैच आकार के साथ बनाया जाता है, और ब्लॉक तब बैच बनाता है जैसे ही यह प्राप्त होता है कि तत्वों की संख्या, अतुल्यकालिक रूप से बैच को आउटपुट बफर में आउटपुट करती है।
बैचब्लॉक लालची और गैर-लालची दोनों तरीकों में निष्पादित करने में सक्षम है।
- डिफ़ॉल्ट लालची मोड में, किसी भी संख्या के स्रोतों से ब्लॉक को दिए गए सभी संदेश स्वीकार किए जाते हैं और बफ़र्स को बैचों में परिवर्तित किया जाता है।
- गैर-लालची मोड में, सभी संदेशों को स्रोतों से स्थगित कर दिया जाता है जब तक कि पर्याप्त स्रोतों ने एक बैच बनाने के लिए ब्लॉक को संदेश की पेशकश नहीं की है। इस प्रकार, एक बैचब्लॉक का उपयोग प्रत्येक एन स्रोतों से 1 तत्व प्राप्त करने के लिए किया जा सकता है, 1 स्रोत से एन तत्व और बीच में विकल्पों का असंख्य।
डेटाबेस सबमिट करने के लिए 100 के समूहों में बैचिंग अनुरोध
var batchRequests = new BatchBlock<Request>(batchSize:100);
var sendToDb = new ActionBlock<Request[]>(reqs => SubmitToDatabase(reqs));
batchRequests.LinkTo(sendToDb);
एक सेकंड में एक बार बैच बनाना
var batch = new BatchBlock<T>(batchSize:Int32.MaxValue);
new Timer(() => { batch.TriggerBatch(); }).Change(1000, 1000);
स्टीफन Toub द्वारा TPL डेटाफ़्लो का परिचय
BufferBlock
(फीफो कतार: जो डेटा आता है वह डेटा है जो बाहर जाता है)
संक्षेप में, बफ़रब्लॉक टी के संचय के लिए एक अनबाउंड या बाउंडेड बफर प्रदान करता है।
आप ब्लॉक को T के "पोस्ट" इंस्टेंसेस कर सकते हैं, जिसके कारण पोस्ट को ब्लॉक द्वारा पहले-पहले-आउट (FIFO) ऑर्डर में स्टोर किया जा सकता है।
आप ब्लॉक से "प्राप्त" कर सकते हैं, जो आपको भविष्य में (फिर से, फीफो) में पहले से संग्रहीत या उपलब्ध टी के उदाहरणों को समकालिक या असिंक्रोनस रूप से प्राप्त करने की अनुमति देता है।
असिंक्रोनस प्रोड्यूसर / कंज्यूमर विथ थ्रोटल्ड प्रोड्यूसर
// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> _Buffer = new BufferBlock<int>(
new DataflowBlockOptions { BoundedCapacity = 10 });
// Producer
private static async void Producer()
{
while(true)
{
await _Buffer.SendAsync(Produce());
}
}
// Consumer
private static async Task Consumer()
{
while(true)
{
Process(await _Buffer.ReceiveAsync());
}
}
// Start the Producer and Consumer
private static async Task Run()
{
await Task.WhenAll(Producer(), Consumer());
}
स्टीफन Toub द्वारा TPL डेटाफ़्लो का परिचय