Buscar..


JoinBlock

(Recoge 2-3 entradas y las combina en una tupla)

Al igual que BatchBlock, JoinBlock <T1, T2, ...> puede agrupar datos de múltiples fuentes de datos. De hecho, ese es el propósito principal de JoinBlock <T1, T2, ...>.

Por ejemplo, un JoinBlock <string, double, int> es un ISourceBlock <Tuple <string, double, int >>.

Al igual que con BatchBlock, JoinBlock <T1, T2, ...> es capaz de operar tanto en modo codicioso como no codicioso.

  • En el modo codicioso predeterminado, se aceptan todos los datos ofrecidos a los objetivos, incluso si el otro objetivo no tiene los datos necesarios para formar una tupla.
  • En el modo no ambicioso, los objetivos del bloque pospondrán los datos hasta que a todos los objetivos se les ofrezcan los datos necesarios para crear una tupla, momento en el que el bloque se involucrará en un protocolo de confirmación de dos fases para recuperar atómicamente todos los elementos necesarios de las fuentes. Este aplazamiento hace posible que otra entidad consuma los datos mientras tanto para permitir que el sistema en general avance hacia adelante.

introduzca la descripción de la imagen aquí

Procesamiento de solicitudes con un número limitado de objetos agrupados

var throttle = new JoinBlock<ExpensiveObject, Request>();
for(int i=0; i<10; i++) 
{
    requestProcessor.Target1.Post(new ExpensiveObject()); 
}

var processor = new Transform<Tuple<ExpensiveObject, Request>, ExpensiveObject>(pair =>
{
    var resource = pair.Item1;
    var request = pair.Item2;
    
    request.ProcessWith(resource);
    
    return resource;
});

throttle.LinkTo(processor);
processor.LinkTo(throttle.Target1);

Introducción al flujo de datos TPL por Stephen Toub

BroadcastBlock

(Copie un elemento y envíe las copias a cada bloque al que esté vinculado)

A diferencia de BufferBlock, la misión de BroadcastBlock en la vida es permitir que todos los objetivos vinculados desde el bloque obtengan una copia de cada elemento publicado, sobrescribiendo continuamente el valor "actual" con los propagados a él.

Además, a diferencia de BufferBlock, BroadcastBlock no retiene datos innecesariamente. Después de que se haya ofrecido un dato en particular a todos los destinos, ese elemento será sobrescrito por cualquier pieza de datos que se encuentre en la siguiente línea (como ocurre con todos los bloques de flujo de datos, los mensajes se manejan en orden FIFO). Ese elemento se ofrecerá a todos los objetivos, y así sucesivamente.

introduzca la descripción de la imagen aquí

Productor asincrónico / consumidor con un productor estrangulado

var ui = TaskScheduler.FromCurrentSynchronizationContext();
var bb = new BroadcastBlock<ImageData>(i => i);

var saveToDiskBlock = new ActionBlock<ImageData>(item =>
    item.Image.Save(item.Path)
);

var showInUiBlock = new ActionBlock<ImageData>(item =>
    imagePanel.AddImage(item.Image), 
    new DataflowBlockOptions { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() }
);

bb.LinkTo(saveToDiskBlock);
bb.LinkTo(showInUiBlock);

Exponer el estado de un agente

public class MyAgent
{
    public ISourceBlock<string> Status { get; private set; }
    
    public MyAgent()
    {
        Status = new BroadcastBlock<string>();
        Run();
    } 

    private void Run()
    {
        Status.Post("Starting");
        Status.Post("Doing cool stuff");
        …
        Status.Post("Done");
    }
}

Introducción al flujo de datos TPL por Stephen Toub

WriteOnceBlock

(Variable de solo lectura: Memoriza su primer elemento de datos y distribuye copias de él como salida. Ignora todos los demás elementos de datos)

Si BufferBlock es el bloque más fundamental en el flujo de datos TPL, WriteOnceBlock es el más simple.
Almacena a lo sumo un valor, y una vez que se ha establecido ese valor, nunca se reemplazará ni se sobrescribirá.

Puede pensar que WriteOnceBlock es similar a una variable miembro de solo lectura en C #, excepto que en lugar de ser solo configurable en un constructor y luego ser inmutable, solo se puede configurar una vez y luego es inmutable.

introduzca la descripción de la imagen aquí

Dividir las salidas potenciales de una tarea

public static async void SplitIntoBlocks(this Task<T> task,
    out IPropagatorBlock<T> result, 
    out IPropagatorBlock<Exception> exception)
{
    result = new WriteOnceBlock<T>(i => i);
    exception = new WriteOnceBlock<Exception>(i => i);

    try 
    { 
        result.Post(await task); 
    }
    catch(Exception ex) 
    { 
        exception.Post(ex); 
    }
}

Introducción al flujo de datos TPL por Stephen Toub

BatchedJoinBlock

(Recopila una cierta cantidad de elementos totales de 2-3 entradas y los agrupa en una tupla de colecciones de elementos de datos)

BatchedJoinBlock <T1, T2, ...> es en cierto sentido una combinación de BatchBlock y JoinBlock <T1, T2, ...>.
Mientras que JoinBlock <T1, T2, ...> se usa para agregar una entrada de cada objetivo en una tupla, y BatchBlock se usa para agregar N entradas en una colección, BatchedJoinBlock <T1, T2, ...> se usa para reunir N entradas desde Todos los objetivos en tuplas de colecciones.

introduzca la descripción de la imagen aquí

Dispersión / reunión

Considere un problema de dispersión / recopilación en el que se inician N operaciones, algunas de las cuales pueden tener éxito y producir resultados de cadena, y otras pueden fallar y producir Excepciones.

var batchedJoin = new BatchedJoinBlock<string, Exception>(10);

for (int i=0; i<10; i++)
{
    Task.Factory.StartNew(() => {
        try { batchedJoin.Target1.Post(DoWork()); }
        catch(Exception ex) { batchJoin.Target2.Post(ex); }
    });
}

var results = await batchedJoin.ReceiveAsync();

foreach(string s in results.Item1) 
{
    Console.WriteLine(s);
}

foreach(Exception e in results.Item2) 
{
    Console.WriteLine(e);
}

Introducción al flujo de datos TPL por Stephen Toub

TransformBlock

(Seleccione, uno a uno)

Al igual que con ActionBlock, TransformBlock <TInput, TOutput> permite la ejecución de un delegado para realizar alguna acción para cada dato de entrada; a diferencia de ActionBlock, este procesamiento tiene una salida. Este delegado puede ser un Func <TInput, TOutput>, en cuyo caso el procesamiento de ese elemento se considera completado cuando el delegado regresa, o puede ser un Func <TInput, Task>, en cuyo caso el procesamiento de ese elemento se considera completado no cuando el delegado regresa pero cuando la tarea devuelta se completa. Para aquellos familiarizados con LINQ, es algo similar a Select () en que toma una entrada, transforma esa entrada de alguna manera y luego produce una salida.

De forma predeterminada, TransformBlock <TInput, TOutput> procesa sus datos secuencialmente con un MaxDegreeOfParallelism igual a 1. Además de recibir una entrada almacenada en el búfer y procesarla, este bloque también tomará toda su salida procesada y el búfer (datos que no han sido procesados). procesados, y datos que han sido procesados).

Tiene 2 tareas: una para procesar los datos y otra para enviar los datos al siguiente bloque.

introduzca la descripción de la imagen aquí

Un oleoducto concurrente

var compressor = new TransformBlock<byte[], byte[]>(input => Compress(input));
var encryptor = new TransformBlock<byte[], byte[]>(input => Encrypt(input));

compressor.LinkTo(Encryptor); 

Introducción al flujo de datos TPL por Stephen Toub

ActionBlock

(para cada)

Esta clase puede considerarse lógicamente como un búfer para que los datos se procesen combinados con tareas para procesar esos datos, con el "bloque de flujo de datos" gestionando ambos. En su uso más básico, podemos instanciar un ActionBlock y "publicar" datos en él; El delegado proporcionado en la construcción del ActionBlock se ejecutará de forma asincrónica para cada pieza de datos publicada.

introduzca la descripción de la imagen aquí

Computación sincrónica

var ab = new ActionBlock<TInput>(i => 
{
    Compute(i);
});
…
ab.Post(1);
ab.Post(2);
ab.Post(3);

Aceleración de descargas asíncronas a un máximo de 5 al mismo tiempo

var downloader = new ActionBlock<string>(async url =>
{
    byte [] imageData = await DownloadAsync(url);
    Process(imageData);
}, new DataflowBlockOptions { MaxDegreeOfParallelism = 5 }); 

downloader.Post("http://website.com/path/to/images");
downloader.Post("http://another-website.com/path/to/images");

Introducción al flujo de datos TPL por Stephen Toub

TransformManyBlock

(SelectMany, 1-m: Los resultados de este mapeo son "aplanados", al igual que SelectMany de LINQ)

TransformManyBlock <TInput, TOutput> es muy similar a TransformBlock <TInput, TOutput>.
La diferencia clave es que mientras un TransformBlock <TInput, TOutput> produce una y solo una salida para cada entrada, TransformManyBlock <TInput, TOutput> produce cualquier número de salidas (cero o más) para cada entrada. Al igual que con ActionBlock y TransformBlock <TInput, TOutput>, este procesamiento se puede especificar utilizando delegados, tanto para el procesamiento síncrono como para el asíncrono.

Un Func <TInput, IEnumerable> se usa para síncrono, y un Func <TInput, Task <IEnumerable>> se usa para asíncrono. Al igual que con ActionBlock y TransformBlock <TInput, TOutput>, TransformManyBlock <TInput, TOutput>, los valores predeterminados son procesadores secuenciales, pero pueden configurarse de otro modo.

El delegado de asignación vuelve a ejecutar una colección de elementos, que se insertan individualmente en el búfer de salida.

introduzca la descripción de la imagen aquí

Rastreador Web Asíncrono

var downloader = new TransformManyBlock<string, string>(async url =>
{
    Console.WriteLine(“Downloading “ + url);
    try 
    { 
        return ParseLinks(await DownloadContents(url)); 
    } 
    catch{}
    
    return Enumerable.Empty<string>();
});
downloader.LinkTo(downloader);

Expandiendo un Enumerable en sus Elementos Constituyentes

var expanded = new TransformManyBlock<T[], T>(array => array);

Filtrado pasando de 1 a 0 o 1 elementos.

public IPropagatorBlock<T> CreateFilteredBuffer<T>(Predicate<T> filter)
{
    return new TransformManyBlock<T, T>(item =>
        filter(item) ? new [] { item } : Enumerable.Empty<T>());
}

Introducción al flujo de datos TPL por Stephen Toub

BatchBlock

(Agrupa un cierto número de elementos de datos secuenciales en colecciones de elementos de datos)

BatchBlock combina N elementos únicos en un artículo por lotes, representado como una matriz de elementos. Se crea una instancia con un tamaño de lote específico, y luego el bloque crea un lote tan pronto como recibe ese número de elementos, enviando el lote de forma asíncrona al búfer de salida.

BatchBlock es capaz de ejecutarse en modo codicioso y no codicioso.

  • En el modo codicioso predeterminado, todos los mensajes ofrecidos al bloque desde cualquier número de fuentes se aceptan y almacenan en búfer para convertirlos en lotes.
    • En el modo no codicioso, todos los mensajes se posponen desde las fuentes hasta que suficientes fuentes hayan ofrecido mensajes al bloque para crear un lote. Por lo tanto, se puede usar un BatchBlock para recibir 1 elemento de cada una de N fuentes, N elementos de 1 fuente y una gran cantidad de opciones entre ellas.

introduzca la descripción de la imagen aquí

Solicitudes por lotes en grupos de 100 para enviar a una base de datos

var batchRequests = new BatchBlock<Request>(batchSize:100);
var sendToDb = new ActionBlock<Request[]>(reqs => SubmitToDatabase(reqs));

batchRequests.LinkTo(sendToDb);

Creando un lote una vez por segundo

var batch = new BatchBlock<T>(batchSize:Int32.MaxValue);
new Timer(() => { batch.TriggerBatch(); }).Change(1000, 1000);

Introducción al flujo de datos TPL por Stephen Toub

BufferBlock

(Cola FIFO: los datos que entran son los datos que salen)

En resumen, BufferBlock proporciona un búfer ilimitado o limitado para almacenar instancias de T.
Puede "publicar" instancias de T en el bloque, lo que hace que los datos que se están publicando se almacenen en un orden de primero en entrar, primero en salir (FIFO) por el bloque.
Puede "recibir" del bloque, lo que le permite obtener de forma sincrónica o asíncrona instancias de T previamente almacenadas o disponibles en el futuro (nuevamente, FIFO).

introduzca la descripción de la imagen aquí

Productor asincrónico / consumidor con un productor estrangulado

// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> _Buffer = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 10 });

// Producer
private static async void Producer()
{
    while(true)
    {
        await _Buffer.SendAsync(Produce());
    }
}

// Consumer
private static async Task Consumer()
{
    while(true)
    {
        Process(await _Buffer.ReceiveAsync());
    } 
}

// Start the Producer and Consumer
private static async Task Run()
{
    await Task.WhenAll(Producer(), Consumer());
}

Introducción al flujo de datos TPL por Stephen Toub



Modified text is an extract of the original Stack Overflow Documentation
Licenciado bajo CC BY-SA 3.0
No afiliado a Stack Overflow