Поиск…


JoinBlock

(Собирает 2-3 входа и объединяет их в Tuple)

Подобно BatchBlock, JoinBlock <T1, T2, ...> способен группировать данные из нескольких источников данных. На самом деле, это основная задача JoinBlock <T1, T2, ...>.

Например, JoinBlock <string, double, int> - это ISourceBlock <Tuple <string, double, int >>.

Как и в случае с BatchBlock, JoinBlock <T1, T2, ...> способен работать как в жадном, так и в не жадном режиме.

  • В жадном режиме по умолчанию все данные, предлагаемые для целей, принимаются, даже если у другой цели нет необходимых данных, чтобы сформировать кортеж.
  • В нежидком режиме целевые объекты блока будут откладывать данные до тех пор, пока всем целям не будут предложены необходимые данные для создания кортежа, после чего блок будет участвовать в двухфазном протоколе фиксации для атомарного извлечения всех необходимых элементов из источников. Эта отсрочка позволяет другому субъекту потреблять данные тем временем, чтобы позволить общей системе продвигаться вперед.

введите описание изображения здесь

Запросы обработки с ограниченным количеством объединенных объектов

var throttle = new JoinBlock<ExpensiveObject, Request>();
for(int i=0; i<10; i++) 
{
    requestProcessor.Target1.Post(new ExpensiveObject()); 
}

var processor = new Transform<Tuple<ExpensiveObject, Request>, ExpensiveObject>(pair =>
{
    var resource = pair.Item1;
    var request = pair.Item2;
    
    request.ProcessWith(resource);
    
    return resource;
});

throttle.LinkTo(processor);
processor.LinkTo(throttle.Target1);

Введение в поток данных TPL Стивена Туба

BroadcastBlock

(Скопируйте элемент и отправьте копии в каждый блок, к которому он привязан)

В отличие от BufferBlock, миссия BroadcastBlock в жизни - включить все цели, связанные с блоком, чтобы получить копию каждого опубликованного элемента, постоянно переписывая «текущее» значение с теми, которые были распространены на него.

Кроме того, в отличие от BufferBlock, BroadcastBlock не требует лишних данных. После того как конкретная дата была предложена всем целям, этот элемент будет перезаписан любым фрагментом данных в строке (как и во всех блоках потока данных, сообщения обрабатываются в порядке FIFO). Этот элемент будет предлагаться всем целям и т. Д.

введите описание изображения здесь

Асинхронный производитель / потребитель с дроссельной заслонкой

var ui = TaskScheduler.FromCurrentSynchronizationContext();
var bb = new BroadcastBlock<ImageData>(i => i);

var saveToDiskBlock = new ActionBlock<ImageData>(item =>
    item.Image.Save(item.Path)
);

var showInUiBlock = new ActionBlock<ImageData>(item =>
    imagePanel.AddImage(item.Image), 
    new DataflowBlockOptions { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() }
);

bb.LinkTo(saveToDiskBlock);
bb.LinkTo(showInUiBlock);

Предоставление статуса от агента

public class MyAgent
{
    public ISourceBlock<string> Status { get; private set; }
    
    public MyAgent()
    {
        Status = new BroadcastBlock<string>();
        Run();
    } 

    private void Run()
    {
        Status.Post("Starting");
        Status.Post("Doing cool stuff");
        …
        Status.Post("Done");
    }
}

Введение в поток данных TPL Стивена Туба

WriteOnceBlock

(Readonly variable: запоминает свой первый элемент данных и выдает копии его в качестве вывода. Игнорирует все другие элементы данных)

Если BufferBlock является самым фундаментальным блоком в потоке данных TPL, WriteOnceBlock является самым простым.
Он хранит не более одного значения, и как только это значение будет установлено, оно никогда не будет заменено или перезаписано.

Вы можете думать о WriteOnceBlock как о сходстве с переменной-членом только для чтения в C #, за исключением того, что вместо того, чтобы настраиваться только в конструкторе, а затем быть неизменяемым, он устанавливается только один раз и неизменен.

введите описание изображения здесь

Разделение потенциальных результатов задачи

public static async void SplitIntoBlocks(this Task<T> task,
    out IPropagatorBlock<T> result, 
    out IPropagatorBlock<Exception> exception)
{
    result = new WriteOnceBlock<T>(i => i);
    exception = new WriteOnceBlock<Exception>(i => i);

    try 
    { 
        result.Post(await task); 
    }
    catch(Exception ex) 
    { 
        exception.Post(ex); 
    }
}

Введение в поток данных TPL Стивена Туба

BatchedJoinBlock

(Собирает определенное количество общих элементов из 2-3 входов и группирует их в набор наборов элементов данных)

BatchedJoinBlock <T1, T2, ...> является в некотором смысле комбинацией BatchBlock и JoinBlock <T1, T2, ...>.
В то время как JoinBlock <T1, T2, ...> используется для объединения одного входа от каждой цели в кортеж, а BatchBlock используется для объединения N входов в коллекцию, BatchedJoinBlock <T1, T2, ...> используется для сбора N входов из все цели в кортежей коллекций.

введите описание изображения здесь

Scatter / Gather

Рассмотрим проблему рассеяния / сбора, в которой запущены операции N, некоторые из которых могут преуспеть и вывести строковые выходы, а другие из них могут завершиться неудачей и создать исключения.

var batchedJoin = new BatchedJoinBlock<string, Exception>(10);

for (int i=0; i<10; i++)
{
    Task.Factory.StartNew(() => {
        try { batchedJoin.Target1.Post(DoWork()); }
        catch(Exception ex) { batchJoin.Target2.Post(ex); }
    });
}

var results = await batchedJoin.ReceiveAsync();

foreach(string s in results.Item1) 
{
    Console.WriteLine(s);
}

foreach(Exception e in results.Item2) 
{
    Console.WriteLine(e);
}

Введение в поток данных TPL Стивена Туба

TransformBlock

(Выбрать, один к одному)

Как и в случае с ActionBlock, TransformBlock <TInput, TOutput> позволяет выполнить делегат для выполнения некоторых действий для каждой входной базы данных; в отличие от ActionBlock, эта обработка имеет выход. Этот делегат может быть Func <TInput, TOutput>, и в этом случае обработка этого элемента считается завершенной при возврате делегата или может быть Func <TInput, Task>, в этом случае обработка этого элемента считается завершенной не когда делегат возвращается, но когда возвращенная Задача завершается. Для тех, кто знаком с LINQ, он несколько похож на Select (), поскольку он принимает вход, каким-то образом преобразует этот вход, а затем производит вывод.

По умолчанию TransformBlock <TInput, TOutput> последовательно обрабатывает свои данные с помощью MaxDegreeOfParallelism, равного 1. В дополнение к приему буферизованного ввода и его обработке этот блок будет также обрабатывать все обработанные выходные данные и буфер (данные, которые не были обработаны и обработаны данные).

Он имеет 2 задачи: один для обработки данных, а другой - для передачи данных в следующий блок.

введите описание изображения здесь

Параллельный трубопровод

var compressor = new TransformBlock<byte[], byte[]>(input => Compress(input));
var encryptor = new TransformBlock<byte[], byte[]>(input => Encrypt(input));

compressor.LinkTo(Encryptor); 

Введение в поток данных TPL Стивена Туба

ActionBlock

(для каждого)

Этот класс можно логически мыслить в качестве буфера для обрабатываемых данных в сочетании с задачами обработки этих данных, причем «блок потока данных» управляет обоими. В своем самом основном использовании мы можем создавать экземпляры ActionBlock и «post»; делегат, предоставленный при построении ActionBlock, будет выполняться асинхронно для каждой части данных.

введите описание изображения здесь

Синхронные вычисления

var ab = new ActionBlock<TInput>(i => 
{
    Compute(i);
});
…
ab.Post(1);
ab.Post(2);
ab.Post(3);

Дросселирование асинхронных загрузок не более 5 одновременно

var downloader = new ActionBlock<string>(async url =>
{
    byte [] imageData = await DownloadAsync(url);
    Process(imageData);
}, new DataflowBlockOptions { MaxDegreeOfParallelism = 5 }); 

downloader.Post("http://website.com/path/to/images");
downloader.Post("http://another-website.com/path/to/images");

Введение в поток данных TPL Стивена Туба

TransformManyBlock

(SelectMany, 1-m: результаты этого сопоставления «сплющены», как и LINQ SelectMany)

TransformManyBlock <TInput, TOutput> очень похож на TransformBlock <TInput, TOutput>.
Главное отличие состоит в том, что, если TransformBlock <TInput, TOutput> создает один и только один вывод для каждого входа, TransformManyBlock <TInput, TOutput> производит любое количество (ноль или более) выходов для каждого входа. Как и в ActionBlock и TransformBlock <TInput, TOutput>, эта обработка может быть задана с использованием делегатов, как для синхронной, так и для асинхронной обработки.

Для синхронного используется Func <TInput, IEnumerable>, а для асинхронных используется Func <TInput, Task <IEnumerable >>. Как и для ActionBlock, так и для TransformBlock <TInput, TOutput>, TransformManyBlock <TInput, TOutput> по умолчанию используется последовательная обработка, но может быть настроена иначе.

Делегат сопоставления перенастраивает набор элементов, которые вставляются индивидуально в выходной буфер.

введите описание изображения здесь

Асинхронный веб-сканер

var downloader = new TransformManyBlock<string, string>(async url =>
{
    Console.WriteLine(“Downloading “ + url);
    try 
    { 
        return ParseLinks(await DownloadContents(url)); 
    } 
    catch{}
    
    return Enumerable.Empty<string>();
});
downloader.LinkTo(downloader);

Расширение перечислимого в его составные элементы

var expanded = new TransformManyBlock<T[], T>(array => array);

Фильтрация путем перехода от 1 до 0 или 1 элементов

public IPropagatorBlock<T> CreateFilteredBuffer<T>(Predicate<T> filter)
{
    return new TransformManyBlock<T, T>(item =>
        filter(item) ? new [] { item } : Enumerable.Empty<T>());
}

Введение в поток данных TPL Стивена Туба

BatchBlock

(Группирует определенное количество последовательных элементов данных в коллекции элементов данных)

BatchBlock объединяет N отдельных элементов в один пакетный элемент, представленный как массив элементов. Экземпляр создается с определенным размером партии, и блок затем создает пакет, как только он получает это количество элементов, асинхронно выводит пакет в выходной буфер.

BatchBlock способен выполнять как жадные, так и неживые режимы.

  • В жадном режиме по умолчанию все сообщения, предлагаемые блоку из любого количества источников, принимаются и буферизуются для преобразования в партии.
    • В не жадном режиме все сообщения откладываются из источников, пока достаточное количество источников не предложит блок сообщения для создания пакета. Таким образом, BatchBlock может использоваться для приема 1 элемента из каждого из N источников, N элементов из 1 источника и множества параметров между ними.

введите описание изображения здесь

Группировка запросов в группы по 100 для отправки в базу данных

var batchRequests = new BatchBlock<Request>(batchSize:100);
var sendToDb = new ActionBlock<Request[]>(reqs => SubmitToDatabase(reqs));

batchRequests.LinkTo(sendToDb);

Создание партии один раз в секунду

var batch = new BatchBlock<T>(batchSize:Int32.MaxValue);
new Timer(() => { batch.TriggerBatch(); }).Change(1000, 1000);

Введение в поток данных TPL Стивена Туба

BufferBlock

(Очередь FIFO: данные, которые поступают, - это данные, которые исчезают)

Короче говоря, BufferBlock предоставляет неограниченный или ограниченный буфер для хранения экземпляров T.
Вы можете «отправить» экземпляры T в блок, которые заставляют данные, которые будут размещаться, быть сохранены в порядке первого входа в первый (FIFO) блоком.
Вы можете «получать» из блока, что позволяет синхронно или асинхронно получать экземпляры T, ранее сохраненные или доступные в будущем (опять же, FIFO).

введите описание изображения здесь

Асинхронный производитель / потребитель с дроссельной заслонкой

// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> _Buffer = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 10 });

// Producer
private static async void Producer()
{
    while(true)
    {
        await _Buffer.SendAsync(Produce());
    }
}

// Consumer
private static async Task Consumer()
{
    while(true)
    {
        Process(await _Buffer.ReceiveAsync());
    } 
}

// Start the Producer and Consumer
private static async Task Run()
{
    await Task.WhenAll(Producer(), Consumer());
}

Введение в поток данных TPL Стивена Туба



Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow