Java Language
Исполнители, Исполнительные службы и пулы потоков
Поиск…
Вступление
Интерфейс Executor в Java обеспечивает способ расцепления заданий от механики того, как будет выполняться каждая задача, включая сведения о потреблении потоков, планировании и т. Д. Обычно используется Executor вместо явного создания потоков. С Executors разработчикам не придется значительно переписывать свой код, чтобы иметь возможность легко настраивать политику выполнения своих задач.
замечания
Ловушки
- Когда вы планируете задачу для повторного выполнения, в зависимости от используемого ScheduledExecutorService, ваша задача может быть приостановлена с любого последующего исполнения, если выполнение вашей задачи вызывает исключение, которое не обрабатывается. См. Mother F ** k ScheduledExecutorService!
Огонь и Забыть - Управляемые задачи
Исполнители принимают java.lang.Runnable
который содержит (потенциально вычислительный или иной долговременный или тяжелый) код для запуска в другом потоке.
Использование:
Executor exec = anExecutor;
exec.execute(new Runnable() {
@Override public void run() {
//offloaded work, no need to get result back
}
});
Обратите внимание, что с этим исполнителем у вас нет средств для возврата какого-либо вычисленного значения.
С Java 8 можно использовать lambdas, чтобы сократить пример кода.
Executor exec = anExecutor;
exec.execute(() -> {
//offloaded work, no need to get result back
});
ThreadPoolExecutor
Обычным Исполнителем является ThreadPoolExecutor
, который занимается обработкой потоков. Вы можете настроить минимальное количество потоков, которые исполнитель всегда должен поддерживать, когда не так много делать (это называется размером ядра) и максимальный размер потока, к которому может увеличиваться пул, если есть больше работы. Как только рабочая нагрузка снижается, пул медленно уменьшает количество потоков, пока не достигнет минимального размера.
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1, // keep at least one thread ready,
// even if no Runnables are executed
5, // at most five Runnables/Threads
// executed in parallel
1, TimeUnit.MINUTES, // idle Threads terminated after one
// minute, when min Pool size exceeded
new ArrayBlockingQueue<Runnable>(10)); // outstanding Runnables are kept here
pool.execute(new Runnable() {
@Override public void run() {
//code to run
}
});
Примечание. Если вы сконфигурируете ThreadPoolExecutor
с неограниченной очередью, то количество потоков не будет превышать corePoolSize
поскольку новые потоки создаются только в том случае, если очередь заполнена:
ThreadPoolExecutor со всеми параметрами:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)
от JavaDoc
Если существует больше, чем corePoolSize, но меньше потоков MaximumPoolSize, новый поток будет создан только в том случае, если очередь заполнена.
Преимущества:
BlockingQueue можно контролировать, а сценарии отсутствия памяти можно избежать. Производительность приложения не будет снижена с ограниченным ограниченным размером очереди.
Вы можете использовать существующие или создавать новые политики Отклонения Обработчика.
В по умолчанию ThreadPoolExecutor.AbortPolicy обработчик выдает исключение ExjectExceptionException после отклонения.
В
ThreadPoolExecutor.CallerRunsPolicy
поток, запускающий выполнение, запускает задачу. Это обеспечивает простой механизм управления с обратной связью, который замедляет скорость подачи новых задач.В
ThreadPoolExecutor.DiscardPolicy
задача, которая не может быть выполнена, просто удаляется.В
ThreadPoolExecutor.DiscardOldestPolicy
, если исполнитель не закрыт, задача во главе рабочей очереди отбрасывается, а затем выполняется повторное выполнение (что может снова потерпеть неудачу, в результате чего это будет повторяться).
Пользовательский
ThreadFactory
может быть настроен, что полезно:- Чтобы задать более описательное имя потока
- Чтобы установить статус демона нити
- Чтобы установить приоритет потока
Вот пример использования ThreadPoolExecutor
Извлечение значения из вычисления - Callable
Если ваше вычисление дает некоторое возвращаемое значение, которое требуется позже, простая задача Runnable недостаточна. Для таких случаев вы можете использовать ExecutorService.submit(
Callable
<T>)
который возвращает значение после завершения выполнения.
Служба вернет Future
которое вы можете использовать для получения результата выполнения задачи.
// Submit a callable for execution
ExecutorService pool = anExecutorService;
Future<Integer> future = pool.submit(new Callable<Integer>() {
@Override public Integer call() {
//do some computation
return new Random().nextInt();
}
});
// ... perform other tasks while future is executed in a different thread
Когда вам нужно получить результат в будущем, вызовите future.get()
Подождите бесконечно для будущего, чтобы закончить результат.
try { // Blocks current thread until future is completed Integer result = future.get(); catch (InterruptedException || ExecutionException e) { // handle appropriately }
Подождите, пока закончится будущее, но не больше указанного времени.
try { // Blocks current thread for a maximum of 500 milliseconds. // If the future finishes before that, result is returned, // otherwise TimeoutException is thrown. Integer result = future.get(500, TimeUnit.MILLISECONDS); catch (InterruptedException || ExecutionException || TimeoutException e) { // handle appropriately }
Если результат запланированной или запущенной задачи больше не требуется, вы можете вызвать Future.cancel(boolean)
чтобы отменить его.
- Вызов
cancel(false)
просто удалит задачу из очереди задач для запуска. - Вызов
cancel(true)
также прервет задачу, если она в данный момент запущена.
Планирование задач для запуска в определенное время, после задержки или многократного
Класс ScheduledExecutorService
предоставляет методы для планирования отдельных или повторяющихся задач несколькими способами. В следующем примере кода предполагается, что pool
был объявлен и инициализирован следующим образом:
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
В дополнение к обычным методам ExecutorService
API ScheduledExecutorService
добавляет 4 метода, которые планируют задачи и возвращают объекты ScheduledFuture
. Последний может использоваться для получения результатов (в некоторых случаях) и отмены задач.
Запуск задачи после фиксированной задержки
В следующем примере планируется запуск задачи через десять минут.
ScheduledFuture<Integer> future = pool.schedule(new Callable<>() {
@Override public Integer call() {
// do something
return 42;
}
},
10, TimeUnit.MINUTES);
Запуск заданий с фиксированной скоростью
В следующем примере планируется запуск задачи через десять минут, а затем несколько раз со скоростью один раз в минуту.
ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
@Override public void run() {
// do something
}
},
10, 1, TimeUnit.MINUTES);
Выполнение задачи будет продолжаться в соответствии с графиком до тех пор, пока pool
будет закрыт, future
будет отменено или одна из задач встретит исключение.
Гарантируется, что задачи, запланированные по заданному вызову scheduledAtFixedRate
, не будут перекрываться во времени. Если задача занимает больше времени, чем заданный период, то последующие и последующие задания могут начинаться с опоздания.
Запуск задач с фиксированной задержкой
В следующем примере планируется запуск задачи через десять минут, а затем несколько раз с задержкой в одну минуту между завершением одной задачи и последующим запуском.
ScheduledFuture<?> future = pool.scheduleWithFixedDelay(new Runnable() {
@Override public void run() {
// do something
}
},
10, 1, TimeUnit.MINUTES);
Выполнение задачи будет продолжаться в соответствии с графиком до тех пор, пока pool
будет закрыт, future
будет отменено или одна из задач встретит исключение.
Отказ от отказа
Если
- вы пытаетесь отправить задания на выключение Executor или
- очередь является насыщенной (возможно только с ограниченными) и достигается максимальное количество потоков,
RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)
.
Поведение по умолчанию заключается в том, что вы получите исключение RejectedExecutionException, которое вызывается вызывающим. Но есть более предопределенное поведение:
- ThreadPoolExecutor.AbortPolicy (по умолчанию выбрасывает REE)
- ThreadPoolExecutor.CallerRunsPolicy (выполняет задачу по потоку вызывающего абонента - блокирует ее )
- ThreadPoolExecutor.DiscardPolicy (тихо отмените задачу)
- ThreadPoolExecutor.DiscardOldestPolicy (тихо отбрасывает старую задачу в очереди и повторяет выполнение новой задачи)
Вы можете установить их с помощью одного из конструкторов ThreadPool:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) // <--
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) // <--
Вы можете также реализовать свое собственное поведение, расширив интерфейс RejectedExecutionHandler :
void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
Отправить () vs execute () отличия обработки исключений
Обычно команда execute () используется для вызовов огня и забвения (без необходимости анализа результата), а команда submit () используется для анализа результата объекта Future.
Мы должны знать о ключевой разнице механизмов обработки исключений между этими двумя командами.
Исключения из submit () проглатываются каркасом, если вы их не поймали.
Пример кода, чтобы понять разницу:
Случай 1: подайте команду Runnable with execute (), которая сообщает об исключении.
import java.util.concurrent.*;
import java.util.*;
public class ExecuteSubmitDemo {
public ExecuteSubmitDemo() {
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(2);
//ExtendedExecutor service = new ExtendedExecutor();
for (int i = 0; i < 2; i++){
service.execute(new Runnable(){
public void run(){
int a = 4, b = 0;
System.out.println("a and b=" + a + ":" + b);
System.out.println("a/b:" + (a / b));
System.out.println("Thread Name in Runnable after divide by zero:"+Thread.currentThread().getName());
}
});
}
service.shutdown();
}
public static void main(String args[]){
ExecuteSubmitDemo demo = new ExecuteSubmitDemo();
}
}
class ExtendedExecutor extends ThreadPoolExecutor {
public ExtendedExecutor() {
super(1, 1, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));
}
// ...
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Object result = ((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
System.out.println(t);
}
}
выход:
creating service
a and b=4:0
a and b=4:0
Exception in thread "pool-1-thread-1" Exception in thread "pool-1-thread-2" java.lang.ArithmeticException: / by zero
at ExecuteSubmitDemo$1.run(ExecuteSubmitDemo.java:15)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
java.lang.ArithmeticException: / by zero
at ExecuteSubmitDemo$1.run(ExecuteSubmitDemo.java:15)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Случай 2: Заменить execute () с помощью submit (): service.submit(new Runnable(){
В этом случае Исключения проглатываются инфраструктурой, так как метод run () не обнаружил их явно.
выход:
creating service
a and b=4:0
a and b=4:0
Случай 3: измените newFixedThreadPool на ExtendedExecutor
//ExecutorService service = Executors.newFixedThreadPool(2);
ExtendedExecutor service = new ExtendedExecutor();
выход:
creating service
a and b=4:0
java.lang.ArithmeticException: / by zero
a and b=4:0
java.lang.ArithmeticException: / by zero
Я продемонстрировал этот пример, чтобы охватить две темы: используйте свой собственный ThreadPoolExecutor и обработайте Exectpion с помощью пользовательского ThreadPoolExecutor.
Другое простое решение вышеуказанной проблемы: когда вы используете обычную команду ExecutorService & submit, получите объект Future из submit () команды call get () API on Future. Вызовите три исключения, которые были указаны в реализации метода afterExecute. Преимущество пользовательского ThreadPoolExecutor по этому подходу: вы должны обрабатывать механизм обработки исключений только в одном месте - Custom ThreadPoolExecutor.
Примеры использования для различных типов конструкций параллелизма
ExecutorService executor = Executors.newFixedThreadPool(50);
Он прост и удобен в использовании. Он скрывает детали уровня
ThreadPoolExecutor
.Я предпочитаю это, когда количество задач
Callable/Runnable
малочисленно, а нагромождение задач в неограниченной очереди не увеличивает память и ухудшает производительность системы. Если у вас есть ограничения наCPU/Memory
, я предпочитаю использоватьThreadPoolExecutor
с ограничениями емкости иRejectedExecutionHandler
для обработки отказа от задач.CountDownLatch
будет инициализирован с заданным счетом. Этот счет уменьшается с помощью вызовов методаcountDown()
. Темы, ожидающие, что этот счет достигнет нуля, могут вызвать один из методовawait()
. Вызовawait()
блокирует поток до тех пор, пока счетчик не достигнет нуля. Этот класс позволяет потоку java ждать, пока другой набор потоков завершит выполнение своих задач.Случаи применения:
Достижение максимального параллелизма. Иногда мы хотим начать несколько потоков одновременно для достижения максимального параллелизма
Подождите N потоков до завершения перед запуском
Обнаружение взаимоблокировки.
ThreadPoolExecutor : он обеспечивает больше контроля. Если приложение ограничено количеством ожидающих задач Runnable / Callable, вы можете использовать ограниченную очередь, установив максимальную емкость. Когда очередь достигает максимальной емкости, вы можете определить RejectionHandler. Java предоставляет четыре типа политик
RejectedExecutionHandler
.ThreadPoolExecutor.AbortPolicy
, обработчик выдает исключение ExjectExceptionException после отклонения.ThreadPoolExecutor.CallerRunsPolicy`, поток, который вызывает выполнение выполнения, запускает задачу. Это обеспечивает простой механизм управления с обратной связью, который замедляет скорость подачи новых задач.
В
ThreadPoolExecutor.DiscardPolicy
задача, которая не может быть выполнена, просто удаляется.ThreadPoolExecutor.DiscardOldestPolicy
, если исполнитель не закрыт, задача во главе рабочей очереди отбрасывается, а затем выполняется повторное выполнение (что может снова потерпеть неудачу, в результате чего это будет повторяться).
Если вы хотите имитировать поведение CountDownLatch
, вы можете использовать invokeAll()
.
Еще один механизм, который вы не указали, - ForkJoinPool
ForkJoinPool
был добавлен в Java на Java 7.ForkJoinPool
похож на JavaExecutorService
но с одним отличием.ForkJoinPool
упрощает задачу разделения работы на более мелкие задачи, которые затем отправляются вForkJoinPool
.ForkJoinPool
задачи происходит вForkJoinPool
когда потоки рабочего потока крадут задачи из очереди занятых рабочих потоков.Java 8 представила еще один API в ExecutorService для создания пула кражи работы. Вам не нужно создавать
RecursiveTask
иRecursiveAction
но все равно использоватьForkJoinPool
.public static ExecutorService newWorkStealingPool()
Создает пул потоков, обрабатывающих работу, используя все доступные процессоры в качестве целевого уровня параллелизма.
По умолчанию в качестве параметра потребуется количество ядер процессора.
Все эти четыре механизма дополняют друг друга. В зависимости от уровня детализации, который вы хотите контролировать, вы должны выбрать правильные.
Подождите завершения всех задач в ExecutorService
Давайте рассмотрим различные варианты ожидания выполнения задач, представленных Исполнителю
- ExecutorService
invokeAll()
Выполняет заданные задачи, возвращая список фьючерсов, подтверждающих их статус и результаты, когда все будет завершено.
Пример:
import java.util.concurrent.*;
import java.util.*;
public class InvokeAllDemo{
public InvokeAllDemo(){
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List<MyCallable> futureList = new ArrayList<MyCallable>();
for (int i = 0; i < 10; i++){
MyCallable myCallable = new MyCallable((long)i);
futureList.add(myCallable);
}
System.out.println("Start");
try{
List<Future<Long>> futures = service.invokeAll(futureList);
} catch(Exception err){
err.printStackTrace();
}
System.out.println("Completed");
service.shutdown();
}
public static void main(String args[]){
InvokeAllDemo demo = new InvokeAllDemo();
}
class MyCallable implements Callable<Long>{
Long id = 0L;
public MyCallable(Long val){
this.id = val;
}
public Long call(){
// Add your business logic
return id;
}
}
}
Вспомогательное средство синхронизации, которое позволяет одному или нескольким потокам дождаться завершения набора операций в других потоках.
CountDownLatch инициализируется с заданным подсчетом. Методы ожидания выполняются до тех пор, пока текущий счетчик не достигнет нуля из-за вызовов метода
countDown()
, после чего все ожидающие потоки освобождаются, и любые последующие вызовы ожидания возвращаются немедленно. Это одноразовый феномен - счетчик не может быть сброшен. Если вам нужна версия, которая сбрасывает счетчик, рассмотрите возможность использования CyclicBarrier .ForkJoinPool или
newWorkStealingPool()
для исполнителейИтерация через все объекты
Future
созданные после отправки вExecutorService
Рекомендуемый способ закрытия страницы документации Oracle по адресу : ExecutorService :
void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); }
shutdown():
инициирует упорядоченное завершение работы, в котором выполняются ранее поставленные задачи, но новые задачи не будут приняты.shutdownNow():
пытается остановить все активное выполнение задач, останавливает обработку ожидающих задач и возвращает список задач, ожидающих выполнения.В приведенном выше примере, если ваши задачи занимают больше времени для завершения, вы можете изменить, если условие на условие
замещать
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
с
while(!pool.awaitTermination(60, TimeUnit.SECONDS)) { Thread.sleep(60000);
}
Случаи использования различных типов ExecutorService
Исполнители возвращают разный тип ThreadPools, удовлетворяющий конкретным потребностям.
public static ExecutorService newSingleThreadExecutor()
Создает Исполнителя, который использует один рабочий поток, работающий с неограниченной очередью
Существует разница между
newFixedThreadPool(1)
иnewSingleThreadExecutor()
как сообщает java-документ для последнего:В отличие от эквивалентного newFixedThreadPool (1), возвращенный исполнитель гарантированно не может быть перенастроен для использования дополнительных потоков.
Это означает, что
newFixedThreadPool
можно переконфигурировать позже в программе:((ThreadPoolExecutor) fixedThreadPool).setMaximumPoolSize(10)
Это невозможно дляnewSingleThreadExecutor
Случаи применения:
- Вы хотите выполнить поставленные задачи в последовательности.
- Вам нужен только один поток для обработки всего вашего запроса
Минусы:
- Неограниченная очередь вредна
public static ExecutorService newFixedThreadPool(int nThreads)
Создает пул потоков, который повторно использует фиксированное количество потоков, работающих с общей неограниченной очередью. В любой момент, в большинстве случаев nThreads будут активными задачами обработки. Если дополнительные задачи передаются, когда все потоки активны, они будут ждать в очереди до тех пор, пока поток не будет доступен
Случаи применения:
- Эффективное использование доступных ядер. Настроить
nThreads
какRuntime.getRuntime().availableProcessors()
- Когда вы решите, что количество потоков не должно превышать число в пуле потоков
Минусы:
- Неограниченная очередь является вредной.
- Эффективное использование доступных ядер. Настроить
public static ExecutorService newCachedThreadPool()
Создает пул потоков, который при необходимости создает новые потоки, но будет повторно использовать ранее созданные потоки, когда они будут доступны
Случаи применения:
- Для недолговечных асинхронных задач
Минусы:
- Неограниченная очередь является вредной.
- Каждая новая задача создаст новый поток, если все существующие потоки заняты. Если задача занимает много времени, будет создано больше потоков, что ухудшит производительность системы. Альтернатива в этом случае:
newFixedThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
Создает пул потоков, который может планировать выполнение команд после заданной задержки или выполнять их периодически.
Случаи применения:
- Обработка повторяющихся событий с задержками, которые будут происходить в будущем в определенный промежуток времени
Минусы:
- Неограниченная очередь является вредной.
5.
public static ExecutorService newWorkStealingPool()
Создает пул потоков, обрабатывающих работу, используя все доступные процессоры в качестве целевого уровня параллелизма
Случаи применения:
- Для деления и преодоления типа проблем.
- Эффективное использование простаивающих потоков. Холостые потоки крадут задачи из занятых потоков.
Минусы:
- Неограниченный размер очереди вреден.
Вы можете увидеть один из общих недостатков во всех этих ExecutorService: неограниченная очередь. Это будет рассмотрено с помощью ThreadPoolExecutor
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)
С ThreadPoolExecutor
вы можете
- Размер пула потоков управления динамически
- Установите емкость для
BlockingQueue
- Определить
RejectionExecutionHander
когда очередь заполнена -
CustomThreadFactory
для добавления дополнительных функций во время создания темы(public Thread newThread(Runnable r)
Использование пулов потоков
В пулах потоков в основном используются методы вызова в ExecutorService
.
Следующие способы могут быть использованы для отправки работ для выполнения:
метод | Описание |
---|---|
submit | Выполняет представленную работу и возвращает будущее, которое может быть использовано для получения результата |
execute | Выполнять задачу в будущем, не получая никакого возвращаемого значения |
invokeAll | Выполните список задач и верните список фьючерсов |
invokeAny | Выполняет все, но возвращает только результат успешной (без исключений) |
Как только вы закончите с пулом потоков, вы можете вызвать shutdown()
чтобы завершить пул потоков. Это выполняет все ожидающие задачи. Чтобы дождаться выполнения всех задач, вы можете выполнить цикл вокруг awaitTermination
или isShutdown()
.