.NET Framework
Task Parallel Library (TPL)
Ricerca…
Osservazioni
Scopo e casi d'uso
Lo scopo della Task Parallel Library è di semplificare il processo di scrittura e mantenimento del codice multithreaded e parallelo.
Alcuni casi d'uso *:
- Mantenere un'interfaccia utente reattiva eseguendo il lavoro in background su un'attività separata
- Distribuzione del carico di lavoro
- Consentire a un'applicazione client di inviare e ricevere richieste contemporaneamente (resto, TCP / UDP, ect)
- Lettura e / o scrittura di più file contemporaneamente
* Il codice deve essere considerato caso per caso per il multithreading. Ad esempio, se un ciclo ha solo poche iterazioni o esegue solo una piccola parte del lavoro, il sovraccarico per il parallelismo può superare i benefici.
TPL con .Net 3.5
Il TPL è anche disponibile per .Net 3.5 incluso in un pacchetto NuGet, si chiama Task Parallel Library.
Ciclo base produttore-consumatore (BlockingCollection)
var collection = new BlockingCollection<int>(5);
var random = new Random();
var producerTask = Task.Run(() => {
for(int item=1; item<=10; item++)
{
collection.Add(item);
Console.WriteLine("Produced: " + item);
Thread.Sleep(random.Next(10,1000));
}
collection.CompleteAdding();
Console.WriteLine("Producer completed!");
});
Vale la pena notare che se non si chiama collection.CompleteAdding();
, puoi continuare ad aggiungere alla raccolta anche se l'attività dell'utente è in esecuzione. Basta chiamare collection.CompleteAdding();
quando sei sicuro che non ci siano più aggiunte. Questa funzionalità può essere utilizzata per creare un produttore multiplo su un modello consumer singolo in cui sono presenti più origini che alimentano elementi in BlockingCollection e un singolo utente che estrae elementi e ne fa qualcosa. Se BlockingCollection è vuoto prima di chiamare l'aggiunta completa, l'Enumerable from collection.GetConsumingEnumerable()
bloccherà fino a quando un nuovo elemento non verrà aggiunto alla raccolta o BlockingCollection.CompleteAdding (); viene chiamato e la coda è vuota.
var consumerTask = Task.Run(() => {
foreach(var item in collection.GetConsumingEnumerable())
{
Console.WriteLine("Consumed: " + item);
Thread.Sleep(random.Next(10,1000));
}
Console.WriteLine("Consumer completed!");
});
Task.WaitAll(producerTask, consumerTask);
Console.WriteLine("Everything completed!");
Compito: istanziazione di base e attesa
Un'attività può essere creata istanziando direttamente la classe Task
...
var task = new Task(() =>
{
Console.WriteLine("Task code starting...");
Thread.Sleep(2000);
Console.WriteLine("...task code ending!");
});
Console.WriteLine("Starting task...");
task.Start();
task.Wait();
Console.WriteLine("Task completed!");
... o usando il metodo Task.Run
statico:
Console.WriteLine("Starting task...");
var task = Task.Run(() =>
{
Console.WriteLine("Task code starting...");
Thread.Sleep(2000);
Console.WriteLine("...task code ending!");
});
task.Wait();
Console.WriteLine("Task completed!");
Si noti che solo nel primo caso è necessario richiamare esplicitamente Start
.
Attività: WaitAll e acquisizione variabile
var tasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() =>
{
Console.WriteLine("I'm task " + n);
return n;
})).ToArray();
foreach(var task in tasks) task.Start();
Task.WaitAll(tasks);
foreach(var task in tasks)
Console.WriteLine(task.Result);
Compito: WaitAny
var allTasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() => n)).ToArray();
var pendingTasks = allTasks.ToArray();
foreach(var task in allTasks) task.Start();
while(pendingTasks.Length > 0)
{
var finishedTask = pendingTasks[Task.WaitAny(pendingTasks)];
Console.WriteLine("Task {0} finished", finishedTask.Result);
pendingTasks = pendingTasks.Except(new[] {finishedTask}).ToArray();
}
Task.WaitAll(allTasks);
Nota: l' WaitAll
finale è necessario perché WaitAny
non fa in modo che vengano osservate eccezioni.
Attività: gestione delle eccezioni (utilizzando Attendi)
var task1 = Task.Run(() =>
{
Console.WriteLine("Task 1 code starting...");
throw new Exception("Oh no, exception from task 1!!");
});
var task2 = Task.Run(() =>
{
Console.WriteLine("Task 2 code starting...");
throw new Exception("Oh no, exception from task 2!!");
});
Console.WriteLine("Starting tasks...");
try
{
Task.WaitAll(task1, task2);
}
catch(AggregateException ex)
{
Console.WriteLine("Task(s) failed!");
foreach(var inner in ex.InnerExceptions)
Console.WriteLine(inner.Message);
}
Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted
Attività: gestione delle eccezioni (senza usare Wait)
var task1 = Task.Run(() =>
{
Console.WriteLine("Task 1 code starting...");
throw new Exception("Oh no, exception from task 1!!");
});
var task2 = Task.Run(() =>
{
Console.WriteLine("Task 2 code starting...");
throw new Exception("Oh no, exception from task 2!!");
});
var tasks = new[] {task1, task2};
Console.WriteLine("Starting tasks...");
while(tasks.All(task => !task.IsCompleted));
foreach(var task in tasks)
{
if(task.IsFaulted)
Console.WriteLine("Task failed: " +
task.Exception.InnerExceptions.First().Message);
}
Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted
Compito: cancellare usando CancellationToken
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
var task = new Task((state) =>
{
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(true)
{
Console.Write("{0} ", i++);
Thread.Sleep(1000);
myCancellationToken.ThrowIfCancellationRequested();
}
},
cancellationToken: cancellationToken,
state: cancellationToken);
Console.WriteLine("Counting to infinity. Press any key to cancel!");
task.Start();
Console.ReadKey();
cancellationTokenSource.Cancel();
try
{
task.Wait();
}
catch(AggregateException ex)
{
ex.Handle(inner => inner is OperationCanceledException);
}
Console.WriteLine($"{Environment.NewLine}You have cancelled! Task status is: {task.Status}");
//Canceled
In alternativa a ThrowIfCancellationRequested
, la richiesta di annullamento può essere rilevata con IsCancellationRequested
e una OperationCanceledException
può essere generata manualmente:
//New task delegate
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(!myCancellationToken.IsCancellationRequested)
{
Console.Write("{0} ", i++);
Thread.Sleep(1000);
}
Console.WriteLine($"{Environment.NewLine}Ouch, I have been cancelled!!");
throw new OperationCanceledException(myCancellationToken);
Nota come il token di cancellazione viene passato al costruttore di task nel parametro cancellationToken
. Ciò è necessario in modo che le transizioni compito alle Canceled
stato, non alla Faulted
stato, quando ThrowIfCancellationRequested
viene richiamato. Inoltre, per lo stesso motivo, il token di cancellazione viene esplicitamente fornito nel costruttore di OperationCanceledException
nel secondo caso.
Task.WhenAny
var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(async() =>
{
Console.WriteLine("I'm task " + n);
await Task.Delay(random.Next(10,1000));
return n;
}));
Task<Task<int>> whenAnyTask = Task.WhenAny(tasks);
Task<int> completedTask = await whenAnyTask;
Console.WriteLine("The winner is: task " + await completedTask);
await Task.WhenAll(tasks);
Console.WriteLine("All tasks finished!");
Task.WhenAll
var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(() =>
{
Console.WriteLine("I'm task " + n);
return n;
}));
Task<int[]> task = Task.WhenAll(tasks);
int[] results = await task;
Console.WriteLine(string.Join(",", results.Select(n => n.ToString())));
// Output: 1,2,3,4,5
Parallel.Invoke
var actions = Enumerable.Range(1, 10).Select(n => new Action(() =>
{
Console.WriteLine("I'm task " + n);
if((n & 1) == 0)
throw new Exception("Exception from task " + n);
})).ToArray();
try
{
Parallel.Invoke(actions);
}
catch(AggregateException ex)
{
foreach(var inner in ex.InnerExceptions)
Console.WriteLine("Task failed: " + inner.Message);
}
Parallel.ForEach
Questo esempio utilizza Parallel.ForEach
per calcolare la somma dei numeri tra 1 e 10000 utilizzando più thread. Per raggiungere la sicurezza del thread, Interlocked.Add
viene utilizzato per sommare i numeri.
using System.Threading;
int Foo()
{
int total = 0;
var numbers = Enumerable.Range(1, 10000).ToList();
Parallel.ForEach(numbers,
() => 0, // initial value,
(num, state, localSum) => num + localSum,
localSum => Interlocked.Add(ref total, localSum));
return total; // total = 50005000
}
Parallel.For
Questo esempio utilizza Parallel.For
calcolare la somma dei numeri tra 1 e 10000 utilizzando più thread. Per raggiungere la sicurezza del thread, Interlocked.Add
viene utilizzato per sommare i numeri.
using System.Threading;
int Foo()
{
int total = 0;
Parallel.For(1, 10001,
() => 0, // initial value,
(num, state, localSum) => num + localSum,
localSum => Interlocked.Add(ref total, localSum));
return total; // total = 50005000
}
Contesto di esecuzione fluente con AsyncLocal
Quando è necessario passare alcuni dati dall'attività padre alle attività figli, in modo tale da scorrere logicamente con l'esecuzione, utilizzare la classe AsyncLocal
:
void Main()
{
AsyncLocal<string> user = new AsyncLocal<string>();
user.Value = "initial user";
// this does not affect other tasks - values are local relative to the branches of execution flow
Task.Run(() => user.Value = "user from another task");
var task1 = Task.Run(() =>
{
Console.WriteLine(user.Value); // outputs "initial user"
Task.Run(() =>
{
// outputs "initial user" - value has flown from main method to this task without being changed
Console.WriteLine(user.Value);
}).Wait();
user.Value = "user from task1";
Task.Run(() =>
{
// outputs "user from task1" - value has flown from main method to task1
// than value was changed and flown to this task.
Console.WriteLine(user.Value);
}).Wait();
});
task1.Wait();
// ouputs "initial user" - changes do not propagate back upstream the execution flow
Console.WriteLine(user.Value);
}
Nota: Come si può vedere dall'esempio sopra, AsynLocal.Value
ha una copy on read
semantica, ma se si scorre qualche tipo di riferimento e si cambiano le sue proprietà si avranno effetti su altre attività. Pertanto, la migliore pratica con AsyncLocal
consiste nell'utilizzare tipi di valore o tipi immutabili.
Parallel.ForEach in VB.NET
For Each row As DataRow In FooDataTable.Rows
Me.RowsToProcess.Add(row)
Next
Dim myOptions As ParallelOptions = New ParallelOptions()
myOptions.MaxDegreeOfParallelism = environment.processorcount
Parallel.ForEach(RowsToProcess, myOptions, Sub(currentRow, state)
ProcessRowParallel(currentRow, state)
End Sub)
Attività: restituire un valore
L'attività che restituisce un valore restituisce il tipo di Task< TResult >
dove TResult è il tipo di valore che deve essere restituito. È possibile interrogare il risultato di un'attività tramite la sua proprietà Result.
Task<int> t = Task.Run(() =>
{
int sum = 0;
for(int i = 0; i < 500; i++)
sum += i;
return sum;
});
Console.WriteLine(t.Result); // Outuput 124750
Se l'attività viene eseguita in modo asincrono rispetto all'attesa, l'operazione restituisce il risultato.
public async Task DoSomeWork()
{
WebClient client = new WebClient();
// Because the task is awaited, result of the task is assigned to response
string response = await client.DownloadStringTaskAsync("http://somedomain.com");
}