.NET Framework
टास्क पैरेलल लाइब्रेरी (TPL)
खोज…
टिप्पणियों
उद्देश्य और उपयोग के मामले
टास्क पैरेलल लाइब्रेरी का उद्देश्य लेखन की प्रक्रिया को सरल बनाना और बहुस्तरीय और समानांतर कोड को बनाए रखना है।
कुछ उपयोग मामले *:
- अलग कार्य पर पृष्ठभूमि का काम चलाकर UI को उत्तरदायी बनाए रखना
- वर्कलोड का वितरण
- एक ही समय में अनुरोध भेजने के लिए और प्राप्त करने के लिए एक ग्राहक आवेदन की अनुमति देना (बाकी, टीसीपी / यूडीपी, ect)
- एक साथ कई फाइलें पढ़ना और / या लिखना
* कोड को मल्टीथ्रेडिंग के लिए केस के आधार पर विचार किया जाना चाहिए। उदाहरण के लिए, यदि एक लूप में केवल कुछ पुनरावृत्तियां होती हैं या केवल थोड़ी मात्रा में काम करता है, तो समानता के लिए ओवरहेड लाभ से आगे निकल सकता है।
TPL .net 3.5 के साथ
TPL .Net 3.5 के लिए भी उपलब्ध है। एक NuGet पैकेज में शामिल है। इसे टास्क पैरेलल लाइब्रेरी कहा जाता है।
मूल उत्पादक-उपभोक्ता लूप (ब्लॉकिंगकॉलिनेशन)
var collection = new BlockingCollection<int>(5);
var random = new Random();
var producerTask = Task.Run(() => {
for(int item=1; item<=10; item++)
{
collection.Add(item);
Console.WriteLine("Produced: " + item);
Thread.Sleep(random.Next(10,1000));
}
collection.CompleteAdding();
Console.WriteLine("Producer completed!");
});
यह ध्यान देने योग्य है कि यदि आप collection.CompleteAdding();
कॉल नहीं करते हैं। पूर्णविराम collection.CompleteAdding();
, यदि आपका उपभोक्ता कार्य चल रहा है तो भी आप संग्रह में शामिल कर सकते हैं। बस collection.CompleteAdding();
बुलाओ collection.CompleteAdding();
जब आप सुनिश्चित हों कि कुछ और जोड़ नहीं हैं। इस कार्यक्षमता का उपयोग एक एकल उपभोक्ता पैटर्न के लिए एक मल्टीपल प्रोड्यूसर बनाने के लिए किया जा सकता है, जहाँ आपके पास कई स्रोत हैं जो ब्लॉकिंगकॉलक्शन में आइटम खिलाते हैं और एक एकल उपभोक्ता वस्तुओं को बाहर निकालते हैं और उनके साथ कुछ करते हैं। यदि आपका पूर्ण जोड़ने से पहले आपका ब्लॉकिंगकॉलेक्शन खाली है, तो collection.GetConsumingEnumerable()
से Enumerable.GetConsumingEnumerable collection.GetConsumingEnumerable()
तब तक ब्लॉक रहेगा जब तक कि कोई नया आइटम संग्रह या ब्लॉकिंग में जोड़ा नहीं जाता है। कहा जाता है और कतार खाली है।
var consumerTask = Task.Run(() => {
foreach(var item in collection.GetConsumingEnumerable())
{
Console.WriteLine("Consumed: " + item);
Thread.Sleep(random.Next(10,1000));
}
Console.WriteLine("Consumer completed!");
});
Task.WaitAll(producerTask, consumerTask);
Console.WriteLine("Everything completed!");
कार्य: मूल तात्कालिकता और प्रतीक्षा करें
Task
क्लास को तुरंत शुरू करके एक कार्य बनाया जा सकता है ...
var task = new Task(() =>
{
Console.WriteLine("Task code starting...");
Thread.Sleep(2000);
Console.WriteLine("...task code ending!");
});
Console.WriteLine("Starting task...");
task.Start();
task.Wait();
Console.WriteLine("Task completed!");
... या स्थैतिक Task.Run
का उपयोग करके।
Console.WriteLine("Starting task...");
var task = Task.Run(() =>
{
Console.WriteLine("Task code starting...");
Thread.Sleep(2000);
Console.WriteLine("...task code ending!");
});
task.Wait();
Console.WriteLine("Task completed!");
ध्यान दें कि केवल पहले मामले में Start
को स्पष्ट रूप से लागू करना आवश्यक है।
कार्य: WaitAll और चर कैप्चरिंग
var tasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() =>
{
Console.WriteLine("I'm task " + n);
return n;
})).ToArray();
foreach(var task in tasks) task.Start();
Task.WaitAll(tasks);
foreach(var task in tasks)
Console.WriteLine(task.Result);
कार्य: WaitAny
var allTasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() => n)).ToArray();
var pendingTasks = allTasks.ToArray();
foreach(var task in allTasks) task.Start();
while(pendingTasks.Length > 0)
{
var finishedTask = pendingTasks[Task.WaitAny(pendingTasks)];
Console.WriteLine("Task {0} finished", finishedTask.Result);
pendingTasks = pendingTasks.Except(new[] {finishedTask}).ToArray();
}
Task.WaitAll(allTasks);
नोट: अंतिम WaitAll
आवश्यक है क्योंकि WaitAny
अपवादों को नहीं WaitAny
है।
कार्य: अपवादों को संभालना (प्रतीक्षा का उपयोग करना)
var task1 = Task.Run(() =>
{
Console.WriteLine("Task 1 code starting...");
throw new Exception("Oh no, exception from task 1!!");
});
var task2 = Task.Run(() =>
{
Console.WriteLine("Task 2 code starting...");
throw new Exception("Oh no, exception from task 2!!");
});
Console.WriteLine("Starting tasks...");
try
{
Task.WaitAll(task1, task2);
}
catch(AggregateException ex)
{
Console.WriteLine("Task(s) failed!");
foreach(var inner in ex.InnerExceptions)
Console.WriteLine(inner.Message);
}
Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted
कार्य: अपवादों को संभालना (प्रतीक्षा का उपयोग किए बिना)
var task1 = Task.Run(() =>
{
Console.WriteLine("Task 1 code starting...");
throw new Exception("Oh no, exception from task 1!!");
});
var task2 = Task.Run(() =>
{
Console.WriteLine("Task 2 code starting...");
throw new Exception("Oh no, exception from task 2!!");
});
var tasks = new[] {task1, task2};
Console.WriteLine("Starting tasks...");
while(tasks.All(task => !task.IsCompleted));
foreach(var task in tasks)
{
if(task.IsFaulted)
Console.WriteLine("Task failed: " +
task.Exception.InnerExceptions.First().Message);
}
Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted
कार्य: रद्दीकरण का उपयोग कर रद्द कर रहा है
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
var task = new Task((state) =>
{
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(true)
{
Console.Write("{0} ", i++);
Thread.Sleep(1000);
myCancellationToken.ThrowIfCancellationRequested();
}
},
cancellationToken: cancellationToken,
state: cancellationToken);
Console.WriteLine("Counting to infinity. Press any key to cancel!");
task.Start();
Console.ReadKey();
cancellationTokenSource.Cancel();
try
{
task.Wait();
}
catch(AggregateException ex)
{
ex.Handle(inner => inner is OperationCanceledException);
}
Console.WriteLine($"{Environment.NewLine}You have cancelled! Task status is: {task.Status}");
//Canceled
ThrowIfCancellationRequested
विकल्प के रूप में, रद्द करने का अनुरोध IsCancellationRequested
साथ पता लगाया जा सकता है और एक OperationCanceledException
मैन्युअल रूप से फेंका जा सकता है:
//New task delegate
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(!myCancellationToken.IsCancellationRequested)
{
Console.Write("{0} ", i++);
Thread.Sleep(1000);
}
Console.WriteLine($"{Environment.NewLine}Ouch, I have been cancelled!!");
throw new OperationCanceledException(myCancellationToken);
ध्यान दें कि रद्द करने का टोकन कैसे कार्य cancellationToken
को cancellationToken
है। यह आवश्यक है ताकि Canceled
स्थिति में कार्य संक्रमण, न कि Faulted
स्थिति में, जब ThrowIfCancellationRequested
को लागू किया जाता है। इसके अलावा, इसी कारण से, कैंसेलेशन टोकन को स्पष्ट रूप से दूसरे मामले में OperationCanceledException
के निर्माता में आपूर्ति की जाती है।
Task.WhenAny
var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(async() =>
{
Console.WriteLine("I'm task " + n);
await Task.Delay(random.Next(10,1000));
return n;
}));
Task<Task<int>> whenAnyTask = Task.WhenAny(tasks);
Task<int> completedTask = await whenAnyTask;
Console.WriteLine("The winner is: task " + await completedTask);
await Task.WhenAll(tasks);
Console.WriteLine("All tasks finished!");
Task.WhenAll
var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(() =>
{
Console.WriteLine("I'm task " + n);
return n;
}));
Task<int[]> task = Task.WhenAll(tasks);
int[] results = await task;
Console.WriteLine(string.Join(",", results.Select(n => n.ToString())));
// Output: 1,2,3,4,5
Parallel.Invoke
var actions = Enumerable.Range(1, 10).Select(n => new Action(() =>
{
Console.WriteLine("I'm task " + n);
if((n & 1) == 0)
throw new Exception("Exception from task " + n);
})).ToArray();
try
{
Parallel.Invoke(actions);
}
catch(AggregateException ex)
{
foreach(var inner in ex.InnerExceptions)
Console.WriteLine("Task failed: " + inner.Message);
}
Parallel.ForEach
यह उदाहरण कई धागे का उपयोग करके 1 और 10000 के बीच की संख्या की गणना करने के लिए Parallel.ForEach
का उपयोग करता है। थ्रेड-सुरक्षा प्राप्त करने के लिए, Interlocked.Add
किए गए। संख्या का योग करने के लिए प्रयोग किया जाता है।
using System.Threading;
int Foo()
{
int total = 0;
var numbers = Enumerable.Range(1, 10000).ToList();
Parallel.ForEach(numbers,
() => 0, // initial value,
(num, state, localSum) => num + localSum,
localSum => Interlocked.Add(ref total, localSum));
return total; // total = 50005000
}
Parallel.For
यह उदाहरण कई धागों का उपयोग करके 1 और 10000 के बीच की संख्या की गणना करने के लिए Parallel.For
का उपयोग करता है। थ्रेड-सुरक्षा प्राप्त करने के लिए, Interlocked.Add
किए गए। संख्या का योग करने के लिए प्रयोग किया जाता है।
using System.Threading;
int Foo()
{
int total = 0;
Parallel.For(1, 10001,
() => 0, // initial value,
(num, state, localSum) => num + localSum,
localSum => Interlocked.Add(ref total, localSum));
return total; // total = 50005000
}
AsyncLocal के साथ बह निष्पादन निष्पादन
जब आपको पेरेंट टास्क से कुछ डेटा अपने बच्चों के कामों में पास करने की आवश्यकता होती है, तो यह तार्किक रूप से निष्पादन के साथ बहता है, AsyncLocal
क्लास का उपयोग करें:
void Main()
{
AsyncLocal<string> user = new AsyncLocal<string>();
user.Value = "initial user";
// this does not affect other tasks - values are local relative to the branches of execution flow
Task.Run(() => user.Value = "user from another task");
var task1 = Task.Run(() =>
{
Console.WriteLine(user.Value); // outputs "initial user"
Task.Run(() =>
{
// outputs "initial user" - value has flown from main method to this task without being changed
Console.WriteLine(user.Value);
}).Wait();
user.Value = "user from task1";
Task.Run(() =>
{
// outputs "user from task1" - value has flown from main method to task1
// than value was changed and flown to this task.
Console.WriteLine(user.Value);
}).Wait();
});
task1.Wait();
// ouputs "initial user" - changes do not propagate back upstream the execution flow
Console.WriteLine(user.Value);
}
नोट: जैसा कि AsynLocal.Value
ऊपर के उदाहरण से देखा जा सकता है। AsynLocal.Value
में copy on read
सिमेंटिक copy on read
, लेकिन यदि आप कुछ संदर्भ प्रकार प्रवाहित करते हैं और इसके गुणों को बदलते हैं तो आप अन्य कार्यों को प्रभावित करेंगे। इसलिए, AsyncLocal
साथ सबसे अच्छा अभ्यास मूल्य प्रकार या अपरिवर्तनीय प्रकारों का उपयोग करना है।
VB.NET में Parallel.ForEach
For Each row As DataRow In FooDataTable.Rows
Me.RowsToProcess.Add(row)
Next
Dim myOptions As ParallelOptions = New ParallelOptions()
myOptions.MaxDegreeOfParallelism = environment.processorcount
Parallel.ForEach(RowsToProcess, myOptions, Sub(currentRow, state)
ProcessRowParallel(currentRow, state)
End Sub)
कार्य: एक मान लौटाते हुए
मान वापस करने वाले Task< TResult >
में वापसी प्रकार का Task< TResult >
जहाँ TResult मान का प्रकार है जिसे वापस करने की आवश्यकता होती है। आप किसी परिणाम के परिणाम के आधार पर क्वेरी कर सकते हैं।
Task<int> t = Task.Run(() =>
{
int sum = 0;
for(int i = 0; i < 500; i++)
sum += i;
return sum;
});
Console.WriteLine(t.Result); // Outuput 124750
यदि कार्य परिणाम के टास्क रिटर्न की प्रतीक्षा करने की तुलना में असिंक्रोनस रूप से निष्पादित करता है।
public async Task DoSomeWork()
{
WebClient client = new WebClient();
// Because the task is awaited, result of the task is assigned to response
string response = await client.DownloadStringTaskAsync("http://somedomain.com");
}