C# Language
タスク並列ライブラリ(TPL)のデータフロー構成
サーチ…
JoinBlock
(2〜3の入力を集めてタプルに結合する)
BatchBlockと同様に、JoinBlock <T1、T2、...>は複数のデータソースからデータをグループ化できます。実際、それはJoinBlock <T1、T2、...>の主な目的です。
たとえば、JoinBlock <string、double、int>はISourceBlock <Tuple <string、double、int >>です。
BatchBlockの場合と同様に、JoinBlock <T1、T2、...>は貪欲モードと非貪欲モードの両方で動作することができます。
- デフォルトの欲張りモードでは、タプルを形成するのに必要なデータが他のターゲットにない場合でも、ターゲットに提供されるすべてのデータが受け入れられます。
- 貪欲でないモードでは、ブロックのターゲットは、すべてのターゲットに必要なデータが提供されてタプルが作成されるまでデータを延期します。この時点で、ブロックは2フェーズコミットプロトコルを使用してソースから必要なすべてのアイテムをアトミックに取得します。この延期により、別のエンティティがその間にデータを消費し、システム全体が前進することが可能になります。
プールされたオブジェクトの数が限られているリクエストの処理
var throttle = new JoinBlock<ExpensiveObject, Request>();
for(int i=0; i<10; i++)
{
requestProcessor.Target1.Post(new ExpensiveObject());
}
var processor = new Transform<Tuple<ExpensiveObject, Request>, ExpensiveObject>(pair =>
{
var resource = pair.Item1;
var request = pair.Item2;
request.ProcessWith(resource);
return resource;
});
throttle.LinkTo(processor);
processor.LinkTo(throttle.Target1);
BroadcastBlock
(アイテムをコピーし、それがリンクされているすべてのブロックにコピーを送信する)
BufferBlockとは違って、BroadcastBlockの使命は、ブロックからリンクされたすべてのターゲットが公開されたすべての要素のコピーを取得し、継続的に現在の値を上書きすることです。
さらに、BufferBlockとは異なり、BroadcastBlockはデータを不必要に保持しません。特定のデータムがすべてのターゲットに提供された後、その要素は次の行にあるデータによって上書きされます(すべてのデータフローブロックと同様に、メッセージはFIFO順に処理されます)。その要素はすべてのターゲットに提供されます。
プロデューサを抑制した非同期プロデューサ/コンシューマ
var ui = TaskScheduler.FromCurrentSynchronizationContext();
var bb = new BroadcastBlock<ImageData>(i => i);
var saveToDiskBlock = new ActionBlock<ImageData>(item =>
item.Image.Save(item.Path)
);
var showInUiBlock = new ActionBlock<ImageData>(item =>
imagePanel.AddImage(item.Image),
new DataflowBlockOptions { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() }
);
bb.LinkTo(saveToDiskBlock);
bb.LinkTo(showInUiBlock);
エージェントからのステータスの公開
public class MyAgent
{
public ISourceBlock<string> Status { get; private set; }
public MyAgent()
{
Status = new BroadcastBlock<string>();
Run();
}
private void Run()
{
Status.Post("Starting");
Status.Post("Doing cool stuff");
…
Status.Post("Done");
}
}
WriteOnceBlock
(Readonly変数:最初のデータ項目を記憶し、そのコピーを出力として渡します。他のすべてのデータ項目は無視します)
BufferBlockがTPL Dataflowの最も基本的なブロックである場合、WriteOnceBlockは最も簡単です。
多くても1つの値が格納され、その値が設定されると、置換または上書きされることはありません。
WriteOnceBlockは、コンストラクタ内でのみ設定可能で、不変であるのではなく、一度設定可能で不変の場合を除いて、C#のreadonlyメンバ変数と似ていると考えることができます。
タスクの潜在的出力の分割
public static async void SplitIntoBlocks(this Task<T> task,
out IPropagatorBlock<T> result,
out IPropagatorBlock<Exception> exception)
{
result = new WriteOnceBlock<T>(i => i);
exception = new WriteOnceBlock<Exception>(i => i);
try
{
result.Post(await task);
}
catch(Exception ex)
{
exception.Post(ex);
}
}
BatchedJoinBlock
(2〜3個の入力から特定の数の合計アイテムを収集し、それらをデータアイテムの集合のタプルにグループ化する)
BatchedJoinBlock <T1、T2、...>は、ある意味ではBatchBlockとJoinBlock <T1、T2、...>の組み合わせです。
JoinBlock <T1、T2、...>は各ターゲットからの1つの入力をタプルに集約するために使用され、BatchBlockはN個の入力をコレクションに集約するために使用され、BatchedJoinBlock <T1、T2、...>すべてのターゲットをコレクションのタプルに組み込みます。
スキャッタ/ギャザー
N個の操作が開始され、そのうちのいくつかが成功して文字列出力を生成し、その他が失敗して例外を生成する可能性のあるスキャッタ/ギャザーの問題を考えてみましょう。
var batchedJoin = new BatchedJoinBlock<string, Exception>(10);
for (int i=0; i<10; i++)
{
Task.Factory.StartNew(() => {
try { batchedJoin.Target1.Post(DoWork()); }
catch(Exception ex) { batchJoin.Target2.Post(ex); }
});
}
var results = await batchedJoin.ReceiveAsync();
foreach(string s in results.Item1)
{
Console.WriteLine(s);
}
foreach(Exception e in results.Item2)
{
Console.WriteLine(e);
}
TransformBlock
(選択、1対1)
ActionBlockと同様に、TransformBlock <TInput、TOutput>はデリゲートの実行が各入力データに対して何らかのアクションを実行できるようにします。 ActionBlockとは異なり、この処理には出力があります。このデリゲートは、Func <TInput、TOutput>とすることができます。この場合、デリゲートが返されたときにその要素の処理が完了したと見なされるか、Func <TInput、Task>になります。デリゲートが返されたときに、返されたタスクが完了したとき。 LINQに精通している人にとって、Select()は入力を受け取り、何らかの方法でその入力を変換し、出力を生成する点でSelect()と多少似ています。
デフォルトでは、TransformBlock <TInput、TOutput>は、MaxDegreeOfParallelismが1に等しい順番でデータを処理します。バッファされた入力を受け取って処理することに加えて、このブロックは処理された出力とバッファをすべて取ります処理されたデータ)を表示します。
これには2つのタスクがあります:1つはデータを処理するタスク、もう1つはデータを次のブロックにプッシュするタスクです。
同時パイプライン
var compressor = new TransformBlock<byte[], byte[]>(input => Compress(input));
var encryptor = new TransformBlock<byte[], byte[]>(input => Encrypt(input));
compressor.LinkTo(Encryptor);
ActionBlock
(foreach)
このクラスは、論理的には、データを処理するタスクと両方を管理する「データフローブロック」を組み合わせて、処理するデータのバッファとして考えることができます。最も基本的な使い方では、ActionBlockをインスタンス化してActionBlockに「投稿」することができます。 ActionBlockの構築で提供されたデリゲートは、ポストされたすべてのデータに対して非同期に実行されます。
同期計算
var ab = new ActionBlock<TInput>(i =>
{
Compute(i);
});
…
ab.Post(1);
ab.Post(2);
ab.Post(3);
非同期ダウンロードを最大で5つまで同時に調整する
var downloader = new ActionBlock<string>(async url =>
{
byte [] imageData = await DownloadAsync(url);
Process(imageData);
}, new DataflowBlockOptions { MaxDegreeOfParallelism = 5 });
downloader.Post("http://website.com/path/to/images");
downloader.Post("http://another-website.com/path/to/images");
TransformManyBlock
(SelectMany、1-m:このマッピングの結果は、LINQのSelectManyと同様に「平坦化」されています)
TransformManyBlock <TInput、TOutput>は、TransformBlock <TInput、TOutput>と非常によく似ています。
主な違いは、TransformBlock <TInput、TOutput>は各入力に対して1つの出力しか生成しませんが、TransformManyBlock <TInput、TOutput>は入力ごとに任意の数(0以上)の出力を生成する点です。 ActionBlockおよびTransformBlock <TInput、TOutput>と同様に、この処理は、同期処理と非同期処理の両方の委譲を使用して指定できます。
Func <TInput、IEnumerable>は同期に使用され、Func <TInput、Task <IEnumerable >>は非同期に使用されます。 ActionBlockとTransformBlock <TInput、TOutput>の両方と同様に、TransformManyBlock <TInput、TOutput>はデフォルトで順次処理になりますが、それ以外の場合は設定されます。
マッピングデリゲートは、出力バッファに個別に挿入されるアイテムのコレクションを返します。
非同期Webクローラー
var downloader = new TransformManyBlock<string, string>(async url =>
{
Console.WriteLine(“Downloading “ + url);
try
{
return ParseLinks(await DownloadContents(url));
}
catch{}
return Enumerable.Empty<string>();
});
downloader.LinkTo(downloader);
Enumerableをその構成要素に拡張する
var expanded = new TransformManyBlock<T[], T>(array => array);
1から0または1要素からなるフィルタリング
public IPropagatorBlock<T> CreateFilteredBuffer<T>(Predicate<T> filter)
{
return new TransformManyBlock<T, T>(item =>
filter(item) ? new [] { item } : Enumerable.Empty<T>());
}
バッチブロック
(一定数の順次データ項目をデータ項目の集合にグループ化する)
BatchBlockは、N個の単一項目を1つのバッチ項目に結合し、要素の配列として表します。特定のバッチサイズでインスタンスが作成され、ブロックはその数の要素を受け取るとすぐにバッチを作成し、バッチを出力バッファに非同期的に出力します。
BatchBlockは、貪欲モードと非貪欲モードの両方で実行することができます。
- デフォルトの貪欲モードでは、任意の数のソースからブロックに提供されたすべてのメッセージが受け入れられ、バッファに入れられてバッチに変換されます。
- 非貪欲モードでは、バッチを作成するのに十分なソースがブロックにメッセージを提供するまで、すべてのメッセージはソースから延期されます。したがって、BatchBlockを使用して、N個のソースのそれぞれから1個の要素、1個のソースからN個の要素、およびそれらの間に無数のオプションを受け取ることができます。
リクエストを100個のグループにまとめてデータベースに送信する
var batchRequests = new BatchBlock<Request>(batchSize:100);
var sendToDb = new ActionBlock<Request[]>(reqs => SubmitToDatabase(reqs));
batchRequests.LinkTo(sendToDb);
1秒に1回バッチを作成する
var batch = new BatchBlock<T>(batchSize:Int32.MaxValue);
new Timer(() => { batch.TriggerBatch(); }).Change(1000, 1000);
BufferBlock
(FIFOキュー:入って来るデータは、出て行くデータです)
要するに、BufferBlockは、Tのインスタンスを格納するための無制限または限定バッファを提供します。
Tのインスタンスをブロックに「ポスト」することができます。これにより、ポストされたデータがブロックによって先入れ先出し(FIFO)順に格納されます。
あなたはブロックから "受信"することができます。これにより、以前に保存された、または将来利用可能なTのインスタンス(つまり、FIFO)を同期的または非同期的に取得できます。
プロデューサを抑制した非同期プロデューサ/コンシューマ
// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> _Buffer = new BufferBlock<int>(
new DataflowBlockOptions { BoundedCapacity = 10 });
// Producer
private static async void Producer()
{
while(true)
{
await _Buffer.SendAsync(Produce());
}
}
// Consumer
private static async Task Consumer()
{
while(true)
{
Process(await _Buffer.ReceiveAsync());
}
}
// Start the Producer and Consumer
private static async Task Run()
{
await Task.WhenAll(Producer(), Consumer());
}