.NET Framework
タスク並列ライブラリ(TPL)
サーチ…
備考
目的と用途
タスク並列ライブラリの目的は、マルチスレッドおよび並列コードの作成および保守プロセスを簡素化することです。
いくつかのユースケース*:
- 別のタスクでバックグラウンド作業を実行してUIを反応的に保つ
- ワークロードの分散
- クライアントアプリケーションがリクエストを同時に送受信できるようにする(休憩、TCP / UDPなど)
- 一度に複数のファイルを読み書きする
*コードは、マルチスレッドのためにケースバイケースで検討する必要があります。たとえば、ループの繰り返し回数がわずかである場合や、作業量が少ない場合は、並列処理のオーバーヘッドがメリットを上回る可能性があります。
TPL with .Net 3.5
TPLは、NuGetパッケージに含まれている.Net 3.5用にも利用可能で、Task Parallel Libraryと呼ばれています。
基本的なプロデューサ - コンシューマループ(BlockingCollection)
var collection = new BlockingCollection<int>(5);
var random = new Random();
var producerTask = Task.Run(() => {
for(int item=1; item<=10; item++)
{
collection.Add(item);
Console.WriteLine("Produced: " + item);
Thread.Sleep(random.Next(10,1000));
}
collection.CompleteAdding();
Console.WriteLine("Producer completed!");
});
あなたがcollection.CompleteAdding();
呼び出さなければ、それは注目に値するcollection.CompleteAdding();
コンシューマタスクが実行中であっても、コレクションに追加し続けることができます。単にcollection.CompleteAdding();
呼び出しcollection.CompleteAdding();
あなたが確信しているときは、それ以上の追加はありません。この機能を使用して複数のプロデューサを単一のコンシューマパターンにすることができます。ここでは、複数のソースからBlockingCollectionにアイテムを供給し、単一の消費者がアイテムを取り出して何かを行うことができます。完全な追加を呼び出す前にBlockingCollectionが空の場合、 collection.GetConsumingEnumerable()
Enumerableは、新しいアイテムがコレクションに追加されるか、BlockingCollection.CompleteAdding();キューが空です。
var consumerTask = Task.Run(() => {
foreach(var item in collection.GetConsumingEnumerable())
{
Console.WriteLine("Consumed: " + item);
Thread.Sleep(random.Next(10,1000));
}
Console.WriteLine("Consumer completed!");
});
Task.WaitAll(producerTask, consumerTask);
Console.WriteLine("Everything completed!");
タスク:基本的なインスタンス化と待機
タスクは、 Task
クラスを直接インスタンス化することで作成できます...
var task = new Task(() =>
{
Console.WriteLine("Task code starting...");
Thread.Sleep(2000);
Console.WriteLine("...task code ending!");
});
Console.WriteLine("Starting task...");
task.Start();
task.Wait();
Console.WriteLine("Task completed!");
...または静的なTask.Run
メソッドを使用して:
Console.WriteLine("Starting task...");
var task = Task.Run(() =>
{
Console.WriteLine("Task code starting...");
Thread.Sleep(2000);
Console.WriteLine("...task code ending!");
});
task.Wait();
Console.WriteLine("Task completed!");
最初のケースでのみ、明示的にStart
呼び出す必要があることに注意してください。
タスク:WaitAllと可変キャプチャ
var tasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() =>
{
Console.WriteLine("I'm task " + n);
return n;
})).ToArray();
foreach(var task in tasks) task.Start();
Task.WaitAll(tasks);
foreach(var task in tasks)
Console.WriteLine(task.Result);
タスク:WaitAny
var allTasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() => n)).ToArray();
var pendingTasks = allTasks.ToArray();
foreach(var task in allTasks) task.Start();
while(pendingTasks.Length > 0)
{
var finishedTask = pendingTasks[Task.WaitAny(pendingTasks)];
Console.WriteLine("Task {0} finished", finishedTask.Result);
pendingTasks = pendingTasks.Except(new[] {finishedTask}).ToArray();
}
Task.WaitAll(allTasks);
注意:最終WaitAll
becasue必要があるWaitAny
、例外が観測されることはありません。
タスク:例外処理(Waitを使用)
var task1 = Task.Run(() =>
{
Console.WriteLine("Task 1 code starting...");
throw new Exception("Oh no, exception from task 1!!");
});
var task2 = Task.Run(() =>
{
Console.WriteLine("Task 2 code starting...");
throw new Exception("Oh no, exception from task 2!!");
});
Console.WriteLine("Starting tasks...");
try
{
Task.WaitAll(task1, task2);
}
catch(AggregateException ex)
{
Console.WriteLine("Task(s) failed!");
foreach(var inner in ex.InnerExceptions)
Console.WriteLine(inner.Message);
}
Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted
タスク:例外処理(Waitを使用しない)
var task1 = Task.Run(() =>
{
Console.WriteLine("Task 1 code starting...");
throw new Exception("Oh no, exception from task 1!!");
});
var task2 = Task.Run(() =>
{
Console.WriteLine("Task 2 code starting...");
throw new Exception("Oh no, exception from task 2!!");
});
var tasks = new[] {task1, task2};
Console.WriteLine("Starting tasks...");
while(tasks.All(task => !task.IsCompleted));
foreach(var task in tasks)
{
if(task.IsFaulted)
Console.WriteLine("Task failed: " +
task.Exception.InnerExceptions.First().Message);
}
Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted
タスク:CancellationTokenを使用してキャンセルする
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
var task = new Task((state) =>
{
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(true)
{
Console.Write("{0} ", i++);
Thread.Sleep(1000);
myCancellationToken.ThrowIfCancellationRequested();
}
},
cancellationToken: cancellationToken,
state: cancellationToken);
Console.WriteLine("Counting to infinity. Press any key to cancel!");
task.Start();
Console.ReadKey();
cancellationTokenSource.Cancel();
try
{
task.Wait();
}
catch(AggregateException ex)
{
ex.Handle(inner => inner is OperationCanceledException);
}
Console.WriteLine($"{Environment.NewLine}You have cancelled! Task status is: {task.Status}");
//Canceled
ThrowIfCancellationRequested
の代わりに、キャンセル要求をIsCancellationRequested
で検出でき、 OperationCanceledException
を手動でスローすることができます。
//New task delegate
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(!myCancellationToken.IsCancellationRequested)
{
Console.Write("{0} ", i++);
Thread.Sleep(1000);
}
Console.WriteLine($"{Environment.NewLine}Ouch, I have been cancelled!!");
throw new OperationCanceledException(myCancellationToken);
キャンセルトークンがcancellationToken
パラメータのタスクコンストラクタにどのように渡されるかに注意してください。これが必要とされているようにタスク遷移Canceled
状態、ではないにFaulted
状態、 ThrowIfCancellationRequested
呼び出されます。また、同じ理由で、キャンセルトークンは、2番目のケースでOperationCanceledException
のコンストラクターで明示的に指定されます。
Task.WhenAny
var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(async() =>
{
Console.WriteLine("I'm task " + n);
await Task.Delay(random.Next(10,1000));
return n;
}));
Task<Task<int>> whenAnyTask = Task.WhenAny(tasks);
Task<int> completedTask = await whenAnyTask;
Console.WriteLine("The winner is: task " + await completedTask);
await Task.WhenAll(tasks);
Console.WriteLine("All tasks finished!");
Task.WhenAll
var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(() =>
{
Console.WriteLine("I'm task " + n);
return n;
}));
Task<int[]> task = Task.WhenAll(tasks);
int[] results = await task;
Console.WriteLine(string.Join(",", results.Select(n => n.ToString())));
// Output: 1,2,3,4,5
Parallel.Invoke
var actions = Enumerable.Range(1, 10).Select(n => new Action(() =>
{
Console.WriteLine("I'm task " + n);
if((n & 1) == 0)
throw new Exception("Exception from task " + n);
})).ToArray();
try
{
Parallel.Invoke(actions);
}
catch(AggregateException ex)
{
foreach(var inner in ex.InnerExceptions)
Console.WriteLine("Task failed: " + inner.Message);
}
Parallel.ForEach
この例では、 Parallel.ForEach
を使用して、複数のスレッドを使用して1〜10000の数値の合計を計算します。スレッドセーフティを実現するために、 Interlocked.Add
を使用して数値を合計します。
using System.Threading;
int Foo()
{
int total = 0;
var numbers = Enumerable.Range(1, 10000).ToList();
Parallel.ForEach(numbers,
() => 0, // initial value,
(num, state, localSum) => num + localSum,
localSum => Interlocked.Add(ref total, localSum));
return total; // total = 50005000
}
Parallel.For
この例では、 Parallel.For
を使用して、複数のスレッドを使用して1〜10000の数値の合計を計算します。スレッドセーフティを実現するために、 Interlocked.Add
を使用して数値を合計します。
using System.Threading;
int Foo()
{
int total = 0;
Parallel.For(1, 10001,
() => 0, // initial value,
(num, state, localSum) => num + localSum,
localSum => Interlocked.Add(ref total, localSum));
return total; // total = 50005000
}
AsyncLocalを使用した実行コンテキストのフロー
親タスクAsyncLocal
タスクにいくつかのデータを渡す必要がある場合、それは論理的に実行とともに流れますので、 AsyncLocal
クラスを使用してAsyncLocal
:
void Main()
{
AsyncLocal<string> user = new AsyncLocal<string>();
user.Value = "initial user";
// this does not affect other tasks - values are local relative to the branches of execution flow
Task.Run(() => user.Value = "user from another task");
var task1 = Task.Run(() =>
{
Console.WriteLine(user.Value); // outputs "initial user"
Task.Run(() =>
{
// outputs "initial user" - value has flown from main method to this task without being changed
Console.WriteLine(user.Value);
}).Wait();
user.Value = "user from task1";
Task.Run(() =>
{
// outputs "user from task1" - value has flown from main method to task1
// than value was changed and flown to this task.
Console.WriteLine(user.Value);
}).Wait();
});
task1.Wait();
// ouputs "initial user" - changes do not propagate back upstream the execution flow
Console.WriteLine(user.Value);
}
注:上の例からわかるように、 AsynLocal.Value
はcopy on read
セマンティックがありますが、参照型をいくつか流してそのプロパティを変更すると、他のタスクに影響します。したがって、 AsyncLocal
ベストプラクティスは、値型または不変型を使用することです。
VB.NETのParallel.ForEach
For Each row As DataRow In FooDataTable.Rows
Me.RowsToProcess.Add(row)
Next
Dim myOptions As ParallelOptions = New ParallelOptions()
myOptions.MaxDegreeOfParallelism = environment.processorcount
Parallel.ForEach(RowsToProcess, myOptions, Sub(currentRow, state)
ProcessRowParallel(currentRow, state)
End Sub)
タスク:値を返す
値を返すTask< TResult >
戻り値の型はTask< TResult >
ここで、TResultは返される必要がある値の型です。タスクの結果をResultプロパティで照会することができます。
Task<int> t = Task.Run(() =>
{
int sum = 0;
for(int i = 0; i < 500; i++)
sum += i;
return sum;
});
Console.WriteLine(t.Result); // Outuput 124750
タスクが非同期的に実行されると、タスクがタスクを待つよりも結果が返されます。
public async Task DoSomeWork()
{
WebClient client = new WebClient();
// Because the task is awaited, result of the task is assigned to response
string response = await client.DownloadStringTaskAsync("http://somedomain.com");
}