Ricerca…


Osservazioni

Scopo e casi d'uso

Lo scopo della Task Parallel Library è di semplificare il processo di scrittura e mantenimento del codice multithreaded e parallelo.

Alcuni casi d'uso *:

  • Mantenere un'interfaccia utente reattiva eseguendo il lavoro in background su un'attività separata
  • Distribuzione del carico di lavoro
  • Consentire a un'applicazione client di inviare e ricevere richieste contemporaneamente (resto, TCP / UDP, ect)
  • Lettura e / o scrittura di più file contemporaneamente

* Il codice deve essere considerato caso per caso per il multithreading. Ad esempio, se un ciclo ha solo poche iterazioni o esegue solo una piccola parte del lavoro, il sovraccarico per il parallelismo può superare i benefici.

TPL con .Net 3.5

Il TPL è anche disponibile per .Net 3.5 incluso in un pacchetto NuGet, si chiama Task Parallel Library.

Ciclo base produttore-consumatore (BlockingCollection)

var collection = new BlockingCollection<int>(5);
var random = new Random();

var producerTask = Task.Run(() => {
    for(int item=1; item<=10; item++) 
    {
        collection.Add(item);
        Console.WriteLine("Produced: " + item);
        Thread.Sleep(random.Next(10,1000));
    }
    collection.CompleteAdding();
    Console.WriteLine("Producer completed!");
});

Vale la pena notare che se non si chiama collection.CompleteAdding(); , puoi continuare ad aggiungere alla raccolta anche se l'attività dell'utente è in esecuzione. Basta chiamare collection.CompleteAdding(); quando sei sicuro che non ci siano più aggiunte. Questa funzionalità può essere utilizzata per creare un produttore multiplo su un modello consumer singolo in cui sono presenti più origini che alimentano elementi in BlockingCollection e un singolo utente che estrae elementi e ne fa qualcosa. Se BlockingCollection è vuoto prima di chiamare l'aggiunta completa, l'Enumerable from collection.GetConsumingEnumerable() bloccherà fino a quando un nuovo elemento non verrà aggiunto alla raccolta o BlockingCollection.CompleteAdding (); viene chiamato e la coda è vuota.

var consumerTask = Task.Run(() => {
    foreach(var item in collection.GetConsumingEnumerable())
    {
        Console.WriteLine("Consumed: " + item);
        Thread.Sleep(random.Next(10,1000));
    }
    Console.WriteLine("Consumer completed!");
});
  
Task.WaitAll(producerTask, consumerTask);
       
Console.WriteLine("Everything completed!");

Compito: istanziazione di base e attesa

Un'attività può essere creata istanziando direttamente la classe Task ...

var task = new Task(() =>
{
    Console.WriteLine("Task code starting...");
    Thread.Sleep(2000);
    Console.WriteLine("...task code ending!");
});

Console.WriteLine("Starting task...");
task.Start();
task.Wait();
Console.WriteLine("Task completed!");

... o usando il metodo Task.Run statico:

Console.WriteLine("Starting task...");
var task = Task.Run(() =>
{
    Console.WriteLine("Task code starting...");
    Thread.Sleep(2000);
    Console.WriteLine("...task code ending!");
});
task.Wait();
Console.WriteLine("Task completed!");

Si noti che solo nel primo caso è necessario richiamare esplicitamente Start .

Attività: WaitAll e acquisizione variabile

var tasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() =>
{
    Console.WriteLine("I'm task " + n);
    return n;
})).ToArray();

foreach(var task in tasks) task.Start();
Task.WaitAll(tasks);

foreach(var task in tasks)
    Console.WriteLine(task.Result);

Compito: WaitAny

var allTasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() => n)).ToArray();
var pendingTasks = allTasks.ToArray();

foreach(var task in allTasks) task.Start();

while(pendingTasks.Length > 0)
{
    var finishedTask = pendingTasks[Task.WaitAny(pendingTasks)];
    Console.WriteLine("Task {0} finished", finishedTask.Result);
    pendingTasks = pendingTasks.Except(new[] {finishedTask}).ToArray();
}

Task.WaitAll(allTasks);

Nota: l' WaitAll finale è necessario perché WaitAny non fa in modo che vengano osservate eccezioni.

Attività: gestione delle eccezioni (utilizzando Attendi)

var task1 = Task.Run(() =>
{
    Console.WriteLine("Task 1 code starting...");
    throw new Exception("Oh no, exception from task 1!!");
});

var task2 = Task.Run(() =>
{
    Console.WriteLine("Task 2 code starting...");
    throw new Exception("Oh no, exception from task 2!!");
});

Console.WriteLine("Starting tasks...");
try
{
    Task.WaitAll(task1, task2);
}
catch(AggregateException ex)
{
    Console.WriteLine("Task(s) failed!");
    foreach(var inner in ex.InnerExceptions)
        Console.WriteLine(inner.Message);
}

Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted

Attività: gestione delle eccezioni (senza usare Wait)

var task1 = Task.Run(() =>
{
    Console.WriteLine("Task 1 code starting...");
    throw new Exception("Oh no, exception from task 1!!");
});

var task2 = Task.Run(() =>
{
    Console.WriteLine("Task 2 code starting...");
    throw new Exception("Oh no, exception from task 2!!");
});

var tasks = new[] {task1, task2};

Console.WriteLine("Starting tasks...");
while(tasks.All(task => !task.IsCompleted));

foreach(var task in tasks)
{
    if(task.IsFaulted)
        Console.WriteLine("Task failed: " +
            task.Exception.InnerExceptions.First().Message);
}

Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted

Compito: cancellare usando CancellationToken

var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;

var task = new Task((state) =>
    {
        int i = 1;
        var myCancellationToken = (CancellationToken)state;
        while(true)
        {
            Console.Write("{0} ", i++);
            Thread.Sleep(1000);
            myCancellationToken.ThrowIfCancellationRequested();
        }
    },
    cancellationToken: cancellationToken,
    state: cancellationToken);

Console.WriteLine("Counting to infinity. Press any key to cancel!");
task.Start();
Console.ReadKey();

cancellationTokenSource.Cancel();
try
{
    task.Wait();
}
catch(AggregateException ex)
{
    ex.Handle(inner => inner is OperationCanceledException);
}

Console.WriteLine($"{Environment.NewLine}You have cancelled! Task status is: {task.Status}");
//Canceled

In alternativa a ThrowIfCancellationRequested , la richiesta di annullamento può essere rilevata con IsCancellationRequested e una OperationCanceledException può essere generata manualmente:

//New task delegate
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(!myCancellationToken.IsCancellationRequested)
{
    Console.Write("{0} ", i++);
    Thread.Sleep(1000);
}
Console.WriteLine($"{Environment.NewLine}Ouch, I have been cancelled!!");
throw new OperationCanceledException(myCancellationToken);

Nota come il token di cancellazione viene passato al costruttore di task nel parametro cancellationToken . Ciò è necessario in modo che le transizioni compito alle Canceled stato, non alla Faulted stato, quando ThrowIfCancellationRequested viene richiamato. Inoltre, per lo stesso motivo, il token di cancellazione viene esplicitamente fornito nel costruttore di OperationCanceledException nel secondo caso.

Task.WhenAny

var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(async() =>
{
    Console.WriteLine("I'm task " + n);
    await Task.Delay(random.Next(10,1000));
    return n;
}));

Task<Task<int>> whenAnyTask = Task.WhenAny(tasks);
Task<int> completedTask = await whenAnyTask;
Console.WriteLine("The winner is: task " + await completedTask);

await Task.WhenAll(tasks);
Console.WriteLine("All tasks finished!");

Task.WhenAll

var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(() =>
{
    Console.WriteLine("I'm task " + n);
    return n;
}));

Task<int[]> task = Task.WhenAll(tasks);
int[] results = await task;

Console.WriteLine(string.Join(",", results.Select(n => n.ToString())));
// Output: 1,2,3,4,5

Parallel.Invoke

var actions = Enumerable.Range(1, 10).Select(n => new Action(() =>
{
    Console.WriteLine("I'm task " + n);
    if((n & 1) == 0)
        throw new Exception("Exception from task " + n);
})).ToArray();

try
{
    Parallel.Invoke(actions);
}
catch(AggregateException ex)
{
    foreach(var inner in ex.InnerExceptions)
        Console.WriteLine("Task failed: " + inner.Message);
}

Parallel.ForEach

Questo esempio utilizza Parallel.ForEach per calcolare la somma dei numeri tra 1 e 10000 utilizzando più thread. Per raggiungere la sicurezza del thread, Interlocked.Add viene utilizzato per sommare i numeri.

using System.Threading;

int Foo()
{
    int total = 0;
    var numbers = Enumerable.Range(1, 10000).ToList();
    Parallel.ForEach(numbers, 
        () => 0, // initial value,
        (num, state, localSum) => num + localSum,
        localSum => Interlocked.Add(ref total, localSum));
    return total; // total = 50005000
}

Parallel.For

Questo esempio utilizza Parallel.For calcolare la somma dei numeri tra 1 e 10000 utilizzando più thread. Per raggiungere la sicurezza del thread, Interlocked.Add viene utilizzato per sommare i numeri.

using System.Threading;

int Foo()
{
    int total = 0;
    Parallel.For(1, 10001, 
        () => 0, // initial value,
        (num, state, localSum) => num + localSum,
        localSum => Interlocked.Add(ref total, localSum));
    return total; // total = 50005000
}

Contesto di esecuzione fluente con AsyncLocal

Quando è necessario passare alcuni dati dall'attività padre alle attività figli, in modo tale da scorrere logicamente con l'esecuzione, utilizzare la classe AsyncLocal :

void Main()
{
    AsyncLocal<string> user = new AsyncLocal<string>();
    user.Value = "initial user";
    
    // this does not affect other tasks - values are local relative to the branches of execution flow
    Task.Run(() => user.Value = "user from another task"); 
    
    var task1 = Task.Run(() =>
    {
        Console.WriteLine(user.Value); // outputs "initial user"
        Task.Run(() =>
        {
            // outputs "initial user" - value has flown from main method to this task without being changed
            Console.WriteLine(user.Value);
        }).Wait();

        user.Value = "user from task1";

        Task.Run(() =>
        {
            // outputs "user from task1" - value has flown from main method to task1
            // than value was changed and flown to this task.
            Console.WriteLine(user.Value);
        }).Wait();
    });
    
    task1.Wait();
    
    // ouputs "initial user" - changes do not propagate back upstream the execution flow    
    Console.WriteLine(user.Value); 
}

Nota: Come si può vedere dall'esempio sopra, AsynLocal.Value ha una copy on read semantica, ma se si scorre qualche tipo di riferimento e si cambiano le sue proprietà si avranno effetti su altre attività. Pertanto, la migliore pratica con AsyncLocal consiste nell'utilizzare tipi di valore o tipi immutabili.

Parallel.ForEach in VB.NET

For Each row As DataRow In FooDataTable.Rows
    Me.RowsToProcess.Add(row)
Next

Dim myOptions As ParallelOptions = New ParallelOptions()
myOptions.MaxDegreeOfParallelism = environment.processorcount

Parallel.ForEach(RowsToProcess, myOptions, Sub(currentRow, state)
                                               ProcessRowParallel(currentRow, state)
                                           End Sub)

Attività: restituire un valore

L'attività che restituisce un valore restituisce il tipo di Task< TResult > dove TResult è il tipo di valore che deve essere restituito. È possibile interrogare il risultato di un'attività tramite la sua proprietà Result.

Task<int> t = Task.Run(() => 
    {
        int sum = 0;

        for(int i = 0; i < 500; i++)
            sum += i;

        return sum;
    });

Console.WriteLine(t.Result); // Outuput 124750

Se l'attività viene eseguita in modo asincrono rispetto all'attesa, l'operazione restituisce il risultato.

public async Task DoSomeWork()
{
    WebClient client = new WebClient();
    // Because the task is awaited, result of the task is assigned to response
    string response = await client.DownloadStringTaskAsync("http://somedomain.com");
}


Modified text is an extract of the original Stack Overflow Documentation
Autorizzato sotto CC BY-SA 3.0
Non affiliato con Stack Overflow