Java Language
Gelijktijdig programmeren (threads)
Zoeken…
Invoering
Concurrent computing is een vorm van computing waarin verschillende berekeningen tegelijkertijd worden uitgevoerd in plaats van opeenvolgend. Java-taal is ontworpen om gelijktijdig programmeren te ondersteunen door het gebruik van threads. Objecten en bronnen zijn toegankelijk via meerdere threads; elke thread kan potentieel toegang krijgen tot elk object in het programma en het programmeerapparaat moet ervoor zorgen dat lees- en schrijftoegang tot objecten correct is gesynchroniseerd tussen threads.
Opmerkingen
Verwant onderwerp (en) op StackOverflow:
Basic Multithreading
Als u veel taken moet uitvoeren en al deze taken niet afhankelijk zijn van het resultaat van de voorgaande taken, kunt u Multithreading voor uw computer gebruiken om al deze taken tegelijkertijd uit te voeren met behulp van meer processors als uw computer dat kan. Dit kan je programma sneller uitvoeren als je een aantal grote onafhankelijke taken hebt.
class CountAndPrint implements Runnable {
private final String name;
CountAndPrint(String name) {
this.name = name;
}
/** This is what a CountAndPrint will do */
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
System.out.println(this.name + ": " + i);
}
}
public static void main(String[] args) {
// Launching 4 parallel threads
for (int i = 1; i <= 4; i++) {
// `start` method will call the `run` method
// of CountAndPrint in another thread
new Thread(new CountAndPrint("Instance " + i)).start();
}
// Doing some others tasks in the main Thread
for (int i = 0; i < 10000; i++) {
System.out.println("Main: " + i);
}
}
}
De code van de uitvoeringsmethode van de verschillende CountAndPrint
instanties wordt in niet-voorspelbare volgorde uitgevoerd. Een fragment van een voorbeelduitvoering kan er zo uitzien:
Instance 4: 1
Instance 2: 1
Instance 4: 2
Instance 1: 1
Instance 1: 2
Main: 1
Instance 4: 3
Main: 2
Instance 3: 1
Instance 4: 4
...
Producer-Consumer
Een eenvoudig voorbeeld van een probleemoplossing tussen producent en consument. Merk op dat JDK-klassen ( AtomicBoolean
en BlockingQueue
) worden gebruikt voor synchronisatie, waardoor de kans op het maken van een ongeldige oplossing wordt verkleind. Raadpleeg Javadoc voor verschillende soorten BlockingQueue ; het kiezen van een andere implementatie kan het gedrag van dit voorbeeld (zoals DelayQueue of Priority Queue ) drastisch veranderen.
public class Producer implements Runnable {
private final BlockingQueue<ProducedData> queue;
public Producer(BlockingQueue<ProducedData> queue) {
this.queue = queue;
}
public void run() {
int producedCount = 0;
try {
while (true) {
producedCount++;
//put throws an InterruptedException when the thread is interrupted
queue.put(new ProducedData());
}
} catch (InterruptedException e) {
// the thread has been interrupted: cleanup and exit
producedCount--;
//re-interrupt the thread in case the interrupt flag is needeed higher up
Thread.currentThread().interrupt();
}
System.out.println("Produced " + producedCount + " objects");
}
}
public class Consumer implements Runnable {
private final BlockingQueue<ProducedData> queue;
public Consumer(BlockingQueue<ProducedData> queue) {
this.queue = queue;
}
public void run() {
int consumedCount = 0;
try {
while (true) {
//put throws an InterruptedException when the thread is interrupted
ProducedData data = queue.poll(10, TimeUnit.MILLISECONDS);
// process data
consumedCount++;
}
} catch (InterruptedException e) {
// the thread has been interrupted: cleanup and exit
consumedCount--;
//re-interrupt the thread in case the interrupt flag is needeed higher up
Thread.currentThread().interrupt();
}
System.out.println("Consumed " + consumedCount + " objects");
}
}
public class ProducerConsumerExample {
static class ProducedData {
// empty data object
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue<ProducedData> queue = new ArrayBlockingQueue<ProducedData>(1000);
// choice of queue determines the actual behavior: see various BlockingQueue implementations
Thread producer = new Thread(new Producer(queue));
Thread consumer = new Thread(new Consumer(queue));
producer.start();
consumer.start();
Thread.sleep(1000);
producer.interrupt();
Thread.sleep(10);
consumer.interrupt();
}
}
ThreadLocal gebruiken
Een handig hulpmiddel in Java Concurrency is ThreadLocal
- hiermee kunt u een variabele hebben die uniek is voor een bepaalde thread. Als dezelfde code dus in verschillende threads wordt uitgevoerd, delen deze uitvoeringen de waarde niet, maar heeft elke thread een eigen variabele die lokaal is voor de thread .
Dit wordt bijvoorbeeld vaak gebruikt om de context (zoals autorisatie-informatie) van het verwerken van een aanvraag in een servlet te bepalen. Je zou zoiets als dit kunnen doen:
private static final ThreadLocal<MyUserContext> contexts = new ThreadLocal<>();
public static MyUserContext getContext() {
return contexts.get(); // get returns the variable unique to this thread
}
public void doGet(...) {
MyUserContext context = magicGetContextFromRequest(request);
contexts.put(context); // save that context to our thread-local - other threads
// making this call don't overwrite ours
try {
// business logic
} finally {
contexts.remove(); // 'ensure' removal of thread-local variable
}
}
Nu, in plaats van MyUserContext
in elke methode door te geven, kunt u in plaats daarvan MyServlet.getContext()
waar u het nodig hebt. Nu introduceert dit natuurlijk een variabele die moet worden gedocumenteerd, maar het is thread-safe, wat veel nadelen van het gebruik van een dergelijke zeer uitgebreide variabele elimineert.
Het belangrijkste voordeel hier is dat elke thread zijn eigen thread lokale variabele heeft in die contexts
. Zolang u het gebruikt vanaf een gedefinieerd beginpunt (zoals eisen dat elke servlet zijn context behoudt, of misschien door een servletfilter toe te voegen), kunt u erop vertrouwen dat deze context er is wanneer u het nodig hebt.
CountDownLatch
Een synchronisatiehulpmiddel waarmee een of meer threads kunnen wachten tot een reeks bewerkingen die in andere threads worden uitgevoerd, is voltooid.
- Een
CountDownLatch
wordt geïnitialiseerd met een bepaalde telling. - De methoden wachten wachten tot de huidige telling nul bereikt vanwege aanroepen van de methode
countDown()
, waarna alle wachtende threads worden vrijgegeven en alle daaropvolgende aanroepen van wachten onmiddellijk terugkeren. - Dit is een eenmalig fenomeen - de telling kan niet worden gereset. Als u een versie nodig hebt die de telling opnieuw
CyclicBarrier
, kunt u overwegen eenCyclicBarrier
.
Belangrijkste methoden:
public void await() throws InterruptedException
Zorgt ervoor dat de huidige thread wacht tot de vergrendeling tot nul is afgeteld, tenzij de thread wordt onderbroken.
public void countDown()
Verlaagt de telling van de vergrendeling, waarbij alle wachtende threads worden vrijgegeven als de telling nul bereikt.
Voorbeeld:
import java.util.concurrent.*;
class DoSomethingInAThread implements Runnable {
CountDownLatch latch;
public DoSomethingInAThread(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
System.out.println("Do some thing");
latch.countDown();
} catch(Exception err) {
err.printStackTrace();
}
}
}
public class CountDownLatchDemo {
public static void main(String[] args) {
try {
int numberOfThreads = 5;
if (args.length < 1) {
System.out.println("Usage: java CountDownLatchDemo numberOfThreads");
return;
}
try {
numberOfThreads = Integer.parseInt(args[0]);
} catch(NumberFormatException ne) {
}
CountDownLatch latch = new CountDownLatch(numberOfThreads);
for (int n = 0; n < numberOfThreads; n++) {
Thread t = new Thread(new DoSomethingInAThread(latch));
t.start();
}
latch.await();
System.out.println("In Main thread after completion of " + numberOfThreads + " threads");
} catch(Exception err) {
err.printStackTrace();
}
}
}
output:
java CountDownLatchDemo 5
Do some thing
Do some thing
Do some thing
Do some thing
Do some thing
In Main thread after completion of 5 threads
Uitleg:
-
CountDownLatch
wordt geïnitialiseerd met een teller van 5 in Hoofdthread - Hoofdthread wacht met de methode
await()
. - Er zijn vijf instanties van
DoSomethingInAThread
gemaakt. Elke instantie verminderde de teller met de methodecountDown()
. - Zodra de teller nul wordt, wordt de hoofddraad hervat
Synchronisatie
In Java is er een ingebouwd vergrendelingsmechanisme op taalniveau: het synchronized
blok, dat elk Java-object als intrinsiek slot kan gebruiken (dat wil zeggen dat elk Java-object een bijbehorende monitor kan hebben).
Intrinsieke sloten zorgen voor atomiciteit van groepen uitspraken. Laten we, om te begrijpen wat dat voor ons betekent, een voorbeeld bekijken waarin synchronized
nuttig is:
private static int t = 0;
private static Object mutex = new Object();
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
synchronized (mutex) {
t++;
System.out.println(MessageFormat.format("t: {0}", t));
}
});
}
executorService.shutdown();
}
In dit geval zouden er meerdere gelijktijdigheidsproblemen zijn geweest als er geen synchronized
blok was. De eerste zou met de post-increment-operator zijn (het is op zichzelf niet atomair), en de tweede zou zijn dat we de waarde van t zouden observeren nadat een willekeurig aantal andere threads de kans heeft gehad om het te wijzigen. Aangezien we echter een intrinsiek slot hebben verkregen, zijn er hier geen raceomstandigheden en bevat de uitvoer nummers van 1 tot 100 in hun normale volgorde.
Intrinsieke sloten in Java zijn mutexen (dwz sloten voor wederzijdse uitvoering). Wederzijdse uitvoering betekent dat als een thread het slot heeft verkregen, de tweede zal moeten wachten tot de eerste het slot loslaat voordat deze het slot zelf kan verwerven. Opmerking: een bewerking die de thread in de wachtstand (slaapstand) kan brengen, wordt een blokkeerbewerking genoemd . Het verkrijgen van een slot is dus een blokkeerbewerking.
Intrinsieke sluizen in Java zijn reentrant. Dit betekent dat als een thread probeert een slot te verwerven dat het al bezit, het niet zal blokkeren en het met succes zal verwerven. De volgende code wordt bijvoorbeeld niet geblokkeerd wanneer deze wordt aangeroepen:
public void bar(){
synchronized(this){
...
}
}
public void foo(){
synchronized(this){
bar();
}
}
Naast synchronized
blokken zijn er ook synchronized
methoden.
De volgende codeblokken zijn praktisch equivalent (hoewel de bytecode anders lijkt te zijn):
synchronized
blokthis
:public void foo() { synchronized(this) { doStuff(); } }
synchronized
methode:public synchronized void foo() { doStuff(); }
Evenzo voor static
methoden, dit:
class MyClass {
...
public static void bar() {
synchronized(MyClass.class) {
doSomeOtherStuff();
}
}
}
heeft hetzelfde effect als dit:
class MyClass {
...
public static synchronized void bar() {
doSomeOtherStuff();
}
}
Atoom operaties
Een atomaire operatie is een operatie die "allemaal tegelijk" wordt uitgevoerd, zonder enige kans dat andere threads de staat observeren of wijzigen tijdens de uitvoering van de atomaire operatie.
Laten we een SLECHT VOORBEELD overwegen.
private static int t = 0;
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
t++;
System.out.println(MessageFormat.format("t: {0}", t));
});
}
executorService.shutdown();
}
In dit geval zijn er twee problemen. Het eerste probleem is dat de post-increment operator niet atomair is. Het bestaat uit meerdere bewerkingen: haal de waarde op, voeg 1 toe aan de waarde, stel de waarde in. Dat is de reden waarom als we het voorbeeld uitvoeren, het waarschijnlijk is dat we t: 100
in de uitvoer zien - twee threads kunnen tegelijkertijd de waarde krijgen, verhogen en instellen: laten we zeggen dat de waarde van t 10 is en twee threads worden oplopend t. Beide threads stellen de waarde van t in op 11, omdat de tweede thread de waarde van t waarneemt voordat de eerste thread deze had verhoogd.
Het tweede probleem is hoe we t waarnemen. Wanneer we de waarde van t afdrukken, is de waarde mogelijk al gewijzigd door een andere thread na de incrementbewerking van deze thread.
Om deze problemen op te lossen, gebruiken we de java.util.concurrent.atomic.AtomicInteger
, die voor ons veel atomaire bewerkingen heeft.
private static AtomicInteger t = new AtomicInteger(0);
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
int currentT = t.incrementAndGet();
System.out.println(MessageFormat.format("t: {0}", currentT));
});
}
executorService.shutdown();
}
De methode incrementAndGet
van AtomicInteger
verhoogt en retourneert atomair de nieuwe waarde, waardoor de vorige race-conditie wordt geëlimineerd. Houd er rekening mee dat in dit voorbeeld de regels nog steeds buiten gebruik zijn, omdat we geen moeite doen om de println
aanroepen in volgorde te plaatsen en dat dit buiten het bereik van dit voorbeeld valt, omdat dit synchronisatie vereist en het doel van dit voorbeeld is om te laten zien hoe AtomicInteger
gebruiken om race-omstandigheden met betrekking tot de staat te elimineren.
Eenvoudig vastgelopen systeem maken
Een impasse treedt op wanneer twee concurrerende acties wachten tot de ander klaar is, en dus doet geen van beide ooit. In Java is er een slot gekoppeld aan elk object. Om gelijktijdige aanpassing door meerdere threads op één object te voorkomen, kunnen we synchronized
trefwoord gebruiken, maar aan alles zijn kosten verbonden. Onjuist gebruik van synchronized
trefwoord kan leiden tot vastgelopen systemen die als vastgelopen systeem worden genoemd.
Overweeg dat er 2 threads aan 1 exemplaar werken, laten we threads als First en Second aanroepen en laten we zeggen dat we 2 bronnen R1 en R2 hebben. Verwerft eerst R1 en heeft ook R2 nodig voor de voltooiing ervan, en ten tweede verwerft R2 en heeft R1 nodig voor voltooiing.
dus zeg op tijdstip t = 0,
Eerst heeft R1 en tweede heeft R2. nu wacht First op R2 terwijl Second op R1 wacht. dit wachten is onbepaald en dit leidt tot een impasse.
public class Example2 {
public static void main(String[] args) throws InterruptedException {
final DeadLock dl = new DeadLock();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
dl.methodA();
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
try {
dl.method2();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
t1.setName("First");
t2.setName("Second");
t1.start();
t2.start();
}
}
class DeadLock {
Object mLock1 = new Object();
Object mLock2 = new Object();
public void methodA() {
System.out.println("methodA wait for mLock1 " + Thread.currentThread().getName());
synchronized (mLock1) {
System.out.println("methodA mLock1 acquired " + Thread.currentThread().getName());
try {
Thread.sleep(100);
method2();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public void method2() throws InterruptedException {
System.out.println("method2 wait for mLock2 " + Thread.currentThread().getName());
synchronized (mLock2) {
System.out.println("method2 mLock2 acquired " + Thread.currentThread().getName());
Thread.sleep(100);
method3();
}
}
public void method3() throws InterruptedException {
System.out.println("method3 mLock1 "+ Thread.currentThread().getName());
synchronized (mLock1) {
System.out.println("method3 mLock1 acquired " + Thread.currentThread().getName());
}
}
}
Output van dit programma:
methodA wait for mLock1 First
method2 wait for mLock2 Second
method2 mLock2 acquired Second
methodA mLock1 acquired First
method3 mLock1 Second
method2 wait for mLock2 First
Uitvoering onderbreken
Thread.sleep
zorgt ervoor dat de huidige thread de uitvoering gedurende een opgegeven periode opschort. Dit is een efficiënt middel om processortijd beschikbaar te maken voor de andere threads van een toepassing of andere toepassingen die mogelijk op een computersysteem worden uitgevoerd. Er zijn twee overbelaste sleep
in de klasse Thread.
Een die de slaaptijd tot de milliseconde aangeeft
public static void sleep(long millis) throws InterruptedException
Een die de slaaptijd voor de nanoseconde aangeeft
public static void sleep(long millis, int nanos)
Uitvoering 1 seconde onderbreken
Thread.sleep(1000);
Het is belangrijk op te merken dat dit een hint is naar de planner van de kernel van het besturingssysteem. Dit hoeft niet noodzakelijk nauwkeurig te zijn, en sommige implementaties houden zelfs geen rekening met de nanoseconde-parameter (mogelijk afgerond op de dichtstbijzijnde milliseconde).
Het wordt aanbevolen om een gesprek naar Thread.sleep
in te sluiten in try / catch en InterruptedException
vangen.
Visualiseer lees- / schrijfbarrières bij gebruik van gesynchroniseerd / vluchtig
Zoals we weten, moeten we synchronized
trefwoord gebruiken om de uitvoering van een methode of block exclusief te maken. Maar weinigen van ons zijn zich misschien niet bewust van nog een belangrijk aspect van het gebruik van een synchronized
en volatile
trefwoord: naast het maken van een atoomeenheid, biedt het ook een lees / schrijf-barrière . Wat is deze lees / schrijf-barrière? Laten we dit aan de hand van een voorbeeld bespreken:
class Counter {
private Integer count = 10;
public synchronized void incrementCount() {
count++;
}
public Integer getCount() {
return count;
}
}
Laten we aannemen dat een thread A eerst incrementCount()
aanroept en vervolgens een andere thread B die getCount()
. In dit scenario is er geen garantie dat B de bijgewerkte count
zal zien. Het kan nog steeds count
als 10
, ook al is het ook mogelijk dat het nooit geactualiseerde waarde van ziet count
ooit.
Om dit gedrag te begrijpen, moeten we begrijpen hoe het Java-geheugenmodel kan worden geïntegreerd met de hardware-architectuur. In Java heeft elke thread zijn eigen thread-stack. Deze stapel bevat: methodeaanroepstapel en lokale variabele gemaakt in die thread. In een multi-core systeem is het heel goed mogelijk dat twee threads tegelijkertijd in afzonderlijke cores worden uitgevoerd. In een dergelijk scenario is het mogelijk dat een deel van de stapel van een thread in register / cache van een kern ligt. Als binnen een thread een object wordt benaderd met behulp van een synchronized
(of volatile
) sleutelwoord, synchronized
die thread na een synchronized
blok zijn lokale kopie van die variabele met het hoofdgeheugen. Dit creëert een lees / schrijf-barrière en zorgt ervoor dat de thread de laatste waarde van dat object ziet.
Maar in ons geval, omdat thread B geen gesynchroniseerde toegang heeft gebruikt om te count
, verwijst het mogelijk naar de waarde van de count
opgeslagen in het register en ziet mogelijk nooit updates van thread A. Om ervoor te zorgen dat B de laatste waarde van count ziet, moeten we getCount()
gesynchroniseerd.
public synchronized Integer getCount() {
return count;
}
Wanneer thread A klaar is met het bijwerken van count
, wordt de Counter
instantie ontgrendeld en wordt tegelijkertijd de schrijfbarrière gecreëerd en worden alle wijzigingen in dat blok naar het hoofdgeheugen gewist. Evenzo wanneer thread B vergrendeling op dezelfde instantie van Counter
verwerft, gaat deze de leesbarrière binnen en leest de waarde van count
uit het hoofdgeheugen en ziet alle updates.
Hetzelfde zichtbaarheidseffect geldt ook voor volatile
lezen / schrijven. Alle variabelen die zijn bijgewerkt voordat ze naar volatile
zijn geschreven, worden naar het hoofdgeheugen gespoeld en alle waarden na het lezen van volatile
variabelen komen uit het hoofdgeheugen.
Een java.lang.Thread-instantie maken
Er zijn twee hoofdbenaderingen voor het maken van een thread in Java. In essentie is het maken van een thread net zo eenvoudig als het schrijven van de code die erin wordt uitgevoerd. De twee benaderingen verschillen in waar u die code definieert.
In Java wordt een thread vertegenwoordigd door een object - een instantie van java.lang.Thread of de subklasse ervan. Dus de eerste benadering is om die subklasse te maken en de methode run () te overschrijven.
Opmerking : ik gebruik Thread om te verwijzen naar de java.lang.Thread- klasse en thread om te verwijzen naar het logische concept van threads.
class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("Thread running!");
}
}
}
Nu we de code die moet worden uitgevoerd al hebben gedefinieerd, kan de thread eenvoudig worden gemaakt als:
MyThread t = new MyThread();
De klasse Thread bevat ook een constructor die een tekenreeks accepteert, die wordt gebruikt als de naam van de thread. Dit kan met name handig zijn bij het debuggen van een multi-threadprogramma.
class MyThread extends Thread {
public MyThread(String name) {
super(name);
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("Thread running! ");
}
}
}
MyThread t = new MyThread("Greeting Producer");
De tweede benadering is om de code te definiëren met behulp van java.lang.Runnable en de enige methode run () . Met de klasse Thread kunt u die methode vervolgens uitvoeren in een gescheiden thread. Om dit te bereiken, maakt u de thread met behulp van een constructor die een exemplaar van de Runnable- interface accepteert.
Thread t = new Thread(aRunnable);
Dit kan zeer krachtig zijn in combinatie met lambdas of methodeverwijzingen (alleen Java 8):
Thread t = new Thread(operator::hardWork);
U kunt ook de naam van de thread opgeven.
Thread t = new Thread(operator::hardWork, "Pi operator");
Praktisch gezien kunt u beide benaderingen zonder zorgen gebruiken. De algemene wijsheid zegt echter om de laatste te gebruiken.
Voor elk van de vier genoemde constructors is er ook een alternatief dat een instantie van java.lang.ThreadGroup accepteert als de eerste parameter.
ThreadGroup tg = new ThreadGroup("Operators");
Thread t = new Thread(tg, operator::hardWork, "PI operator");
De ThreadGroup vertegenwoordigt een set threads. U kunt alleen een thread aan een ThreadGroup toevoegen met de constructor van een Thread . De ThreadGroup kan vervolgens worden gebruikt om alle Thread 's samen te beheren, en de Thread kan informatie verkrijgen van zijn ThreadGroup .
Dus om het samen te vatten, de draad kan worden gemaakt met een van deze openbare constructeurs:
Thread()
Thread(String name)
Thread(Runnable target)
Thread(Runnable target, String name)
Thread(ThreadGroup group, String name)
Thread(ThreadGroup group, Runnable target)
Thread(ThreadGroup group, Runnable target, String name)
Thread(ThreadGroup group, Runnable target, String name, long stackSize)
Met de laatste kunnen we de gewenste stapelgrootte voor de nieuwe draad definiëren.
Vaak lijdt de leesbaarheid van de code bij het maken en configureren van veel threads met dezelfde eigenschappen of met hetzelfde patroon. Dat is wanneer java.util.concurrent.ThreadFactory kan worden gebruikt. Met deze interface kunt u de procedure voor het maken van de thread inkapselen via het fabriekspatroon en de enige methode newThread (Runnable) .
class WorkerFactory implements ThreadFactory {
private int id = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Worker " + id++);
}
}
Draadonderbreking / Draadstoppen stoppen
Elke Java-thread heeft een interrupt-vlag, die aanvankelijk niet waar is. Een thread onderbreken, is in wezen niets meer dan die vlag op true zetten. De code die op die thread wordt uitgevoerd, kan af en toe de vlag controleren en erop reageren. De code kan deze ook volledig negeren. Maar waarom zou elke draad zo'n vlag hebben? Een booleaanse vlag op een draad hebben is tenslotte iets dat we onszelf gewoon kunnen organiseren, als en wanneer we het nodig hebben. Nou, er zijn methoden die zich op een speciale manier gedragen wanneer de thread waarop ze draaien wordt onderbroken. Deze methoden worden blokkeermethoden genoemd. Dit zijn methoden die de thread in de status WAITING of TIMED_WAITING plaatsen. Wanneer een thread zich in deze status bevindt, zal het onderbreken ervan ertoe leiden dat een InterruptedException op de onderbroken thread wordt gegooid, in plaats van dat de interrupt-vlag wordt ingesteld op true en de thread opnieuw RUNNABLE wordt. Code die een blokkeermethode oproept, wordt gedwongen om te gaan met de InterruptedException, omdat het een aangevinkte uitzondering is. Dus, en vandaar de naam, kan een interrupt een WAIT onderbreken en effectief beëindigen. Merk op dat niet alle methoden die op de een of andere manier wachten (bijv. IO blokkeren) op die manier reageren op onderbreking, omdat ze de thread niet in een wachttoestand plaatsen. Ten slotte zal een thread waarvoor de interrupt-vlag is ingesteld en die een blokkeermethode gebruikt (dat wil zeggen proberen in een wachttoestand te komen), onmiddellijk een InterruptedException genereren en de interrupt-vlag wordt gewist.
Afgezien van deze mechanismen kent Java geen speciale semantische betekenis toe aan onderbreking. Code is vrij om een onderbreking naar eigen inzicht te interpreteren. Maar meestal wordt onderbreking gebruikt om aan een thread te melden dat deze zo snel mogelijk moet stoppen met werken. Maar zoals duidelijk moet zijn uit het bovenstaande, is het aan de code op die thread om op gepaste wijze op die onderbreking te reageren om te stoppen met rennen. Een thread stoppen is een samenwerking. Wanneer een thread wordt onderbroken, kan de lopende code meerdere niveaus diep in de stacktrace zijn. De meeste code roept geen blokkeermethode op en wordt tijdig genoeg voltooid om het stoppen van de thread niet onnodig te vertragen. De code die zich vooral bezig moet houden met reageren op onderbrekingen, is code die in een lus wordt verwerkt totdat er niets meer over is, of totdat een vlag is ingesteld om aan te geven dat die lus moet worden gestopt. Lussen die mogelijk oneindige taken afhandelen (dwz ze blijven in principe doorlopen) moeten de interrupt-vlag controleren om de lus te verlaten. Voor eindige lussen kan de semantiek dicteren dat alle taken moeten worden voltooid voordat ze worden beëindigd, of het kan passend zijn om sommige taken ongewijzigd te laten. Code die blokkeermethoden aanroept, wordt gedwongen om te gaan met de InterruptedException. Als het überhaupt semantisch mogelijk is, kan het eenvoudig de InterruptedException propageren en verklaren het te gooien. Als zodanig wordt het zelf een blokkeermethode met betrekking tot zijn bellers. Als het de uitzondering niet kan verspreiden, moet het op zijn minst de onderbroken vlag instellen, zodat bellers hoger op de stapel ook weten dat de thread is onderbroken. In sommige gevallen moet de methode blijven wachten ongeacht de InterruptedException, in welk geval het instellen van de onderbroken vlag moet worden uitgesteld tot nadat het klaar is met wachten, dit kan het instellen van een lokale variabele inhouden, die moet worden gecontroleerd voordat de methode wordt afgesloten om onderbreek dan zijn draad.
Voorbeelden:
Voorbeeld van code die de verwerking van taken bij onderbreking stopt
class TaskHandler implements Runnable {
private final BlockingQueue<Task> queue;
TaskHandler(BlockingQueue<Task> queue) {
this.queue = queue;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) { // check for interrupt flag, exit loop when interrupted
try {
Task task = queue.take(); // blocking call, responsive to interruption
handle(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // cannot throw InterruptedException (due to Runnable interface restriction) so indicating interruption by setting the flag
}
}
}
private void handle(Task task) {
// actual handling
}
}
Voorbeeld van code die het instellen van de interrupt-vlag vertraagt totdat deze volledig is voltooid:
class MustFinishHandler implements Runnable {
private final BlockingQueue<Task> queue;
MustFinishHandler(BlockingQueue<Task> queue) {
this.queue = queue;
}
@Override
public void run() {
boolean shouldInterrupt = false;
while (true) {
try {
Task task = queue.take();
if (task.isEndOfTasks()) {
if (shouldInterrupt) {
Thread.currentThread().interrupt();
}
return;
}
handle(task);
} catch (InterruptedException e) {
shouldInterrupt = true; // must finish, remember to set interrupt flag when we're done
}
}
}
private void handle(Task task) {
// actual handling
}
}
Voorbeeld van code met een vaste takenlijst, maar die mogelijk vroegtijdig wordt afgesloten wanneer deze wordt onderbroken
class GetAsFarAsPossible implements Runnable {
private final List<Task> tasks = new ArrayList<>();
@Override
public void run() {
for (Task task : tasks) {
if (Thread.currentThread().isInterrupted()) {
return;
}
handle(task);
}
}
private void handle(Task task) {
// actual handling
}
}
Voorbeeld van meerdere producenten / consumenten met gedeelde wereldwijde wachtrij
Onderstaande code toont meerdere Producer / Consumer-programma's. Zowel Producer- als Consumer-threads delen dezelfde globale wachtrij.
import java.util.concurrent.*;
import java.util.Random;
public class ProducerConsumerWithES {
public static void main(String args[]) {
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
ExecutorService pes = Executors.newFixedThreadPool(2);
ExecutorService ces = Executors.newFixedThreadPool(2);
pes.submit(new Producer(sharedQueue, 1));
pes.submit(new Producer(sharedQueue, 2));
ces.submit(new Consumer(sharedQueue, 1));
ces.submit(new Consumer(sharedQueue, 2));
pes.shutdown();
ces.shutdown();
}
}
/* Different producers produces a stream of integers continuously to a shared queue,
which is shared between all Producers and consumers */
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
private Random random = new Random();
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
// Producer produces a continuous stream of numbers for every 200 milli seconds
while (true) {
try {
int number = random.nextInt(1000);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
Thread.sleep(200);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
/* Different consumers consume data from shared queue, which is shared by both producer and consumer threads */
class Consumer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
// Consumer consumes numbers generated from Producer threads continuously
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
output:
Produced:69:by thread:2
Produced:553:by thread:1
Consumed: 69:by thread:1
Consumed: 553:by thread:2
Produced:41:by thread:2
Produced:796:by thread:1
Consumed: 41:by thread:1
Consumed: 796:by thread:2
Produced:728:by thread:2
Consumed: 728:by thread:1
enzovoorts ................
Uitleg:
-
sharedQueue
, wat eenLinkedBlockingQueue
is, wordt gedeeld met alle producent- en consumentendraden. - Producentthreads produceren continu één geheel getal voor elke 200 milli seconden en voegen dit toe aan
sharedQueue
-
Consumer
verbruikt continu een geheel getal uitsharedQueue
. - Dit programma is geïmplementeerd zonder expliciete
synchronized
ofLock
constructies. BlockingQueue is de sleutel om dit te bereiken.
BlockingQueue-implementaties zijn ontworpen om voornamelijk te worden gebruikt voor wachtrijen tussen producent en consument.
BlockingQueue-implementaties zijn thread-safe. Alle wachtrijmethoden bereiken hun effecten atomair met behulp van interne vergrendelingen of andere vormen van concurrency-controle.
Exclusieve schrijf / gelijktijdige leestoegang
Soms is het vereist voor een proces om gelijktijdig dezelfde "gegevens" te schrijven en te lezen.
De ReadWriteLock
interface en de ReentrantReadWriteLock
implementatie zorgen voor een toegangspatroon dat als volgt kan worden beschreven:
- Er kan een willekeurig aantal gelijktijdige lezers van de gegevens zijn. Als er ten minste één leestoegang wordt verleend, is schrijverstoegang niet mogelijk.
- De gegevens kunnen maximaal één schrijver bevatten. Als er schrijverstoegang is verleend, heeft geen enkele lezer toegang tot de gegevens.
Een implementatie kan er als volgt uitzien:
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Sample {
// Our lock. The constructor allows a "fairness" setting, which guarantees the chronology of lock attributions.
protected static final ReadWriteLock RW_LOCK = new ReentrantReadWriteLock();
// This is a typical data that needs to be protected for concurrent access
protected static int data = 0;
/** This will write to the data, in an exclusive access */
public static void writeToData() {
RW_LOCK.writeLock().lock();
try {
data++;
} finally {
RW_LOCK.writeLock().unlock();
}
}
public static int readData() {
RW_LOCK.readLock().lock();
try {
return data;
} finally {
RW_LOCK.readLock().unlock();
}
}
}
OPMERKING 1 : Deze precieze use case heeft een schonere oplossing met AtomicInteger
, maar wat hier wordt beschreven, is een toegangspatroon, dat werkt ongeacht het feit dat gegevens hier een geheel getal zijn dat als een Atomic-variant.
OPMERKING 2 : Het slot op het leesgedeelte is echt nodig, hoewel het misschien niet zo lijkt voor de toevallige lezer. Inderdaad, als u de lezer niet op slot zet, kan een aantal dingen misgaan, waaronder:
- Het schrijven van primitieve waarden is niet gegarandeerd atoom op alle JVM's, dus de lezer kon bijvoorbeeld alleen 32 bits van een 64 bits zien schrijven als de
data
type 64 bits waren - De zichtbaarheid van het schrijven van een thread die het niet heeft uitgevoerd, wordt alleen gegarandeerd door de JVM als we de relatie Happen Before vaststellen tussen het schrijven en het lezen. Deze relatie ontstaat wanneer zowel lezers als schrijvers hun respectieve vergrendelingen gebruiken, maar niet anders
Als hogere prestaties vereist zijn, en onder bepaalde gebruikstypes, is er een sneller StampedLock
beschikbaar, genaamd de StampedLock
, dat onder andere een optimistische vergrendelingsmodus implementeert. Deze vergrendeling werkt heel anders dan de ReadWriteLock
en dit voorbeeld is niet transponeerbaar.
Runnable Object
De Runnable
interface definieert een enkele methode, run()
, bedoeld om de code te bevatten die in de thread wordt uitgevoerd.
Het object Runnable
wordt doorgegeven aan de Thread
constructor. En de methode start()
Thread wordt aangeroepen.
Voorbeeld
public class HelloRunnable implements Runnable {
@Override
public void run() {
System.out.println("Hello from a thread");
}
public static void main(String[] args) {
new Thread(new HelloRunnable()).start();
}
}
Voorbeeld in Java8:
public static void main(String[] args) {
Runnable r = () -> System.out.println("Hello world");
new Thread(r).start();
}
Runnable versus Thread-subklasse
Een Runnable
object-tewerkstelling is algemener, omdat het Runnable
object een andere klasse dan Thread
kan classificeren.
Thread
threads is eenvoudiger te gebruiken in eenvoudige toepassingen, maar wordt beperkt door het feit dat uw taakklasse een afstammeling moet zijn van Thread
.
Een Runnable
object is van toepassing op de thread-API's op hoog niveau.
seinpaal
Een Semafoor is een synchronisatieprogramma op hoog niveau dat een reeks vergunningen handhaaft die kunnen worden verkregen en vrijgegeven door threads. Een semafoor kan worden voorgesteld als een teller van vergunningen die worden verlaagd wanneer een thread wordt verkregen, en verhoogd wanneer een thread wordt vrijgegeven. Als het aantal vergunningen 0
wanneer een thread probeert te verkrijgen, wordt de thread geblokkeerd totdat er een vergunning beschikbaar is (of totdat de thread is onderbroken).
Een semafoor wordt geïnitialiseerd als:
Semaphore semaphore = new Semaphore(1); // The int value being the number of permits
De Semaphore-constructor accepteert een extra Booleaanse parameter voor eerlijkheid. Indien ingesteld op false, geeft deze klasse geen garanties over de volgorde waarin threads vergunningen verkrijgen. Wanneer eerlijkheid wordt ingesteld, garandeert de semafoor dat threads die een van de verkrijgingsmethoden gebruiken, worden geselecteerd om vergunningen te verkrijgen in de volgorde waarin hun aanroep van die methoden is verwerkt. Het wordt op de volgende manier verklaard:
Semaphore semaphore = new Semaphore(1, true);
Laten we nu eens kijken naar een voorbeeld van javadocs, waar Semaphore wordt gebruikt om de toegang tot een verzameling items te beheren. In dit voorbeeld wordt een Semafoor gebruikt om blokkeerfunctionaliteit te bieden om ervoor te zorgen dat er altijd items zijn die kunnen worden verkregen wanneer getItem()
wordt aangeroepen.
class Pool {
/*
* Note that this DOES NOT bound the amount that may be released!
* This is only a starting value for the Semaphore and has no other
* significant meaning UNLESS you enforce this inside of the
* getNextAvailableItem() and markAsUnused() methods
*/
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
/**
* Obtains the next available item and reduces the permit count by 1.
* If there are no items available, block.
*/
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
/**
* Puts the item into the pool and add 1 permit.
*/
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
private Object getNextAvailableItem() {
// Implementation
}
private boolean markAsUnused(Object o) {
// Implementation
}
}
Voeg twee `int` arrays toe met behulp van een Threadpool
Een Threadpool heeft een wachtrij met taken, die elk op een van deze threads worden uitgevoerd.
Het volgende voorbeeld laat zien hoe u twee int
arrays kunt toevoegen met behulp van een Threadpool.
int[] firstArray = { 2, 4, 6, 8 };
int[] secondArray = { 1, 3, 5, 7 };
int[] result = { 0, 0, 0, 0 };
ExecutorService pool = Executors.newCachedThreadPool();
// Setup the ThreadPool:
// for each element in the array, submit a worker to the pool that adds elements
for (int i = 0; i < result.length; i++) {
final int worker = i;
pool.submit(() -> result[worker] = firstArray[worker] + secondArray[worker] );
}
// Wait for all Workers to finish:
try {
// execute all submitted tasks
pool.shutdown();
// waits until all workers finish, or the timeout ends
pool.awaitTermination(12, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
pool.shutdownNow(); //kill thread
}
System.out.println(Arrays.toString(result));
Opmerkingen:
Dit voorbeeld is louter illustratief. In de praktijk zal er geen versnelling zijn door threads te gebruiken voor een zo kleine taak. Een vertraging is waarschijnlijk, omdat de overheadkosten van het maken en plannen van taken de tijd die nodig is om een taak uit te voeren, zal ondermijnen.
Als u Java 7 en eerder zou gebruiken, zou u anonieme klassen gebruiken in plaats van lambdas om de taken te implementeren.
Ontvang de status van alle threads die door uw programma zijn gestart, behalve systeemthreads
Codefragment:
import java.util.Set;
public class ThreadStatus {
public static void main(String args[]) throws Exception {
for (int i = 0; i < 5; i++){
Thread t = new Thread(new MyThread());
t.setName("MyThread:" + i);
t.start();
}
int threadCount = 0;
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
for (Thread t : threadSet) {
if (t.getThreadGroup() == Thread.currentThread().getThreadGroup()) {
System.out.println("Thread :" + t + ":" + "state:" + t.getState());
++threadCount;
}
}
System.out.println("Thread count started by Main thread:" + threadCount);
}
}
class MyThread implements Runnable {
public void run() {
try {
Thread.sleep(2000);
} catch(Exception err) {
err.printStackTrace();
}
}
}
Output:
Thread :Thread[MyThread:1,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:3,5,main]:state:TIMED_WAITING
Thread :Thread[main,5,main]:state:RUNNABLE
Thread :Thread[MyThread:4,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:0,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:2,5,main]:state:TIMED_WAITING
Thread count started by Main thread:6
Uitleg:
Thread.getAllStackTraces().keySet()
geeft alle Thread
s inbegrip van de toepassing draad en stelsel draden. Als u alleen geïnteresseerd bent in de status van threads, gestart door uw toepassing, itereert u de ingestelde Thread
door Thread Group van een bepaalde thread te vergelijken met uw hoofdprogrammathread.
Bij afwezigheid van de bovenstaande ThreadGroup-voorwaarde retourneert het programma de status van de onderstaande systeemthreads:
Reference Handler
Signal Dispatcher
Attach Listener
Finalizer
Oproepbaar en toekomst
Hoewel Runnable
een manier biedt om code in een andere thread te laten lopen, heeft het een beperking dat het geen resultaat van de uitvoering kan retourneren. De enige manier om wat retourwaarde te krijgen van de uitvoering van een Runnable
is door het resultaat toe te wijzen aan een variabele die toegankelijk is in een bereik buiten het Runnable
.
Callable
werd geïntroduceerd in Java 5 als een peer to Runnable
. Callable
is in wezen hetzelfde, behalve dat het een call
plaats van run
. De call
heeft de extra mogelijkheid om een resultaat terug te geven en mag ook aangevinkte uitzonderingen gooien.
Het resultaat van een Callable-taakverzending is beschikbaar om te worden afgetapt via een toekomst
Future
kan worden beschouwd als een soort container waarin het resultaat van de Callable
berekening is Callable
. De berekening van het afroepbare bedrag kan in een andere thread worden voortgezet en elke poging om op het resultaat van een Future
te tikken, wordt geblokkeerd en retourneert het resultaat alleen wanneer het beschikbaar is.
Oproepbare interface
public interface Callable<V> {
V call() throws Exception;
}
Toekomst
interface Future<V> {
V get();
V get(long timeout, TimeUnit unit);
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
}
Voorbeeld van Callable en Future gebruiken:
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newSingleThreadExecutor();
System.out.println("Time At Task Submission : " + new Date());
Future<String> result = es.submit(new ComplexCalculator());
// the call to Future.get() blocks until the result is available.So we are in for about a 10 sec wait now
System.out.println("Result of Complex Calculation is : " + result.get());
System.out.println("Time At the Point of Printing the Result : " + new Date());
}
Onze Callable die een lange berekening uitvoert
public class ComplexCalculator implements Callable<String> {
@Override
public String call() throws Exception {
// just sleep for 10 secs to simulate a lengthy computation
Thread.sleep(10000);
System.out.println("Result after a lengthy 10sec calculation");
return "Complex Result"; // the result
}
}
uitgang
Time At Task Submission : Thu Aug 04 15:05:15 EDT 2016
Result after a lengthy 10sec calculation
Result of Complex Calculation is : Complex Result
Time At the Point of Printing the Result : Thu Aug 04 15:05:25 EDT 2016
Andere bewerkingen toegestaan op Future
Terwijl get()
de methode is om het daadwerkelijke resultaat te extraheren, heeft Future een voorziening
-
get(long timeout, TimeUnit unit)
definieert de maximale tijdsperiode gedurende de huidige thread wacht op een resultaat; - Om de
cancel(mayInterruptIfRunning)
te annuleren,cancel(mayInterruptIfRunning)
. De vlagmayInterrupt
geeft aan dat de taak moet worden onderbroken als deze is gestart en nu wordt uitgevoerd; - Om te controleren of de taak is voltooid / voltooid door aan te roepen
isDone()
; - Om te controleren of de lange taak is geannuleerd, is
isCancelled()
.
Vergrendelt als synchronisatiehulpmiddelen
Vóór de introductie van het gelijktijdige pakket van Java 5 was de threading van een lager niveau. De introductie van dit pakket bood verschillende gelijktijdige programmeerhulpmiddelen / constructen op hoger niveau.
Sloten zijn draadsynchronisatiemechanismen die in wezen hetzelfde doel dienen als gesynchroniseerde blokken of sleutelwoorden.
Intrinsieke vergrendeling
int count = 0; // shared among multiple threads
public void doSomething() {
synchronized(this) {
++count; // a non-atomic operation
}
}
Synchronisatie met behulp van sloten
int count = 0; // shared among multiple threads
Lock lockObj = new ReentrantLock();
public void doSomething() {
try {
lockObj.lock();
++count; // a non-atomic operation
} finally {
lockObj.unlock(); // sure to release the lock without fail
}
}
Sloten hebben ook functionaliteit beschikbaar die intrinsieke vergrendeling niet biedt, zoals vergrendelen maar reageren op onderbrekingen of proberen te vergrendelen en niet blokkeren wanneer dit niet mogelijk is.
Vergrendelen, reageren op onderbrekingen
class Locky {
int count = 0; // shared among multiple threads
Lock lockObj = new ReentrantLock();
public void doSomething() {
try {
try {
lockObj.lockInterruptibly();
++count; // a non-atomic operation
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // stopping
}
} finally {
if (!Thread.currentThread().isInterrupted()) {
lockObj.unlock(); // sure to release the lock without fail
}
}
}
}
Doe alleen iets als u kunt vergrendelen
public class Locky2 {
int count = 0; // shared among multiple threads
Lock lockObj = new ReentrantLock();
public void doSomething() {
boolean locked = lockObj.tryLock(); // returns true upon successful lock
if (locked) {
try {
++count; // a non-atomic operation
} finally {
lockObj.unlock(); // sure to release the lock without fail
}
}
}
}
Er zijn verschillende varianten van vergrendeling beschikbaar. Raadpleeg de api-documentatie hier voor meer informatie