.NET Framework
Task Parallel Library (TPL)
Sök…
Anmärkningar
Syfte och användningsfall
Syftet med Task Parallel Library är att förenkla processen att skriva och underhålla multitrådad och parallell kod.
Vissa användningsfall *:
- Att hålla ett användargränssnitt användbart genom att köra bakgrundsarbete på en separat uppgift
- Distribuera arbetsbelastningen
- Tillåter en klientapplikation att skicka och ta emot förfrågningar samtidigt (vila, TCP / UDP, ect)
- Läsa och / eller skriva flera filer samtidigt
* Kod bör övervägas från fall till fall för multiträdning. Till exempel, om en slinga bara har några iterationer eller bara gör en liten mängd av arbetet, kan omkostnaderna för parallellitet uppväga fördelarna.
TPL med .Net 3.5
TPL finns också för .Net 3.5 ingår i ett NuGet-paket, det kallas Task Parallel Library.
Grundläggande producent-konsumentslinga (BlockingCollection)
var collection = new BlockingCollection<int>(5);
var random = new Random();
var producerTask = Task.Run(() => {
for(int item=1; item<=10; item++)
{
collection.Add(item);
Console.WriteLine("Produced: " + item);
Thread.Sleep(random.Next(10,1000));
}
collection.CompleteAdding();
Console.WriteLine("Producer completed!");
});
Det är värt att notera att om du inte ringer collection.CompleteAdding();
, kan du fortsätta lägga till samlingen även om din konsumentuppgift körs. Bara ringsamling. Komplett collection.CompleteAdding();
när du är säker på att det inte finns fler tillägg. Den här funktionaliteten kan användas för att göra en flera tillverkare till ett enda konsumentmönster där du har flera källor som matar objekt i BlockingCollection och en enda konsument drar ut saker och gör något med dem. Om din BlockingCollection är tom innan du ringer fullständig läggning kommer Enumerable from collection.GetConsumingEnumerable()
att blockeras tills ett nytt objekt läggs till i samlingen eller BlockingCollection.CompleteAdding (); heter och kön är tom.
var consumerTask = Task.Run(() => {
foreach(var item in collection.GetConsumingEnumerable())
{
Console.WriteLine("Consumed: " + item);
Thread.Sleep(random.Next(10,1000));
}
Console.WriteLine("Consumer completed!");
});
Task.WaitAll(producerTask, consumerTask);
Console.WriteLine("Everything completed!");
Uppgift: grundläggande instans och vänta
En uppgift kan skapas genom att direkt instansera Task
...
var task = new Task(() =>
{
Console.WriteLine("Task code starting...");
Thread.Sleep(2000);
Console.WriteLine("...task code ending!");
});
Console.WriteLine("Starting task...");
task.Start();
task.Wait();
Console.WriteLine("Task completed!");
... eller med hjälp av den statiska Task.Run
metoden:
Console.WriteLine("Starting task...");
var task = Task.Run(() =>
{
Console.WriteLine("Task code starting...");
Thread.Sleep(2000);
Console.WriteLine("...task code ending!");
});
task.Wait();
Console.WriteLine("Task completed!");
Observera att det endast i det första fallet är nödvändigt att uttryckligen åberopa Start
.
Uppgift: WaitAll och variabel fångst
var tasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() =>
{
Console.WriteLine("I'm task " + n);
return n;
})).ToArray();
foreach(var task in tasks) task.Start();
Task.WaitAll(tasks);
foreach(var task in tasks)
Console.WriteLine(task.Result);
Uppgift: Vänta alla
var allTasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() => n)).ToArray();
var pendingTasks = allTasks.ToArray();
foreach(var task in allTasks) task.Start();
while(pendingTasks.Length > 0)
{
var finishedTask = pendingTasks[Task.WaitAny(pendingTasks)];
Console.WriteLine("Task {0} finished", finishedTask.Result);
pendingTasks = pendingTasks.Except(new[] {finishedTask}).ToArray();
}
Task.WaitAll(allTasks);
Obs! Den sista WaitAll
är nödvändig eftersom WaitAny
orsakar inte undantag.
Uppgift: hantering av undantag (med vänta)
var task1 = Task.Run(() =>
{
Console.WriteLine("Task 1 code starting...");
throw new Exception("Oh no, exception from task 1!!");
});
var task2 = Task.Run(() =>
{
Console.WriteLine("Task 2 code starting...");
throw new Exception("Oh no, exception from task 2!!");
});
Console.WriteLine("Starting tasks...");
try
{
Task.WaitAll(task1, task2);
}
catch(AggregateException ex)
{
Console.WriteLine("Task(s) failed!");
foreach(var inner in ex.InnerExceptions)
Console.WriteLine(inner.Message);
}
Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted
Uppgift: hantera undantag (utan att använda Vänta)
var task1 = Task.Run(() =>
{
Console.WriteLine("Task 1 code starting...");
throw new Exception("Oh no, exception from task 1!!");
});
var task2 = Task.Run(() =>
{
Console.WriteLine("Task 2 code starting...");
throw new Exception("Oh no, exception from task 2!!");
});
var tasks = new[] {task1, task2};
Console.WriteLine("Starting tasks...");
while(tasks.All(task => !task.IsCompleted));
foreach(var task in tasks)
{
if(task.IsFaulted)
Console.WriteLine("Task failed: " +
task.Exception.InnerExceptions.First().Message);
}
Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted
Uppgift: avbokning med CancellationToken
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
var task = new Task((state) =>
{
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(true)
{
Console.Write("{0} ", i++);
Thread.Sleep(1000);
myCancellationToken.ThrowIfCancellationRequested();
}
},
cancellationToken: cancellationToken,
state: cancellationToken);
Console.WriteLine("Counting to infinity. Press any key to cancel!");
task.Start();
Console.ReadKey();
cancellationTokenSource.Cancel();
try
{
task.Wait();
}
catch(AggregateException ex)
{
ex.Handle(inner => inner is OperationCanceledException);
}
Console.WriteLine($"{Environment.NewLine}You have cancelled! Task status is: {task.Status}");
//Canceled
Som ett alternativ till ThrowIfCancellationRequested
kan avbokningsförfrågan upptäckas med IsCancellationRequested
och en OperationCanceledException
kan kastas manuellt:
//New task delegate
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(!myCancellationToken.IsCancellationRequested)
{
Console.Write("{0} ", i++);
Thread.Sleep(1000);
}
Console.WriteLine($"{Environment.NewLine}Ouch, I have been cancelled!!");
throw new OperationCanceledException(myCancellationToken);
Observera hur annulleringstoken skickas till uppgiftskonstruktorn i parametern cancellationToken
. Detta behövs så att uppgiften övergår till det Canceled
tillståndet, inte till det Faulted
tillståndet när ThrowIfCancellationRequested
. Av samma anledning tillhandahålls också annulleringstoken uttryckligen i konstruktören av OperationCanceledException
i det andra fallet.
Task.WhenAny
var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(async() =>
{
Console.WriteLine("I'm task " + n);
await Task.Delay(random.Next(10,1000));
return n;
}));
Task<Task<int>> whenAnyTask = Task.WhenAny(tasks);
Task<int> completedTask = await whenAnyTask;
Console.WriteLine("The winner is: task " + await completedTask);
await Task.WhenAll(tasks);
Console.WriteLine("All tasks finished!");
Task.WhenAll
var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(() =>
{
Console.WriteLine("I'm task " + n);
return n;
}));
Task<int[]> task = Task.WhenAll(tasks);
int[] results = await task;
Console.WriteLine(string.Join(",", results.Select(n => n.ToString())));
// Output: 1,2,3,4,5
Parallel.Invoke
var actions = Enumerable.Range(1, 10).Select(n => new Action(() =>
{
Console.WriteLine("I'm task " + n);
if((n & 1) == 0)
throw new Exception("Exception from task " + n);
})).ToArray();
try
{
Parallel.Invoke(actions);
}
catch(AggregateException ex)
{
foreach(var inner in ex.InnerExceptions)
Console.WriteLine("Task failed: " + inner.Message);
}
Parallel.ForEach
Detta exempel använder Parallel.ForEach
att beräkna summan av siffrorna mellan 1 och 10000 genom att använda flera trådar. För att uppnå gängsäkerhet används Interlocked.Add
för att summera siffrorna.
using System.Threading;
int Foo()
{
int total = 0;
var numbers = Enumerable.Range(1, 10000).ToList();
Parallel.ForEach(numbers,
() => 0, // initial value,
(num, state, localSum) => num + localSum,
localSum => Interlocked.Add(ref total, localSum));
return total; // total = 50005000
}
Parallel.For
Detta exempel använder Parallel.For
att beräkna summan av siffrorna mellan 1 och 10000 med hjälp av flera trådar. För att uppnå gängsäkerhet används Interlocked.Add
för att summera siffrorna.
using System.Threading;
int Foo()
{
int total = 0;
Parallel.For(1, 10001,
() => 0, // initial value,
(num, state, localSum) => num + localSum,
localSum => Interlocked.Add(ref total, localSum));
return total; // total = 50005000
}
Flödande exekveringskontekst med AsyncLocal
När du behöver överföra vissa data från föräldrauppgiften till sina barns uppgifter, så att det logiskt flyter med exekveringen, använd AsyncLocal
klassen :
void Main()
{
AsyncLocal<string> user = new AsyncLocal<string>();
user.Value = "initial user";
// this does not affect other tasks - values are local relative to the branches of execution flow
Task.Run(() => user.Value = "user from another task");
var task1 = Task.Run(() =>
{
Console.WriteLine(user.Value); // outputs "initial user"
Task.Run(() =>
{
// outputs "initial user" - value has flown from main method to this task without being changed
Console.WriteLine(user.Value);
}).Wait();
user.Value = "user from task1";
Task.Run(() =>
{
// outputs "user from task1" - value has flown from main method to task1
// than value was changed and flown to this task.
Console.WriteLine(user.Value);
}).Wait();
});
task1.Wait();
// ouputs "initial user" - changes do not propagate back upstream the execution flow
Console.WriteLine(user.Value);
}
Obs: Som framgår av exemplet ovan har AsynLocal.Value
copy on read
semantiskt, men om du flyter någon referenstyp och ändrar dess egenskaper kommer du att påverka andra uppgifter. Därför är bästa praxis med AsyncLocal
att använda värdetyper eller oföränderliga typer.
Parallell.ForEach i VB.NET
For Each row As DataRow In FooDataTable.Rows
Me.RowsToProcess.Add(row)
Next
Dim myOptions As ParallelOptions = New ParallelOptions()
myOptions.MaxDegreeOfParallelism = environment.processorcount
Parallel.ForEach(RowsToProcess, myOptions, Sub(currentRow, state)
ProcessRowParallel(currentRow, state)
End Sub)
Uppgift: Återvända ett värde
Uppgift som returnerar ett värde har Task< TResult >
för Task< TResult >
där TResult är den typ av värde som måste returneras. Du kan fråga om resultatet av en uppgift med dess resultategenskap.
Task<int> t = Task.Run(() =>
{
int sum = 0;
for(int i = 0; i < 500; i++)
sum += i;
return sum;
});
Console.WriteLine(t.Result); // Outuput 124750
Om uppgiften utförs asynkront än att vänta på uppgiften returnerar det resultatet.
public async Task DoSomeWork()
{
WebClient client = new WebClient();
// Because the task is awaited, result of the task is assigned to response
string response = await client.DownloadStringTaskAsync("http://somedomain.com");
}