Sök…


JoinBlock

(Samlar 2-3 ingångar och kombinerar dem till en Tuple)

Som BatchBlock kan JoinBlock <T1, T2, ...> gruppera data från flera datakällor. Faktum är att det är JoinBlock <T1, T2, ...>: s huvudsakliga syfte.

Till exempel är en JoinBlock <-sträng, dubbel, int> en ISourceBlock <Tuple <-sträng, dubbel, int >>.

Liksom med BatchBlock kan JoinBlock <T1, T2, ...> fungera i både girigt och icke-girigt läge.

  • I standard-girigt läge accepteras all data som erbjuds till mål, även om det andra målet inte har den nödvändiga informationen att skapa en tupel med.
  • I icke-girigt läge kommer blockets mål att skjuta upp data tills alla mål har erbjudits nödvändiga data för att skapa en tupel, vid vilken punkt blocket kommer att ingå i ett tvåfas åtagandeprotokoll för att atomiskt hämta alla nödvändiga objekt från källorna. Denna uppskjutning gör det möjligt för en annan enhet att konsumera uppgifterna under tiden så att det övergripande systemet kan göra framsteg.

ange bildbeskrivning här

Behandla begäranden med ett begränsat antal poolade objekt

var throttle = new JoinBlock<ExpensiveObject, Request>();
for(int i=0; i<10; i++) 
{
    requestProcessor.Target1.Post(new ExpensiveObject()); 
}

var processor = new Transform<Tuple<ExpensiveObject, Request>, ExpensiveObject>(pair =>
{
    var resource = pair.Item1;
    var request = pair.Item2;
    
    request.ProcessWith(resource);
    
    return resource;
});

throttle.LinkTo(processor);
processor.LinkTo(throttle.Target1);

Introduktion till TPL Dataflow av Stephen Toub

BroadcastBlock

(Kopiera ett objekt och skicka kopiorna till varje block som det är länkat till)

Till skillnad från BufferBlock är BroadcastBlocks uppdrag i livet att göra det möjligt för alla mål som är länkade från blocket att få en kopia av varje publicerat element, och kontinuerligt skriva över det "aktuella" värdet med de som sprids till det.

Till skillnad från BufferBlock håller BroadcastBlock inte onödigt på data. Efter att ett visst datum har erbjudits till alla mål kommer det elementet att skrivas över av vilket data som finns nästa i raden (som med alla dataflödesblock hanteras meddelanden i FIFO-ordning). Det elementet kommer att erbjudas till alla mål, och så vidare.

ange bildbeskrivning här

Asynkron producent / konsument med en gasad tillverkare

var ui = TaskScheduler.FromCurrentSynchronizationContext();
var bb = new BroadcastBlock<ImageData>(i => i);

var saveToDiskBlock = new ActionBlock<ImageData>(item =>
    item.Image.Save(item.Path)
);

var showInUiBlock = new ActionBlock<ImageData>(item =>
    imagePanel.AddImage(item.Image), 
    new DataflowBlockOptions { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() }
);

bb.LinkTo(saveToDiskBlock);
bb.LinkTo(showInUiBlock);

Visar status från en agent

public class MyAgent
{
    public ISourceBlock<string> Status { get; private set; }
    
    public MyAgent()
    {
        Status = new BroadcastBlock<string>();
        Run();
    } 

    private void Run()
    {
        Status.Post("Starting");
        Status.Post("Doing cool stuff");
        …
        Status.Post("Done");
    }
}

Introduktion till TPL Dataflow av Stephen Toub

WriteOnceBlock

(Readonly variabel: Memorerar sin första dataobjekt och skickar ut kopior av den som dess utgång. Ignorerar alla andra dataobjekt)

Om BufferBlock är det mest grundläggande blocket i TPL Dataflow, är WritOnceBlock det enklaste.
Det lagrar högst ett värde, och när detta värde har ställts in kommer det aldrig att ersättas eller skrivas över.

Du kan tänka på WritOnceBlock som att likna en skrivskyddad medlemsvariabel i C #, förutom istället för att bara kunna ställas in i en konstruktör och sedan vara oföränderlig, den kan bara ställas in en gång och är sedan oföränderlig.

ange bildbeskrivning här

Dela upp en uppgifts potentiella utgångar

public static async void SplitIntoBlocks(this Task<T> task,
    out IPropagatorBlock<T> result, 
    out IPropagatorBlock<Exception> exception)
{
    result = new WriteOnceBlock<T>(i => i);
    exception = new WriteOnceBlock<Exception>(i => i);

    try 
    { 
        result.Post(await task); 
    }
    catch(Exception ex) 
    { 
        exception.Post(ex); 
    }
}

Introduktion till TPL Dataflow av Stephen Toub

BatchedJoinBlock

(Samlar in ett visst antal totala objekt från 2-3 ingångar och grupperar dem i en Tuple av samlingar av dataposter)

BatchJoinBlock <T1, T2, ...> är i en mening en kombination av BatchBlock och JoinBlock <T1, T2, ...>.
Medan JoinBlock <T1, T2, ...> används för att aggregera en ingång från varje mål till en tupel, och BatchBlock används för att aggregera N-ingångar i en samling, används BatchJoinBlock <T1, T2, ...> för att samla N-ingångar från hela alla målen i samlingarna.

ange bildbeskrivning här

Scatter / Samla

Tänk på ett spridnings- / samlingsproblem där N-operationer startas, av vilka vissa kan lyckas och producera strängutgångar, och andra kan misslyckas och producera undantag.

var batchedJoin = new BatchedJoinBlock<string, Exception>(10);

for (int i=0; i<10; i++)
{
    Task.Factory.StartNew(() => {
        try { batchedJoin.Target1.Post(DoWork()); }
        catch(Exception ex) { batchJoin.Target2.Post(ex); }
    });
}

var results = await batchedJoin.ReceiveAsync();

foreach(string s in results.Item1) 
{
    Console.WriteLine(s);
}

foreach(Exception e in results.Item2) 
{
    Console.WriteLine(e);
}

Introduktion till TPL Dataflow av Stephen Toub

TransformBlock

(Välj en-till-en)

Precis som med ActionBlock, möjliggör TransformBlock <TInput, TOutput> exekveringen av en delegat för att utföra någon åtgärd för varje inmatningsdatum; till skillnad från ActionBlock har denna bearbetning en utgång. Denna delegat kan vara en Func <TInput, TOutput>, i vilket fall behandling av det elementet anses vara slutfört när delegaten återvänder, eller det kan vara en Func <TInput, Task>, i vilket fall behandling av det elementet anses vara avslutat inte när delegaten återvänder men när den returnerade uppgiften är klar. För dem som är bekanta med LINQ, är det något liknande Select () genom att det tar en ingång, omvandlar den ingången på något sätt och sedan producerar en utgång.

Som standard bearbetar TransformBlock <TInput, TOutput> sina data i följd med en MaxDegreeOfParallelism lika med 1. Utöver att ta emot buffrad inmatning och bearbeta den, kommer detta block att ta all dess bearbetade output och buffert också (data som inte har varit behandlas och data som har behandlats).

Den har två uppgifter: En för att bearbeta data, och en för att driva data till nästa block.

ange bildbeskrivning här

En samtidig pipeline

var compressor = new TransformBlock<byte[], byte[]>(input => Compress(input));
var encryptor = new TransformBlock<byte[], byte[]>(input => Encrypt(input));

compressor.LinkTo(Encryptor); 

Introduktion till TPL Dataflow av Stephen Toub

ActionBlock

(för varje)

Denna klass kan logiskt betraktas som en buffert för data som ska bearbetas i kombination med uppgifter för bearbetning av dessa data, där "dataflow-blocket" hanterar båda. I dess mest grundläggande användning kan vi anpassa en ActionBlock och "posta" data till den; delegaten som tillhandahålls vid ActionBlock-konstruktionen kommer att utföras asynkront för varje uppsatt data.

ange bildbeskrivning här

Synkron beräkning

var ab = new ActionBlock<TInput>(i => 
{
    Compute(i);
});
…
ab.Post(1);
ab.Post(2);
ab.Post(3);

Asrkroniserande nedladdningar till högst 5 samtidigt

var downloader = new ActionBlock<string>(async url =>
{
    byte [] imageData = await DownloadAsync(url);
    Process(imageData);
}, new DataflowBlockOptions { MaxDegreeOfParallelism = 5 }); 

downloader.Post("http://website.com/path/to/images");
downloader.Post("http://another-website.com/path/to/images");

Introduktion till TPL Dataflow av Stephen Toub

TransformManyBlock

(SelectMany, 1-m: Resultaten av denna kartläggning är "plattade", precis som LINQs SelectMany)

TransformManyBlock <TInput, TOutput> liknar TransformBlock <TInput, TOutput>.
Nyckelskillnaden är att medan en TransformBlock <TInput, TOutput> producerar en och endast en utgång för varje ingång, ger TransformManyBlock <TInput, TOutput> ett valfritt antal (noll eller fler) utgångar för varje ingång. Som med ActionBlock och TransformBlock <TInput, TOutput>, kan denna bearbetning specificeras med hjälp av delegater, både för synkron och asynkron behandling.

En Func <TInput, IEnumerable> används för synkron, och en Func <TInput, Task <IEnumerable>> används för asynkron. Som med både ActionBlock och TransformBlock <TInput, TOutput>, TransformManyBlock <TInput, TOutput> är standard för sekvensbearbetning, men kan konfigureras på annat sätt.

Kartläggningsdelegaten återkallar en samling artiklar, som infogas individuellt i utgångsbufferten.

ange bildbeskrivning här

Asynkron webbrobot

var downloader = new TransformManyBlock<string, string>(async url =>
{
    Console.WriteLine(“Downloading “ + url);
    try 
    { 
        return ParseLinks(await DownloadContents(url)); 
    } 
    catch{}
    
    return Enumerable.Empty<string>();
});
downloader.LinkTo(downloader);

Utöka ett antal till dess beståndsdelar

var expanded = new TransformManyBlock<T[], T>(array => array);

Filtrering genom att gå från 1 till 0 eller 1 element

public IPropagatorBlock<T> CreateFilteredBuffer<T>(Predicate<T> filter)
{
    return new TransformManyBlock<T, T>(item =>
        filter(item) ? new [] { item } : Enumerable.Empty<T>());
}

Introduktion till TPL Dataflow av Stephen Toub

BatchBlock

(Grupperar ett visst antal sekventiella dataobjekt i samlingar av dataposter)

BatchBlock kombinerar N enstaka objekt i ett batchobjekt, representerat som en rad element. En instans skapas med en specifik batchstorlek, och blocket skapar sedan en bunt så snart det har fått det antalet element, som asynkront matar ut satsen till utgångsbufferten.

BatchBlock kan köras i både giriga och icke-giriga lägen.

  • I standard girigt läge accepteras och buffras alla meddelanden som erbjuds till blocket från valfritt antal källor för att konverteras till partier.
    • I icke-girigt läge skjuts alla meddelanden från källor tills tillräckligt med källor har erbjudit meddelanden till blocket för att skapa en batch. Således kan en BatchBlock användas för att ta emot 1 element från var och en av N-källor, N-element från en källa och ett mylder av alternativ däremellan.

ange bildbeskrivning här

Batchningsförfrågningar i grupper om 100 att skicka till en databas

var batchRequests = new BatchBlock<Request>(batchSize:100);
var sendToDb = new ActionBlock<Request[]>(reqs => SubmitToDatabase(reqs));

batchRequests.LinkTo(sendToDb);

Skapa en bunt en gång per sekund

var batch = new BatchBlock<T>(batchSize:Int32.MaxValue);
new Timer(() => { batch.TriggerBatch(); }).Change(1000, 1000);

Introduktion till TPL Dataflow av Stephen Toub

BufferBlock

(FIFO-kö: Data som kommer in är data som släpps ut)

Kort sagt, BufferBlock tillhandahåller en obegränsad eller begränsad buffert för lagring av instanser av T.
Du kan "posta" förekomster av T till blocket, vilket gör att data som skickas lagras i en första-in-först-ut (FIFO) ordning av blocket.
Du kan "ta emot" från blocket, vilket låter dig synkront eller asynkront erhålla förekomster av T som tidigare lagrats eller är tillgängliga i framtiden (igen, FIFO).

ange bildbeskrivning här

Asynkron producent / konsument med en gasad tillverkare

// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> _Buffer = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 10 });

// Producer
private static async void Producer()
{
    while(true)
    {
        await _Buffer.SendAsync(Produce());
    }
}

// Consumer
private static async Task Consumer()
{
    while(true)
    {
        Process(await _Buffer.ReceiveAsync());
    } 
}

// Start the Producer and Consumer
private static async Task Run()
{
    await Task.WhenAll(Producer(), Consumer());
}

Introduktion till TPL Dataflow av Stephen Toub



Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow