C# Language
Task Parallel Library (TPL) Dataflow-constructen
Zoeken…
JoinBlock
(Verzamelt 2-3 ingangen en combineert ze in een Tuple)
Net als BatchBlock kan JoinBlock <T1, T2, ...> gegevens uit meerdere gegevensbronnen groeperen. Dat is eigenlijk het hoofddoel van JoinBlock <T1, T2, ...>.
Een JoinBlock <string, double, int> is bijvoorbeeld een ISourceBlock <Tuple <string, double, int >>.
Net als bij BatchBlock kan JoinBlock <T1, T2, ...> zowel in hebzuchtige als niet-hebzuchtige modus werken.
- In de standaard hebzuchtige modus worden alle gegevens die aan doelen worden aangeboden geaccepteerd, zelfs als het andere doel niet de benodigde gegevens heeft om een tuple te vormen.
- In de niet-hebzuchtige modus zullen de doelen van het blok gegevens uitstellen totdat alle doelen de nodige gegevens zijn aangeboden om een tuple te maken, waarna het blok een tweefasen-vastleggingsprotocol aangaat om alle benodigde items atomisch uit de bronnen te halen. Dit uitstel maakt het voor een andere entiteit mogelijk om de gegevens in de tussentijd te consumeren, zodat het totale systeem vooruitgang kan boeken.
Verwerkingsverzoeken met een beperkt aantal gepoolde objecten
var throttle = new JoinBlock<ExpensiveObject, Request>();
for(int i=0; i<10; i++)
{
requestProcessor.Target1.Post(new ExpensiveObject());
}
var processor = new Transform<Tuple<ExpensiveObject, Request>, ExpensiveObject>(pair =>
{
var resource = pair.Item1;
var request = pair.Item2;
request.ProcessWith(resource);
return resource;
});
throttle.LinkTo(processor);
processor.LinkTo(throttle.Target1);
Inleiding tot TPL Dataflow door Stephen Toub
BroadcastBlock
(Kopieer een item en stuur de kopieën naar elk blok waaraan het is gekoppeld)
In tegenstelling tot BufferBlock is de missie van BroadcastBlock in het leven om alle gekoppelde doelen uit het blok in staat te stellen een kopie te krijgen van elk gepubliceerd element, waarbij de "huidige" waarde voortdurend wordt overschreven met de elementen die eraan worden doorgegeven.
Bovendien houdt BroadcastBlock, anders dan BufferBlock, gegevens niet onnodig vast. Nadat een bepaald gegeven aan alle doelen is aangeboden, wordt dat element overschreven door het volgende gegeven in de rij (zoals bij alle gegevensstroomblokken, worden berichten in FIFO-volgorde afgehandeld). Dat element wordt aan alle doelen aangeboden, enzovoort.
Asynchrone producent / consument met een vertraagde producent
var ui = TaskScheduler.FromCurrentSynchronizationContext();
var bb = new BroadcastBlock<ImageData>(i => i);
var saveToDiskBlock = new ActionBlock<ImageData>(item =>
item.Image.Save(item.Path)
);
var showInUiBlock = new ActionBlock<ImageData>(item =>
imagePanel.AddImage(item.Image),
new DataflowBlockOptions { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() }
);
bb.LinkTo(saveToDiskBlock);
bb.LinkTo(showInUiBlock);
Blootstellingsstatus van een agent
public class MyAgent
{
public ISourceBlock<string> Status { get; private set; }
public MyAgent()
{
Status = new BroadcastBlock<string>();
Run();
}
private void Run()
{
Status.Post("Starting");
Status.Post("Doing cool stuff");
…
Status.Post("Done");
}
}
Inleiding tot TPL Dataflow door Stephen Toub
WriteOnceBlock
(Alleen-variabel: onthoudt het eerste gegevensitem en geeft kopieën ervan weg als uitvoer. Negeert alle andere gegevensitems)
Als BufferBlock het meest fundamentele blok in TPL Dataflow is, is WriteOnceBlock het eenvoudigst.
Er wordt maximaal één waarde opgeslagen en als die waarde eenmaal is ingesteld, wordt deze nooit meer vervangen of overschreven.
Je kunt denken dat WriteOnceBlock vergelijkbaar is met een read-only lidvariabele in C #, behalve dat het niet alleen instelbaar is in een constructor en vervolgens onveranderlijk is, maar slechts eenmaal instelbaar is en dan onveranderlijk is.
De potentiële output van een taak splitsen
public static async void SplitIntoBlocks(this Task<T> task,
out IPropagatorBlock<T> result,
out IPropagatorBlock<Exception> exception)
{
result = new WriteOnceBlock<T>(i => i);
exception = new WriteOnceBlock<Exception>(i => i);
try
{
result.Post(await task);
}
catch(Exception ex)
{
exception.Post(ex);
}
}
Inleiding tot TPL Dataflow door Stephen Toub
BatchedJoinBlock
(Verzamelt een bepaald aantal items uit 2-3 ingangen en groepeert ze in een Tuple van verzamelingen gegevensitems)
BatchedJoinBlock <T1, T2, ...> is in zekere zin een combinatie van BatchBlock en JoinBlock <T1, T2, ...>.
Terwijl JoinBlock <T1, T2, ...> wordt gebruikt om één input van elk doel te aggregeren in een tuple, en BatchBlock wordt gebruikt om N inputs in een verzameling te aggregeren, BatchedJoinBlock <T1, T2, ...> wordt gebruikt om N inputs van overal te verzamelen alle doelen in tupels collecties.
Scatter / Gather
Overweeg een spreidings- / verzamelprobleem waarbij N-bewerkingen worden gestart, waarvan sommige kunnen slagen en stringuitgangen kunnen produceren, en andere mislukken en uitzonderingen kunnen produceren.
var batchedJoin = new BatchedJoinBlock<string, Exception>(10);
for (int i=0; i<10; i++)
{
Task.Factory.StartNew(() => {
try { batchedJoin.Target1.Post(DoWork()); }
catch(Exception ex) { batchJoin.Target2.Post(ex); }
});
}
var results = await batchedJoin.ReceiveAsync();
foreach(string s in results.Item1)
{
Console.WriteLine(s);
}
foreach(Exception e in results.Item2)
{
Console.WriteLine(e);
}
Inleiding tot TPL Dataflow door Stephen Toub
TransformBlock
(Selecteer één op één)
Net als bij ActionBlock maakt TransformBlock <TInput, TOutput> het uitvoeren van een gemachtigde mogelijk om een actie uit te voeren voor elk ingangsgegeven; in tegenstelling tot ActionBlock heeft deze verwerking een uitvoer. Deze afgevaardigde kan een Func <TInput, TOutput> zijn, in welk geval de verwerking van dat element als voltooid wordt beschouwd wanneer de afgevaardigde terugkeert, of het kan een Func <TInput, Tas> zijn, in welk geval de verwerking van dat element als voltooid wordt beschouwd, niet wanneer de gemachtigde terugkeert maar wanneer de geretourneerde taak is voltooid. Voor degenen die bekend zijn met LINQ, is het enigszins vergelijkbaar met Select () omdat het een invoer vereist, die invoer op een bepaalde manier transformeert en vervolgens een uitvoer produceert.
TransformBlock <TInput, TOutput> verwerkt standaard zijn gegevens opeenvolgend met een MaxDegreeOfParallelism gelijk aan 1. Naast het ontvangen van gebufferde invoer en het verwerken ervan, neemt dit blok ook al zijn verwerkte uitvoer op en buffert dat ook (gegevens die niet zijn verwerkt en gegevens die zijn verwerkt).
Het heeft 2 taken: één om de gegevens te verwerken en één om gegevens naar het volgende blok te duwen.
Een gelijktijdige pijpleiding
var compressor = new TransformBlock<byte[], byte[]>(input => Compress(input));
var encryptor = new TransformBlock<byte[], byte[]>(input => Encrypt(input));
compressor.LinkTo(Encryptor);
Inleiding tot TPL Dataflow door Stephen Toub
ActionBlock
(Foreach)
Deze klasse kan logisch gezien worden als een buffer voor te verwerken gegevens in combinatie met taken voor het verwerken van die gegevens, waarbij het "gegevensstroomblok" beide beheert. In het meest elementaire gebruik kunnen we een ActionBlock instantiëren en er gegevens naartoe 'posten'; de afgevaardigde die bij de constructie van ActionBlock wordt verstrekt, wordt asynchroon uitgevoerd voor elk geplaatst stuk gegevens.
Synchrone berekening
var ab = new ActionBlock<TInput>(i =>
{
Compute(i);
});
…
ab.Post(1);
ab.Post(2);
ab.Post(3);
Asynchrone downloads beperken tot maximaal 5 tegelijkertijd
var downloader = new ActionBlock<string>(async url =>
{
byte [] imageData = await DownloadAsync(url);
Process(imageData);
}, new DataflowBlockOptions { MaxDegreeOfParallelism = 5 });
downloader.Post("http://website.com/path/to/images");
downloader.Post("http://another-website.com/path/to/images");
Inleiding tot TPL Dataflow door Stephen Toub
TransformManyBlock
(SelectMany, 1-m: de resultaten van deze mapping zijn 'afgeplat', net als LINQ's SelectMany)
TransformManyBlock <TInput, TOutput> lijkt sterk op TransformBlock <TInput, TOutput>.
Het belangrijkste verschil is dat terwijl een TransformBlock <TInput, TOutput> één en slechts één uitgang produceert voor elke ingang, TransformManyBlock <TInput, TOutput> een willekeurig aantal (nul of meer) uitgangen produceert voor elke ingang. Net als bij ActionBlock en TransformBlock <TInput, TOutput>, kan deze verwerking worden opgegeven met behulp van afgevaardigden, zowel voor synchrone als asynchrone verwerking.
Een Func <TInput, IEnumerable> wordt gebruikt voor synchroon, en een Func <TInput, Task <IEnumerable>> voor asynchroon. Zoals met zowel ActionBlock als TransformBlock <TInput, TOutput>, TransformManyBlock <TInput, TOutput> standaard ingesteld op sequentiële verwerking, maar kan anders worden geconfigureerd.
De toewijzingsafgevaardigde retourneert een verzameling items die afzonderlijk in de uitvoerbuffer worden ingevoegd.
Asynchrone webcrawler
var downloader = new TransformManyBlock<string, string>(async url =>
{
Console.WriteLine(“Downloading “ + url);
try
{
return ParseLinks(await DownloadContents(url));
}
catch{}
return Enumerable.Empty<string>();
});
downloader.LinkTo(downloader);
Uitbreiding van een ontelbare in zijn samenstellende elementen
var expanded = new TransformManyBlock<T[], T>(array => array);
Filteren door van 1 naar 0 of 1 elementen te gaan
public IPropagatorBlock<T> CreateFilteredBuffer<T>(Predicate<T> filter)
{
return new TransformManyBlock<T, T>(item =>
filter(item) ? new [] { item } : Enumerable.Empty<T>());
}
Inleiding tot TPL Dataflow door Stephen Toub
BatchBlock
(Groepeert een bepaald aantal opeenvolgende gegevensitems in verzamelingen gegevensitems)
BatchBlock combineert N afzonderlijke artikelen in één batchartikel, weergegeven als een reeks elementen. Er wordt een exemplaar gemaakt met een specifieke batchgrootte en het blok maakt vervolgens een batch zodra het aantal elementen is ontvangen, waarbij de batch asynchroon wordt uitgevoerd naar de uitvoerbuffer.
BatchBlock kan zowel hebzuchtig als niet-hebzuchtig worden uitgevoerd.
- In de standaard hebzuchtige modus worden alle berichten die vanuit een willekeurig aantal bronnen aan het blok worden aangeboden, geaccepteerd en gebufferd om in batches te worden omgezet.
- In niet-hebzuchtige modus worden alle berichten uit bronnen uitgesteld totdat voldoende bronnen berichten aan het blok hebben aangeboden om een batch te maken. Zo kan een BatchBlock worden gebruikt om 1 element te ontvangen van elk van de N-bronnen, N-elementen van 1 bron en een groot aantal opties daartussenin.
Batchverzoeken in groepen van 100 om in te dienen bij een database
var batchRequests = new BatchBlock<Request>(batchSize:100);
var sendToDb = new ActionBlock<Request[]>(reqs => SubmitToDatabase(reqs));
batchRequests.LinkTo(sendToDb);
Eén keer per seconde een batch maken
var batch = new BatchBlock<T>(batchSize:Int32.MaxValue);
new Timer(() => { batch.TriggerBatch(); }).Change(1000, 1000);
Inleiding tot TPL Dataflow door Stephen Toub
BufferBlock
(FIFO Queue: de gegevens die binnenkomen zijn de gegevens die uitgaan)
Kortom, BufferBlock biedt een onbeperkte of begrensde buffer voor het opslaan van instanties van T.
U kunt T-exemplaren naar het blok "posten", waardoor de gegevens die worden gepost in een FIFO-volgorde (first-in-first-out) worden opgeslagen.
U kunt "ontvangen" van het blok, waarmee u synchroon of asynchroon exemplaren van T kunt downloaden die eerder zijn opgeslagen of beschikbaar zijn in de toekomst (nogmaals, FIFO).
Asynchrone producent / consument met een vertraagde producent
// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> _Buffer = new BufferBlock<int>(
new DataflowBlockOptions { BoundedCapacity = 10 });
// Producer
private static async void Producer()
{
while(true)
{
await _Buffer.SendAsync(Produce());
}
}
// Consumer
private static async Task Consumer()
{
while(true)
{
Process(await _Buffer.ReceiveAsync());
}
}
// Start the Producer and Consumer
private static async Task Run()
{
await Task.WhenAll(Producer(), Consumer());
}
Inleiding tot TPL Dataflow door Stephen Toub