Suche…


JoinBlock

(Sammelt 2-3 Eingaben und kombiniert sie zu einem Tupel)

Wie bei BatchBlock können mit JoinBlock <T1, T2,…> Daten aus mehreren Datenquellen gruppiert werden. Tatsächlich ist dies der Hauptzweck von JoinBlock <T1, T2,…>.

Ein JoinBlock <string, double, int> ist beispielsweise ein ISourceBlock <Tuple <string, double, int >>.

Wie bei BatchBlock kann JoinBlock <T1, T2,…> sowohl im gierigen als auch im nicht gierigen Modus arbeiten.

  • Im standardmäßigen Greedy-Modus werden alle den Zielen angebotenen Daten akzeptiert, auch wenn das andere Ziel nicht über die erforderlichen Daten verfügt, um ein Tupel zu bilden.
  • Im nicht-gierigen Modus verschieben die Ziele des Blocks die Daten, bis allen Zielen die erforderlichen Daten zum Erstellen eines Tupels zur Verfügung gestellt wurden. Zu diesem Zeitpunkt verwendet der Block ein zweiphasiges Festschreibungsprotokoll, um alle erforderlichen Elemente aus den Quellen atomar abzurufen. Diese Verschiebung ermöglicht es einer anderen Entität, die Daten zwischenzeitlich zu verbrauchen, um dem Gesamtsystem einen Fortschritt zu ermöglichen.

Geben Sie hier die Bildbeschreibung ein

Anfragen mit einer begrenzten Anzahl gepoolter Objekte bearbeiten

var throttle = new JoinBlock<ExpensiveObject, Request>();
for(int i=0; i<10; i++) 
{
    requestProcessor.Target1.Post(new ExpensiveObject()); 
}

var processor = new Transform<Tuple<ExpensiveObject, Request>, ExpensiveObject>(pair =>
{
    var resource = pair.Item1;
    var request = pair.Item2;
    
    request.ProcessWith(resource);
    
    return resource;
});

throttle.LinkTo(processor);
processor.LinkTo(throttle.Target1);

Einführung in TPL Dataflow von Stephen Toub

BroadcastBlock

(Kopieren Sie ein Objekt und senden Sie die Kopien an jeden Block, mit dem es verlinkt ist.)

Im Gegensatz zu BufferBlock besteht das Ziel von BroadcastBlock darin, allen mit dem Block verknüpften Zielen zu ermöglichen, eine Kopie jedes veröffentlichten Elements zu erhalten, wobei der "aktuelle" Wert ständig mit den an ihn weitergegebenen Objekten überschrieben wird.

Im Gegensatz zu BufferBlock hält BroadcastBlock die Daten nicht unnötig an. Nachdem ein bestimmtes Datum für alle Ziele angeboten wurde, wird dieses Element durch das nächste Datenelement überschrieben (wie bei allen Datenflussblöcken werden Nachrichten in der FIFO-Reihenfolge behandelt). Dieses Element wird allen Zielen angeboten und so weiter.

Geben Sie hier die Bildbeschreibung ein

Asynchroner Producer / Consumer mit einem gedrosselten Producer

var ui = TaskScheduler.FromCurrentSynchronizationContext();
var bb = new BroadcastBlock<ImageData>(i => i);

var saveToDiskBlock = new ActionBlock<ImageData>(item =>
    item.Image.Save(item.Path)
);

var showInUiBlock = new ActionBlock<ImageData>(item =>
    imagePanel.AddImage(item.Image), 
    new DataflowBlockOptions { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() }
);

bb.LinkTo(saveToDiskBlock);
bb.LinkTo(showInUiBlock);

Anzeigen des Status eines Agenten

public class MyAgent
{
    public ISourceBlock<string> Status { get; private set; }
    
    public MyAgent()
    {
        Status = new BroadcastBlock<string>();
        Run();
    } 

    private void Run()
    {
        Status.Post("Starting");
        Status.Post("Doing cool stuff");
        …
        Status.Post("Done");
    }
}

Einführung in TPL Dataflow von Stephen Toub

WriteOnceBlock

(Readonly-Variable: Speichert das erste Datenelement und gibt Kopien davon als Ausgabe aus. Ignoriert alle anderen Datenelemente.)

Wenn BufferBlock der grundlegendste Block in TPL Dataflow ist, ist WriteOnceBlock der einfachste.
Es speichert höchstens einen Wert. Sobald dieser Wert festgelegt wurde, wird er niemals ersetzt oder überschrieben.

Sie können sich WriteOnceBlock wie eine readonly-Membervariable in C # vorstellen, mit der Ausnahme, dass Sie nicht nur in einem Konstruktor einstellbar und dann unveränderlich sind, sondern nur einmal einstellbar und dann unveränderlich sind.

Geben Sie hier die Bildbeschreibung ein

Potentielle Ausgaben einer Aufgabe aufteilen

public static async void SplitIntoBlocks(this Task<T> task,
    out IPropagatorBlock<T> result, 
    out IPropagatorBlock<Exception> exception)
{
    result = new WriteOnceBlock<T>(i => i);
    exception = new WriteOnceBlock<Exception>(i => i);

    try 
    { 
        result.Post(await task); 
    }
    catch(Exception ex) 
    { 
        exception.Post(ex); 
    }
}

Einführung in TPL Dataflow von Stephen Toub

BatchedJoinBlock

(Sammelt eine bestimmte Anzahl von Gesamtelementen aus 2-3 Eingaben und gruppiert sie in einem Tuple von Sammlungen von Datenelementen.)

BatchedJoinBlock <T1, T2, ...> ist gewissermaßen eine Kombination aus BatchBlock und JoinBlock <T1, T2, ...>.
Während JoinBlock <T1, T2,…> verwendet wird, um eine Eingabe von jedem Ziel zu einem Tupel zu aggregieren, und BatchBlock, um N Eingaben in einer Sammlung zu aggregieren, wird BatchedJoinBlock <T1, T2,…> verwendet, um N Eingaben von across zu sammeln alle Ziele in Tupel von Sammlungen.

Geben Sie hier die Bildbeschreibung ein

Streuung / Sammeln

Stellen Sie sich ein Scatter / Gather-Problem vor, bei dem N-Operationen gestartet werden, von denen einige erfolgreich sein können und String-Ausgaben erzeugen können, und andere, die möglicherweise fehlschlagen und Ausnahmen erzeugen.

var batchedJoin = new BatchedJoinBlock<string, Exception>(10);

for (int i=0; i<10; i++)
{
    Task.Factory.StartNew(() => {
        try { batchedJoin.Target1.Post(DoWork()); }
        catch(Exception ex) { batchJoin.Target2.Post(ex); }
    });
}

var results = await batchedJoin.ReceiveAsync();

foreach(string s in results.Item1) 
{
    Console.WriteLine(s);
}

foreach(Exception e in results.Item2) 
{
    Console.WriteLine(e);
}

Einführung in TPL Dataflow von Stephen Toub

TransformBlock

(Wählen Sie eins zu eins aus)

Wie bei ActionBlock ermöglicht TransformBlock <TInput, TOutput> die Ausführung eines Delegaten, um eine Aktion für jedes Eingabedatum auszuführen. Im Gegensatz zu ActionBlock hat diese Verarbeitung eine Ausgabe. Dieser Delegat kann ein Func <TInput, TOutput> sein. In diesem Fall wird die Verarbeitung dieses Elements als abgeschlossen betrachtet, wenn der Delegat zurückgegeben wird, oder er kann ein Func <TInput, Task> sein. In diesem Fall wird die Verarbeitung dieses Elements als nicht abgeschlossen betrachtet wenn der Delegat zurückkehrt, aber wenn die zurückgegebene Aufgabe abgeschlossen ist. Für diejenigen, die mit LINQ vertraut sind, ist es Select () ein wenig ähnlich, dass es eine Eingabe übernimmt, diese Eingabe in eine bestimmte Weise transformiert und dann eine Ausgabe erzeugt.

Standardmäßig verarbeitet TransformBlock <TInput, TOutput> seine Daten sequentiell mit einem MaxDegreeOfParallelism-Wert von 1. Zusätzlich zum Empfang von gepufferten Eingaben und deren Verarbeitung nimmt dieser Block seine gesamte verarbeitete Ausgabe auf und puffert diese (auch nicht vorhandene Daten) verarbeitet und Daten, die verarbeitet wurden).

Es hat zwei Aufgaben: eine für die Verarbeitung der Daten und eine für die Weitergabe der Daten an den nächsten Block.

Geben Sie hier die Bildbeschreibung ein

Eine gleichzeitige Pipeline

var compressor = new TransformBlock<byte[], byte[]>(input => Compress(input));
var encryptor = new TransformBlock<byte[], byte[]>(input => Encrypt(input));

compressor.LinkTo(Encryptor); 

Einführung in TPL Dataflow von Stephen Toub

ActionBlock

(für jeden)

Diese Klasse kann logisch als Puffer für zu verarbeitende Daten mit Aufgaben zur Verarbeitung dieser Daten betrachtet werden, wobei der „Datenflussblock“ beide verwaltet. In der einfachsten Verwendung können wir einen ActionBlock instanziieren und Daten darauf "buchen". Der bei der ActionBlock-Konstruktion bereitgestellte Delegat wird asynchron für alle gesendeten Daten ausgeführt.

Geben Sie hier die Bildbeschreibung ein

Synchrone Berechnung

var ab = new ActionBlock<TInput>(i => 
{
    Compute(i);
});
…
ab.Post(1);
ab.Post(2);
ab.Post(3);

Drosseln asynchroner Downloads auf maximal 5 gleichzeitig

var downloader = new ActionBlock<string>(async url =>
{
    byte [] imageData = await DownloadAsync(url);
    Process(imageData);
}, new DataflowBlockOptions { MaxDegreeOfParallelism = 5 }); 

downloader.Post("http://website.com/path/to/images");
downloader.Post("http://another-website.com/path/to/images");

Einführung in TPL Dataflow von Stephen Toub

TransformManyBlock

(SelectMany, 1-m: Die Ergebnisse dieser Zuordnung werden wie bei SelectMany von LINQ "abgeflacht".)

TransformManyBlock <TInput, TOutput> ist TransformBlock <TInput, TOutput> sehr ähnlich.
Der Hauptunterschied besteht darin, dass ein TransformBlock <TInput, TOutput> für jeden Eingang nur einen Ausgang erzeugt, während TransformManyBlock <TInput, TOutput> für jeden Eingang eine beliebige Anzahl (null oder mehr) Ausgänge erzeugt. Wie bei ActionBlock und TransformBlock <TInput, TOutput> kann diese Verarbeitung mithilfe von Delegaten angegeben werden, und zwar sowohl für die synchrone als auch für die asynchrone Verarbeitung.

Ein Func <TInput, IEnumerable> wird für synchron verwendet, und ein Func <TInput, Task <IEnumerable> wird für asynchron verwendet. Wie bei ActionBlock und TransformBlock <TInput wird bei TOutput>, TransformManyBlock <TInput, TOutput> standardmäßig die sequentielle Verarbeitung verwendet, sie kann jedoch auch anders konfiguriert werden.

Der Zuordnungsdelegierte führt eine Sammlung von Elementen erneut aus, die einzeln in den Ausgabepuffer eingefügt werden.

Geben Sie hier die Bildbeschreibung ein

Asynchroner Web Crawler

var downloader = new TransformManyBlock<string, string>(async url =>
{
    Console.WriteLine(“Downloading “ + url);
    try 
    { 
        return ParseLinks(await DownloadContents(url)); 
    } 
    catch{}
    
    return Enumerable.Empty<string>();
});
downloader.LinkTo(downloader);

Ausdehnung eines Aufzählers in seine Bestandteile

var expanded = new TransformManyBlock<T[], T>(array => array);

Filtern durch Wechseln von 1 zu 0 oder 1 Elementen

public IPropagatorBlock<T> CreateFilteredBuffer<T>(Predicate<T> filter)
{
    return new TransformManyBlock<T, T>(item =>
        filter(item) ? new [] { item } : Enumerable.Empty<T>());
}

Einführung in TPL Dataflow von Stephen Toub

BatchBlock

(Gruppiert eine bestimmte Anzahl von sequentiellen Datenelementen in Sammlungen von Datenelementen.)

BatchBlock kombiniert N einzelne Elemente in einem Stapelelement, das als Array von Elementen dargestellt wird. Eine Instanz wird mit einer bestimmten Stapelgröße erstellt. Der Block erstellt dann einen Stapel, sobald er diese Anzahl von Elementen erhalten hat, und gibt den Stapel asynchron in den Ausgabepuffer aus.

BatchBlock kann sowohl im gierigen als auch im nicht gierigen Modus ausgeführt werden.

  • Im voreingestellten Greedy-Modus werden alle Meldungen, die aus einer beliebigen Anzahl von Quellen zum Block angeboten werden, akzeptiert und zwischengespeichert, um in Stapel umgewandelt zu werden.
    • Im nicht-gierigen Modus werden alle Nachrichten aus Quellen verschoben, bis genügend Quellen dem Block Angebote zum Erstellen eines Stapels angeboten haben. Somit kann ein BatchBlock verwendet werden, um 1 Element von jeder der N Quellen, N Elemente von einer Quelle und eine Vielzahl von Optionen dazwischen zu empfangen.

Geben Sie hier die Bildbeschreibung ein

Batch-Anforderungen in Gruppen von 100 zum Übermitteln an eine Datenbank

var batchRequests = new BatchBlock<Request>(batchSize:100);
var sendToDb = new ActionBlock<Request[]>(reqs => SubmitToDatabase(reqs));

batchRequests.LinkTo(sendToDb);

Einmal pro Sekunde einen Stapel erstellen

var batch = new BatchBlock<T>(batchSize:Int32.MaxValue);
new Timer(() => { batch.TriggerBatch(); }).Change(1000, 1000);

Einführung in TPL Dataflow von Stephen Toub

BufferBlock

(FIFO-Warteschlange: Die eingehenden Daten sind die Daten, die ausgehen.)

Kurz gesagt, BufferBlock stellt einen unbegrenzten oder beschränkten Puffer zum Speichern von Instanzen von T bereit.
Sie können Instanzen von T in den Block "buchen", wodurch die gesendeten Daten in einer FIFO-Reihenfolge (First-In-First-Out) des Blocks gespeichert werden.
Sie können von dem Block aus "empfangen", wodurch Sie synchron oder asynchron Instanzen von T erhalten können, die zuvor gespeichert wurden oder in der Zukunft verfügbar sind (wiederum FIFO).

Geben Sie hier die Bildbeschreibung ein

Asynchroner Producer / Consumer mit einem gedrosselten Producer

// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> _Buffer = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 10 });

// Producer
private static async void Producer()
{
    while(true)
    {
        await _Buffer.SendAsync(Produce());
    }
}

// Consumer
private static async Task Consumer()
{
    while(true)
    {
        Process(await _Buffer.ReceiveAsync());
    } 
}

// Start the Producer and Consumer
private static async Task Run()
{
    await Task.WhenAll(Producer(), Consumer());
}

Einführung in TPL Dataflow von Stephen Toub



Modified text is an extract of the original Stack Overflow Documentation
Lizenziert unter CC BY-SA 3.0
Nicht angeschlossen an Stack Overflow