.NET Framework
TPL 데이터 흐름
수색…
비고
예제에 사용 된 라이브러리
System.Threading.Tasks.Dataflow
System.Threading.Tasks
System.Net.Http
System.Net
Post와 SendAsync의 차이점
블록에 항목을 추가하려면 Post
또는 SendAsync
를 사용할 수 있습니다.
Post
는 항목을 동 기적으로 추가하고 성공했는지 여부를 나타내는 bool
반환합니다. 예를 들어 블록이 BoundedCapcity
도달하여 아직 새 항목을위한 여유 공간이 없을 때 성공하지 못할 수 있습니다. 반면에 SendAsync
는 await
수있는 완료되지 않은 Task<bool>
을 반환합니다. 이 작업은 블록이 내부 대기열을 지우고 영구적으로 감소하는 경우 (예 : 취소 결과) 더 많은 항목을 허용하거나 false
결과로 받아 들일 수있는 true
결과로 미래에 완료 될 것입니다.
ActionBlock에 게시하고 완료를 기다리고 있습니다.
// Create a block with an asynchronous action
var block = new ActionBlock<string>(async hostName =>
{
IPAddress[] ipAddresses = await Dns.GetHostAddressesAsync(hostName);
Console.WriteLine(ipAddresses[0]);
});
block.Post("google.com"); // Post items to the block's InputQueue for processing
block.Post("reddit.com");
block.Post("stackoverflow.com");
block.Complete(); // Tell the block to complete and stop accepting new items
await block.Completion; // Asynchronously wait until all items completed processingu
블록을 연결하여 파이프 라인 만들기
var httpClient = new HttpClient();
// Create a block the accepts a uri and returns its contents as a string
var downloaderBlock = new TransformBlock<string, string>(
async uri => await httpClient.GetStringAsync(uri));
// Create a block that accepts the content and prints it to the console
var printerBlock = new ActionBlock<string>(
contents => Console.WriteLine(contents));
// Make the downloaderBlock complete the printerBlock when its completed.
var dataflowLinkOptions = new DataflowLinkOptions {PropagateCompletion = true};
// Link the block to create a pipeline
downloaderBlock.LinkTo(printerBlock, dataflowLinkOptions);
// Post urls to the first block which will pass their contents to the second one.
downloaderBlock.Post("http://youtube.com");
downloaderBlock.Post("http://github.com");
downloaderBlock.Post("http://twitter.com");
downloaderBlock.Complete(); // Completion will propagate to printerBlock
await printerBlock.Completion; // Only need to wait for the last block in the pipeline
BufferBlock을 사용한 동기식 프로듀서 / 소비자
public class Producer
{
private static Random random = new Random((int)DateTime.UtcNow.Ticks);
//produce the value that will be posted to buffer block
public double Produce ( )
{
var value = random.NextDouble();
Console.WriteLine($"Producing value: {value}");
return value;
}
}
public class Consumer
{
//consume the value that will be received from buffer block
public void Consume (double value) => Console.WriteLine($"Consuming value: {value}");
}
class Program
{
private static BufferBlock<double> buffer = new BufferBlock<double>();
static void Main (string[] args)
{
//start a task that will every 1 second post a value from the producer to buffer block
var producerTask = Task.Run(async () =>
{
var producer = new Producer();
while(true)
{
buffer.Post(producer.Produce());
await Task.Delay(1000);
}
});
//start a task that will recieve values from bufferblock and consume it
var consumerTask = Task.Run(() =>
{
var consumer = new Consumer();
while(true)
{
consumer.Consume(buffer.Receive());
}
});
Task.WaitAll(new[] { producerTask, consumerTask });
}
}
Bounded BufferBlock을 사용하는 비동기 프로듀서 소비자
var bufferBlock = new BufferBlock<int>(new DataflowBlockOptions
{
BoundedCapacity = 1000
});
var cancellationToken = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token;
var producerTask = Task.Run(async () =>
{
var random = new Random();
while (!cancellationToken.IsCancellationRequested)
{
var value = random.Next();
await bufferBlock.SendAsync(value, cancellationToken);
}
});
var consumerTask = Task.Run(async () =>
{
while (await bufferBlock.OutputAvailableAsync())
{
var value = bufferBlock.Receive();
Console.WriteLine(value);
}
});
await Task.WhenAll(producerTask, consumerTask);
Modified text is an extract of the original Stack Overflow Documentation
아래 라이선스 CC BY-SA 3.0
와 제휴하지 않음 Stack Overflow