C# Language
작업 병렬 라이브러리 (TPL) 데이터 흐름 구성
수색…
JoinBlock
(2-3 개의 입력을 모아 터플로 결합)
BatchBlock과 마찬가지로 JoinBlock <T1, T2, ...>는 여러 데이터 소스의 데이터를 그룹화 할 수 있습니다. 실제로 JoinBlock <T1, T2, ...>의 주요 목적입니다.
예를 들어 JoinBlock <string, double, int>은 ISourceBlock <Tuple <string, double, int >>입니다.
BatchBlock과 마찬가지로 JoinBlock <T1, T2, ...>는 욕심 및 비 탐욕 모드에서 모두 작동 할 수 있습니다.
- 기본 욕심 꾸러기 모드에서는 다른 대상이 튜플을 형성하는 데 필요한 데이터를 가지고 있지 않더라도 대상에 제공된 모든 데이터가 허용됩니다.
- non-greedy 모드에서 블럭의 타겟은 모든 타겟이 튜플을 생성하는 데 필요한 데이터를 제공받을 때까지 데이터를 연기 할 것입니다.이 시점에서 블럭은 2 단계 커밋 프로토콜을 사용하여 소스에서 필요한 모든 항목을 원자 적으로 검색합니다. 이 연기는 다른 주체가 그 동안 데이터를 소비하여 전체 시스템이 진행을 진행할 수있게합니다.
풀링 된 객체 수가 제한된 요청 처리
var throttle = new JoinBlock<ExpensiveObject, Request>();
for(int i=0; i<10; i++)
{
requestProcessor.Target1.Post(new ExpensiveObject());
}
var processor = new Transform<Tuple<ExpensiveObject, Request>, ExpensiveObject>(pair =>
{
var resource = pair.Item1;
var request = pair.Item2;
request.ProcessWith(resource);
return resource;
});
throttle.LinkTo(processor);
processor.LinkTo(throttle.Target1);
BroadcastBlock
(항목을 복사하고 연결된 모든 블록에 사본을 보냅니다)
BufferBlock과 달리, BroadcastBlock의 임무는 블록에서 링크 된 모든 타겟이 게시 된 모든 요소의 사본을 얻고, 지속적으로 "현재"값을 전파 한 값으로 덮어 쓰는 것입니다.
또한 BufferBlock과 달리 BroadcastBlock은 데이터를 불필요하게 유지하지 않습니다. 특정 데이터가 모든 타겟에 제공되면 해당 데이터는 다음 라인에있는 모든 데이터로 덮어 씁니다 (모든 데이터 흐름 블록과 마찬가지로 메시지는 FIFO 순서로 처리됩니다). 그 요소는 모든 목표에 제공됩니다.
Throttled Producer가있는 비동기 프로듀서 / 소비자
var ui = TaskScheduler.FromCurrentSynchronizationContext();
var bb = new BroadcastBlock<ImageData>(i => i);
var saveToDiskBlock = new ActionBlock<ImageData>(item =>
item.Image.Save(item.Path)
);
var showInUiBlock = new ActionBlock<ImageData>(item =>
imagePanel.AddImage(item.Image),
new DataflowBlockOptions { TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext() }
);
bb.LinkTo(saveToDiskBlock);
bb.LinkTo(showInUiBlock);
에이전트에서 상태 노출
public class MyAgent
{
public ISourceBlock<string> Status { get; private set; }
public MyAgent()
{
Status = new BroadcastBlock<string>();
Run();
}
private void Run()
{
Status.Post("Starting");
Status.Post("Doing cool stuff");
…
Status.Post("Done");
}
}
WriteOnceBlock
(Readonly 변수 : 첫 번째 데이터 항목을 기억하고 그 사본을 출력으로 전달합니다. 다른 모든 데이터 항목을 무시합니다)
BufferBlock이 TPL Dataflow에서 가장 기본적인 블록 인 경우 WriteOnceBlock이 가장 간단합니다.
최대 하나의 값을 저장하고 해당 값이 설정되면 대체되거나 덮어 쓰이지 않습니다.
WriteOnceBlock은 C #의 readonly 멤버 변수와 비슷하다고 생각할 수 있습니다. 단, 생성자에서 설정 가능하고 불변 인 대신 한 번만 설정할 수 있기 때문에 불변입니다.
작업의 잠재적 출력 분할
public static async void SplitIntoBlocks(this Task<T> task,
out IPropagatorBlock<T> result,
out IPropagatorBlock<Exception> exception)
{
result = new WriteOnceBlock<T>(i => i);
exception = new WriteOnceBlock<Exception>(i => i);
try
{
result.Post(await task);
}
catch(Exception ex)
{
exception.Post(ex);
}
}
BatchedJoinBlock
(2-3 개의 입력에서 특정 수의 전체 항목을 수집하여 데이터 항목의 모음으로 그룹화합니다.)
BatchedJoinBlock <T1, T2, ...>은 의미 적으로 BatchBlock과 JoinBlock <T1, T2, ...>의 조합입니다.
JoinBlock <T1, T2, ...>는 각 대상의 입력을 하나의 튜플로 집계하는 데 사용되고 BatchBlock은 N 개의 입력을 모음으로 집계하는 데 사용되지만 BatchedJoinBlock <T1, T2, ...>은 N 입력을 가로 질러 수집하는 데 사용됩니다 모든 목표는 컬렉션의 튜플에 포함됩니다.
분산 형 / 집적 형
N 작업이 시작되고 일부 작업은 성공하고 문자열 출력을 생성 할 수있는 작업과 실패하고 예외를 생성 할 수있는 분산 / 수집 문제를 고려하십시오.
var batchedJoin = new BatchedJoinBlock<string, Exception>(10);
for (int i=0; i<10; i++)
{
Task.Factory.StartNew(() => {
try { batchedJoin.Target1.Post(DoWork()); }
catch(Exception ex) { batchJoin.Target2.Post(ex); }
});
}
var results = await batchedJoin.ReceiveAsync();
foreach(string s in results.Item1)
{
Console.WriteLine(s);
}
foreach(Exception e in results.Item2)
{
Console.WriteLine(e);
}
TransformBlock
(선택, 일대일)
ActionBlock과 마찬가지로, TransformBlock <TInput, TOutput>은 델리게이트의 실행이 각 입력 데이텀에 대해 어떤 동작을 수행 할 수 있도록합니다. ActionBlock과 달리,이 처리에는 출력이 있습니다. 이 델리게이트는 Func <TInput, TOutput>이 될 수 있으며,이 경우 델리게이트가 반환 할 때 해당 요소의 처리가 완료된 것으로 간주되거나 Func <TInput, Task> 일 수 있습니다.이 경우 해당 요소의 처리는 완료되지 않은 것으로 간주됩니다 대리자가 반환하지만 반환 된 작업이 완료되면. LINQ에 익숙한 사람들은 Select ()와 비슷한 점이 있습니다. Select는 입력을 받아 어떤 방식 으로든 변환 한 다음 출력을 생성합니다.
기본적으로 TransformBlock <TInput, TOutput>은 MaxDegreeOfParallelism이 1 인 순차적으로 데이터를 처리합니다. 버퍼링 된 입력을 수신하고 처리하는 것 외에도이 블록은 처리 된 모든 출력과 버퍼도 취합니다 처리 된 데이터 및 처리 된 데이터).
두 가지 작업이 있습니다. 하나는 데이터를 처리하고 다른 하나는 데이터를 다음 블록으로 푸시하는 작업입니다.
동시 파이프 라인
var compressor = new TransformBlock<byte[], byte[]>(input => Compress(input));
var encryptor = new TransformBlock<byte[], byte[]>(input => Encrypt(input));
compressor.LinkTo(Encryptor);
액션 블록
(각각)
이 클래스는 논리적으로 데이터를 처리하기위한 작업과 함께 처리 할 데이터의 버퍼와 데이터 흐름 블록을 모두 관리하는 버퍼로 생각할 수 있습니다. 가장 기본적인 사용법에서는 ActionBlock을 인스턴스화하고 그것에 ActionBlock을 "게시"할 수 있습니다. ActionBlock의 구조에서 제공되는 델리게이트는 게시 된 모든 데이터에 대해 비동기 적으로 실행됩니다.
동기 계산
var ab = new ActionBlock<TInput>(i =>
{
Compute(i);
});
…
ab.Post(1);
ab.Post(2);
ab.Post(3);
비동기 다운로드를 동시에 최대 5 개까지 조절
var downloader = new ActionBlock<string>(async url =>
{
byte [] imageData = await DownloadAsync(url);
Process(imageData);
}, new DataflowBlockOptions { MaxDegreeOfParallelism = 5 });
downloader.Post("http://website.com/path/to/images");
downloader.Post("http://another-website.com/path/to/images");
TransformManyBlock
(SelectMany, 1-m :이 매핑 결과는 LINQ의 SelectMany처럼 "평평하게"됩니다.)
TransformManyBlock <TInput, TOutput>은 TransformBlock <TInput, TOutput>과 매우 비슷합니다.
주요 차이점은 TransformBlock <TInput, TOutput>이 각 입력에 대해 하나의 출력 만 생성하는 반면 TransformManyBlock <TInput, TOutput>은 각 입력에 대해 숫자 (0 또는 그 이상의) 출력을 생성한다는 점입니다. ActionBlock 및 TransformBlock <TInput, TOutput>과 마찬가지로이 처리는 동기 및 비동기 처리를 위해 대리자를 사용하여 지정할 수 있습니다.
Func <TInput, IEnumerable>은 동기식으로 사용되고 Func <TInput, Task <IEnumerable >>은 비동기식으로 사용됩니다. ActionBlock과 TransformBlock <TInput, TOutput>과 같이 TransformManyBlock <TInput, TOutput>의 기본값은 순차적 처리이지만 그렇지 않으면 구성 될 수 있습니다.
매핑 대리자는 개별적으로 출력 버퍼에 삽입되는 항목 컬렉션을 다시 가져옵니다.
비동기 웹 크롤러
var downloader = new TransformManyBlock<string, string>(async url =>
{
Console.WriteLine(“Downloading “ + url);
try
{
return ParseLinks(await DownloadContents(url));
}
catch{}
return Enumerable.Empty<string>();
});
downloader.LinkTo(downloader);
Enumerable을 구성 요소로 확장
var expanded = new TransformManyBlock<T[], T>(array => array);
1에서 0 또는 1 요소로 이동하여 필터링
public IPropagatorBlock<T> CreateFilteredBuffer<T>(Predicate<T> filter)
{
return new TransformManyBlock<T, T>(item =>
filter(item) ? new [] { item } : Enumerable.Empty<T>());
}
BatchBlock
(특정 수의 순차적 데이터 항목을 데이터 항목의 모음으로 그룹화)
BatchBlock은 N 개의 단일 항목을 하나의 배치 항목으로 결합하여 요소 배열로 표시합니다. 특정 일괄 처리 크기로 인스턴스가 생성 된 다음 블록은 해당 수의 요소를 수신하자마자 일괄 처리를 생성하여 일괄 처리를 출력 버퍼로 비동기 적으로 출력합니다.
BatchBlock은 욕심 및 비 탐욕 모드에서 모두 실행할 수 있습니다.
- 기본 욕심 꾸러기 형태에서는, 어떤 수의 근원에서 구획에 제안 된 모든 메시지는 받아 들여지고 뭉치로 바뀌기 위하여 완충된다.
- 욕심이 아닌 모드에서는 배치를 만들기에 충분한 소스가 메시지를 블록에 제공 할 때까지 모든 메시지가 소스에서 연기됩니다. 따라서 BatchBlock을 사용하여 N 개의 소스 각각에서 1 개의 요소를, 1 개의 소스에서 N 개의 요소를, 그리고 그 사이에 무수히 많은 옵션을 수신 할 수 있습니다.
요청을 100 개 그룹으로 묶어 데이터베이스에 제출
var batchRequests = new BatchBlock<Request>(batchSize:100);
var sendToDb = new ActionBlock<Request[]>(reqs => SubmitToDatabase(reqs));
batchRequests.LinkTo(sendToDb);
한 번에 한 번 배치 만들기
var batch = new BatchBlock<T>(batchSize:Int32.MaxValue);
new Timer(() => { batch.TriggerBatch(); }).Change(1000, 1000);
BufferBlock
(FIFO 대기열 : 들어오는 데이터가 나가는 데이터입니다)
요컨대, BufferBlock은 T의 인스턴스를 저장하기위한 제한되지 않거나 제한된 버퍼를 제공합니다.
T의 인스턴스를 블록에 "게시"할 수 있습니다. 그러면 게시되는 데이터가 블록에 의해 선입 선출 (FIFO) 순서로 저장됩니다.
블록에서 "수신"하면 이전에 저장되었거나 향후 사용할 수있는 T의 인스턴스를 동 기적 또는 비동기 적으로 얻을 수 있습니다 (다시 FIFO).
Throttled Producer가있는 비동기 프로듀서 / 소비자
// Hand-off through a bounded BufferBlock<T>
private static BufferBlock<int> _Buffer = new BufferBlock<int>(
new DataflowBlockOptions { BoundedCapacity = 10 });
// Producer
private static async void Producer()
{
while(true)
{
await _Buffer.SendAsync(Produce());
}
}
// Consumer
private static async Task Consumer()
{
while(true)
{
Process(await _Buffer.ReceiveAsync());
}
}
// Start the Producer and Consumer
private static async Task Run()
{
await Task.WhenAll(Producer(), Consumer());
}