Sök…


Anmärkningar

Syfte och användningsfall

Syftet med Task Parallel Library är att förenkla processen att skriva och underhålla multitrådad och parallell kod.

Vissa användningsfall *:

  • Att hålla ett användargränssnitt användbart genom att köra bakgrundsarbete på en separat uppgift
  • Distribuera arbetsbelastningen
  • Tillåter en klientapplikation att skicka och ta emot förfrågningar samtidigt (vila, TCP / UDP, ect)
  • Läsa och / eller skriva flera filer samtidigt

* Kod bör övervägas från fall till fall för multiträdning. Till exempel, om en slinga bara har några iterationer eller bara gör en liten mängd av arbetet, kan omkostnaderna för parallellitet uppväga fördelarna.

TPL med .Net 3.5

TPL finns också för .Net 3.5 ingår i ett NuGet-paket, det kallas Task Parallel Library.

Grundläggande producent-konsumentslinga (BlockingCollection)

var collection = new BlockingCollection<int>(5);
var random = new Random();

var producerTask = Task.Run(() => {
    for(int item=1; item<=10; item++) 
    {
        collection.Add(item);
        Console.WriteLine("Produced: " + item);
        Thread.Sleep(random.Next(10,1000));
    }
    collection.CompleteAdding();
    Console.WriteLine("Producer completed!");
});

Det är värt att notera att om du inte ringer collection.CompleteAdding(); , kan du fortsätta lägga till samlingen även om din konsumentuppgift körs. Bara ringsamling. Komplett collection.CompleteAdding(); när du är säker på att det inte finns fler tillägg. Den här funktionaliteten kan användas för att göra en flera tillverkare till ett enda konsumentmönster där du har flera källor som matar objekt i BlockingCollection och en enda konsument drar ut saker och gör något med dem. Om din BlockingCollection är tom innan du ringer fullständig läggning kommer Enumerable from collection.GetConsumingEnumerable() att blockeras tills ett nytt objekt läggs till i samlingen eller BlockingCollection.CompleteAdding (); heter och kön är tom.

var consumerTask = Task.Run(() => {
    foreach(var item in collection.GetConsumingEnumerable())
    {
        Console.WriteLine("Consumed: " + item);
        Thread.Sleep(random.Next(10,1000));
    }
    Console.WriteLine("Consumer completed!");
});
  
Task.WaitAll(producerTask, consumerTask);
       
Console.WriteLine("Everything completed!");

Uppgift: grundläggande instans och vänta

En uppgift kan skapas genom att direkt instansera Task ...

var task = new Task(() =>
{
    Console.WriteLine("Task code starting...");
    Thread.Sleep(2000);
    Console.WriteLine("...task code ending!");
});

Console.WriteLine("Starting task...");
task.Start();
task.Wait();
Console.WriteLine("Task completed!");

... eller med hjälp av den statiska Task.Run metoden:

Console.WriteLine("Starting task...");
var task = Task.Run(() =>
{
    Console.WriteLine("Task code starting...");
    Thread.Sleep(2000);
    Console.WriteLine("...task code ending!");
});
task.Wait();
Console.WriteLine("Task completed!");

Observera att det endast i det första fallet är nödvändigt att uttryckligen åberopa Start .

Uppgift: WaitAll och variabel fångst

var tasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() =>
{
    Console.WriteLine("I'm task " + n);
    return n;
})).ToArray();

foreach(var task in tasks) task.Start();
Task.WaitAll(tasks);

foreach(var task in tasks)
    Console.WriteLine(task.Result);

Uppgift: Vänta alla

var allTasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() => n)).ToArray();
var pendingTasks = allTasks.ToArray();

foreach(var task in allTasks) task.Start();

while(pendingTasks.Length > 0)
{
    var finishedTask = pendingTasks[Task.WaitAny(pendingTasks)];
    Console.WriteLine("Task {0} finished", finishedTask.Result);
    pendingTasks = pendingTasks.Except(new[] {finishedTask}).ToArray();
}

Task.WaitAll(allTasks);

Obs! Den sista WaitAll är nödvändig eftersom WaitAny orsakar inte undantag.

Uppgift: hantering av undantag (med vänta)

var task1 = Task.Run(() =>
{
    Console.WriteLine("Task 1 code starting...");
    throw new Exception("Oh no, exception from task 1!!");
});

var task2 = Task.Run(() =>
{
    Console.WriteLine("Task 2 code starting...");
    throw new Exception("Oh no, exception from task 2!!");
});

Console.WriteLine("Starting tasks...");
try
{
    Task.WaitAll(task1, task2);
}
catch(AggregateException ex)
{
    Console.WriteLine("Task(s) failed!");
    foreach(var inner in ex.InnerExceptions)
        Console.WriteLine(inner.Message);
}

Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted

Uppgift: hantera undantag (utan att använda Vänta)

var task1 = Task.Run(() =>
{
    Console.WriteLine("Task 1 code starting...");
    throw new Exception("Oh no, exception from task 1!!");
});

var task2 = Task.Run(() =>
{
    Console.WriteLine("Task 2 code starting...");
    throw new Exception("Oh no, exception from task 2!!");
});

var tasks = new[] {task1, task2};

Console.WriteLine("Starting tasks...");
while(tasks.All(task => !task.IsCompleted));

foreach(var task in tasks)
{
    if(task.IsFaulted)
        Console.WriteLine("Task failed: " +
            task.Exception.InnerExceptions.First().Message);
}

Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted

Uppgift: avbokning med CancellationToken

var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;

var task = new Task((state) =>
    {
        int i = 1;
        var myCancellationToken = (CancellationToken)state;
        while(true)
        {
            Console.Write("{0} ", i++);
            Thread.Sleep(1000);
            myCancellationToken.ThrowIfCancellationRequested();
        }
    },
    cancellationToken: cancellationToken,
    state: cancellationToken);

Console.WriteLine("Counting to infinity. Press any key to cancel!");
task.Start();
Console.ReadKey();

cancellationTokenSource.Cancel();
try
{
    task.Wait();
}
catch(AggregateException ex)
{
    ex.Handle(inner => inner is OperationCanceledException);
}

Console.WriteLine($"{Environment.NewLine}You have cancelled! Task status is: {task.Status}");
//Canceled

Som ett alternativ till ThrowIfCancellationRequested kan avbokningsförfrågan upptäckas med IsCancellationRequested och en OperationCanceledException kan kastas manuellt:

//New task delegate
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(!myCancellationToken.IsCancellationRequested)
{
    Console.Write("{0} ", i++);
    Thread.Sleep(1000);
}
Console.WriteLine($"{Environment.NewLine}Ouch, I have been cancelled!!");
throw new OperationCanceledException(myCancellationToken);

Observera hur annulleringstoken skickas till uppgiftskonstruktorn i parametern cancellationToken . Detta behövs så att uppgiften övergår till det Canceled tillståndet, inte till det Faulted tillståndet när ThrowIfCancellationRequested . Av samma anledning tillhandahålls också annulleringstoken uttryckligen i konstruktören av OperationCanceledException i det andra fallet.

Task.WhenAny

var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(async() =>
{
    Console.WriteLine("I'm task " + n);
    await Task.Delay(random.Next(10,1000));
    return n;
}));

Task<Task<int>> whenAnyTask = Task.WhenAny(tasks);
Task<int> completedTask = await whenAnyTask;
Console.WriteLine("The winner is: task " + await completedTask);

await Task.WhenAll(tasks);
Console.WriteLine("All tasks finished!");

Task.WhenAll

var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(() =>
{
    Console.WriteLine("I'm task " + n);
    return n;
}));

Task<int[]> task = Task.WhenAll(tasks);
int[] results = await task;

Console.WriteLine(string.Join(",", results.Select(n => n.ToString())));
// Output: 1,2,3,4,5

Parallel.Invoke

var actions = Enumerable.Range(1, 10).Select(n => new Action(() =>
{
    Console.WriteLine("I'm task " + n);
    if((n & 1) == 0)
        throw new Exception("Exception from task " + n);
})).ToArray();

try
{
    Parallel.Invoke(actions);
}
catch(AggregateException ex)
{
    foreach(var inner in ex.InnerExceptions)
        Console.WriteLine("Task failed: " + inner.Message);
}

Parallel.ForEach

Detta exempel använder Parallel.ForEach att beräkna summan av siffrorna mellan 1 och 10000 genom att använda flera trådar. För att uppnå gängsäkerhet används Interlocked.Add för att summera siffrorna.

using System.Threading;

int Foo()
{
    int total = 0;
    var numbers = Enumerable.Range(1, 10000).ToList();
    Parallel.ForEach(numbers, 
        () => 0, // initial value,
        (num, state, localSum) => num + localSum,
        localSum => Interlocked.Add(ref total, localSum));
    return total; // total = 50005000
}

Parallel.For

Detta exempel använder Parallel.For att beräkna summan av siffrorna mellan 1 och 10000 med hjälp av flera trådar. För att uppnå gängsäkerhet används Interlocked.Add för att summera siffrorna.

using System.Threading;

int Foo()
{
    int total = 0;
    Parallel.For(1, 10001, 
        () => 0, // initial value,
        (num, state, localSum) => num + localSum,
        localSum => Interlocked.Add(ref total, localSum));
    return total; // total = 50005000
}

Flödande exekveringskontekst med AsyncLocal

När du behöver överföra vissa data från föräldrauppgiften till sina barns uppgifter, så att det logiskt flyter med exekveringen, använd AsyncLocal klassen :

void Main()
{
    AsyncLocal<string> user = new AsyncLocal<string>();
    user.Value = "initial user";
    
    // this does not affect other tasks - values are local relative to the branches of execution flow
    Task.Run(() => user.Value = "user from another task"); 
    
    var task1 = Task.Run(() =>
    {
        Console.WriteLine(user.Value); // outputs "initial user"
        Task.Run(() =>
        {
            // outputs "initial user" - value has flown from main method to this task without being changed
            Console.WriteLine(user.Value);
        }).Wait();

        user.Value = "user from task1";

        Task.Run(() =>
        {
            // outputs "user from task1" - value has flown from main method to task1
            // than value was changed and flown to this task.
            Console.WriteLine(user.Value);
        }).Wait();
    });
    
    task1.Wait();
    
    // ouputs "initial user" - changes do not propagate back upstream the execution flow    
    Console.WriteLine(user.Value); 
}

Obs: Som framgår av exemplet ovan har AsynLocal.Value copy on read semantiskt, men om du flyter någon referenstyp och ändrar dess egenskaper kommer du att påverka andra uppgifter. Därför är bästa praxis med AsyncLocal att använda värdetyper eller oföränderliga typer.

Parallell.ForEach i VB.NET

For Each row As DataRow In FooDataTable.Rows
    Me.RowsToProcess.Add(row)
Next

Dim myOptions As ParallelOptions = New ParallelOptions()
myOptions.MaxDegreeOfParallelism = environment.processorcount

Parallel.ForEach(RowsToProcess, myOptions, Sub(currentRow, state)
                                               ProcessRowParallel(currentRow, state)
                                           End Sub)

Uppgift: Återvända ett värde

Uppgift som returnerar ett värde har Task< TResult > för Task< TResult > där TResult är den typ av värde som måste returneras. Du kan fråga om resultatet av en uppgift med dess resultategenskap.

Task<int> t = Task.Run(() => 
    {
        int sum = 0;

        for(int i = 0; i < 500; i++)
            sum += i;

        return sum;
    });

Console.WriteLine(t.Result); // Outuput 124750

Om uppgiften utförs asynkront än att vänta på uppgiften returnerar det resultatet.

public async Task DoSomeWork()
{
    WebClient client = new WebClient();
    // Because the task is awaited, result of the task is assigned to response
    string response = await client.DownloadStringTaskAsync("http://somedomain.com");
}


Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow