C# Language
Costrutti di flusso di dati di Task Parallel Library (TPL)
Ricerca…
JoinBlock
(Raccoglie 2-3 input e li combina in una tupla)
Come BatchBlock, JoinBlock <T1, T2, ...> è in grado di raggruppare i dati da più origini dati. In effetti, questo è lo scopo principale di JoinBlock <T1, T2, ...>.
Ad esempio, un JoinBlock <string, double, int> è un ISourceBlock <Tuple <string, double, int >>.
Come con BatchBlock, JoinBlock <T1, T2, ...> è in grado di funzionare sia in modalità avida che non avida.
- Nella modalità greedy di default, tutti i dati offerti ai target sono accettati, anche se l'altro target non ha i dati necessari con cui formare una tupla.
- In modalità non greedy, i target del blocco posticiperanno i dati fino a quando a tutti i target non saranno stati offerti i dati necessari per creare una tupla, a quel punto il blocco si impegnerà in un protocollo di commit a due fasi per recuperare atomicamente tutti gli elementi necessari dalle fonti. Questo rinvio consente a un'altra entità di consumare i dati nel frattempo in modo da consentire al sistema complessivo di avanzare.
Elaborazione di richieste con un numero limitato di oggetti raggruppati
var throttle = new JoinBlock<ExpensiveObject, Request>();
for(int i=0; i<10; i++)
{
requestProcessor.Target1.Post(new ExpensiveObject());
}
var processor = new Transform<Tuple<ExpensiveObject, Request>, ExpensiveObject>(pair =>
{
var resource = pair.Item1;
var request = pair.Item2;
request.ProcessWith(resource);
return resource;
});
throttle.LinkTo(processor);
processor.LinkTo(throttle.Target1);
Introduzione a TPL Dataflow di Stephen Toub
BroadcastBlock
(Copia un elemento e invia le copie a ogni blocco a cui è collegato)
A differenza di BufferBlock, la missione di BroadcastBlock nella vita è di abilitare tutti i target collegati dal blocco per ottenere una copia di ogni elemento pubblicato, sovrascrivendo continuamente il valore "corrente" con quelli propagati ad esso.
Inoltre, a differenza di BufferBlock, BroadcastBlock non mantiene i dati inutilmente. Dopo che un particolare dato è stato offerto a tutti i bersagli, quell'elemento sarà sovrascritto da qualunque pezzo di dati è in linea (come con tutti i blocchi di flusso di dati, i messaggi sono gestiti in ordine FIFO). Questo elemento sarà offerto a tutti gli obiettivi, e così via.
Producer / consumer asincrono con un produttore Throttled
var ui = TaskScheduler.FromCurrentSynchronizationContext();
var bb = new BroadcastBlock<ImageData>(i => i);
var saveToDiskBlock = new ActionBlock<ImageData>(item =>
item.Image.Save(item.Path)
);
var showInUiBlock = new ActionBlock<ImageData>(item =>
imagePanel.AddImage(item.Image),
new DataflowBlockOptions { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() }
);
bb.LinkTo(saveToDiskBlock);
bb.LinkTo(showInUiBlock);
Esposizione dello stato da un agente
public class MyAgent
{
public ISourceBlock<string> Status { get; private set; }
public MyAgent()
{
Status = new BroadcastBlock<string>();
Run();
}
private void Run()
{
Status.Post("Starting");
Status.Post("Doing cool stuff");
…
Status.Post("Done");
}
}
Introduzione a TPL Dataflow di Stephen Toub
WriteOnceBlock
(Variabile di sola lettura: memorizza il suo primo elemento di dati e ne distribuisce copie come output. Ignora tutti gli altri elementi di dati)
Se BufferBlock è il blocco più fondamentale in TPL Dataflow, WriteOnceBlock è il più semplice.
Memorizza al massimo un valore, e una volta che questo valore è stato impostato, non verrà mai sostituito o sovrascritto.
Puoi pensare a WriteOnceBlock come simile a una variabile membro readonly in C #, ma invece di essere solo impostabile in un costruttore e quindi immutabile, è impostabile solo una volta ed è quindi immutabile.
Dividere i potenziali output di un'attività
public static async void SplitIntoBlocks(this Task<T> task,
out IPropagatorBlock<T> result,
out IPropagatorBlock<Exception> exception)
{
result = new WriteOnceBlock<T>(i => i);
exception = new WriteOnceBlock<Exception>(i => i);
try
{
result.Post(await task);
}
catch(Exception ex)
{
exception.Post(ex);
}
}
Introduzione a TPL Dataflow di Stephen Toub
BatchedJoinBlock
(Raccoglie un certo numero di elementi totali da 2-3 input e li raggruppa in una tupla di raccolte di elementi di dati)
BatchedJoinBlock <T1, T2, ...> è in un certo senso una combinazione di BatchBlock e JoinBlock <T1, T2, ...>.
Mentre JoinBlock <T1, T2, ...> viene utilizzato per aggregare un input da ciascun target in una tupla e BatchBlock viene utilizzato per aggregare N input in una raccolta, BatchJoinBlock <T1, T2, ...> viene utilizzato per raccogliere N input da across tutti gli obiettivi in tuple di raccolte.
Scatter / gather
Si consideri un problema di dispersione / raccolta in cui vengono avviate N operazioni, alcune delle quali possono avere successo e produrre output di stringhe, e altre potrebbero non riuscire e produrre eccezioni.
var batchedJoin = new BatchedJoinBlock<string, Exception>(10);
for (int i=0; i<10; i++)
{
Task.Factory.StartNew(() => {
try { batchedJoin.Target1.Post(DoWork()); }
catch(Exception ex) { batchJoin.Target2.Post(ex); }
});
}
var results = await batchedJoin.ReceiveAsync();
foreach(string s in results.Item1)
{
Console.WriteLine(s);
}
foreach(Exception e in results.Item2)
{
Console.WriteLine(e);
}
Introduzione a TPL Dataflow di Stephen Toub
TransformBlock
(Seleziona, uno a uno)
Come con ActionBlock, TransformBlock <TInput, TOutput> consente l'esecuzione di un delegato per eseguire alcune azioni per ogni dato di input; a differenza di ActionBlock, questa elaborazione ha un output. Questo delegato può essere un Func <TInput, TOutput>, nel qual caso l'elaborazione di quell'elemento viene considerata completata al ritorno del delegato oppure può essere Func <TInput, Task>, nel qual caso l'elaborazione di quell'elemento viene considerata completata quando il delegato ritorna ma quando l'attività restituita è completa. Per chi ha familiarità con LINQ, è in qualche modo simile a Select () in quanto richiede un input, trasforma quell'input in qualche modo e quindi produce un output.
Per impostazione predefinita, TransformBlock <TInput, TOutput> elabora i suoi dati in modo sequenziale con un MaxDegreeOfParallelism uguale a 1. Oltre a ricevere l'input bufferizzato ed elaborarlo, questo blocco prenderà tutti i suoi output elaborati e anche il buffer (dati che non sono stati elaborati e dati che sono stati elaborati).
Ha 2 compiti: uno per elaborare i dati e uno per spingere i dati al blocco successivo.
Una pipeline concorrente
var compressor = new TransformBlock<byte[], byte[]>(input => Compress(input));
var encryptor = new TransformBlock<byte[], byte[]>(input => Encrypt(input));
compressor.LinkTo(Encryptor);
Introduzione a TPL Dataflow di Stephen Toub
ActionBlock
(per ciascuno)
Questa classe può essere considerata logicamente come un buffer per i dati da elaborare in combinazione con le attività per l'elaborazione di tali dati, con il "blocco del flusso di dati" che gestisce entrambi. Nel suo uso più basilare, possiamo istanziare un oggetto ActionBlock e "postare" i dati su di esso; il delegato fornito alla costruzione di ActionBlock verrà eseguito in modo asincrono per ogni dato inviato.
Calcolo sincrono
var ab = new ActionBlock<TInput>(i =>
{
Compute(i);
});
…
ab.Post(1);
ab.Post(2);
ab.Post(3);
Limitazione di download asincroni fino a un massimo di 5 contemporaneamente
var downloader = new ActionBlock<string>(async url =>
{
byte [] imageData = await DownloadAsync(url);
Process(imageData);
}, new DataflowBlockOptions { MaxDegreeOfParallelism = 5 });
downloader.Post("http://website.com/path/to/images");
downloader.Post("http://another-website.com/path/to/images");
Introduzione a TPL Dataflow di Stephen Toub
TransformManyBlock
(SelectMany, 1-m: i risultati di questa mappatura sono "appiattiti", proprio come SelectMany di LINQ)
TransformManyBlock <TInput, TOutput> è molto simile a TransformBlock <TInput, TOutput>.
La differenza principale è che mentre TransformBlock <TInput, TOutput> produce uno e un solo output per ogni input, TransformManyBlock <TInput, TOutput> produce qualsiasi numero (zero o più) output per ogni input. Come con ActionBlock e TransformBlock <TInput, TOutput>, questa elaborazione può essere specificata utilizzando i delegati, sia per l'elaborazione sincrona che asincrona.
Un Func <TInput, IEnumerable> viene utilizzato per synchronous e un Func <TInput, Task <IEnumerable >> viene utilizzato per asincrono. Come con ActionBlock e TransformBlock <TInput, TOutput>, TransformManyBlock <TInput, TOutput> passa automaticamente all'elaborazione sequenziale, ma può essere configurato diversamente.
Il delegato di mappatura retunisce una raccolta di elementi, che vengono inseriti singolarmente nel buffer di output.
Web crawler asincrono
var downloader = new TransformManyBlock<string, string>(async url =>
{
Console.WriteLine(“Downloading “ + url);
try
{
return ParseLinks(await DownloadContents(url));
}
catch{}
return Enumerable.Empty<string>();
});
downloader.LinkTo(downloader);
Espandere un Enumerable nei suoi elementi costitutivi
var expanded = new TransformManyBlock<T[], T>(array => array);
Filtraggio passando da 1 a 0 o 1 elemento
public IPropagatorBlock<T> CreateFilteredBuffer<T>(Predicate<T> filter)
{
return new TransformManyBlock<T, T>(item =>
filter(item) ? new [] { item } : Enumerable.Empty<T>());
}
Introduzione a TPL Dataflow di Stephen Toub
BatchBlock
(Raggruppa un certo numero di elementi di dati sequenziali in raccolte di elementi di dati)
BatchBlock combina N singoli elementi in un unico articolo, rappresentato come una matrice di elementi. Un'istanza viene creata con una specifica dimensione di batch e il blocco crea quindi un batch non appena viene ricevuto quel numero di elementi, in modo asincrono che invia il batch al buffer di output.
BatchBlock è in grado di eseguire in entrambe le modalità avide e non avide.
- Nella modalità greedy di default, tutti i messaggi offerti al blocco da qualsiasi numero di fonti sono accettati e memorizzati nel buffer per essere convertiti in batch.
- In modalità non avida, tutti i messaggi vengono posticipati dalle fonti finché un numero sufficiente di fonti non ha offerto messaggi al blocco per creare un batch. Quindi, un BatchBlock può essere usato per ricevere 1 elemento da ciascuna delle fonti N, da N elementi da 1 fonte e da una miriade di opzioni intermedie.
Richieste di raggruppamento in gruppi di 100 da inviare a un database
var batchRequests = new BatchBlock<Request>(batchSize:100);
var sendToDb = new ActionBlock<Request[]>(reqs => SubmitToDatabase(reqs));
batchRequests.LinkTo(sendToDb);
Creare un batch una volta al secondo
var batch = new BatchBlock<T>(batchSize:Int32.MaxValue);
new Timer(() => { batch.TriggerBatch(); }).Change(1000, 1000);
Introduzione a TPL Dataflow di Stephen Toub
BufferBlock
(Coda FIFO: i dati che arrivano sono i dati che escono)
In breve, BufferBlock fornisce un buffer illimitato o limitato per la memorizzazione di istanze di T.
È possibile "postare" istanze di T al blocco, il che fa sì che i dati inviati vengano memorizzati in un ordine FIFO (first-in-first-out) dal blocco.
È possibile "ricevere" dal blocco, che consente di ottenere in modo sincrono o asincrono istanze di T precedentemente memorizzate o disponibili in futuro (di nuovo, FIFO).
Producer / consumer asincrono con un produttore Throttled
// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> _Buffer = new BufferBlock<int>(
new DataflowBlockOptions { BoundedCapacity = 10 });
// Producer
private static async void Producer()
{
while(true)
{
await _Buffer.SendAsync(Produce());
}
}
// Consumer
private static async Task Consumer()
{
while(true)
{
Process(await _Buffer.ReceiveAsync());
}
}
// Start the Producer and Consumer
private static async Task Run()
{
await Task.WhenAll(Producer(), Consumer());
}
Introduzione a TPL Dataflow di Stephen Toub