Java Language
Executor、ExecutorService、Threadsプール
サーチ…
前書き
JavaのExecutorインタフェースは、スレッドの使用、スケジューリングなどの詳細を含む、各タスクの実行方法の仕組みからのタスクの提出を切り離す方法を提供します。通常、明示的にスレッドを作成する代わりにExecutorが使用されます。 Executorsでは、開発者はプログラムのタスク実行ポリシーを簡単に調整できるようにコードを大幅に書き直す必要はありません。
備考
落とし穴
- 繰り返し実行するタスクをスケジュールすると、使用されるScheduledExecutorServiceによっては、タスクの実行によって処理されない例外が発生した場合、タスクはそれ以上実行されなくなる可能性があります。 ScheduledExecutorService!を参照してください。
火災と忘れる - 実行可能なタスク
エグゼキュータは、他のスレッドで実行される(潜在的に計算上の、または長時間実行されている、または長時間実行されている)コードを含むjava.lang.Runnable
を受け入れjava.lang.Runnable
。
使用方法は次のとおりです。
Executor exec = anExecutor;
exec.execute(new Runnable() {
@Override public void run() {
//offloaded work, no need to get result back
}
});
このエグゼキュータでは、計算された値を戻す手段がないことに注意してください。
Java 8では、lambdaを使用してコード例を短縮することができます。
Executor exec = anExecutor;
exec.execute(() -> {
//offloaded work, no need to get result back
});
ThreadPoolExecutor
使用される共通のExecutorはThreadPoolExecutor
、これはスレッドの処理を担当します。実行する人が多いときには、エグゼキュータが常に保たなければならないスレッドの最小量(コア・サイズと呼ばれます)と、プールが増える最大スレッド・サイズを構成できます。ワークロードが低下すると、PoolはMinのサイズに達するまでスレッドのカウントを徐々に減らします。
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1, // keep at least one thread ready,
// even if no Runnables are executed
5, // at most five Runnables/Threads
// executed in parallel
1, TimeUnit.MINUTES, // idle Threads terminated after one
// minute, when min Pool size exceeded
new ArrayBlockingQueue<Runnable>(10)); // outstanding Runnables are kept here
pool.execute(new Runnable() {
@Override public void run() {
//code to run
}
});
注意 無制限キューを使用してThreadPoolExecutor
をコンフィグレーションすると、スレッドが満杯の場合にのみ新しいスレッドが作成されるため、スレッド数はcorePoolSize
超えません。
すべてのパラメータを持つThreadPoolExecutor:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)
JavaDocから
corePoolSize以上で、maximumPoolSizeより小さいスレッドが実行されている場合は、キューがいっぱいになった場合にのみ新しいスレッドが作成されます。
利点:
BlockingQueueサイズを制御し、メモリ不足シナリオを回避することができます。限られた制限キュー・サイズでは、アプリケーションのパフォーマンスは低下しません。
既存のRejection Handlerポリシーを使用することも、新しいRejection Handlerポリシーを作成することもできます。
デフォルトのThreadPoolExecutor.AbortPolicyでは、拒否されると、ハンドラは実行時RejectedExecutionExceptionをスローします。
ThreadPoolExecutor.CallerRunsPolicy
では、execute自体を呼び出すスレッドがタスクを実行します。これは、新しいタスクが提出される速度を遅くする単純なフィードバック制御メカニズムを提供する。ThreadPoolExecutor.DiscardPolicy
では、実行できないタスクは単に破棄されます。ThreadPoolExecutor.DiscardOldestPolicy
では、エグゼキュータがシャットダウンされていない場合、作業キューの先頭にあるタスクが破棄され、実行が再試行されます(これが再度失敗する可能性があります)。
カスタム
ThreadFactory
を設定することができます。これは便利です:- よりわかりやすいスレッド名を設定するには
- スレッドデーモンのステータスを設定するには
- スレッドの優先順位を設定するには
ここで使用する方法の例があるThreadPoolExecutor
計算からの値の取得 - 呼び出し可能
計算が後で必要となる戻り値を生成する場合、単純なRunnableタスクでは不十分です。このような場合、実行が完了した後に値を返すExecutorService.submit(
Callable
<T>)
を使用できます。
サービスは、タスクの実行結果を取得するために使用できるFuture
を返します。
// Submit a callable for execution
ExecutorService pool = anExecutorService;
Future<Integer> future = pool.submit(new Callable<Integer>() {
@Override public Integer call() {
//do some computation
return new Random().nextInt();
}
});
// ... perform other tasks while future is executed in a different thread
将来の結果を得る必要があるときは、 future.get()
呼び出します。
未来のために無限に待って、結果を待つ。
try { // Blocks current thread until future is completed Integer result = future.get(); catch (InterruptedException || ExecutionException e) { // handle appropriately }
将来が終了するのを待ちますが、指定された時間を超えてはなりません。
try { // Blocks current thread for a maximum of 500 milliseconds. // If the future finishes before that, result is returned, // otherwise TimeoutException is thrown. Integer result = future.get(500, TimeUnit.MILLISECONDS); catch (InterruptedException || ExecutionException || TimeoutException e) { // handle appropriately }
スケジュールされたタスクまたは実行中のタスクの結果が不要になった場合は、 Future.cancel(boolean)
を呼び出してキャンセルすることができます。
-
cancel(false)
を呼び出すと、実行するタスクのキューからタスクが削除されます。 -
cancel(true)
を呼び出すと、現在実行中のタスクも中断されます。
遅延または繰り返しの後、一定時間にタスクを実行するようにスケジューリングする
ScheduledExecutorService
クラスは、さまざまな方法で単一タスクまたは繰り返しタスクのスケジューリングを行うメソッドを提供します。次のコードサンプルでは、 pool
が宣言され、次のように初期化されていると想定しています。
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
通常のExecutorService
メソッドに加えて、 ScheduledExecutorService
APIには、タスクをScheduledFuture
オブジェクトを返す4つのメソッドが追加さScheduledFuture
ます。後者を使用して結果を取得したり、タスクをキャンセルしたりすることができます。
一定の遅延後にタスクを開始する
次の例では、10分後にタスクを開始するようにスケジュールを設定しています。
ScheduledFuture<Integer> future = pool.schedule(new Callable<>() {
@Override public Integer call() {
// do something
return 42;
}
},
10, TimeUnit.MINUTES);
一定の速度でタスクを開始する
次の例では、10分後にタスクを開始し、1分に1回の割合でタスクを繰り返し実行するようにスケジュールしています。
ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
@Override public void run() {
// do something
}
},
10, 1, TimeUnit.MINUTES);
タスクの実行は、 pool
がシャットダウンされたり、 future
がキャンセルされたり、タスクの1つが例外に遭遇するまでスケジュールに従って継続されます。
指定scheduledAtFixedRate
れたscheduledAtFixedRate
コールによってスケジュールされたタスクは、時間的に重複しないことが保証されています。タスクが所定の期間より長くかかると、次回以降のタスクの実行が遅く開始することがあります。
固定遅延によるタスクの開始
次の例では、10分後にタスクを開始し、1つのタスクが終了してから次のタスクが開始するまでに1分の遅延を繰り返してスケジュールします。
ScheduledFuture<?> future = pool.scheduleWithFixedDelay(new Runnable() {
@Override public void run() {
// do something
}
},
10, 1, TimeUnit.MINUTES);
タスクの実行は、 pool
がシャットダウンされたり、 future
がキャンセルされたり、タスクの1つが例外に遭遇するまでスケジュールに従って継続されます。
拒否された実行を処理する
もし
- シャットダウンExecutorにタスクをサブミットしようとするか、または
- キューは飽和しています(結合されたスレッドでのみ可能です)、スレッドの最大数に達しました。
RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)
が呼び出されます。
デフォルトの動作では、呼び出し側でRejectedExecutionExceptionがスローされます。しかし、より多くの事前定義された動作が利用可能です。
- ThreadPoolExecutor.AbortPolicy (デフォルト、REEをスローします)
- ThreadPoolExecutor.CallerRunsPolicy (呼び出し元のスレッドでタスクを実行してスレッドをブロックします )
- ThreadPoolExecutor.DiscardPolicy (タスクをサイレントに破棄)
- ThreadPoolExecutor.DiscardOldestPolicy (キュー内の最も古いタスクを静かに破棄し、新しいタスクの実行を再試行する)
ThreadPool コンストラクタの 1つを使用してそれらを設定することができます:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) // <--
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) // <--
RejectedExecutionHandlerインターフェイスを拡張することで、独自の動作を実装することもできます。
void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
submit()とexecute()の例外処理の違い
一般に、execute()コマンドはfire呼び出しとforget呼び出し(結果を解析する必要なし)に使用され、submit()コマンドはfutureオブジェクトの結果の解析に使用されます。
これらの2つのコマンドの間の例外処理メカニズムの主な違いを認識する必要があります。
あなたがそれらをキャッチしなかった場合、submit()の例外はフレームワークによって飲み込まれます。
違いを理解するためのコード例:
ケース1:例外を報告するexecute()コマンドでRunnableを送信します。
import java.util.concurrent.*;
import java.util.*;
public class ExecuteSubmitDemo {
public ExecuteSubmitDemo() {
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(2);
//ExtendedExecutor service = new ExtendedExecutor();
for (int i = 0; i < 2; i++){
service.execute(new Runnable(){
public void run(){
int a = 4, b = 0;
System.out.println("a and b=" + a + ":" + b);
System.out.println("a/b:" + (a / b));
System.out.println("Thread Name in Runnable after divide by zero:"+Thread.currentThread().getName());
}
});
}
service.shutdown();
}
public static void main(String args[]){
ExecuteSubmitDemo demo = new ExecuteSubmitDemo();
}
}
class ExtendedExecutor extends ThreadPoolExecutor {
public ExtendedExecutor() {
super(1, 1, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));
}
// ...
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Object result = ((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
System.out.println(t);
}
}
出力:
creating service
a and b=4:0
a and b=4:0
Exception in thread "pool-1-thread-1" Exception in thread "pool-1-thread-2" java.lang.ArithmeticException: / by zero
at ExecuteSubmitDemo$1.run(ExecuteSubmitDemo.java:15)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
java.lang.ArithmeticException: / by zero
at ExecuteSubmitDemo$1.run(ExecuteSubmitDemo.java:15)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
ケース2:execute()をsubmit()に置き換えます service.submit(new Runnable(){
この場合、例外はrun()メソッドが明示的にキャッチしなかったためフレームワークによって取り込まれます。
出力:
creating service
a and b=4:0
a and b=4:0
ケース3:newFixedThreadPoolをExtendedExecutorに変更する
//ExecutorService service = Executors.newFixedThreadPool(2);
ExtendedExecutor service = new ExtendedExecutor();
出力:
creating service
a and b=4:0
java.lang.ArithmeticException: / by zero
a and b=4:0
java.lang.ArithmeticException: / by zero
この例では、2つのトピックについて説明しました。カスタムThreadPoolExecutorを使用し、カスタムThreadPoolExecutorでExectpionを処理します。
上記の問題に対する他の簡単な解決策:通常のExecutorServiceおよびsubmitコマンドを使用している場合は、Future()のsubmit()コマンド呼び出しget()APIからFutureオブジェクトを取得します。 afterExecuteメソッドの実装で引用されている3つの例外をキャッチします。このアプローチに対するカスタムThreadPoolExecutorの利点:例外処理メカニズムは、カスタムThreadPoolExecutorという1つの場所でのみ処理する必要があります。
さまざまな種類の同時実行性構造のユースケース
ExecutorService executor = Executors.newFixedThreadPool(50);
シンプルで使いやすいです。
ThreadPoolExecutor
低レベルの詳細を隠します。Callable/Runnable
タスクの数が少なく、無制限キュー内のタスクを積み重ねてもメモリが増えず、システムのパフォーマンスが低下する場合は、この方法をお勧めします。CPU/Memory
制約がある場合は、タスクのRejectedExecutionHandler
を処理するために、容量制約&RejectedExecutionHandler
を使用してThreadPoolExecutor
を使用することをお勧めします。CountDownLatch
は、指定されたカウントで初期化されます。このカウントは、countDown()
メソッドの呼び出しによって減分されます。このカウントがゼロになるのを待っているスレッドは、await()
メソッドの1つを呼び出すことができます。await()
呼び出すと、カウントがゼロになるまでスレッドがブロックされます。 このクラスは、他のスレッドセットがタスクを完了するまでJavaスレッドを待機させます。ユースケース:
最大限の並列性を達成する:並列化を最大限に達成するために同時に複数のスレッドを開始することがあります
実行を開始する前にN個のスレッドが完了するまで待ちます。
デッドロック検出。
ThreadPoolExecutor :より多くの制御を提供します。保留中のRunnable / Callableタスクの数によってアプリケーションが制約されている場合は、最大容量を設定して境界キューを使用できます。キューが最大容量に達すると、RejectionHandlerを定義できます。 Javaには4種類の
RejectedExecutionHandler
ポリシーが用意RejectedExecutionHandler
ます。ThreadPoolExecutor.AbortPolicy
、ハンドラは拒否されると実行時にRejectedExecutionExceptionをスローします。ThreadPoolExecutor.CallerRunsPolicy`、executeを呼び出すスレッドがタスクを実行します。これは、新しいタスクが提出される速度を遅くする単純なフィードバック制御メカニズムを提供する。
ThreadPoolExecutor.DiscardPolicy
では、実行できないタスクは単に破棄されます。ThreadPoolExecutor.DiscardOldestPolicy
、エグゼキュータがシャットダウンされていない場合は、作業キューの先頭にあるタスクが破棄され、実行が再試行されます(再実行が失敗する可能性があります)。
CountDownLatch
動作をシミュレートする場合は、 invokeAll()
メソッドを使用できます。
あなたが引用しなかったもう一つのメカニズムはForkJoinPoolです
ForkJoinPool
はJava 7のJavaに追加されましたForkJoinPool
はJavaExecutorService
似ていますが、1つの違いがあります。ForkJoinPool
すると、タスクを自分の仕事を分割して小さなタスクに分割し、ForkJoinPool
することが容易にForkJoinPool
ます。空きワーカースレッドがビジー状態のワーカースレッドキューからタスクを盗むとき、ForkJoinPool
タスクが盗み出されます。Java 8は、 Work stealing poolを作成するためにExecutorServiceにもう1つのAPIを導入しました。
RecursiveTask
とRecursiveAction
を作成する必要はありませんが、ForkJoinPool
は使用できます。public static ExecutorService newWorkStealingPool()
使用可能なすべてのプロセッサーをターゲット並列処理レベルとして使用して、ワークスティールスレッドプールを作成します。
デフォルトでは、パラメータとしてCPUコア数が使用されます。
これら4つのメカニズムはすべて相互に補完的です。制御したい細かさのレベルに応じて、適切なものを選択する必要があります。
ExecutorService内のすべてのタスクの完了を待つ
Executorに提出されたタスクの完了を待つ様々なオプションを見てみましょう
- ExecutorService
invokeAll()
指定されたタスクを実行し、すべてが完了したときにステータスと結果を保持するFuturesのリストを返します。
例:
import java.util.concurrent.*;
import java.util.*;
public class InvokeAllDemo{
public InvokeAllDemo(){
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List<MyCallable> futureList = new ArrayList<MyCallable>();
for (int i = 0; i < 10; i++){
MyCallable myCallable = new MyCallable((long)i);
futureList.add(myCallable);
}
System.out.println("Start");
try{
List<Future<Long>> futures = service.invokeAll(futureList);
} catch(Exception err){
err.printStackTrace();
}
System.out.println("Completed");
service.shutdown();
}
public static void main(String args[]){
InvokeAllDemo demo = new InvokeAllDemo();
}
class MyCallable implements Callable<Long>{
Long id = 0L;
public MyCallable(Long val){
this.id = val;
}
public Long call(){
// Add your business logic
return id;
}
}
}
他のスレッドで実行されている一連の操作が完了するまで、1つまたは複数のスレッドを待機させる同期化補助機能。
CountDownLatchは、指定されたカウントで初期化されます。 awaitメソッドは、
countDown()
メソッドの呼び出しによって現在のカウントがゼロになるまでブロックします。その後、待機中のすべてのスレッドが解放され、その後のすべてのwaitの呼び出しがすぐに戻ります。これはワンショット現象です。カウントをリセットすることはできません。カウントをリセットするバージョンが必要な場合は、 CyclicBarrierの使用を検討してください 。ForkJoinPoolまたは
newWorkStealingPool()
でエグゼキューExecutorService
に送信した後に作成されたすべてのFuture
オブジェクトを繰り返し処理するExecutorServiceの Oracleドキュメント・ページからのシャットダウンの推奨方法:
void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); }
shutdown():
以前にサブミットされたタスクが実行され、新しいタスクは受け入れられない、正常なシャットダウンを開始します。shutdownNow():
アクティブに実行中のタスクをすべて停止しようとし、待機中のタスクの処理を停止し、実行待ちのタスクのリストを返します。上記の例では、タスクの完了に時間がかかる場合は、条件をwhileに変更することができます
置換
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
〜と
while(!pool.awaitTermination(60, TimeUnit.SECONDS)) { Thread.sleep(60000);
}
異なるタイプのExecutorServiceの使用例
エグゼクティブは、特定のニーズに対応するさまざまなタイプのThreadPoolを返します。
public static ExecutorService newSingleThreadExecutor()
バインドされていないキューで動作する単一のワーカースレッドを使用するExecutorを作成します。
newFixedThreadPool(1)
とnewSingleThreadExecutor()
は、javaドキュメントが後者について言うように違いがあります。その他の同等のnewFixedThreadPool(1)とは異なり、返されたエグゼキュータは、追加スレッドを使用するように再構成できないことが保証されています。
つまり、
newFixedThreadPool
は、newFixedThreadPool
プログラムで再構成することができます。((ThreadPoolExecutor) fixedThreadPool).setMaximumPoolSize(10)
これはnewSingleThreadExecutor
では不可能ですユースケース:
- 送信されたタスクを順番に実行したいとします。
- すべてのリクエストを処理するスレッドは1つだけ必要です
短所:
- 無制限のキューは有害です
public static ExecutorService newFixedThreadPool(int nThreads)
共有無制限キューを操作して固定数のスレッドを再利用するスレッドプールを作成します。どの時点でも、たいていのnThreadsスレッドはアクティブな処理タスクになります。すべてのスレッドがアクティブなときに追加のタスクが送信されると、スレッドが使用可能になるまでキュー内で待機します
ユースケース:
- 使用可能なコアの効果的な使用。
nThreads
をRuntime.getRuntime().availableProcessors()
として設定しますnThreads
Runtime.getRuntime().availableProcessors()
- スレッド数がスレッドプール内の数値を超えないようにすることを決定すると
短所:
- 無制限のキューは有害です。
- 使用可能なコアの効果的な使用。
public static ExecutorService newCachedThreadPool()
必要に応じて新しいスレッドを作成するスレッドプールを作成します。ただし、以前に構築されたスレッドが使用可能になった場合に再利用します
ユースケース:
- 短命の非同期タスク
短所:
- 無制限のキューは有害です。
- 既存のスレッドがすべてビジー状態になると、新しいスレッドが作成されます。タスクの実行時間が長くなると、より多くのスレッドが作成され、システムのパフォーマンスが低下します。この場合の代替:
newFixedThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
指定された遅延の後に実行するコマンド、または定期的に実行するコマンドをスケジュールできるスレッドプールを作成します。
ユースケース:
- 遅延を伴う定期的なイベントの処理は、将来一定の間隔で行われます。
短所:
- 無制限のキューは有害です。
5.
public static ExecutorService newWorkStealingPool()
使用可能なすべてのプロセッサーをターゲット並列処理レベルとして使用して、ワークスティールスレッドプールを作成します。
ユースケース:
- 分割して征服するタイプの問題。
- アイドル状態のスレッドの効果的な使用。アイドル状態のスレッドは、ビジー状態のスレッドからタスクを盗みます。
短所:
- 無制限のキューサイズは有害です。
これらのすべてのExecutorServiceには、無制限のキューという共通の欠点が1つあります。これは、 ThreadPoolExecutorで対処されます。
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)
ThreadPoolExecutor
、
- スレッドプールサイズを動的に制御する
-
BlockingQueue
の容量を設定する - キューがいっぱいになると
RejectionExecutionHander
定義する -
CustomThreadFactory
を使用してスレッド作成時に追加機能を追加する(public Thread newThread(Runnable r)
スレッドプールの使用
スレッドプールは、主にExecutorService
メソッドを呼び出すために使用されます。
次のメソッドを使用して、実行する作業を送信できます。
方法 | 説明 |
---|---|
submit | 提出された作品を実行し、結果を得るために使用できる将来を返す |
execute | 将来的に何らかの戻り値を得ることなくタスクを実行する |
invokeAll | タスクのリストを実行し、未来のリストを返す |
invokeAny | すべてを実行するが、成功した結果のみを返す(例外なし) |
スレッドプールを終了すると、 shutdown()
を呼び出してスレッドプールを終了できます。保留中のすべてのタスクが実行されます。すべてのタスクが実行されるのを待つために、 awaitTermination
またはisShutdown()
ループすることができます。