Suche…


Bemerkungen

Zweck und Anwendungsfälle

Zweck der Task Parallel Library ist die Vereinfachung des Schreibens und Verwaltens von Multithread-Code und parallelem Code.

Einige Anwendungsfälle *:

  • Beibehalten einer Benutzeroberfläche, indem Hintergrundarbeit für separate Aufgaben ausgeführt wird
  • Verteilung der Arbeitslast
  • Zulassen, dass eine Clientanwendung gleichzeitig Anforderungen sendet und empfängt (rest, TCP / UDP, ect)
  • Mehrere Dateien gleichzeitig lesen und / oder schreiben

* Code sollte von Fall zu Fall für Multithreading betrachtet werden. Wenn zum Beispiel eine Schleife nur wenige Iterationen durchführt oder nur einen kleinen Teil der Arbeit erledigt, kann der Mehraufwand für Parallelität die Vorteile überwiegen.

TPL mit .Net 3.5

Die TPL ist auch für .Net 3.5 in einem NuGet-Paket verfügbar. Sie wird Task Parallel Library genannt.

Grundlegende Producer-Consumer-Schleife (BlockingCollection)

var collection = new BlockingCollection<int>(5);
var random = new Random();

var producerTask = Task.Run(() => {
    for(int item=1; item<=10; item++) 
    {
        collection.Add(item);
        Console.WriteLine("Produced: " + item);
        Thread.Sleep(random.Next(10,1000));
    }
    collection.CompleteAdding();
    Console.WriteLine("Producer completed!");
});

Es ist erwähnenswert, wenn Sie collection.CompleteAdding(); nicht aufrufen.CompleteAdding collection.CompleteAdding(); Sie können der Sammlung auch dann weiter hinzufügen, wenn Ihre Consumer-Aufgabe ausgeführt wird. Rufen Sie einfach collection.CompleteAdding(); Wenn Sie sicher sind, gibt es keine weiteren Ergänzungen. Diese Funktion kann verwendet werden, um einen Multiple Producer zu einem einzigen Consumer-Muster zu machen, bei dem mehrere Quellen Elemente in die BlockingCollection einspeisen und ein einzelner Consumer Elemente herauszieht und mit diesen etwas tut. Wenn Ihre BlockingCollection leer ist, bevor Sie das vollständige Hinzufügen aufrufen, wird die Enumerable from collection.GetConsumingEnumerable() so lange blockiert, bis der Collection ein neues Element hinzugefügt wird. wird aufgerufen und die Warteschlange ist leer.

var consumerTask = Task.Run(() => {
    foreach(var item in collection.GetConsumingEnumerable())
    {
        Console.WriteLine("Consumed: " + item);
        Thread.Sleep(random.Next(10,1000));
    }
    Console.WriteLine("Consumer completed!");
});
  
Task.WaitAll(producerTask, consumerTask);
       
Console.WriteLine("Everything completed!");

Aufgabe: Grundinstanziierung und Warten

Eine Aufgabe kann erstellt werden, indem die Task Klasse direkt instanziiert wird ...

var task = new Task(() =>
{
    Console.WriteLine("Task code starting...");
    Thread.Sleep(2000);
    Console.WriteLine("...task code ending!");
});

Console.WriteLine("Starting task...");
task.Start();
task.Wait();
Console.WriteLine("Task completed!");

... oder mit der statischen Task.Run Methode:

Console.WriteLine("Starting task...");
var task = Task.Run(() =>
{
    Console.WriteLine("Task code starting...");
    Thread.Sleep(2000);
    Console.WriteLine("...task code ending!");
});
task.Wait();
Console.WriteLine("Task completed!");

Beachten Sie, dass nur im ersten Fall Start explizit aufgerufen werden muss.

Aufgabe: WaitAll und Variablenerfassung

var tasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() =>
{
    Console.WriteLine("I'm task " + n);
    return n;
})).ToArray();

foreach(var task in tasks) task.Start();
Task.WaitAll(tasks);

foreach(var task in tasks)
    Console.WriteLine(task.Result);

Aufgabe: WaitAny

var allTasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() => n)).ToArray();
var pendingTasks = allTasks.ToArray();

foreach(var task in allTasks) task.Start();

while(pendingTasks.Length > 0)
{
    var finishedTask = pendingTasks[Task.WaitAny(pendingTasks)];
    Console.WriteLine("Task {0} finished", finishedTask.Result);
    pendingTasks = pendingTasks.Except(new[] {finishedTask}).ToArray();
}

Task.WaitAll(allTasks);

Hinweis: Die letzte WaitAll ist notwendig, da WaitAny keine Ausnahmen verursacht.

Task: Ausnahmen behandeln (mit Wait)

var task1 = Task.Run(() =>
{
    Console.WriteLine("Task 1 code starting...");
    throw new Exception("Oh no, exception from task 1!!");
});

var task2 = Task.Run(() =>
{
    Console.WriteLine("Task 2 code starting...");
    throw new Exception("Oh no, exception from task 2!!");
});

Console.WriteLine("Starting tasks...");
try
{
    Task.WaitAll(task1, task2);
}
catch(AggregateException ex)
{
    Console.WriteLine("Task(s) failed!");
    foreach(var inner in ex.InnerExceptions)
        Console.WriteLine(inner.Message);
}

Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted

Aufgabe: Behandlung von Ausnahmen (ohne zu warten)

var task1 = Task.Run(() =>
{
    Console.WriteLine("Task 1 code starting...");
    throw new Exception("Oh no, exception from task 1!!");
});

var task2 = Task.Run(() =>
{
    Console.WriteLine("Task 2 code starting...");
    throw new Exception("Oh no, exception from task 2!!");
});

var tasks = new[] {task1, task2};

Console.WriteLine("Starting tasks...");
while(tasks.All(task => !task.IsCompleted));

foreach(var task in tasks)
{
    if(task.IsFaulted)
        Console.WriteLine("Task failed: " +
            task.Exception.InnerExceptions.First().Message);
}

Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted

Aufgabe: Abbrechen mit Annullierungstoken

var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;

var task = new Task((state) =>
    {
        int i = 1;
        var myCancellationToken = (CancellationToken)state;
        while(true)
        {
            Console.Write("{0} ", i++);
            Thread.Sleep(1000);
            myCancellationToken.ThrowIfCancellationRequested();
        }
    },
    cancellationToken: cancellationToken,
    state: cancellationToken);

Console.WriteLine("Counting to infinity. Press any key to cancel!");
task.Start();
Console.ReadKey();

cancellationTokenSource.Cancel();
try
{
    task.Wait();
}
catch(AggregateException ex)
{
    ex.Handle(inner => inner is OperationCanceledException);
}

Console.WriteLine($"{Environment.NewLine}You have cancelled! Task status is: {task.Status}");
//Canceled

Als Alternative zu ThrowIfCancellationRequested kann die Stornierungsanforderung mit IsCancellationRequested erkannt und eine OperationCanceledException manuell ausgelöst werden:

//New task delegate
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(!myCancellationToken.IsCancellationRequested)
{
    Console.Write("{0} ", i++);
    Thread.Sleep(1000);
}
Console.WriteLine($"{Environment.NewLine}Ouch, I have been cancelled!!");
throw new OperationCanceledException(myCancellationToken);

Beachten Sie, wie das Annullierungstoken im Parameter cancellationToken an den cancellationToken wird. Dies ist erforderlich, damit die Task in den Status Canceled Faulted , nicht in den Faulted , wenn ThrowIfCancellationRequested aufgerufen wird. Aus demselben Grund wird das Annullierungstoken im zweiten Fall explizit im Konstruktor von OperationCanceledException .

Task.WhenAny

var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(async() =>
{
    Console.WriteLine("I'm task " + n);
    await Task.Delay(random.Next(10,1000));
    return n;
}));

Task<Task<int>> whenAnyTask = Task.WhenAny(tasks);
Task<int> completedTask = await whenAnyTask;
Console.WriteLine("The winner is: task " + await completedTask);

await Task.WhenAll(tasks);
Console.WriteLine("All tasks finished!");

Task.WhenAll

var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(() =>
{
    Console.WriteLine("I'm task " + n);
    return n;
}));

Task<int[]> task = Task.WhenAll(tasks);
int[] results = await task;

Console.WriteLine(string.Join(",", results.Select(n => n.ToString())));
// Output: 1,2,3,4,5

Parallel.Einruf

var actions = Enumerable.Range(1, 10).Select(n => new Action(() =>
{
    Console.WriteLine("I'm task " + n);
    if((n & 1) == 0)
        throw new Exception("Exception from task " + n);
})).ToArray();

try
{
    Parallel.Invoke(actions);
}
catch(AggregateException ex)
{
    foreach(var inner in ex.InnerExceptions)
        Console.WriteLine("Task failed: " + inner.Message);
}

Parallel.ForEach

In diesem Beispiel wird Parallel.ForEach , um die Summe der Zahlen zwischen 1 und 10000 mithilfe mehrerer Threads zu berechnen. Um Thread-Sicherheit zu erreichen, wird Interlocked.Add verwendet, um die Zahlen zu summieren.

using System.Threading;

int Foo()
{
    int total = 0;
    var numbers = Enumerable.Range(1, 10000).ToList();
    Parallel.ForEach(numbers, 
        () => 0, // initial value,
        (num, state, localSum) => num + localSum,
        localSum => Interlocked.Add(ref total, localSum));
    return total; // total = 50005000
}

Parallel.für

In diesem Beispiel wird Parallel.For , um die Summe der Zahlen zwischen 1 und 10000 mithilfe mehrerer Threads zu berechnen. Um Thread-Sicherheit zu erreichen, wird Interlocked.Add verwendet, um die Zahlen zu summieren.

using System.Threading;

int Foo()
{
    int total = 0;
    Parallel.For(1, 10001, 
        () => 0, // initial value,
        (num, state, localSum) => num + localSum,
        localSum => Interlocked.Add(ref total, localSum));
    return total; // total = 50005000
}

Fließender Ausführungskontext mit AsyncLocal

Wenn Sie einige Daten von der übergeordneten Aufgabe an ihre AsyncLocal Aufgaben übergeben müssen, damit sie bei der Ausführung logisch übertragen werden, verwenden Sie die AsyncLocal Klasse :

void Main()
{
    AsyncLocal<string> user = new AsyncLocal<string>();
    user.Value = "initial user";
    
    // this does not affect other tasks - values are local relative to the branches of execution flow
    Task.Run(() => user.Value = "user from another task"); 
    
    var task1 = Task.Run(() =>
    {
        Console.WriteLine(user.Value); // outputs "initial user"
        Task.Run(() =>
        {
            // outputs "initial user" - value has flown from main method to this task without being changed
            Console.WriteLine(user.Value);
        }).Wait();

        user.Value = "user from task1";

        Task.Run(() =>
        {
            // outputs "user from task1" - value has flown from main method to task1
            // than value was changed and flown to this task.
            Console.WriteLine(user.Value);
        }).Wait();
    });
    
    task1.Wait();
    
    // ouputs "initial user" - changes do not propagate back upstream the execution flow    
    Console.WriteLine(user.Value); 
}

Anmerkung: Wie aus dem obigen Beispiel AsynLocal.Value hat AsynLocal.Value eine copy on read Semantik. Wenn Sie jedoch einen Referenztyp übergeben und seine Eigenschaften ändern, wirken sich dies auf andere Aufgaben aus. Daher AsyncLocal es sich bei AsyncLocal oder unveränderliche Typen zu verwenden.

Parallel.ForEach in VB.NET

For Each row As DataRow In FooDataTable.Rows
    Me.RowsToProcess.Add(row)
Next

Dim myOptions As ParallelOptions = New ParallelOptions()
myOptions.MaxDegreeOfParallelism = environment.processorcount

Parallel.ForEach(RowsToProcess, myOptions, Sub(currentRow, state)
                                               ProcessRowParallel(currentRow, state)
                                           End Sub)

Aufgabe: Rückgabe eines Wertes

Eine Task, die einen Wert Task< TResult > hat den Rückgabetyp Task< TResult > wobei TResult der Wertetyp ist, der zurückgegeben werden muss. Sie können das Ergebnis einer Aufgabe über ihre Result-Eigenschaft abfragen.

Task<int> t = Task.Run(() => 
    {
        int sum = 0;

        for(int i = 0; i < 500; i++)
            sum += i;

        return sum;
    });

Console.WriteLine(t.Result); // Outuput 124750

Wenn die Task asynchron ausgeführt wird und die Task wartet, wird das Ergebnis zurückgegeben.

public async Task DoSomeWork()
{
    WebClient client = new WebClient();
    // Because the task is awaited, result of the task is assigned to response
    string response = await client.DownloadStringTaskAsync("http://somedomain.com");
}


Modified text is an extract of the original Stack Overflow Documentation
Lizenziert unter CC BY-SA 3.0
Nicht angeschlossen an Stack Overflow