C# Language
Конструкции потока данных параллельной библиотеки задач (TPL)
Поиск…
JoinBlock
(Собирает 2-3 входа и объединяет их в Tuple)
Подобно BatchBlock, JoinBlock <T1, T2, ...> способен группировать данные из нескольких источников данных. На самом деле, это основная задача JoinBlock <T1, T2, ...>.
Например, JoinBlock <string, double, int> - это ISourceBlock <Tuple <string, double, int >>.
Как и в случае с BatchBlock, JoinBlock <T1, T2, ...> способен работать как в жадном, так и в не жадном режиме.
- В жадном режиме по умолчанию все данные, предлагаемые для целей, принимаются, даже если у другой цели нет необходимых данных, чтобы сформировать кортеж.
- В нежидком режиме целевые объекты блока будут откладывать данные до тех пор, пока всем целям не будут предложены необходимые данные для создания кортежа, после чего блок будет участвовать в двухфазном протоколе фиксации для атомарного извлечения всех необходимых элементов из источников. Эта отсрочка позволяет другому субъекту потреблять данные тем временем, чтобы позволить общей системе продвигаться вперед.
Запросы обработки с ограниченным количеством объединенных объектов
var throttle = new JoinBlock<ExpensiveObject, Request>();
for(int i=0; i<10; i++)
{
requestProcessor.Target1.Post(new ExpensiveObject());
}
var processor = new Transform<Tuple<ExpensiveObject, Request>, ExpensiveObject>(pair =>
{
var resource = pair.Item1;
var request = pair.Item2;
request.ProcessWith(resource);
return resource;
});
throttle.LinkTo(processor);
processor.LinkTo(throttle.Target1);
Введение в поток данных TPL Стивена Туба
BroadcastBlock
(Скопируйте элемент и отправьте копии в каждый блок, к которому он привязан)
В отличие от BufferBlock, миссия BroadcastBlock в жизни - включить все цели, связанные с блоком, чтобы получить копию каждого опубликованного элемента, постоянно переписывая «текущее» значение с теми, которые были распространены на него.
Кроме того, в отличие от BufferBlock, BroadcastBlock не требует лишних данных. После того как конкретная дата была предложена всем целям, этот элемент будет перезаписан любым фрагментом данных в строке (как и во всех блоках потока данных, сообщения обрабатываются в порядке FIFO). Этот элемент будет предлагаться всем целям и т. Д.
Асинхронный производитель / потребитель с дроссельной заслонкой
var ui = TaskScheduler.FromCurrentSynchronizationContext();
var bb = new BroadcastBlock<ImageData>(i => i);
var saveToDiskBlock = new ActionBlock<ImageData>(item =>
item.Image.Save(item.Path)
);
var showInUiBlock = new ActionBlock<ImageData>(item =>
imagePanel.AddImage(item.Image),
new DataflowBlockOptions { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() }
);
bb.LinkTo(saveToDiskBlock);
bb.LinkTo(showInUiBlock);
Предоставление статуса от агента
public class MyAgent
{
public ISourceBlock<string> Status { get; private set; }
public MyAgent()
{
Status = new BroadcastBlock<string>();
Run();
}
private void Run()
{
Status.Post("Starting");
Status.Post("Doing cool stuff");
…
Status.Post("Done");
}
}
Введение в поток данных TPL Стивена Туба
WriteOnceBlock
(Readonly variable: запоминает свой первый элемент данных и выдает копии его в качестве вывода. Игнорирует все другие элементы данных)
Если BufferBlock является самым фундаментальным блоком в потоке данных TPL, WriteOnceBlock является самым простым.
Он хранит не более одного значения, и как только это значение будет установлено, оно никогда не будет заменено или перезаписано.
Вы можете думать о WriteOnceBlock как о сходстве с переменной-членом только для чтения в C #, за исключением того, что вместо того, чтобы настраиваться только в конструкторе, а затем быть неизменяемым, он устанавливается только один раз и неизменен.
Разделение потенциальных результатов задачи
public static async void SplitIntoBlocks(this Task<T> task,
out IPropagatorBlock<T> result,
out IPropagatorBlock<Exception> exception)
{
result = new WriteOnceBlock<T>(i => i);
exception = new WriteOnceBlock<Exception>(i => i);
try
{
result.Post(await task);
}
catch(Exception ex)
{
exception.Post(ex);
}
}
Введение в поток данных TPL Стивена Туба
BatchedJoinBlock
(Собирает определенное количество общих элементов из 2-3 входов и группирует их в набор наборов элементов данных)
BatchedJoinBlock <T1, T2, ...> является в некотором смысле комбинацией BatchBlock и JoinBlock <T1, T2, ...>.
В то время как JoinBlock <T1, T2, ...> используется для объединения одного входа от каждой цели в кортеж, а BatchBlock используется для объединения N входов в коллекцию, BatchedJoinBlock <T1, T2, ...> используется для сбора N входов из все цели в кортежей коллекций.
Scatter / Gather
Рассмотрим проблему рассеяния / сбора, в которой запущены операции N, некоторые из которых могут преуспеть и вывести строковые выходы, а другие из них могут завершиться неудачей и создать исключения.
var batchedJoin = new BatchedJoinBlock<string, Exception>(10);
for (int i=0; i<10; i++)
{
Task.Factory.StartNew(() => {
try { batchedJoin.Target1.Post(DoWork()); }
catch(Exception ex) { batchJoin.Target2.Post(ex); }
});
}
var results = await batchedJoin.ReceiveAsync();
foreach(string s in results.Item1)
{
Console.WriteLine(s);
}
foreach(Exception e in results.Item2)
{
Console.WriteLine(e);
}
Введение в поток данных TPL Стивена Туба
TransformBlock
(Выбрать, один к одному)
Как и в случае с ActionBlock, TransformBlock <TInput, TOutput> позволяет выполнить делегат для выполнения некоторых действий для каждой входной базы данных; в отличие от ActionBlock, эта обработка имеет выход. Этот делегат может быть Func <TInput, TOutput>, и в этом случае обработка этого элемента считается завершенной при возврате делегата или может быть Func <TInput, Task>, в этом случае обработка этого элемента считается завершенной не когда делегат возвращается, но когда возвращенная Задача завершается. Для тех, кто знаком с LINQ, он несколько похож на Select (), поскольку он принимает вход, каким-то образом преобразует этот вход, а затем производит вывод.
По умолчанию TransformBlock <TInput, TOutput> последовательно обрабатывает свои данные с помощью MaxDegreeOfParallelism, равного 1. В дополнение к приему буферизованного ввода и его обработке этот блок будет также обрабатывать все обработанные выходные данные и буфер (данные, которые не были обработаны и обработаны данные).
Он имеет 2 задачи: один для обработки данных, а другой - для передачи данных в следующий блок.
Параллельный трубопровод
var compressor = new TransformBlock<byte[], byte[]>(input => Compress(input));
var encryptor = new TransformBlock<byte[], byte[]>(input => Encrypt(input));
compressor.LinkTo(Encryptor);
Введение в поток данных TPL Стивена Туба
ActionBlock
(для каждого)
Этот класс можно логически мыслить в качестве буфера для обрабатываемых данных в сочетании с задачами обработки этих данных, причем «блок потока данных» управляет обоими. В своем самом основном использовании мы можем создавать экземпляры ActionBlock и «post»; делегат, предоставленный при построении ActionBlock, будет выполняться асинхронно для каждой части данных.
Синхронные вычисления
var ab = new ActionBlock<TInput>(i =>
{
Compute(i);
});
…
ab.Post(1);
ab.Post(2);
ab.Post(3);
Дросселирование асинхронных загрузок не более 5 одновременно
var downloader = new ActionBlock<string>(async url =>
{
byte [] imageData = await DownloadAsync(url);
Process(imageData);
}, new DataflowBlockOptions { MaxDegreeOfParallelism = 5 });
downloader.Post("http://website.com/path/to/images");
downloader.Post("http://another-website.com/path/to/images");
Введение в поток данных TPL Стивена Туба
TransformManyBlock
(SelectMany, 1-m: результаты этого сопоставления «сплющены», как и LINQ SelectMany)
TransformManyBlock <TInput, TOutput> очень похож на TransformBlock <TInput, TOutput>.
Главное отличие состоит в том, что, если TransformBlock <TInput, TOutput> создает один и только один вывод для каждого входа, TransformManyBlock <TInput, TOutput> производит любое количество (ноль или более) выходов для каждого входа. Как и в ActionBlock и TransformBlock <TInput, TOutput>, эта обработка может быть задана с использованием делегатов, как для синхронной, так и для асинхронной обработки.
Для синхронного используется Func <TInput, IEnumerable>, а для асинхронных используется Func <TInput, Task <IEnumerable >>. Как и для ActionBlock, так и для TransformBlock <TInput, TOutput>, TransformManyBlock <TInput, TOutput> по умолчанию используется последовательная обработка, но может быть настроена иначе.
Делегат сопоставления перенастраивает набор элементов, которые вставляются индивидуально в выходной буфер.
Асинхронный веб-сканер
var downloader = new TransformManyBlock<string, string>(async url =>
{
Console.WriteLine(“Downloading “ + url);
try
{
return ParseLinks(await DownloadContents(url));
}
catch{}
return Enumerable.Empty<string>();
});
downloader.LinkTo(downloader);
Расширение перечислимого в его составные элементы
var expanded = new TransformManyBlock<T[], T>(array => array);
Фильтрация путем перехода от 1 до 0 или 1 элементов
public IPropagatorBlock<T> CreateFilteredBuffer<T>(Predicate<T> filter)
{
return new TransformManyBlock<T, T>(item =>
filter(item) ? new [] { item } : Enumerable.Empty<T>());
}
Введение в поток данных TPL Стивена Туба
BatchBlock
(Группирует определенное количество последовательных элементов данных в коллекции элементов данных)
BatchBlock объединяет N отдельных элементов в один пакетный элемент, представленный как массив элементов. Экземпляр создается с определенным размером партии, и блок затем создает пакет, как только он получает это количество элементов, асинхронно выводит пакет в выходной буфер.
BatchBlock способен выполнять как жадные, так и неживые режимы.
- В жадном режиме по умолчанию все сообщения, предлагаемые блоку из любого количества источников, принимаются и буферизуются для преобразования в партии.
- В не жадном режиме все сообщения откладываются из источников, пока достаточное количество источников не предложит блок сообщения для создания пакета. Таким образом, BatchBlock может использоваться для приема 1 элемента из каждого из N источников, N элементов из 1 источника и множества параметров между ними.
Группировка запросов в группы по 100 для отправки в базу данных
var batchRequests = new BatchBlock<Request>(batchSize:100);
var sendToDb = new ActionBlock<Request[]>(reqs => SubmitToDatabase(reqs));
batchRequests.LinkTo(sendToDb);
Создание партии один раз в секунду
var batch = new BatchBlock<T>(batchSize:Int32.MaxValue);
new Timer(() => { batch.TriggerBatch(); }).Change(1000, 1000);
Введение в поток данных TPL Стивена Туба
BufferBlock
(Очередь FIFO: данные, которые поступают, - это данные, которые исчезают)
Короче говоря, BufferBlock предоставляет неограниченный или ограниченный буфер для хранения экземпляров T.
Вы можете «отправить» экземпляры T в блок, которые заставляют данные, которые будут размещаться, быть сохранены в порядке первого входа в первый (FIFO) блоком.
Вы можете «получать» из блока, что позволяет синхронно или асинхронно получать экземпляры T, ранее сохраненные или доступные в будущем (опять же, FIFO).
Асинхронный производитель / потребитель с дроссельной заслонкой
// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> _Buffer = new BufferBlock<int>(
new DataflowBlockOptions { BoundedCapacity = 10 });
// Producer
private static async void Producer()
{
while(true)
{
await _Buffer.SendAsync(Produce());
}
}
// Consumer
private static async Task Consumer()
{
while(true)
{
Process(await _Buffer.ReceiveAsync());
}
}
// Start the Producer and Consumer
private static async Task Run()
{
await Task.WhenAll(Producer(), Consumer());
}
Введение в поток данных TPL Стивена Туба