.NET Framework
Bibliothèque parallèle de tâches (TPL)
Recherche…
Remarques
Buts et cas d'utilisation
Le but de la bibliothèque parallèle de tâches est de simplifier le processus d’écriture et de maintenance du code multithread et parallèle.
Quelques cas d'utilisation *:
- Garder une interface utilisateur réactive en exécutant un travail d'arrière-plan sur une tâche distincte
- Répartition de la charge de travail
- Autoriser une application cliente à envoyer et recevoir des requêtes en même temps (reste, TCP / UDP, ect)
- Lire et / ou écrire plusieurs fichiers à la fois
* Le code doit être considéré au cas par cas pour le multithreading. Par exemple, si une boucle ne comporte que quelques itérations ou ne fait qu'une petite partie du travail, la surcharge du parallélisme peut dépasser les avantages.
TPL avec .Net 3.5
Le TPL est également disponible pour .Net 3.5 inclus dans un package NuGet, il s'appelle Task Parallel Library.
Boucle de base producteur-consommateur (BlockingCollection)
var collection = new BlockingCollection<int>(5);
var random = new Random();
var producerTask = Task.Run(() => {
for(int item=1; item<=10; item++)
{
collection.Add(item);
Console.WriteLine("Produced: " + item);
Thread.Sleep(random.Next(10,1000));
}
collection.CompleteAdding();
Console.WriteLine("Producer completed!");
});
Il est à noter que si vous n'appelez pas collection.CompleteAdding();
, vous pouvez continuer à ajouter à la collection même si votre tâche client est en cours d'exécution. Il suffit d'appeler collection.CompleteAdding();
lorsque vous êtes sûr qu'il n'y a plus d'ajouts. Cette fonctionnalité peut être utilisée pour créer un modèle Producteur multiple vers un consommateur unique dans lequel vous disposez de plusieurs sources alimentant des éléments dans BlockingCollection et un seul consommateur en retirant des éléments et en faisant quelque chose. Si votre BlockingCollection est vide avant que vous appeliez l'ajout complet, le Enumerable de collection.GetConsumingEnumerable()
bloquera jusqu'à ce qu'un nouvel élément soit ajouté à la collection ou à BlockingCollection.CompleteAdding (); est appelé et la file d'attente est vide.
var consumerTask = Task.Run(() => {
foreach(var item in collection.GetConsumingEnumerable())
{
Console.WriteLine("Consumed: " + item);
Thread.Sleep(random.Next(10,1000));
}
Console.WriteLine("Consumer completed!");
});
Task.WaitAll(producerTask, consumerTask);
Console.WriteLine("Everything completed!");
Tâche: instanciation de base et attente
Une tâche peut être créée en instanciant directement la classe de Task
...
var task = new Task(() =>
{
Console.WriteLine("Task code starting...");
Thread.Sleep(2000);
Console.WriteLine("...task code ending!");
});
Console.WriteLine("Starting task...");
task.Start();
task.Wait();
Console.WriteLine("Task completed!");
... ou en utilisant la méthode Task.Run
statique:
Console.WriteLine("Starting task...");
var task = Task.Run(() =>
{
Console.WriteLine("Task code starting...");
Thread.Sleep(2000);
Console.WriteLine("...task code ending!");
});
task.Wait();
Console.WriteLine("Task completed!");
Notez que seulement dans le premier cas, il est nécessaire d'appeler explicitement Start
.
Tâche: WaitAll et capture de variable
var tasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() =>
{
Console.WriteLine("I'm task " + n);
return n;
})).ToArray();
foreach(var task in tasks) task.Start();
Task.WaitAll(tasks);
foreach(var task in tasks)
Console.WriteLine(task.Result);
Tâche: WaitAny
var allTasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() => n)).ToArray();
var pendingTasks = allTasks.ToArray();
foreach(var task in allTasks) task.Start();
while(pendingTasks.Length > 0)
{
var finishedTask = pendingTasks[Task.WaitAny(pendingTasks)];
Console.WriteLine("Task {0} finished", finishedTask.Result);
pendingTasks = pendingTasks.Except(new[] {finishedTask}).ToArray();
}
Task.WaitAll(allTasks);
Note: Le WaitAll
final est nécessaire car WaitAny
ne fait pas observer d’exceptions.
Tâche: gestion des exceptions (à l'aide de l'attente)
var task1 = Task.Run(() =>
{
Console.WriteLine("Task 1 code starting...");
throw new Exception("Oh no, exception from task 1!!");
});
var task2 = Task.Run(() =>
{
Console.WriteLine("Task 2 code starting...");
throw new Exception("Oh no, exception from task 2!!");
});
Console.WriteLine("Starting tasks...");
try
{
Task.WaitAll(task1, task2);
}
catch(AggregateException ex)
{
Console.WriteLine("Task(s) failed!");
foreach(var inner in ex.InnerExceptions)
Console.WriteLine(inner.Message);
}
Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted
Tâche: gérer les exceptions (sans utiliser Wait)
var task1 = Task.Run(() =>
{
Console.WriteLine("Task 1 code starting...");
throw new Exception("Oh no, exception from task 1!!");
});
var task2 = Task.Run(() =>
{
Console.WriteLine("Task 2 code starting...");
throw new Exception("Oh no, exception from task 2!!");
});
var tasks = new[] {task1, task2};
Console.WriteLine("Starting tasks...");
while(tasks.All(task => !task.IsCompleted));
foreach(var task in tasks)
{
if(task.IsFaulted)
Console.WriteLine("Task failed: " +
task.Exception.InnerExceptions.First().Message);
}
Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted
Tâche: annuler en utilisant CancellationToken
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
var task = new Task((state) =>
{
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(true)
{
Console.Write("{0} ", i++);
Thread.Sleep(1000);
myCancellationToken.ThrowIfCancellationRequested();
}
},
cancellationToken: cancellationToken,
state: cancellationToken);
Console.WriteLine("Counting to infinity. Press any key to cancel!");
task.Start();
Console.ReadKey();
cancellationTokenSource.Cancel();
try
{
task.Wait();
}
catch(AggregateException ex)
{
ex.Handle(inner => inner is OperationCanceledException);
}
Console.WriteLine($"{Environment.NewLine}You have cancelled! Task status is: {task.Status}");
//Canceled
Comme alternative à ThrowIfCancellationRequested
, la demande d'annulation peut être détectée avec IsCancellationRequested
et une IsCancellationRequested
OperationCanceledException
peut être lancée manuellement:
//New task delegate
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(!myCancellationToken.IsCancellationRequested)
{
Console.Write("{0} ", i++);
Thread.Sleep(1000);
}
Console.WriteLine($"{Environment.NewLine}Ouch, I have been cancelled!!");
throw new OperationCanceledException(myCancellationToken);
Notez comment le jeton d'annulation est transmis au constructeur de tâches dans le paramètre cancellationToken
. Cela est nécessaire pour que la tâche passe à l'état Canceled
, et non à l'état Faulted
, lorsque ThrowIfCancellationRequested
est ThrowIfCancellationRequested
. De plus, pour la même raison, le jeton d'annulation est explicitement fourni dans le constructeur d' OperationCanceledException
dans le second cas.
Task.WhenAny
var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(async() =>
{
Console.WriteLine("I'm task " + n);
await Task.Delay(random.Next(10,1000));
return n;
}));
Task<Task<int>> whenAnyTask = Task.WhenAny(tasks);
Task<int> completedTask = await whenAnyTask;
Console.WriteLine("The winner is: task " + await completedTask);
await Task.WhenAll(tasks);
Console.WriteLine("All tasks finished!");
Task.WhenAll
var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(() =>
{
Console.WriteLine("I'm task " + n);
return n;
}));
Task<int[]> task = Task.WhenAll(tasks);
int[] results = await task;
Console.WriteLine(string.Join(",", results.Select(n => n.ToString())));
// Output: 1,2,3,4,5
Parallel.Invoke
var actions = Enumerable.Range(1, 10).Select(n => new Action(() =>
{
Console.WriteLine("I'm task " + n);
if((n & 1) == 0)
throw new Exception("Exception from task " + n);
})).ToArray();
try
{
Parallel.Invoke(actions);
}
catch(AggregateException ex)
{
foreach(var inner in ex.InnerExceptions)
Console.WriteLine("Task failed: " + inner.Message);
}
Parallel.ForEach
Cet exemple utilise Parallel.ForEach
pour calculer la somme des nombres entre 1 et 10000 en utilisant plusieurs threads. Pour atteindre la sécurité des threads, Interlocked.Add
est utilisé pour additionner les nombres.
using System.Threading;
int Foo()
{
int total = 0;
var numbers = Enumerable.Range(1, 10000).ToList();
Parallel.ForEach(numbers,
() => 0, // initial value,
(num, state, localSum) => num + localSum,
localSum => Interlocked.Add(ref total, localSum));
return total; // total = 50005000
}
Parallel.For
Cet exemple utilise Parallel.For
pour calculer la somme des nombres entre 1 et 10000 en utilisant plusieurs threads. Pour atteindre la sécurité des threads, Interlocked.Add
est utilisé pour additionner les nombres.
using System.Threading;
int Foo()
{
int total = 0;
Parallel.For(1, 10001,
() => 0, // initial value,
(num, state, localSum) => num + localSum,
localSum => Interlocked.Add(ref total, localSum));
return total; // total = 50005000
}
Contexte d'exécution fluide avec AsyncLocal
Lorsque vous devez transmettre des données de la tâche parente à ses tâches enfants, de manière à ce AsyncLocal
à l'exécution, utilisez la classe AsyncLocal
:
void Main()
{
AsyncLocal<string> user = new AsyncLocal<string>();
user.Value = "initial user";
// this does not affect other tasks - values are local relative to the branches of execution flow
Task.Run(() => user.Value = "user from another task");
var task1 = Task.Run(() =>
{
Console.WriteLine(user.Value); // outputs "initial user"
Task.Run(() =>
{
// outputs "initial user" - value has flown from main method to this task without being changed
Console.WriteLine(user.Value);
}).Wait();
user.Value = "user from task1";
Task.Run(() =>
{
// outputs "user from task1" - value has flown from main method to task1
// than value was changed and flown to this task.
Console.WriteLine(user.Value);
}).Wait();
});
task1.Wait();
// ouputs "initial user" - changes do not propagate back upstream the execution flow
Console.WriteLine(user.Value);
}
Note: Comme on peut le voir dans l'exemple ci-dessus, AsynLocal.Value
a une copy on read
sémantique de copy on read
, mais si vous AsynLocal.Value
un type de référence et modifiez ses propriétés, vous affecterez d'autres tâches. Par conséquent, la meilleure pratique avec AsyncLocal
consiste à utiliser des types de valeur ou des types immuables.
Parallel.ForEach dans VB.NET
For Each row As DataRow In FooDataTable.Rows
Me.RowsToProcess.Add(row)
Next
Dim myOptions As ParallelOptions = New ParallelOptions()
myOptions.MaxDegreeOfParallelism = environment.processorcount
Parallel.ForEach(RowsToProcess, myOptions, Sub(currentRow, state)
ProcessRowParallel(currentRow, state)
End Sub)
Tâche: Renvoyer une valeur
La tâche qui renvoie une valeur a le type de retour de la Task< TResult >
où TResult est le type de valeur à renvoyer. Vous pouvez interroger le résultat d'une tâche par sa propriété Result.
Task<int> t = Task.Run(() =>
{
int sum = 0;
for(int i = 0; i < 500; i++)
sum += i;
return sum;
});
Console.WriteLine(t.Result); // Outuput 124750
Si la tâche est exécutée de manière asynchrone dans l'attente de la tâche, le résultat est renvoyé.
public async Task DoSomeWork()
{
WebClient client = new WebClient();
// Because the task is awaited, result of the task is assigned to response
string response = await client.DownloadStringTaskAsync("http://somedomain.com");
}