Java Language
Executor, ExecutorService och trådpooler
Sök…
Introduktion
Executor- gränssnittet i Java tillhandahåller ett sätt att avkoppla inlämning av uppgifter från mekaniken för hur varje uppgift ska köras, inklusive detaljer om trådanvändning, schemaläggning, etc. En Executor används normalt istället för att uttryckligen skapa trådar. Med Executors behöver utvecklare inte skriva om sin kod betydligt för att enkelt kunna ställa in programmets policy för att utföra uppgifter.
Anmärkningar
Fallgropar gropar~~POS=HEADCOMP
- När du planerar en uppgift för upprepad körning, beroende på vilken ScheduledExecutorService som används, kan din uppgift avbrytas från ytterligare utförande om en exekvering av din uppgift orsakar ett undantag som inte hanteras. Se Mor F ** k ScheduledExecutorService!
Eld och glöm - körbara uppgifter
Exekutorer accepterar en java.lang.Runnable
som innehåller (potentiellt beräkningsmässigt eller på annat sätt långvarig eller tung) kod som ska köras i en annan tråd.
Användning skulle vara:
Executor exec = anExecutor;
exec.execute(new Runnable() {
@Override public void run() {
//offloaded work, no need to get result back
}
});
Observera att du inte har några sätt att få tillbaka beräknad värde med den här exekutorn.
Med Java 8 kan man använda lambdas för att förkorta kodexemplet.
Executor exec = anExecutor;
exec.execute(() -> {
//offloaded work, no need to get result back
});
ThreadPoolExecutor
En vanlig Executor som används är ThreadPoolExecutor
, som tar hand om trådhantering. Du kan konfigurera den minimala mängden trådar som exekutorn alltid måste underhålla när det inte finns mycket att göra (det kallas kärnstorlek) och en maximal trådstorlek som poolen kan växa till, om det finns mer arbete att göra. När arbetsbelastningen minskar, minskar poolen långsamt trådantalet tills den når minstorlek.
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1, // keep at least one thread ready,
// even if no Runnables are executed
5, // at most five Runnables/Threads
// executed in parallel
1, TimeUnit.MINUTES, // idle Threads terminated after one
// minute, when min Pool size exceeded
new ArrayBlockingQueue<Runnable>(10)); // outstanding Runnables are kept here
pool.execute(new Runnable() {
@Override public void run() {
//code to run
}
});
Obs! Om du konfigurerar ThreadPoolExecutor
med en obegränsad kö, kommer trådantalet inte att överstiga corePoolSize
eftersom nya trådar bara skapas om kön är full:
ThreadPoolExecutor med alla parametrar:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)
från JavaDoc
Om det finns fler än corePoolSize men mindre än maximumPoolSize-trådarna körs, skapas en ny tråd endast om kön är full.
fördelar:
BlockingQueue-storlek kan kontrolleras och scenarier utanför minnet kan undvikas. Applikationsprestanda försämras inte med begränsad avgränsad köstorlek.
Du kan använda befintliga eller skapa nya riktlinjer för avslag.
I standard ThreadPoolExecutor.AbortPolicy kastar hanteraren en runtime RejectedExecutionException vid avslag.
I
ThreadPoolExecutor.CallerRunsPolicy
tråden som åberopar köra sig själv. Detta tillhandahåller en enkel feedbackkontrollmekanism som bromsar hastigheten för att nya uppgifter skickas in.I
ThreadPoolExecutor.DiscardPolicy
en uppgift som inte kan köras helt enkelt.I
ThreadPoolExecutor.DiscardOldestPolicy
, om exekutorn inte är avstängd, tappas uppgiften i spetsen för arbetskön och sedan körs exekveringen igen (vilket kan misslyckas igen, vilket gör att detta upprepas.)
Custom
ThreadFactory
kan konfigureras, vilket är användbart:- För att ställa in ett mer beskrivande trådnamn
- Ställer in status för tråddemon
- Ställa in trådprioritet
Här är ett exempel på hur du använder ThreadPoolExecutor
Hämta värde från beräkningen - Callable
Om din beräkning ger ett visst returvärde som senare krävs räcker det inte med en enkel körbar uppgift. I sådana fall kan du använda ExecutorService.submit(
Callable
<T>)
som returnerar ett värde efter att exekveringen är klar.
Tjänsten kommer att returnera en Future
som du kan använda för att hämta resultatet av utförandet av uppgiften.
// Submit a callable for execution
ExecutorService pool = anExecutorService;
Future<Integer> future = pool.submit(new Callable<Integer>() {
@Override public Integer call() {
//do some computation
return new Random().nextInt();
}
});
// ... perform other tasks while future is executed in a different thread
När du behöver få framtidens resultat kan du ringa future.get()
Vänta på obestämd tid för framtiden att avsluta med ett resultat.
try { // Blocks current thread until future is completed Integer result = future.get(); catch (InterruptedException || ExecutionException e) { // handle appropriately }
Vänta tills framtiden är klar, men inte längre än angiven tid.
try { // Blocks current thread for a maximum of 500 milliseconds. // If the future finishes before that, result is returned, // otherwise TimeoutException is thrown. Integer result = future.get(500, TimeUnit.MILLISECONDS); catch (InterruptedException || ExecutionException || TimeoutException e) { // handle appropriately }
Om resultatet av en schemalagd eller körande uppgift inte längre krävs kan du ringa Future.cancel(boolean)
att avbryta det.
- Om du
cancel(false)
kommer du bara att ta bort uppgiften från kön med uppgifter som ska köras. - Samtalsavbrytning
cancel(true)
kommer också att avbryta uppgiften om den för närvarande körs.
Schemaläggningsuppgifter att köras vid en fast tid, efter en försening eller upprepade gånger
Klassen ScheduledExecutorService
tillhandahåller metoder för att schemalägga enstaka eller upprepade uppgifter på ett antal sätt. Följande kodprov antar att pool
har deklarerats och initialiserats enligt följande:
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
Förutom de normala ExecutorService
metoderna lägger ScheduledExecutorService
API till 4 metoder som schemaläggar uppgifter och returnerar ScheduledFuture
objekt. Det senare kan användas för att hämta resultat (i vissa fall) och avbryta uppgifter.
Starta en uppgift efter en fast fördröjning
Följande exempel planerar en uppgift att starta efter tio minuter.
ScheduledFuture<Integer> future = pool.schedule(new Callable<>() {
@Override public Integer call() {
// do something
return 42;
}
},
10, TimeUnit.MINUTES);
Starta uppgifter med en fast takt
Följande exempel planerar en uppgift att starta efter tio minuter, och sedan upprepade gånger med en hastighet en gång var minut.
ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() {
@Override public void run() {
// do something
}
},
10, 1, TimeUnit.MINUTES);
Uppgiftens körning kommer att fortsätta enligt schemat tills pool
stängs av, future
avbryts eller någon av uppgifterna möter ett undantag.
Det är garanterat att uppgifterna schemalagda av ett givet scheduledAtFixedRate
samtal inte överlappar i tid. Om en uppgift tar längre tid än den föreskrivna perioden, kan nästa och efterföljande utförande av uppgifter starta sent.
Starta uppgifter med en fast fördröjning
Följande exempel planerar en uppgift att starta efter tio minuter, och sedan upprepade gånger med en försening på en minut mellan en uppgift som slutar och den nästa startar.
ScheduledFuture<?> future = pool.scheduleWithFixedDelay(new Runnable() {
@Override public void run() {
// do something
}
},
10, 1, TimeUnit.MINUTES);
Uppgiftens körning kommer att fortsätta enligt schemat tills pool
stängs av, future
avbryts eller någon av uppgifterna möter ett undantag.
Hantera avvisad körning
Om
- du försöker skicka uppgifter till en avstängningsexekutör eller
- kön är mättad (endast möjligt med avgränsade) och maximalt antal trådar har uppnåtts,
RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)
kommer att kallas.
Standardbeteendet är att du får en RejectedExecutionException kastad på den som ringer. Men det finns mer fördefinierade beteenden tillgängliga:
- ThreadPoolExecutor.AbortPolicy (standard kommer att kasta REE)
- ThreadPoolExecutor.CallerRunsPolicy (kör uppgiften på den som ringer tråd - blockerar den )
- ThreadPoolExecutor.DiscardPolicy (tyst kassera uppgift)
- ThreadPoolExecutor.DiscardOldestPolicy (tyst kasta den äldsta uppgiften i kö och försök att utföra den nya uppgiften igen)
Du kan ställa in dem med en av ThreadPool- konstruktörerna :
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) // <--
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) // <--
Du kan också implementera ditt eget beteende genom att utöka gränssnittet RejectedExecutionHandler :
void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
skicka () vs exekvera () skillnader i hantering av undantag
Generellt kör () kommando används för brand och glöm samtal (utan att behöva analysera resultatet) och kommandot skicka () används för att analysera resultatet av Future-objektet.
Vi bör vara medvetna om den viktigaste skillnaden i mekanismerna för undantagshantering mellan dessa två kommandon.
Undantag från skicka () sväljs av ramverk om du inte fångade dem.
Kodsexempel för att förstå skillnaden:
Fall 1: skicka kommandot Runnable with execute () som rapporterar undantaget.
import java.util.concurrent.*;
import java.util.*;
public class ExecuteSubmitDemo {
public ExecuteSubmitDemo() {
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(2);
//ExtendedExecutor service = new ExtendedExecutor();
for (int i = 0; i < 2; i++){
service.execute(new Runnable(){
public void run(){
int a = 4, b = 0;
System.out.println("a and b=" + a + ":" + b);
System.out.println("a/b:" + (a / b));
System.out.println("Thread Name in Runnable after divide by zero:"+Thread.currentThread().getName());
}
});
}
service.shutdown();
}
public static void main(String args[]){
ExecuteSubmitDemo demo = new ExecuteSubmitDemo();
}
}
class ExtendedExecutor extends ThreadPoolExecutor {
public ExtendedExecutor() {
super(1, 1, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));
}
// ...
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Object result = ((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
System.out.println(t);
}
}
produktion:
creating service
a and b=4:0
a and b=4:0
Exception in thread "pool-1-thread-1" Exception in thread "pool-1-thread-2" java.lang.ArithmeticException: / by zero
at ExecuteSubmitDemo$1.run(ExecuteSubmitDemo.java:15)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
java.lang.ArithmeticException: / by zero
at ExecuteSubmitDemo$1.run(ExecuteSubmitDemo.java:15)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Fall 2: Ersätt exekvering () med skicka (): service.submit(new Runnable(){
I detta fall sväljer undantag av ramverk sedan metoden för körning) inte fångade dem uttryckligen.
produktion:
creating service
a and b=4:0
a and b=4:0
Fall 3: Ändra den nya FixedThreadPool till ExtendedExecutor
//ExecutorService service = Executors.newFixedThreadPool(2);
ExtendedExecutor service = new ExtendedExecutor();
produktion:
creating service
a and b=4:0
java.lang.ArithmeticException: / by zero
a and b=4:0
java.lang.ArithmeticException: / by zero
Jag har visat detta exempel för att täcka två ämnen: Använd din anpassade ThreadPoolExecutor och hantera Exectpion med anpassad ThreadPoolExecutor.
En annan enkel lösning på ovanstående problem: När du använder normalt ExecutorService & skicka kommando, hämta Future-objektet från skicka () kommandosamtalet få () API på Future. Fånga de tre undantagen, som har citerats i implementering av efterexekutmetod. Fördel med anpassad ThreadPoolExecutor över denna metod: Du måste hantera undantagshanteringsmekanismen på bara ett ställe - Custom ThreadPoolExecutor.
Använd fall för olika typer av samtidighetskonstruktioner
ExecutorService executor = Executors.newFixedThreadPool(50);
Det är enkelt och enkelt att använda. Det döljer detaljer på låg nivå för
ThreadPoolExecutor
.Jag föredrar den här när antalet
Callable/Runnable
uppgifter är små i antal och stapling av uppgifter i obegränsad kö inte ökar minnet och försämrar systemets prestanda. Om du harCPU/Memory
minnesbegränsningar föredrar jag att användaThreadPoolExecutor
med kapacitetsbegränsningar &RejectedExecutionHandler
att hantera avslag på uppgifter.CountDownLatch
kommer att initialiseras med ett givet antal. Detta räkning minskas av samtal tillcountDown()
. Trådar som väntar på att denna räkning når noll kan ringa en av metoderna för attawait()
. Samtalawait()
blockerar tråden tills räkningen når noll. Denna klass gör det möjligt för en java-tråd att vänta tills andra uppsättningar av trådar slutför sina uppgifter.Använd fall:
Uppnå maximal parallellism: Ibland vill vi starta ett antal trådar på samma gång för att uppnå maximal parallellitet
Vänta N-trådarna för att slutföras innan körningen startas
Upplåsning av dödlås.
ThreadPoolExecutor : Det ger mer kontroll. Om applikationen begränsas av antalet pågående Runnable / Callable-uppgifter kan du använda avgränsad kö genom att ställa in maxkapaciteten. När kön når maximal kapacitet kan du definiera RejectionHandler. Java tillhandahåller fyra typer av
RejectedExecutionHandler
policyer .ThreadPoolExecutor.AbortPolicy
, hanteraren kastar en runtime RejectedExecutionException vid avslag.ThreadPoolExecutor.CallerRunsPolicy`, tråden som åberopar köra sig själv kör uppgiften. Detta tillhandahåller en enkel feedbackkontrollmekanism som bromsar hastigheten för att nya uppgifter skickas in.
I
ThreadPoolExecutor.DiscardPolicy
en uppgift som inte kan köras helt enkelt.ThreadPoolExecutor.DiscardOldestPolicy
, om exekutorn inte stängs av, släcks uppgiften i spetsen för arbetskön och sedan körs exekveringen igen (vilket kan misslyckas igen, vilket gör att detta upprepas.)
Om du vill simulera CountDownLatch
beteende kan du använda invokeAll()
-metoden.
En annan mekanism som du inte citerade är ForkJoinPool
ForkJoinPool
lades till Java i Java 7.ForkJoinPool
liknar JavaExecutorService
men med en skillnad.ForkJoinPool
gör det enkelt för uppgifterna att dela upp sitt arbete i mindre uppgifter som sedan skickas tillForkJoinPool
också. Uppgiftsstjälning sker iForkJoinPool
när fria arbetartrådar stjäl uppgifter från upptagen arbetartrådkö.Java 8 har introducerat ytterligare ett API i ExecutorService för att skapa arbetsstjälpool. Du behöver inte skapa
RecursiveTask
ochRecursiveAction
men kan fortfarande användaForkJoinPool
.public static ExecutorService newWorkStealingPool()
Skapar en arbetsstjälande trådpool med alla tillgängliga processorer som målparallellismnivå.
Som standard tar det antal CPU-kärnor som parameter.
Alla dessa fyra mekanismer är komplementära till varandra. Beroende på graden av granularitet du vill kontrollera måste du välja rätt.
Vänta på att alla uppgifter i ExecutorService har slutförts
Låt oss ta en titt på olika alternativ för att vänta på att de uppgifter som skickats till Executor har slutförts
- ExecutorService
invokeAll()
Utför de givna uppgifterna och returnerar en lista över Futures med status och resultat när allt är klart.
Exempel:
import java.util.concurrent.*;
import java.util.*;
public class InvokeAllDemo{
public InvokeAllDemo(){
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
List<MyCallable> futureList = new ArrayList<MyCallable>();
for (int i = 0; i < 10; i++){
MyCallable myCallable = new MyCallable((long)i);
futureList.add(myCallable);
}
System.out.println("Start");
try{
List<Future<Long>> futures = service.invokeAll(futureList);
} catch(Exception err){
err.printStackTrace();
}
System.out.println("Completed");
service.shutdown();
}
public static void main(String args[]){
InvokeAllDemo demo = new InvokeAllDemo();
}
class MyCallable implements Callable<Long>{
Long id = 0L;
public MyCallable(Long val){
this.id = val;
}
public Long call(){
// Add your business logic
return id;
}
}
}
Ett synkroniseringshjälpmedel som gör att en eller flera trådar kan vänta tills en uppsättning operationer som utförs i andra trådar slutförs.
En CountDownLatch initialiseras med ett givet antal. Väntarmetoderna blockerar tills det aktuella antalet når noll på grund av invokationer av
countDown()
, varefter alla väntetrådar släpps och eventuella efterföljande invokationer av väntar återgår omedelbart. Detta är ett fenomen med en skott - räkningen kan inte återställas. Om du behöver en version som återställer räkningen kan du överväga att använda en CyclicBarrier .ForkJoinPool eller
newWorkStealingPool()
i ExecutorsIterera genom alla
Future
objekt som skapats efter att ha skickats tillExecutorService
Rekommenderat sätt att stänga av från Oracle-dokumentationssidan för ExecutorService :
void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); }
shutdown():
Initierar en ordnad avstängning där tidigare skickade uppgifter utförs, men inga nya uppgifter accepteras.shutdownNow():
I exemplet ovan, om dina uppgifter tar mer tid att slutföra, kan du ändra om villkor till villkor
Byta ut
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
med
while(!pool.awaitTermination(60, TimeUnit.SECONDS)) { Thread.sleep(60000);
}
Använd ärenden för olika typer av ExecutorService
Executors returnerar olika typer av ThreadPools-catering till specifika behov.
public static ExecutorService newSingleThreadExecutor()
Skapar en Executor som använder en enda arbetartråd som fungerar utan en obegränsad kö
Det finns en skillnad mellan
newFixedThreadPool(1)
ochnewSingleThreadExecutor()
som java doc säger för det senare:Till skillnad från den annars likvärdiga newFixedThreadPool (1) garanteras den returnerade exekutören att den inte kan konfigureras för att använda ytterligare trådar.
Vilket innebär att en
newFixedThreadPool
kan konfigureras senare i programmet med:((ThreadPoolExecutor) fixedThreadPool).setMaximumPoolSize(10)
Detta är inte möjligt förnewSingleThreadExecutor
Använd fall:
- Du vill utföra de skickade uppgifterna i en sekvens.
- Du behöver bara en tråd för att hantera alla dina önskemål
Nackdelar:
- Obegränsad kö är skadligt
public static ExecutorService newFixedThreadPool(int nThreads)
Skapar en trådpool som återanvänder ett fast antal trådar som fungerar från en delad obegränsad kö. När som helst kommer nThreads-trådar som mest vara aktiva bearbetningsuppgifter. Om ytterligare uppgifter skickas in när alla trådar är aktiva, väntar de i kön tills en tråd är tillgänglig
Använd fall:
- Effektiv användning av tillgängliga kärnor. Konfigurera
nThreads
somRuntime.getRuntime().availableProcessors()
nThreads
Runtime.getRuntime().availableProcessors()
- När du bestämmer att antalet trådar inte ska överstiga ett antal i trådpoolen
Nackdelar:
- Obegränsad kö är skadligt.
- Effektiv användning av tillgängliga kärnor. Konfigurera
public static ExecutorService newCachedThreadPool()
Skapar en trådpool som skapar nya trådar efter behov, men kommer att återanvända tidigare konstruerade trådar när de är tillgängliga
Använd fall:
- För kortlivade asynkrona uppgifter
Nackdelar:
- Obegränsad kö är skadligt.
- Varje ny uppgift skapar en ny tråd om alla befintliga trådar är upptagna. Om uppgiften tar lång tid kommer fler antal trådar att skapas, vilket kommer att försämra systemets prestanda. Alternativ i det här fallet:
newFixedThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
Skapar en trådpool som kan schemalägga kommandon som ska köras efter en viss försening eller att utföra periodiskt.
Använd fall:
- Hantera återkommande händelser med förseningar, vilket kommer att hända i framtiden vid vissa tider
Nackdelar:
- Obegränsad kö är skadligt.
5.
public static ExecutorService newWorkStealingPool()
Skapar en arbetsstjälande trådpool med alla tillgängliga processorer som målparallellismnivå
Använd fall:
- För att dela och erövra typ av problem.
- Effektiv användning av lediga trådar. Tomgångstrådar stjäl uppgifter från upptagna trådar.
Nackdelar:
- Obegränsad köstorlek är skadlig.
Du kan se en vanlig nackdel i alla dessa ExecutorService: obegränsad kö. Detta kommer att behandlas med ThreadPoolExecutor
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)
Med ThreadPoolExecutor
kan du göra det
- Kontrollera trådens poolstorlek dynamiskt
- Ställ in kapaciteten för
BlockingQueue
- Definiera
RejectionExecutionHander
när kön är full -
CustomThreadFactory
att lägga till lite ytterligare funktionalitet under skapandet av tråd(public Thread newThread(Runnable r)
Använda trådbassänger
Trådpooler används mestadels samtalsmetoder i ExecutorService
.
Följande metoder kan användas för att skicka in arbete för utförande:
Metod | Beskrivning |
---|---|
submit | Utför det insända arbetet och returnera en framtid som kan användas för att få resultatet |
execute | Kör uppgiften någon gång i framtiden utan att få något returvärde |
invokeAll | Kör en lista med uppgifter och returnera en lista med Futures |
invokeAny | Utför alla men returnerar bara resultatet av en som har lyckats (utan undantag) |
När du är klar med trådpoolen kan du ringa shutdown()
att avsluta trådpoolen. Detta kör alla väntande uppgifter. För att vänta på att alla uppgifter ska utföras kan du slinga runt awaitTermination
eller isShutdown()
.