Szukaj…


JoinBlock

(Zbiera 2-3 dane wejściowe i łączy je w krotkę)

Podobnie jak BatchBlock, JoinBlock <T1, T2,…> może grupować dane z wielu źródeł danych. W rzeczywistości jest to główny cel JoinBlock <T1, T2,…>.

Na przykład JoinBlock <string, double, int> to ISourceBlock <Tuple <string, double, int >>.

Podobnie jak w przypadku BatchBlock, JoinBlock <T1, T2,…> może działać zarówno w trybie chciwym, jak i chciwym.

  • W domyślnym trybie chciwym wszystkie dane oferowane celom są akceptowane, nawet jeśli drugi cel nie ma danych niezbędnych do utworzenia krotki.
  • W trybie bez chciwości cele bloku odłożą dane, dopóki wszystkie cele nie otrzymają danych niezbędnych do utworzenia krotki, w którym to momencie blok wejdzie w dwufazowy protokół zatwierdzania, aby atomowo pobrać wszystkie niezbędne elementy ze źródeł. Odroczenie to umożliwia innemu podmiotowi wykorzystanie danych w międzyczasie, tak aby cały system mógł poczynić postępy.

wprowadź opis zdjęcia tutaj

Przetwarzanie żądań z ograniczoną liczbą połączonych obiektów

var throttle = new JoinBlock<ExpensiveObject, Request>();
for(int i=0; i<10; i++) 
{
    requestProcessor.Target1.Post(new ExpensiveObject()); 
}

var processor = new Transform<Tuple<ExpensiveObject, Request>, ExpensiveObject>(pair =>
{
    var resource = pair.Item1;
    var request = pair.Item2;
    
    request.ProcessWith(resource);
    
    return resource;
});

throttle.LinkTo(processor);
processor.LinkTo(throttle.Target1);

Wprowadzenie do przepływu danych TPL autorstwa Stephena Toub

BroadcastBlock

(Skopiuj element i wyślij kopie do każdego bloku, z którym jest powiązany)

W przeciwieństwie do BufferBlock, misją BroadcastBlock w życiu jest umożliwienie wszystkim celom powiązanym z bloku pobierania kopii każdego elementu, stale nadpisując „bieżącą” wartość tymi, które zostały do niej przekazane.

Ponadto, w przeciwieństwie do BufferBlock, BroadcastBlock niepotrzebnie nie przechowuje danych. Po zaoferowaniu konkretnej bazy danych wszystkim celom, element ten zostanie zastąpiony przez dowolną kolejną część danych (podobnie jak w przypadku wszystkich bloków przepływu danych, wiadomości są obsługiwane w kolejności FIFO). Ten element będzie oferowany wszystkim celom i tak dalej.

wprowadź opis zdjęcia tutaj

Asynchroniczny producent / konsument z ograniczonym producentem

var ui = TaskScheduler.FromCurrentSynchronizationContext();
var bb = new BroadcastBlock<ImageData>(i => i);

var saveToDiskBlock = new ActionBlock<ImageData>(item =>
    item.Image.Save(item.Path)
);

var showInUiBlock = new ActionBlock<ImageData>(item =>
    imagePanel.AddImage(item.Image), 
    new DataflowBlockOptions { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() }
);

bb.LinkTo(saveToDiskBlock);
bb.LinkTo(showInUiBlock);

Ujawnianie statusu od agenta

public class MyAgent
{
    public ISourceBlock<string> Status { get; private set; }
    
    public MyAgent()
    {
        Status = new BroadcastBlock<string>();
        Run();
    } 

    private void Run()
    {
        Status.Post("Starting");
        Status.Post("Doing cool stuff");
        …
        Status.Post("Done");
    }
}

Wprowadzenie do przepływu danych TPL autorstwa Stephena Toub

WriteOnceBlock

(Zmienna tylko do odczytu: Zapamiętuje swój pierwszy element danych i przekazuje jego kopie jako dane wyjściowe. Ignoruje wszystkie inne elementy danych)

Jeśli BufferBlock jest najbardziej podstawowym blokiem w przepływie danych TPL, WriteOnceBlock jest najprostszy.
Przechowuje co najwyżej jedną wartość, a po ustawieniu tej wartości nigdy nie zostanie zastąpiona ani zastąpiona.

Możesz myśleć o WriteOnceBlock jako podobnej do zmiennej składowej tylko do odczytu w C #, z tym wyjątkiem, że zamiast być ustawialnym w konstruktorze, a następnie niezmiennym, można go ustawić tylko raz, a następnie jest niezmienny.

wprowadź opis zdjęcia tutaj

Podział potencjalnych wyników zadania

public static async void SplitIntoBlocks(this Task<T> task,
    out IPropagatorBlock<T> result, 
    out IPropagatorBlock<Exception> exception)
{
    result = new WriteOnceBlock<T>(i => i);
    exception = new WriteOnceBlock<Exception>(i => i);

    try 
    { 
        result.Post(await task); 
    }
    catch(Exception ex) 
    { 
        exception.Post(ex); 
    }
}

Wprowadzenie do przepływu danych TPL autorstwa Stephena Toub

BatchedJoinBlock

(Zbiera określoną liczbę wszystkich elementów z 2-3 danych wejściowych i grupuje je w kręgu zbiorów elementów danych)

BatchedJoinBlock <T1, T2,…> jest w pewnym sensie kombinacją BatchBlock i JoinBlock <T1, T2,…>.
Podczas gdy JoinBlock <T1, T2,…> służy do agregacji jednego wejścia z każdego celu w krotkę, a BatchBlock służy do agregacji N danych wejściowych w kolekcji, BatchedJoinBlock <T1, T2,…> służy do gromadzenia N danych wejściowych z całej wszystkie cele w krotki kolekcji.

wprowadź opis zdjęcia tutaj

Rozproszenie / Zebranie

Rozważ problem rozproszenia / gromadzenia, w którym uruchamianych jest N operacji, z których niektóre mogą zakończyć się powodzeniem i wygenerować ciąg znaków, a inne mogą zakończyć się niepowodzeniem i wygenerować wyjątki.

var batchedJoin = new BatchedJoinBlock<string, Exception>(10);

for (int i=0; i<10; i++)
{
    Task.Factory.StartNew(() => {
        try { batchedJoin.Target1.Post(DoWork()); }
        catch(Exception ex) { batchJoin.Target2.Post(ex); }
    });
}

var results = await batchedJoin.ReceiveAsync();

foreach(string s in results.Item1) 
{
    Console.WriteLine(s);
}

foreach(Exception e in results.Item2) 
{
    Console.WriteLine(e);
}

Wprowadzenie do przepływu danych TPL autorstwa Stephena Toub

TransformBlock

(Wybierz, jeden do jednego)

Podobnie jak w przypadku ActionBlock, TransformBlock <TInput, TOutput> umożliwia wykonanie delegata w celu wykonania pewnej akcji dla każdego wejściowego układu odniesienia; w przeciwieństwie do ActionBlock, przetwarzanie to ma wynik. Ten delegat może być Func <TInput, TOutput>, w którym przetwarzanie tego elementu jest uważane za zakończone, gdy delegat powraca, lub może być Func <TInput, Zadanie>, w którym to przypadku przetwarzanie tego elementu jest uważane za zakończone kiedy delegat powróci, ale kiedy zakończone zadanie zostanie zakończone. Dla tych, którzy znają LINQ, jest on nieco podobny do Select (), ponieważ pobiera dane wejściowe, przekształca je w jakiś sposób, a następnie generuje dane wyjściowe.

Domyślnie TransformBlock <TInput, TOutput> przetwarza swoje dane sekwencyjnie z MaxDegreeOfParallelism równą 1. Oprócz odbierania buforowanych danych wejściowych i przetwarzania, ten blok pobierze również wszystkie przetworzone dane wyjściowe i buforuje je (dane, które nie zostały przetwarzane oraz dane, które zostały przetworzone).

Ma 2 zadania: jedno do przetwarzania danych i jedno do przesyłania danych do następnego bloku.

wprowadź opis zdjęcia tutaj

Równoległy rurociąg

var compressor = new TransformBlock<byte[], byte[]>(input => Compress(input));
var encryptor = new TransformBlock<byte[], byte[]>(input => Encrypt(input));

compressor.LinkTo(Encryptor); 

Wprowadzenie do przepływu danych TPL autorstwa Stephena Toub

ActionBlock

(dla każdego)

Tę klasę można traktować logicznie jako bufor do przetwarzania danych w połączeniu z zadaniami przetwarzania tych danych, przy czym „blok przepływu danych” zarządza obydwoma. W najbardziej podstawowym użyciu możemy utworzyć ActionBlock i „wysłać” dane do niego; delegat podany w konstrukcji ActionBlock zostanie wykonany asynchronicznie dla każdego wysłanego elementu.

wprowadź opis zdjęcia tutaj

Obliczenia synchroniczne

var ab = new ActionBlock<TInput>(i => 
{
    Compute(i);
});
…
ab.Post(1);
ab.Post(2);
ab.Post(3);

Ograniczanie asynchronicznych pobrań do maksymalnie 5 jednocześnie

var downloader = new ActionBlock<string>(async url =>
{
    byte [] imageData = await DownloadAsync(url);
    Process(imageData);
}, new DataflowBlockOptions { MaxDegreeOfParallelism = 5 }); 

downloader.Post("http://website.com/path/to/images");
downloader.Post("http://another-website.com/path/to/images");

Wprowadzenie do przepływu danych TPL autorstwa Stephena Toub

TransformManyBlock

(SelectMany, 1-m: Wyniki tego mapowania są „spłaszczone”, podobnie jak SelectMany LINQ)

TransformManyBlock <TInput, TOutput> jest bardzo podobny do TransformBlock <TInput, TOutput>.
Kluczowa różnica polega na tym, że podczas gdy TransformBlock <TInput, TOutput> wytwarza jedno i tylko jedno wyjście dla każdego wejścia, TransformManyBlock <TInput, TOutput> wytwarza dowolną liczbę (zero lub więcej) wyjść dla każdego wejścia. Podobnie jak w przypadku ActionBlock i TransformBlock <TInput, TOutput>, przetwarzanie to można określić za pomocą delegatów, zarówno do przetwarzania synchronicznego, jak i asynchronicznego.

Func <TInput, IEnumerable> jest używany do synchronizacji, a Func <TInput, zadanie <IEnumerable>> jest używany do asynchronizacji. Podobnie jak w przypadku ActionBlock i TransformBlock <TInput, TOutput>, TransformManyBlock <TInput, TOutput> domyślnie jest przetwarzany sekwencyjnie, ale może zostać skonfigurowany w inny sposób.

Delegat mapowania ponownie sprawdza kolekcję elementów, które są wstawiane indywidualnie do bufora wyjściowego.

wprowadź opis zdjęcia tutaj

Asynchroniczny przeszukiwacz sieci

var downloader = new TransformManyBlock<string, string>(async url =>
{
    Console.WriteLine(“Downloading “ + url);
    try 
    { 
        return ParseLinks(await DownloadContents(url)); 
    } 
    catch{}
    
    return Enumerable.Empty<string>();
});
downloader.LinkTo(downloader);

Rozwijanie elementu policzalnego na jego elementy składowe

var expanded = new TransformManyBlock<T[], T>(array => array);

Filtrowanie według liczby od 1 do 0 lub 1 elementów

public IPropagatorBlock<T> CreateFilteredBuffer<T>(Predicate<T> filter)
{
    return new TransformManyBlock<T, T>(item =>
        filter(item) ? new [] { item } : Enumerable.Empty<T>());
}

Wprowadzenie do przepływu danych TPL autorstwa Stephena Toub

BatchBlock

(Grupuje pewną liczbę sekwencyjnych elementów danych w kolekcje elementów danych)

BatchBlock łączy N pojedynczych elementów w jeden element wsadowy, reprezentowany jako tablica elementów. Instancja jest tworzona z określonym rozmiarem partii, a następnie blok tworzy partię, gdy tylko otrzyma taką liczbę elementów, asynchronicznie wysyłając partię do bufora wyjściowego.

BatchBlock może działać zarówno w trybie chciwym, jak i chciwym.

  • W domyślnym trybie chciwym wszystkie wiadomości oferowane blokowi z dowolnej liczby źródeł są akceptowane i buforowane do konwersji na partie.
    • W trybie bez chciwości wszystkie wiadomości są odraczane ze źródeł, dopóki wystarczająca liczba źródeł nie zaoferuje wiadomości do bloku, aby utworzyć partię. Tak więc BatchBlock może być użyty do otrzymania 1 elementu z każdego z N źródeł, N elementów z 1 źródła i niezliczonej liczby opcji pomiędzy nimi.

wprowadź opis zdjęcia tutaj

Grupowanie żądań w grupy po 100 w celu przesłania do bazy danych

var batchRequests = new BatchBlock<Request>(batchSize:100);
var sendToDb = new ActionBlock<Request[]>(reqs => SubmitToDatabase(reqs));

batchRequests.LinkTo(sendToDb);

Tworzenie partii raz na sekundę

var batch = new BatchBlock<T>(batchSize:Int32.MaxValue);
new Timer(() => { batch.TriggerBatch(); }).Change(1000, 1000);

Wprowadzenie do przepływu danych TPL autorstwa Stephena Toub

BufferBlock

(Kolejka FIFO: Dane, które przychodzą, to dane, które wychodzą)

Krótko mówiąc, BufferBlock zapewnia nieograniczony lub ograniczony bufor do przechowywania instancji T.
Możesz „zaksięgować” wystąpienia T do bloku, co powoduje, że dane są zapisywane w bloku według kolejności „pierwsze weszło, pierwsze wyszło” (FIFO).
Możesz „odbierać” z bloku, co pozwala synchronicznie lub asynchronicznie uzyskiwać instancje T uprzednio zapisane lub dostępne w przyszłości (ponownie FIFO).

wprowadź opis zdjęcia tutaj

Asynchroniczny producent / konsument z ograniczonym producentem

// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> _Buffer = new BufferBlock<int>(
    new DataflowBlockOptions { BoundedCapacity = 10 });

// Producer
private static async void Producer()
{
    while(true)
    {
        await _Buffer.SendAsync(Produce());
    }
}

// Consumer
private static async Task Consumer()
{
    while(true)
    {
        Process(await _Buffer.ReceiveAsync());
    } 
}

// Start the Producer and Consumer
private static async Task Run()
{
    await Task.WhenAll(Producer(), Consumer());
}

Wprowadzenie do przepływu danych TPL autorstwa Stephena Toub



Modified text is an extract of the original Stack Overflow Documentation
Licencjonowany na podstawie CC BY-SA 3.0
Nie związany z Stack Overflow