.NET Framework
작업 병렬 라이브러리 (TPL)
수색…
비고
목적과 사용 사례
작업 병렬 라이브러리의 목적은 다중 스레드 및 병렬 코드 작성 및 유지 관리 프로세스를 단순화하는 것입니다.
일부 사용 사례 * :
- 별도의 작업에서 백그라운드 작업을 실행하여 UI를 응답 성있게 유지
- 워크로드 분산
- 클라이언트 응용 프로그램이 동시에 (요청, TCP / UDP 등) 요청을 보내고받을 수 있도록 허용
- 한 번에 여러 파일 읽기 및 쓰기
* 코드는 멀티 스레딩을 위해 사례별로 고려해야합니다. 예를 들어 루프가 몇 번의 반복 만 수행하거나 적은 양의 작업 만 수행하는 경우 병렬 처리의 오버 헤드가 이점보다 클 수 있습니다.
TPL with .Net 3.5
TPL은 NuGet 패키지에 포함 된 .Net 3.5에도 사용할 수 있으며 작업 병렬 라이브러리라고합니다.
기본 생산자 - 소비자 루프 (BlockingCollection)
var collection = new BlockingCollection<int>(5);
var random = new Random();
var producerTask = Task.Run(() => {
for(int item=1; item<=10; item++)
{
collection.Add(item);
Console.WriteLine("Produced: " + item);
Thread.Sleep(random.Next(10,1000));
}
collection.CompleteAdding();
Console.WriteLine("Producer completed!");
});
collection.CompleteAdding();
호출하지 않는다면 주목할 가치가 있습니다 collection.CompleteAdding();
컨 수머 태스크가 실행 중이더라도 컬렉션에 계속 추가 할 수 있습니다. 그냥 collection.CompleteAdding();
호출하십시오 collection.CompleteAdding();
당신이 확신 할 때 더 이상 추가 사항이 없습니다. 이 기능을 사용하면 여러 소스가 BlockingCollection에 항목을 제공하고 단일 소비자가 항목을 가져 와서 항목을 처리하는 단일 소비자 패턴으로 여러 제작자를 만들 수 있습니다. 전체 추가를 호출하기 전에 BlockingCollection이 비어 있으면 collection.GetConsumingEnumerable()
의 Enumerable은 새 항목이 컬렉션에 추가되거나 BlockingCollection.CompleteAdding (); 대기열이 비어 있습니다.
var consumerTask = Task.Run(() => {
foreach(var item in collection.GetConsumingEnumerable())
{
Console.WriteLine("Consumed: " + item);
Thread.Sleep(random.Next(10,1000));
}
Console.WriteLine("Consumer completed!");
});
Task.WaitAll(producerTask, consumerTask);
Console.WriteLine("Everything completed!");
작업 : 기본 인스턴스 및 대기
Task
클래스를 직접 인스턴스화하여 Task
만들 수 있습니다 ...
var task = new Task(() =>
{
Console.WriteLine("Task code starting...");
Thread.Sleep(2000);
Console.WriteLine("...task code ending!");
});
Console.WriteLine("Starting task...");
task.Start();
task.Wait();
Console.WriteLine("Task completed!");
... 또는 정적 Task.Run
메서드를 사용하여 :
Console.WriteLine("Starting task...");
var task = Task.Run(() =>
{
Console.WriteLine("Task code starting...");
Thread.Sleep(2000);
Console.WriteLine("...task code ending!");
});
task.Wait();
Console.WriteLine("Task completed!");
첫 번째 경우에만 Start
를 명시 적으로 호출해야합니다.
작업 : WaitAll 및 변수 캡처
var tasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() =>
{
Console.WriteLine("I'm task " + n);
return n;
})).ToArray();
foreach(var task in tasks) task.Start();
Task.WaitAll(tasks);
foreach(var task in tasks)
Console.WriteLine(task.Result);
작업 : WaitAny
var allTasks = Enumerable.Range(1, 5).Select(n => new Task<int>(() => n)).ToArray();
var pendingTasks = allTasks.ToArray();
foreach(var task in allTasks) task.Start();
while(pendingTasks.Length > 0)
{
var finishedTask = pendingTasks[Task.WaitAny(pendingTasks)];
Console.WriteLine("Task {0} finished", finishedTask.Result);
pendingTasks = pendingTasks.Except(new[] {finishedTask}).ToArray();
}
Task.WaitAll(allTasks);
참고 : WaitAny
는 예외가 발생하지 않기 때문에 마지막 WaitAll
이 필요합니다.
작업 : 예외 처리 (Wait 사용)
var task1 = Task.Run(() =>
{
Console.WriteLine("Task 1 code starting...");
throw new Exception("Oh no, exception from task 1!!");
});
var task2 = Task.Run(() =>
{
Console.WriteLine("Task 2 code starting...");
throw new Exception("Oh no, exception from task 2!!");
});
Console.WriteLine("Starting tasks...");
try
{
Task.WaitAll(task1, task2);
}
catch(AggregateException ex)
{
Console.WriteLine("Task(s) failed!");
foreach(var inner in ex.InnerExceptions)
Console.WriteLine(inner.Message);
}
Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted
작업 : 예외 처리 (대기를 사용하지 않고)
var task1 = Task.Run(() =>
{
Console.WriteLine("Task 1 code starting...");
throw new Exception("Oh no, exception from task 1!!");
});
var task2 = Task.Run(() =>
{
Console.WriteLine("Task 2 code starting...");
throw new Exception("Oh no, exception from task 2!!");
});
var tasks = new[] {task1, task2};
Console.WriteLine("Starting tasks...");
while(tasks.All(task => !task.IsCompleted));
foreach(var task in tasks)
{
if(task.IsFaulted)
Console.WriteLine("Task failed: " +
task.Exception.InnerExceptions.First().Message);
}
Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted
작업 : CancellationToken을 사용하여 취소
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
var task = new Task((state) =>
{
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(true)
{
Console.Write("{0} ", i++);
Thread.Sleep(1000);
myCancellationToken.ThrowIfCancellationRequested();
}
},
cancellationToken: cancellationToken,
state: cancellationToken);
Console.WriteLine("Counting to infinity. Press any key to cancel!");
task.Start();
Console.ReadKey();
cancellationTokenSource.Cancel();
try
{
task.Wait();
}
catch(AggregateException ex)
{
ex.Handle(inner => inner is OperationCanceledException);
}
Console.WriteLine($"{Environment.NewLine}You have cancelled! Task status is: {task.Status}");
//Canceled
ThrowIfCancellationRequested
의 대안으로, 취소 요청은 IsCancellationRequested
로 감지 될 수 있으며 OperationCanceledException
을 수동으로 throw 할 수 있습니다.
//New task delegate
int i = 1;
var myCancellationToken = (CancellationToken)state;
while(!myCancellationToken.IsCancellationRequested)
{
Console.Write("{0} ", i++);
Thread.Sleep(1000);
}
Console.WriteLine($"{Environment.NewLine}Ouch, I have been cancelled!!");
throw new OperationCanceledException(myCancellationToken);
취소 토큰이 cancellationToken
매개 변수의 태스크 생성자에 전달되는 방법에 유의하십시오. ThrowIfCancellationRequested
가 호출 될 때 ThrowIfCancellationRequested
이 Faulted
상태가 아닌 Canceled
상태로 전환되도록하려면이 작업이 필요합니다. 또한 같은 이유로 인해 취소 토큰은 두 번째 경우에 OperationCanceledException
생성자에서 명시 적으로 제공됩니다.
Task.WhenAny
var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(async() =>
{
Console.WriteLine("I'm task " + n);
await Task.Delay(random.Next(10,1000));
return n;
}));
Task<Task<int>> whenAnyTask = Task.WhenAny(tasks);
Task<int> completedTask = await whenAnyTask;
Console.WriteLine("The winner is: task " + await completedTask);
await Task.WhenAll(tasks);
Console.WriteLine("All tasks finished!");
Task.WhenAll
var random = new Random();
IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(n => Task.Run(() =>
{
Console.WriteLine("I'm task " + n);
return n;
}));
Task<int[]> task = Task.WhenAll(tasks);
int[] results = await task;
Console.WriteLine(string.Join(",", results.Select(n => n.ToString())));
// Output: 1,2,3,4,5
Parallel.Invoke
var actions = Enumerable.Range(1, 10).Select(n => new Action(() =>
{
Console.WriteLine("I'm task " + n);
if((n & 1) == 0)
throw new Exception("Exception from task " + n);
})).ToArray();
try
{
Parallel.Invoke(actions);
}
catch(AggregateException ex)
{
foreach(var inner in ex.InnerExceptions)
Console.WriteLine("Task failed: " + inner.Message);
}
Parallel.ForEach
이 예에서는 Parallel.ForEach
를 사용하여 여러 스레드를 사용하여 1에서 10000 사이의 숫자 합계를 계산합니다. 스레드 안전을 위해 Interlocked.Add
를 사용하여 숫자를 더합니다.
using System.Threading;
int Foo()
{
int total = 0;
var numbers = Enumerable.Range(1, 10000).ToList();
Parallel.ForEach(numbers,
() => 0, // initial value,
(num, state, localSum) => num + localSum,
localSum => Interlocked.Add(ref total, localSum));
return total; // total = 50005000
}
Parallel.For
이 예에서는 Parallel.For
를 사용하여 여러 스레드를 사용하여 1에서 10000 사이의 숫자 합계를 계산합니다. 스레드 안전을 위해 Interlocked.Add
를 사용하여 숫자를 더합니다.
using System.Threading;
int Foo()
{
int total = 0;
Parallel.For(1, 10001,
() => 0, // initial value,
(num, state, localSum) => num + localSum,
localSum => Interlocked.Add(ref total, localSum));
return total; // total = 50005000
}
AsyncLocal을 사용한 실행 컨텍스트 흐름
부모 작업에서 자식 작업으로 일부 데이터를 전달해야하므로 논리적으로 실행과 함께 흐르게되므로 AsyncLocal
클래스를 사용 AsyncLocal
.
void Main()
{
AsyncLocal<string> user = new AsyncLocal<string>();
user.Value = "initial user";
// this does not affect other tasks - values are local relative to the branches of execution flow
Task.Run(() => user.Value = "user from another task");
var task1 = Task.Run(() =>
{
Console.WriteLine(user.Value); // outputs "initial user"
Task.Run(() =>
{
// outputs "initial user" - value has flown from main method to this task without being changed
Console.WriteLine(user.Value);
}).Wait();
user.Value = "user from task1";
Task.Run(() =>
{
// outputs "user from task1" - value has flown from main method to task1
// than value was changed and flown to this task.
Console.WriteLine(user.Value);
}).Wait();
});
task1.Wait();
// ouputs "initial user" - changes do not propagate back upstream the execution flow
Console.WriteLine(user.Value);
}
참고 : 위의 예제에서 볼 수 있듯이 AsynLocal.Value
에는 copy on read
semantic이 있지만 참조 유형을 이동하고 속성을 변경하면 다른 작업에 영향을줍니다. 따라서 AsyncLocal
을 사용하는 것이 값 유형 또는 변경 불가능 유형을 사용하는 것이 가장 좋습니다.
VB.NET의 Parallel.ForEach
For Each row As DataRow In FooDataTable.Rows
Me.RowsToProcess.Add(row)
Next
Dim myOptions As ParallelOptions = New ParallelOptions()
myOptions.MaxDegreeOfParallelism = environment.processorcount
Parallel.ForEach(RowsToProcess, myOptions, Sub(currentRow, state)
ProcessRowParallel(currentRow, state)
End Sub)
작업 : 값 반환
값을 반환하는 작업의 반환 형식은 Task< TResult >
여기서 TResult는 반환해야하는 값 형식입니다. 작업의 결과를 Result 속성으로 쿼리 할 수 있습니다.
Task<int> t = Task.Run(() =>
{
int sum = 0;
for(int i = 0; i < 500; i++)
sum += i;
return sum;
});
Console.WriteLine(t.Result); // Outuput 124750
작업이 작업을 기다리는 것보다 비동기 적으로 실행되면 결과가 반환됩니다.
public async Task DoSomeWork()
{
WebClient client = new WebClient();
// Because the task is awaited, result of the task is assigned to response
string response = await client.DownloadStringTaskAsync("http://somedomain.com");
}