Recherche…


JoinBlock

(Recueille 2-3 entrées et les combine dans un tuple)

Comme BatchBlock, JoinBlock <T1, T2,…> peut regrouper des données provenant de plusieurs sources de données. En fait, c'est l'objectif principal de JoinBlock <T1, T2,…>.

Par exemple, un JoinBlock <string, double, int> est un ISourceBlock <Tuple <string, double, int >>.

Comme avec BatchBlock, JoinBlock <T1, T2,…> est capable de fonctionner en mode gourmand et non gourmand.

  • Dans le mode gourmand par défaut, toutes les données proposées aux cibles sont acceptées, même si l'autre cible ne dispose pas des données nécessaires pour former un tuple.
  • En mode non gourmand, les cibles du bloc reporteront les données jusqu'à ce que toutes les cibles se soient vues proposer les données nécessaires à la création d'un tuple. À ce moment, le bloc s'engagera dans un protocole de validation en deux phases. Ce report permet à une autre entité de consommer les données entre-temps afin de permettre au système global d’avancer.

entrer la description de l'image ici

Traitement des demandes avec un nombre limité d'objets groupés

var throttle = new JoinBlock<ExpensiveObject, Request>();
for(int i=0; i<10; i++) 
{
    requestProcessor.Target1.Post(new ExpensiveObject()); 
}

var processor = new Transform<Tuple<ExpensiveObject, Request>, ExpensiveObject>(pair =>
{
    var resource = pair.Item1;
    var request = pair.Item2;
    
    request.ProcessWith(resource);
    
    return resource;
});

throttle.LinkTo(processor);
processor.LinkTo(throttle.Target1);

Introduction à TPL Dataflow par Stephen Toub

BroadcastBlock

(Copier un article et envoyer les copies à chaque bloc auquel il est lié)

Contrairement à BufferBlock, la mission de BroadcastBlock dans la vie est de permettre à toutes les cibles liées à partir du bloc d'obtenir une copie de chaque élément publié, en écrasant continuellement la valeur «actuelle» avec celles qui lui sont propagées.

En outre, contrairement à BufferBlock, BroadcastBlock ne conserve pas inutilement les données. Après qu'une donnée particulière a été offerte à toutes les cibles, cet élément sera écrasé par n'importe quelle donnée suivante (comme pour tous les blocs de flux de données, les messages sont traités dans l'ordre FIFO). Cet élément sera offert à toutes les cibles, et ainsi de suite.

entrer la description de l'image ici

Producteur / consommateur asynchrone avec un producteur étranglé

var ui = TaskScheduler.FromCurrentSynchronizationContext();
var bb = new BroadcastBlock<ImageData>(i => i);

var saveToDiskBlock = new ActionBlock<ImageData>(item =>
    item.Image.Save(item.Path)
);

var showInUiBlock = new ActionBlock<ImageData>(item =>
    imagePanel.AddImage(item.Image), 
    new DataflowBlockOptions { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() }
);

bb.LinkTo(saveToDiskBlock);
bb.LinkTo(showInUiBlock);

Statut d'exposition d'un agent

public class MyAgent
{
    public ISourceBlock<string> Status { get; private set; }
    
    public MyAgent()
    {
        Status = new BroadcastBlock<string>();
        Run();
    } 

    private void Run()
    {
        Status.Post("Starting");
        Status.Post("Doing cool stuff");
        …
        Status.Post("Done");
    }
}

Introduction à TPL Dataflow par Stephen Toub

WriteOnceBlock

(Variable en lecture seule: Mémorise son premier élément de données et en distribue des copies en tant que sortie. Ignore tous les autres éléments de données)

Si BufferBlock est le bloc le plus fondamental dans TPL Dataflow, WriteOnceBlock est le plus simple.
Il stocke au plus une valeur et, une fois cette valeur définie, elle ne sera jamais remplacée ou remplacée.

Vous pouvez penser à WriteOnceBlock comme étant similaire à une variable membre readonly en C #, sauf qu'au lieu d'être uniquement paramétrable dans un constructeur et ensuite immuable, il est seulement définissable une fois et est alors immuable.

entrer la description de l'image ici

Fractionnement des sorties potentielles d'une tâche

public static async void SplitIntoBlocks(this Task<T> task,
    out IPropagatorBlock<T> result, 
    out IPropagatorBlock<Exception> exception)
{
    result = new WriteOnceBlock<T>(i => i);
    exception = new WriteOnceBlock<Exception>(i => i);

    try 
    { 
        result.Post(await task); 
    }
    catch(Exception ex) 
    { 
        exception.Post(ex); 
    }
}

Introduction à TPL Dataflow par Stephen Toub

BatchedJoinBlock

(Recueille un certain nombre d'éléments totaux de 2-3 entrées et les regroupe dans un Tuple de collections d'éléments de données)

BatchedJoinBlock <T1, T2,…> est en quelque sorte une combinaison de BatchBlock et de JoinBlock <T1, T2,…>.
Alors que JoinBlock <T1, T2,…> est utilisé pour agréger une entrée de chaque cible dans un tuple et que BatchBlock est utilisé pour agréger N entrées dans une collection, BatchedJoinBlock <T1, T2,…> est utilisé pour collecter N entrées toutes les cibles en tuples de collections.

entrer la description de l'image ici

Scatter / Gather

Considérons un problème de dispersion / rassemblement où N opérations sont lancées, dont certaines peuvent réussir et produire des sorties de chaînes, et d'autres peuvent échouer et produire des exceptions.

var batchedJoin = new BatchedJoinBlock<string, Exception>(10);

for (int i=0; i<10; i++)
{
    Task.Factory.StartNew(() => {
        try { batchedJoin.Target1.Post(DoWork()); }
        catch(Exception ex) { batchJoin.Target2.Post(ex); }
    });
}

var results = await batchedJoin.ReceiveAsync();

foreach(string s in results.Item1) 
{
    Console.WriteLine(s);
}

foreach(Exception e in results.Item2) 
{
    Console.WriteLine(e);
}

Introduction à TPL Dataflow par Stephen Toub

TransformBlock

(Sélectionnez un à un)

Comme avec ActionBlock, TransformBlock <TInput, TOutput> permet l'exécution d'un délégué pour effectuer certaines actions pour chaque donnée d'entrée; contrairement à ActionBlock, ce traitement a une sortie. Ce délégué peut être un Func <TInput, TOutput>, auquel cas le traitement de cet élément est considéré comme terminé lorsque le délégué revient, ou il peut s'agir d'un Func <TInput, Task>, auquel cas le traitement de cet élément est considéré comme terminé. lorsque le délégué renvoie, mais lorsque la tâche renvoyée est terminée. Pour ceux qui sont familiers avec LINQ, c'est un peu similaire à Select () dans la mesure où il prend une entrée, transforme cette entrée d'une certaine manière, puis produit une sortie.

Par défaut, TransformBlock <TInput, TOutput> traite ses données séquentiellement avec un MaxDegreeOfParallelism égal à 1. En plus de recevoir une entrée en mémoire tampon et de le traiter, ce bloc prend également toute sa sortie et son tampon traités (données qui n'ont pas été traitées et les données traitées).

Il comporte deux tâches: une pour traiter les données et une pour transmettre les données au bloc suivant.

entrer la description de l'image ici

Un pipeline concomitant

var compressor = new TransformBlock<byte[], byte[]>(input => Compress(input));
var encryptor = new TransformBlock<byte[], byte[]>(input => Encrypt(input));

compressor.LinkTo(Encryptor); 

Introduction à TPL Dataflow par Stephen Toub

ActionBlock

(pour chaque)

Cette classe peut être considérée logiquement comme un tampon pour les données à traiter combinées avec des tâches pour traiter ces données, avec le «bloc de flux de données» gérant les deux. Dans son utilisation la plus élémentaire, nous pouvons instancier un ActionBlock et y «publier» des données; le délégué fourni lors de la construction du ActionBlock sera exécuté de manière asynchrone pour chaque donnée envoyée.

entrer la description de l'image ici

Calcul synchrone

var ab = new ActionBlock<TInput>(i => 
{
    Compute(i);
});
…
ab.Post(1);
ab.Post(2);
ab.Post(3);

Limiter les téléchargements asynchrones à 5 au maximum simultanément

var downloader = new ActionBlock<string>(async url =>
{
    byte [] imageData = await DownloadAsync(url);
    Process(imageData);
}, new DataflowBlockOptions { MaxDegreeOfParallelism = 5 }); 

downloader.Post("http://website.com/path/to/images");
downloader.Post("http://another-website.com/path/to/images");

Introduction à TPL Dataflow par Stephen Toub

TransformManyBlock

(SelectMany, 1-m: les résultats de ce mappage sont «aplatis», tout comme le SelectMany de LINQ)

TransformManyBlock <TInput, TOutput> est très similaire à TransformBlock <TInput, TOutput>.
La principale différence est qu'un TransformBlock <TInput, TOutput> produit une et une seule sortie pour chaque entrée, TransformManyBlock <TInput, TOutput> produit un nombre quelconque (zéro ou plus) de sorties pour chaque entrée. Comme avec ActionBlock et TransformBlock <TInput, TOutput>, ce traitement peut être spécifié à l'aide de délégués, à la fois pour le traitement synchrone et asynchrone.

Un Func <TInput, IEnumerable> est utilisé pour synchrone et un Func <TInput, Task <IEnumerable >> est utilisé pour asynchrone. Comme avec ActionBlock et TransformBlock <TInput, TOutput>, TransformManyBlock <TInput, TOutput> utilise par défaut un traitement séquentiel, mais peut être configuré autrement.

Le délégué de mappage exécute une collection d'éléments qui sont insérés individuellement dans le tampon de sortie.

entrer la description de l'image ici

Asynchrone Web Crawler

var downloader = new TransformManyBlock<string, string>(async url =>
{
    Console.WriteLine(“Downloading “ + url);
    try 
    { 
        return ParseLinks(await DownloadContents(url)); 
    } 
    catch{}
    
    return Enumerable.Empty<string>();
});
downloader.LinkTo(downloader);

Élargir un Enumerable dans ses éléments constitutifs

var expanded = new TransformManyBlock<T[], T>(array => array);

Filtrage en passant de 1 à 0 ou 1 éléments

public IPropagatorBlock<T> CreateFilteredBuffer<T>(Predicate<T> filter)
{
    return new TransformManyBlock<T, T>(item =>
        filter(item) ? new [] { item } : Enumerable.Empty<T>());
}

Introduction à TPL Dataflow par Stephen Toub

BatchBlock

(Regroupe un certain nombre d'éléments de données séquentiels dans des ensembles d'éléments de données)

BatchBlock combine N éléments uniques en un seul lot, représenté sous la forme d'un tableau d'éléments. Une instance est créée avec une taille de lot spécifique, et le bloc crée ensuite un lot dès qu'il reçoit ce nombre d'éléments, produisant de manière asynchrone le lot dans le tampon de sortie.

BatchBlock est capable de s'exécuter dans des modes gourmands et non gourmands.

  • Dans le mode gourmand par défaut, tous les messages proposés au bloc depuis un nombre quelconque de sources sont acceptés et mis en mémoire tampon pour être convertis en lots.
    • En mode non gourmand, tous les messages sont reportés des sources jusqu'à ce que suffisamment de sources aient offert des messages au bloc pour créer un lot. Ainsi, un BatchBlock peut être utilisé pour recevoir 1 élément de chacune des N sources, N éléments provenant de 1 source et une myriade d'options entre les deux.

entrer la description de l'image ici

Classement des demandes en groupes de 100 pour les soumettre à une base de données

var batchRequests = new BatchBlock<Request>(batchSize:100);
var sendToDb = new ActionBlock<Request[]>(reqs => SubmitToDatabase(reqs));

batchRequests.LinkTo(sendToDb);

Créer un lot une fois par seconde

var batch = new BatchBlock<T>(batchSize:Int32.MaxValue);
new Timer(() => { batch.TriggerBatch(); }).Change(1000, 1000);

Introduction à TPL Dataflow par Stephen Toub

BufferBlock

(FIFO Queue: Les données qui entrent sont les données qui sortent)

En bref, BufferBlock fournit un tampon non lié ou limité pour stocker les instances de T.
Vous pouvez «publier» des instances de T sur le bloc, ce qui entraîne le stockage des données à enregistrer dans un ordre FIFO (first-in-first-out) par le bloc.
Vous pouvez «recevoir» du bloc, ce qui vous permet d'obtenir de manière synchrone ou asynchrone des instances de T précédemment stockées ou disponibles (encore une fois, FIFO).

entrer la description de l'image ici

Producteur / consommateur asynchrone avec un producteur étranglé

// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> _Buffer = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 10 });

// Producer
private static async void Producer()
{
    while(true)
    {
        await _Buffer.SendAsync(Produce());
    }
}

// Consumer
private static async Task Consumer()
{
    while(true)
    {
        Process(await _Buffer.ReceiveAsync());
    } 
}

// Start the Producer and Consumer
private static async Task Run()
{
    await Task.WhenAll(Producer(), Consumer());
}

Introduction à TPL Dataflow par Stephen Toub



Modified text is an extract of the original Stack Overflow Documentation
Sous licence CC BY-SA 3.0
Non affilié à Stack Overflow