Zoeken…


Opmerkingen

Doel en use cases

Het doel van de Task Parallel Library is om het schrijven en onderhouden van multithreaded en parallelle code te vereenvoudigen.

Enkele use cases *:

  • Een gebruikersinterface responsief houden door achtergrondwerk op een afzonderlijke taak uit te voeren
  • Werklast verdelen
  • Toestaan dat een clienttoepassing tegelijkertijd aanvragen verzendt en ontvangt (rest, TCP / UDP, ect)
  • Meerdere bestanden tegelijk lezen en / of schrijven

* Code moet per geval worden bekeken voor multithreading. Als een lus bijvoorbeeld slechts enkele iteraties heeft of slechts een klein deel van het werk doet, kan de overhead voor parallellisme opwegen tegen de voordelen.

TPL met .Net 3.5

De TPL is ook beschikbaar voor .Net 3.5 opgenomen in een NuGet-pakket, het heet Task Parallel Library.

Basis producent-consument lus (BlockingCollection)

var collection = new BlockingCollection<int>(5);
var random = new Random();

var producerTask = Task.Run(() => {
    for(int item=1; item<=10; item++) 
    {
        collection.Add(item);
        Console.WriteLine("Produced: " + item);
        Thread.Sleep(random.Next(10,1000));
    }
    collection.CompleteAdding();
    Console.WriteLine("Producer completed!");
});

Het is vermeldenswaard dat als u collection.CompleteAdding(); niet aanroept collection.CompleteAdding(); , kunt u blijven toevoegen aan de verzameling, zelfs als uw consumententaak wordt uitgevoerd. Roep gewoon collection.CompleteAdding(); wanneer u zeker weet dat er geen toevoegingen meer zijn. Deze functionaliteit kan worden gebruikt om van een meervoudige producent een enkel consumentenpatroon te maken waarbij u meerdere bronnen hebt die items in de BlockingCollection invoeren en een enkele consument items eruit haalt en er iets mee doet. Als uw BlockingCollection leeg is voordat u de volledige toevoeging aanroept, wordt de Enumerable from collection.GetConsumingEnumerable() geblokkeerd totdat een nieuw item aan de collectie wordt toegevoegd of BlockingCollection.CompleteAdding (); wordt aangeroepen en de wachtrij is leeg.

var consumerTask = Task.Run(() => {
    foreach(var item in collection.GetConsumingEnumerable())
    {
        Console.WriteLine("Consumed: " + item);
        Thread.Sleep(random.Next(10,1000));
    }
    Console.WriteLine("Consumer completed!");
});
  
Task.WaitAll(producerTask, consumerTask);
       
Console.WriteLine("Everything completed!");

Taak: basisinstantie en wachten

Een taak kan worden gecreëerd door direct instantiëren de Task klasse ...

var task = new Task(() =>
{
    Console.WriteLine("Task code starting...");
    Thread.Sleep(2000);
    Console.WriteLine("...task code ending!");
});

Console.WriteLine("Starting task...");
task.Start();
task.Wait();
Console.WriteLine("Task completed!");

... of door de statische Task.Run methode te gebruiken:

Console.WriteLine("Starting task...");
var task = Task.Run(() =>
{
    Console.WriteLine("Task code starting...");
    Thread.Sleep(2000);
    Console.WriteLine("...task code ending!");
});
task.Wait();
Console.WriteLine("Task completed!");

Merk op dat alleen in het eerste geval expliciet een beroep moet worden gedaan op Start .

Taak: WaitAll en het vastleggen van variabelen

var tasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() =>
{
    Console.WriteLine("I'm task " + n);
    return n;
})).ToArray();

foreach(var task in tasks) task.Start();
Task.WaitAll(tasks);

foreach(var task in tasks)
    Console.WriteLine(task.Result);

Taak: WaitAny

var allTasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() => n)).ToArray();
var pendingTasks = allTasks.ToArray();

foreach(var task in allTasks) task.Start();

while(pendingTasks.Length > 0)
{
    var finishedTask = pendingTasks[Task.WaitAny(pendingTasks)];
    Console.WriteLine("Task {0} finished", finishedTask.Result);
    pendingTasks = pendingTasks.Except(new[] {finishedTask}).ToArray();
}

Task.WaitAll(allTasks);

Opmerking: de laatste WaitAll is noodzakelijk omdat WaitAny geen uitzonderingen veroorzaakt.

Taak: uitzonderingen verwerken (met Wacht)

var task1 = Task.Run(() =>
{
    Console.WriteLine("Task 1 code starting...");
    throw new Exception("Oh no, exception from task 1!!");
});

var task2 = Task.Run(() =>
{
    Console.WriteLine("Task 2 code starting...");
    throw new Exception("Oh no, exception from task 2!!");
});

Console.WriteLine("Starting tasks...");
try
{
    Task.WaitAll(task1, task2);
}
catch(AggregateException ex)
{
    Console.WriteLine("Task(s) failed!");
    foreach(var inner in ex.InnerExceptions)
        Console.WriteLine(inner.Message);
}

Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted

Taak: uitzonderingen verwerken (zonder Wacht te gebruiken)

var task1 = Task.Run(() =>
{
    Console.WriteLine("Task 1 code starting...");
    throw new Exception("Oh no, exception from task 1!!");
});

var task2 = Task.Run(() =>
{
    Console.WriteLine("Task 2 code starting...");
    throw new Exception("Oh no, exception from task 2!!");
});

var tasks = new[] {task1, task2};

Console.WriteLine("Starting tasks...");
while(tasks.All(task => !task.IsCompleted));

foreach(var task in tasks)
{
    if(task.IsFaulted)
        Console.WriteLine("Task failed: " +
            task.Exception.InnerExceptions.First().Message);
}

Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted

Taak: annuleren met Annuleringstoken

var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;

var task = new Task((state) =>
    {
        int i = 1;
        var myCancellationToken = (CancellationToken)state;
        while(true)
        {
            Console.Write("{0} ", i++);
            Thread.Sleep(1000);
            myCancellationToken.ThrowIfCancellationRequested();
        }
    },
    cancellationToken: cancellationToken,
    state: cancellationToken);

Console.WriteLine("Counting to infinity. Press any key to cancel!");
task.Start();
Console.ReadKey();

cancellationTokenSource.Cancel();
try
{
    task.Wait();
}
catch(AggregateException ex)
{
    ex.Handle(inner => inner is OperationCanceledException);
}

Console.WriteLine($"{Environment.NewLine}You have cancelled! Task status is: {task.Status}");
//Canceled

Als alternatief voor ThrowIfCancellationRequested kan het annuleringsverzoek worden gedetecteerd met IsCancellationRequested en kan een OperationCanceledException handmatig worden gegenereerd:

//New task delegate
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(!myCancellationToken.IsCancellationRequested)
{
    Console.Write("{0} ", i++);
    Thread.Sleep(1000);
}
Console.WriteLine($"{Environment.NewLine}Ouch, I have been cancelled!!");
throw new OperationCanceledException(myCancellationToken);

Merk op hoe de annulering token wordt doorgegeven aan de taak constructeur in de cancellationToken parameter. Dit is nodig, zodat de taak overgangen naar de Canceled staat, niet te vergeten de Faulted staat, wanneer ThrowIfCancellationRequested wordt aangeroepen. Om dezelfde reden wordt het annuleringstoken ook expliciet verstrekt in de constructor van OperationCanceledException in het tweede geval.

Task.WhenAny

var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(async() =>
{
    Console.WriteLine("I'm task " + n);
    await Task.Delay(random.Next(10,1000));
    return n;
}));

Task<Task<int>> whenAnyTask = Task.WhenAny(tasks);
Task<int> completedTask = await whenAnyTask;
Console.WriteLine("The winner is: task " + await completedTask);

await Task.WhenAll(tasks);
Console.WriteLine("All tasks finished!");

Task.WhenAll

var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(() =>
{
    Console.WriteLine("I'm task " + n);
    return n;
}));

Task<int[]> task = Task.WhenAll(tasks);
int[] results = await task;

Console.WriteLine(string.Join(",", results.Select(n => n.ToString())));
// Output: 1,2,3,4,5

Parallel.Invoke

var actions = Enumerable.Range(1, 10).Select(n => new Action(() =>
{
    Console.WriteLine("I'm task " + n);
    if((n & 1) == 0)
        throw new Exception("Exception from task " + n);
})).ToArray();

try
{
    Parallel.Invoke(actions);
}
catch(AggregateException ex)
{
    foreach(var inner in ex.InnerExceptions)
        Console.WriteLine("Task failed: " + inner.Message);
}

Parallel.ForEach

In dit voorbeeld wordt Parallel.ForEach gebruikt om de som van de getallen tussen 1 en 10000 te berekenen met behulp van meerdere threads. Om draadveiligheid te bereiken, wordt Interlocked.Add gebruikt om de getallen op te tellen.

using System.Threading;

int Foo()
{
    int total = 0;
    var numbers = Enumerable.Range(1, 10000).ToList();
    Parallel.ForEach(numbers, 
        () => 0, // initial value,
        (num, state, localSum) => num + localSum,
        localSum => Interlocked.Add(ref total, localSum));
    return total; // total = 50005000
}

Parallel.For

In dit voorbeeld wordt Parallel.For gebruikt om de som van de getallen tussen 1 en 10000 te berekenen met behulp van meerdere threads. Om draadveiligheid te bereiken, wordt Interlocked.Add gebruikt om de getallen op te tellen.

using System.Threading;

int Foo()
{
    int total = 0;
    Parallel.For(1, 10001, 
        () => 0, // initial value,
        (num, state, localSum) => num + localSum,
        localSum => Interlocked.Add(ref total, localSum));
    return total; // total = 50005000
}

Vloeiende uitvoeringscontext met AsyncLocal

Als u gegevens van de bovenliggende taak moet doorgeven aan de onderliggende taken, zodat deze logisch wordt AsyncLocal de uitvoering, gebruikt u de AsyncLocal klasse :

void Main()
{
    AsyncLocal<string> user = new AsyncLocal<string>();
    user.Value = "initial user";
    
    // this does not affect other tasks - values are local relative to the branches of execution flow
    Task.Run(() => user.Value = "user from another task"); 
    
    var task1 = Task.Run(() =>
    {
        Console.WriteLine(user.Value); // outputs "initial user"
        Task.Run(() =>
        {
            // outputs "initial user" - value has flown from main method to this task without being changed
            Console.WriteLine(user.Value);
        }).Wait();

        user.Value = "user from task1";

        Task.Run(() =>
        {
            // outputs "user from task1" - value has flown from main method to task1
            // than value was changed and flown to this task.
            Console.WriteLine(user.Value);
        }).Wait();
    });
    
    task1.Wait();
    
    // ouputs "initial user" - changes do not propagate back upstream the execution flow    
    Console.WriteLine(user.Value); 
}

Opmerking: Zoals te zien is in het bovenstaande AsynLocal.Value heeft AsynLocal.Value een copy on read semantisch copy on read , maar als u een referentietype stroomt en de eigenschappen ervan wijzigt, heeft dit invloed op andere taken. Daarom is de beste praktijk met AsyncLocal om waardetypen of onveranderlijke typen te gebruiken.

Parallel.ForEach in VB.NET

For Each row As DataRow In FooDataTable.Rows
    Me.RowsToProcess.Add(row)
Next

Dim myOptions As ParallelOptions = New ParallelOptions()
myOptions.MaxDegreeOfParallelism = environment.processorcount

Parallel.ForEach(RowsToProcess, myOptions, Sub(currentRow, state)
                                               ProcessRowParallel(currentRow, state)
                                           End Sub)

Taak: een waarde retourneren

Taak die een waarde retourneert, heeft het Task< TResult > waarbij TResult het type waarde is dat moet worden geretourneerd. U kunt de uitkomst van een taak opvragen aan de hand van de eigenschap Result.

Task<int> t = Task.Run(() => 
    {
        int sum = 0;

        for(int i = 0; i < 500; i++)
            sum += i;

        return sum;
    });

Console.WriteLine(t.Result); // Outuput 124750

Als de taak asynchroon wordt uitgevoerd, wordt het resultaat weergegeven in afwachting van de taak.

public async Task DoSomeWork()
{
    WebClient client = new WebClient();
    // Because the task is awaited, result of the task is assigned to response
    string response = await client.DownloadStringTaskAsync("http://somedomain.com");
}


Modified text is an extract of the original Stack Overflow Documentation
Licentie onder CC BY-SA 3.0
Niet aangesloten bij Stack Overflow