.NET Framework
Task Parallel Library (TPL)
Suche…
Bemerkungen
Zweck und Anwendungsfälle
Zweck der Task Parallel Library ist die Vereinfachung des Schreibens und Verwaltens von Multithread-Code und parallelem Code.
Einige Anwendungsfälle *:
- Beibehalten einer Benutzeroberfläche, indem Hintergrundarbeit für separate Aufgaben ausgeführt wird
- Verteilung der Arbeitslast
- Zulassen, dass eine Clientanwendung gleichzeitig Anforderungen sendet und empfängt (rest, TCP / UDP, ect)
- Mehrere Dateien gleichzeitig lesen und / oder schreiben
* Code sollte von Fall zu Fall für Multithreading betrachtet werden. Wenn zum Beispiel eine Schleife nur wenige Iterationen durchführt oder nur einen kleinen Teil der Arbeit erledigt, kann der Mehraufwand für Parallelität die Vorteile überwiegen.
TPL mit .Net 3.5
Die TPL ist auch für .Net 3.5 in einem NuGet-Paket verfügbar. Sie wird Task Parallel Library genannt.
Grundlegende Producer-Consumer-Schleife (BlockingCollection)
var collection = new BlockingCollection<int>(5);
var random = new Random();
var producerTask = Task.Run(() => {
for(int item=1; item<=10; item++)
{
collection.Add(item);
Console.WriteLine("Produced: " + item);
Thread.Sleep(random.Next(10,1000));
}
collection.CompleteAdding();
Console.WriteLine("Producer completed!");
});
Es ist erwähnenswert, wenn Sie collection.CompleteAdding();
nicht aufrufen.CompleteAdding collection.CompleteAdding();
Sie können der Sammlung auch dann weiter hinzufügen, wenn Ihre Consumer-Aufgabe ausgeführt wird. Rufen Sie einfach collection.CompleteAdding();
Wenn Sie sicher sind, gibt es keine weiteren Ergänzungen. Diese Funktion kann verwendet werden, um einen Multiple Producer zu einem einzigen Consumer-Muster zu machen, bei dem mehrere Quellen Elemente in die BlockingCollection einspeisen und ein einzelner Consumer Elemente herauszieht und mit diesen etwas tut. Wenn Ihre BlockingCollection leer ist, bevor Sie das vollständige Hinzufügen aufrufen, wird die Enumerable from collection.GetConsumingEnumerable()
so lange blockiert, bis der Collection ein neues Element hinzugefügt wird. wird aufgerufen und die Warteschlange ist leer.
var consumerTask = Task.Run(() => {
foreach(var item in collection.GetConsumingEnumerable())
{
Console.WriteLine("Consumed: " + item);
Thread.Sleep(random.Next(10,1000));
}
Console.WriteLine("Consumer completed!");
});
Task.WaitAll(producerTask, consumerTask);
Console.WriteLine("Everything completed!");
Aufgabe: Grundinstanziierung und Warten
Eine Aufgabe kann erstellt werden, indem die Task
Klasse direkt instanziiert wird ...
var task = new Task(() =>
{
Console.WriteLine("Task code starting...");
Thread.Sleep(2000);
Console.WriteLine("...task code ending!");
});
Console.WriteLine("Starting task...");
task.Start();
task.Wait();
Console.WriteLine("Task completed!");
... oder mit der statischen Task.Run
Methode:
Console.WriteLine("Starting task...");
var task = Task.Run(() =>
{
Console.WriteLine("Task code starting...");
Thread.Sleep(2000);
Console.WriteLine("...task code ending!");
});
task.Wait();
Console.WriteLine("Task completed!");
Beachten Sie, dass nur im ersten Fall Start
explizit aufgerufen werden muss.
Aufgabe: WaitAll und Variablenerfassung
var tasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() =>
{
Console.WriteLine("I'm task " + n);
return n;
})).ToArray();
foreach(var task in tasks) task.Start();
Task.WaitAll(tasks);
foreach(var task in tasks)
Console.WriteLine(task.Result);
Aufgabe: WaitAny
var allTasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() => n)).ToArray();
var pendingTasks = allTasks.ToArray();
foreach(var task in allTasks) task.Start();
while(pendingTasks.Length > 0)
{
var finishedTask = pendingTasks[Task.WaitAny(pendingTasks)];
Console.WriteLine("Task {0} finished", finishedTask.Result);
pendingTasks = pendingTasks.Except(new[] {finishedTask}).ToArray();
}
Task.WaitAll(allTasks);
Hinweis: Die letzte WaitAll
ist notwendig, da WaitAny
keine Ausnahmen verursacht.
Task: Ausnahmen behandeln (mit Wait)
var task1 = Task.Run(() =>
{
Console.WriteLine("Task 1 code starting...");
throw new Exception("Oh no, exception from task 1!!");
});
var task2 = Task.Run(() =>
{
Console.WriteLine("Task 2 code starting...");
throw new Exception("Oh no, exception from task 2!!");
});
Console.WriteLine("Starting tasks...");
try
{
Task.WaitAll(task1, task2);
}
catch(AggregateException ex)
{
Console.WriteLine("Task(s) failed!");
foreach(var inner in ex.InnerExceptions)
Console.WriteLine(inner.Message);
}
Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted
Aufgabe: Behandlung von Ausnahmen (ohne zu warten)
var task1 = Task.Run(() =>
{
Console.WriteLine("Task 1 code starting...");
throw new Exception("Oh no, exception from task 1!!");
});
var task2 = Task.Run(() =>
{
Console.WriteLine("Task 2 code starting...");
throw new Exception("Oh no, exception from task 2!!");
});
var tasks = new[] {task1, task2};
Console.WriteLine("Starting tasks...");
while(tasks.All(task => !task.IsCompleted));
foreach(var task in tasks)
{
if(task.IsFaulted)
Console.WriteLine("Task failed: " +
task.Exception.InnerExceptions.First().Message);
}
Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted
Aufgabe: Abbrechen mit Annullierungstoken
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
var task = new Task((state) =>
{
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(true)
{
Console.Write("{0} ", i++);
Thread.Sleep(1000);
myCancellationToken.ThrowIfCancellationRequested();
}
},
cancellationToken: cancellationToken,
state: cancellationToken);
Console.WriteLine("Counting to infinity. Press any key to cancel!");
task.Start();
Console.ReadKey();
cancellationTokenSource.Cancel();
try
{
task.Wait();
}
catch(AggregateException ex)
{
ex.Handle(inner => inner is OperationCanceledException);
}
Console.WriteLine($"{Environment.NewLine}You have cancelled! Task status is: {task.Status}");
//Canceled
Als Alternative zu ThrowIfCancellationRequested
kann die Stornierungsanforderung mit IsCancellationRequested
erkannt und eine OperationCanceledException
manuell ausgelöst werden:
//New task delegate
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(!myCancellationToken.IsCancellationRequested)
{
Console.Write("{0} ", i++);
Thread.Sleep(1000);
}
Console.WriteLine($"{Environment.NewLine}Ouch, I have been cancelled!!");
throw new OperationCanceledException(myCancellationToken);
Beachten Sie, wie das Annullierungstoken im Parameter cancellationToken
an den cancellationToken
wird. Dies ist erforderlich, damit die Task in den Status Canceled
Faulted
, nicht in den Faulted
, wenn ThrowIfCancellationRequested
aufgerufen wird. Aus demselben Grund wird das Annullierungstoken im zweiten Fall explizit im Konstruktor von OperationCanceledException
.
Task.WhenAny
var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(async() =>
{
Console.WriteLine("I'm task " + n);
await Task.Delay(random.Next(10,1000));
return n;
}));
Task<Task<int>> whenAnyTask = Task.WhenAny(tasks);
Task<int> completedTask = await whenAnyTask;
Console.WriteLine("The winner is: task " + await completedTask);
await Task.WhenAll(tasks);
Console.WriteLine("All tasks finished!");
Task.WhenAll
var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(() =>
{
Console.WriteLine("I'm task " + n);
return n;
}));
Task<int[]> task = Task.WhenAll(tasks);
int[] results = await task;
Console.WriteLine(string.Join(",", results.Select(n => n.ToString())));
// Output: 1,2,3,4,5
Parallel.Einruf
var actions = Enumerable.Range(1, 10).Select(n => new Action(() =>
{
Console.WriteLine("I'm task " + n);
if((n & 1) == 0)
throw new Exception("Exception from task " + n);
})).ToArray();
try
{
Parallel.Invoke(actions);
}
catch(AggregateException ex)
{
foreach(var inner in ex.InnerExceptions)
Console.WriteLine("Task failed: " + inner.Message);
}
Parallel.ForEach
In diesem Beispiel wird Parallel.ForEach
, um die Summe der Zahlen zwischen 1 und 10000 mithilfe mehrerer Threads zu berechnen. Um Thread-Sicherheit zu erreichen, wird Interlocked.Add
verwendet, um die Zahlen zu summieren.
using System.Threading;
int Foo()
{
int total = 0;
var numbers = Enumerable.Range(1, 10000).ToList();
Parallel.ForEach(numbers,
() => 0, // initial value,
(num, state, localSum) => num + localSum,
localSum => Interlocked.Add(ref total, localSum));
return total; // total = 50005000
}
Parallel.für
In diesem Beispiel wird Parallel.For
, um die Summe der Zahlen zwischen 1 und 10000 mithilfe mehrerer Threads zu berechnen. Um Thread-Sicherheit zu erreichen, wird Interlocked.Add
verwendet, um die Zahlen zu summieren.
using System.Threading;
int Foo()
{
int total = 0;
Parallel.For(1, 10001,
() => 0, // initial value,
(num, state, localSum) => num + localSum,
localSum => Interlocked.Add(ref total, localSum));
return total; // total = 50005000
}
Fließender Ausführungskontext mit AsyncLocal
Wenn Sie einige Daten von der übergeordneten Aufgabe an ihre AsyncLocal
Aufgaben übergeben müssen, damit sie bei der Ausführung logisch übertragen werden, verwenden Sie die AsyncLocal
Klasse :
void Main()
{
AsyncLocal<string> user = new AsyncLocal<string>();
user.Value = "initial user";
// this does not affect other tasks - values are local relative to the branches of execution flow
Task.Run(() => user.Value = "user from another task");
var task1 = Task.Run(() =>
{
Console.WriteLine(user.Value); // outputs "initial user"
Task.Run(() =>
{
// outputs "initial user" - value has flown from main method to this task without being changed
Console.WriteLine(user.Value);
}).Wait();
user.Value = "user from task1";
Task.Run(() =>
{
// outputs "user from task1" - value has flown from main method to task1
// than value was changed and flown to this task.
Console.WriteLine(user.Value);
}).Wait();
});
task1.Wait();
// ouputs "initial user" - changes do not propagate back upstream the execution flow
Console.WriteLine(user.Value);
}
Anmerkung: Wie aus dem obigen Beispiel AsynLocal.Value
hat AsynLocal.Value
eine copy on read
Semantik. Wenn Sie jedoch einen Referenztyp übergeben und seine Eigenschaften ändern, wirken sich dies auf andere Aufgaben aus. Daher AsyncLocal
es sich bei AsyncLocal
oder unveränderliche Typen zu verwenden.
Parallel.ForEach in VB.NET
For Each row As DataRow In FooDataTable.Rows
Me.RowsToProcess.Add(row)
Next
Dim myOptions As ParallelOptions = New ParallelOptions()
myOptions.MaxDegreeOfParallelism = environment.processorcount
Parallel.ForEach(RowsToProcess, myOptions, Sub(currentRow, state)
ProcessRowParallel(currentRow, state)
End Sub)
Aufgabe: Rückgabe eines Wertes
Eine Task, die einen Wert Task< TResult >
hat den Rückgabetyp Task< TResult >
wobei TResult der Wertetyp ist, der zurückgegeben werden muss. Sie können das Ergebnis einer Aufgabe über ihre Result-Eigenschaft abfragen.
Task<int> t = Task.Run(() =>
{
int sum = 0;
for(int i = 0; i < 500; i++)
sum += i;
return sum;
});
Console.WriteLine(t.Result); // Outuput 124750
Wenn die Task asynchron ausgeführt wird und die Task wartet, wird das Ergebnis zurückgegeben.
public async Task DoSomeWork()
{
WebClient client = new WebClient();
// Because the task is awaited, result of the task is assigned to response
string response = await client.DownloadStringTaskAsync("http://somedomain.com");
}