.NET Framework
Task Parallel Library (TPL)
Zoeken…
Opmerkingen
Doel en use cases
Het doel van de Task Parallel Library is om het schrijven en onderhouden van multithreaded en parallelle code te vereenvoudigen.
Enkele use cases *:
- Een gebruikersinterface responsief houden door achtergrondwerk op een afzonderlijke taak uit te voeren
- Werklast verdelen
- Toestaan dat een clienttoepassing tegelijkertijd aanvragen verzendt en ontvangt (rest, TCP / UDP, ect)
- Meerdere bestanden tegelijk lezen en / of schrijven
* Code moet per geval worden bekeken voor multithreading. Als een lus bijvoorbeeld slechts enkele iteraties heeft of slechts een klein deel van het werk doet, kan de overhead voor parallellisme opwegen tegen de voordelen.
TPL met .Net 3.5
De TPL is ook beschikbaar voor .Net 3.5 opgenomen in een NuGet-pakket, het heet Task Parallel Library.
Basis producent-consument lus (BlockingCollection)
var collection = new BlockingCollection<int>(5);
var random = new Random();
var producerTask = Task.Run(() => {
for(int item=1; item<=10; item++)
{
collection.Add(item);
Console.WriteLine("Produced: " + item);
Thread.Sleep(random.Next(10,1000));
}
collection.CompleteAdding();
Console.WriteLine("Producer completed!");
});
Het is vermeldenswaard dat als u collection.CompleteAdding();
niet aanroept collection.CompleteAdding();
, kunt u blijven toevoegen aan de verzameling, zelfs als uw consumententaak wordt uitgevoerd. Roep gewoon collection.CompleteAdding();
wanneer u zeker weet dat er geen toevoegingen meer zijn. Deze functionaliteit kan worden gebruikt om van een meervoudige producent een enkel consumentenpatroon te maken waarbij u meerdere bronnen hebt die items in de BlockingCollection invoeren en een enkele consument items eruit haalt en er iets mee doet. Als uw BlockingCollection leeg is voordat u de volledige toevoeging aanroept, wordt de Enumerable from collection.GetConsumingEnumerable()
geblokkeerd totdat een nieuw item aan de collectie wordt toegevoegd of BlockingCollection.CompleteAdding (); wordt aangeroepen en de wachtrij is leeg.
var consumerTask = Task.Run(() => {
foreach(var item in collection.GetConsumingEnumerable())
{
Console.WriteLine("Consumed: " + item);
Thread.Sleep(random.Next(10,1000));
}
Console.WriteLine("Consumer completed!");
});
Task.WaitAll(producerTask, consumerTask);
Console.WriteLine("Everything completed!");
Taak: basisinstantie en wachten
Een taak kan worden gecreëerd door direct instantiëren de Task
klasse ...
var task = new Task(() =>
{
Console.WriteLine("Task code starting...");
Thread.Sleep(2000);
Console.WriteLine("...task code ending!");
});
Console.WriteLine("Starting task...");
task.Start();
task.Wait();
Console.WriteLine("Task completed!");
... of door de statische Task.Run
methode te gebruiken:
Console.WriteLine("Starting task...");
var task = Task.Run(() =>
{
Console.WriteLine("Task code starting...");
Thread.Sleep(2000);
Console.WriteLine("...task code ending!");
});
task.Wait();
Console.WriteLine("Task completed!");
Merk op dat alleen in het eerste geval expliciet een beroep moet worden gedaan op Start
.
Taak: WaitAll en het vastleggen van variabelen
var tasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() =>
{
Console.WriteLine("I'm task " + n);
return n;
})).ToArray();
foreach(var task in tasks) task.Start();
Task.WaitAll(tasks);
foreach(var task in tasks)
Console.WriteLine(task.Result);
Taak: WaitAny
var allTasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() => n)).ToArray();
var pendingTasks = allTasks.ToArray();
foreach(var task in allTasks) task.Start();
while(pendingTasks.Length > 0)
{
var finishedTask = pendingTasks[Task.WaitAny(pendingTasks)];
Console.WriteLine("Task {0} finished", finishedTask.Result);
pendingTasks = pendingTasks.Except(new[] {finishedTask}).ToArray();
}
Task.WaitAll(allTasks);
Opmerking: de laatste WaitAll
is noodzakelijk omdat WaitAny
geen uitzonderingen veroorzaakt.
Taak: uitzonderingen verwerken (met Wacht)
var task1 = Task.Run(() =>
{
Console.WriteLine("Task 1 code starting...");
throw new Exception("Oh no, exception from task 1!!");
});
var task2 = Task.Run(() =>
{
Console.WriteLine("Task 2 code starting...");
throw new Exception("Oh no, exception from task 2!!");
});
Console.WriteLine("Starting tasks...");
try
{
Task.WaitAll(task1, task2);
}
catch(AggregateException ex)
{
Console.WriteLine("Task(s) failed!");
foreach(var inner in ex.InnerExceptions)
Console.WriteLine(inner.Message);
}
Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted
Taak: uitzonderingen verwerken (zonder Wacht te gebruiken)
var task1 = Task.Run(() =>
{
Console.WriteLine("Task 1 code starting...");
throw new Exception("Oh no, exception from task 1!!");
});
var task2 = Task.Run(() =>
{
Console.WriteLine("Task 2 code starting...");
throw new Exception("Oh no, exception from task 2!!");
});
var tasks = new[] {task1, task2};
Console.WriteLine("Starting tasks...");
while(tasks.All(task => !task.IsCompleted));
foreach(var task in tasks)
{
if(task.IsFaulted)
Console.WriteLine("Task failed: " +
task.Exception.InnerExceptions.First().Message);
}
Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted
Taak: annuleren met Annuleringstoken
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
var task = new Task((state) =>
{
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(true)
{
Console.Write("{0} ", i++);
Thread.Sleep(1000);
myCancellationToken.ThrowIfCancellationRequested();
}
},
cancellationToken: cancellationToken,
state: cancellationToken);
Console.WriteLine("Counting to infinity. Press any key to cancel!");
task.Start();
Console.ReadKey();
cancellationTokenSource.Cancel();
try
{
task.Wait();
}
catch(AggregateException ex)
{
ex.Handle(inner => inner is OperationCanceledException);
}
Console.WriteLine($"{Environment.NewLine}You have cancelled! Task status is: {task.Status}");
//Canceled
Als alternatief voor ThrowIfCancellationRequested
kan het annuleringsverzoek worden gedetecteerd met IsCancellationRequested
en kan een OperationCanceledException
handmatig worden gegenereerd:
//New task delegate
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(!myCancellationToken.IsCancellationRequested)
{
Console.Write("{0} ", i++);
Thread.Sleep(1000);
}
Console.WriteLine($"{Environment.NewLine}Ouch, I have been cancelled!!");
throw new OperationCanceledException(myCancellationToken);
Merk op hoe de annulering token wordt doorgegeven aan de taak constructeur in de cancellationToken
parameter. Dit is nodig, zodat de taak overgangen naar de Canceled
staat, niet te vergeten de Faulted
staat, wanneer ThrowIfCancellationRequested
wordt aangeroepen. Om dezelfde reden wordt het annuleringstoken ook expliciet verstrekt in de constructor van OperationCanceledException
in het tweede geval.
Task.WhenAny
var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(async() =>
{
Console.WriteLine("I'm task " + n);
await Task.Delay(random.Next(10,1000));
return n;
}));
Task<Task<int>> whenAnyTask = Task.WhenAny(tasks);
Task<int> completedTask = await whenAnyTask;
Console.WriteLine("The winner is: task " + await completedTask);
await Task.WhenAll(tasks);
Console.WriteLine("All tasks finished!");
Task.WhenAll
var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(() =>
{
Console.WriteLine("I'm task " + n);
return n;
}));
Task<int[]> task = Task.WhenAll(tasks);
int[] results = await task;
Console.WriteLine(string.Join(",", results.Select(n => n.ToString())));
// Output: 1,2,3,4,5
Parallel.Invoke
var actions = Enumerable.Range(1, 10).Select(n => new Action(() =>
{
Console.WriteLine("I'm task " + n);
if((n & 1) == 0)
throw new Exception("Exception from task " + n);
})).ToArray();
try
{
Parallel.Invoke(actions);
}
catch(AggregateException ex)
{
foreach(var inner in ex.InnerExceptions)
Console.WriteLine("Task failed: " + inner.Message);
}
Parallel.ForEach
In dit voorbeeld wordt Parallel.ForEach
gebruikt om de som van de getallen tussen 1 en 10000 te berekenen met behulp van meerdere threads. Om draadveiligheid te bereiken, wordt Interlocked.Add
gebruikt om de getallen op te tellen.
using System.Threading;
int Foo()
{
int total = 0;
var numbers = Enumerable.Range(1, 10000).ToList();
Parallel.ForEach(numbers,
() => 0, // initial value,
(num, state, localSum) => num + localSum,
localSum => Interlocked.Add(ref total, localSum));
return total; // total = 50005000
}
Parallel.For
In dit voorbeeld wordt Parallel.For
gebruikt om de som van de getallen tussen 1 en 10000 te berekenen met behulp van meerdere threads. Om draadveiligheid te bereiken, wordt Interlocked.Add
gebruikt om de getallen op te tellen.
using System.Threading;
int Foo()
{
int total = 0;
Parallel.For(1, 10001,
() => 0, // initial value,
(num, state, localSum) => num + localSum,
localSum => Interlocked.Add(ref total, localSum));
return total; // total = 50005000
}
Vloeiende uitvoeringscontext met AsyncLocal
Als u gegevens van de bovenliggende taak moet doorgeven aan de onderliggende taken, zodat deze logisch wordt AsyncLocal
de uitvoering, gebruikt u de AsyncLocal
klasse :
void Main()
{
AsyncLocal<string> user = new AsyncLocal<string>();
user.Value = "initial user";
// this does not affect other tasks - values are local relative to the branches of execution flow
Task.Run(() => user.Value = "user from another task");
var task1 = Task.Run(() =>
{
Console.WriteLine(user.Value); // outputs "initial user"
Task.Run(() =>
{
// outputs "initial user" - value has flown from main method to this task without being changed
Console.WriteLine(user.Value);
}).Wait();
user.Value = "user from task1";
Task.Run(() =>
{
// outputs "user from task1" - value has flown from main method to task1
// than value was changed and flown to this task.
Console.WriteLine(user.Value);
}).Wait();
});
task1.Wait();
// ouputs "initial user" - changes do not propagate back upstream the execution flow
Console.WriteLine(user.Value);
}
Opmerking: Zoals te zien is in het bovenstaande AsynLocal.Value
heeft AsynLocal.Value
een copy on read
semantisch copy on read
, maar als u een referentietype stroomt en de eigenschappen ervan wijzigt, heeft dit invloed op andere taken. Daarom is de beste praktijk met AsyncLocal
om waardetypen of onveranderlijke typen te gebruiken.
Parallel.ForEach in VB.NET
For Each row As DataRow In FooDataTable.Rows
Me.RowsToProcess.Add(row)
Next
Dim myOptions As ParallelOptions = New ParallelOptions()
myOptions.MaxDegreeOfParallelism = environment.processorcount
Parallel.ForEach(RowsToProcess, myOptions, Sub(currentRow, state)
ProcessRowParallel(currentRow, state)
End Sub)
Taak: een waarde retourneren
Taak die een waarde retourneert, heeft het Task< TResult >
waarbij TResult het type waarde is dat moet worden geretourneerd. U kunt de uitkomst van een taak opvragen aan de hand van de eigenschap Result.
Task<int> t = Task.Run(() =>
{
int sum = 0;
for(int i = 0; i < 500; i++)
sum += i;
return sum;
});
Console.WriteLine(t.Result); // Outuput 124750
Als de taak asynchroon wordt uitgevoerd, wordt het resultaat weergegeven in afwachting van de taak.
public async Task DoSomeWork()
{
WebClient client = new WebClient();
// Because the task is awaited, result of the task is assigned to response
string response = await client.DownloadStringTaskAsync("http://somedomain.com");
}