Java Language
Programmation concurrente (threads)
Recherche…
Introduction
L'informatique concurrente est une forme de calcul dans laquelle plusieurs calculs sont exécutés simultanément au lieu d'être séquentiels. Le langage Java est conçu pour prendre en charge la programmation simultanée via l'utilisation de threads. Les objets et les ressources sont accessibles par plusieurs threads; chaque thread peut potentiellement accéder à n'importe quel objet du programme et le programmeur doit s'assurer que les accès en lecture et en écriture aux objets sont correctement synchronisés entre les threads.
Remarques
Sujets apparentés sur StackOverflow:
Multithreading de base
Si vous avez beaucoup de tâches à exécuter et que toutes ces tâches ne dépendent pas du résultat des précédentes, vous pouvez utiliser Multithreading pour que votre ordinateur effectue toutes ces tâches en même temps en utilisant plus de processeurs si votre ordinateur le peut. Cela peut accélérer l' exécution de votre programme si vous avez d'importantes tâches indépendantes.
class CountAndPrint implements Runnable {
private final String name;
CountAndPrint(String name) {
this.name = name;
}
/** This is what a CountAndPrint will do */
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
System.out.println(this.name + ": " + i);
}
}
public static void main(String[] args) {
// Launching 4 parallel threads
for (int i = 1; i <= 4; i++) {
// `start` method will call the `run` method
// of CountAndPrint in another thread
new Thread(new CountAndPrint("Instance " + i)).start();
}
// Doing some others tasks in the main Thread
for (int i = 0; i < 10000; i++) {
System.out.println("Main: " + i);
}
}
}
Le code de la méthode d'exécution des différentes instances de CountAndPrint
s'exécutera dans un ordre non prévisible. Un extrait d'un exemple d'exécution peut ressembler à ceci:
Instance 4: 1
Instance 2: 1
Instance 4: 2
Instance 1: 1
Instance 1: 2
Main: 1
Instance 4: 3
Main: 2
Instance 3: 1
Instance 4: 4
...
Producteur-consommateur
Un exemple simple de solution du problème producteur-consommateur. Notez que les classes JDK ( AtomicBoolean
et BlockingQueue
) sont utilisées pour la synchronisation, ce qui réduit le risque de création d'une solution non valide. Consultez Javadoc pour différents types de BlockingQueue ; Le choix d'une implémentation différente peut modifier radicalement le comportement de cet exemple (comme DelayQueue ou Priority Queue ).
public class Producer implements Runnable {
private final BlockingQueue<ProducedData> queue;
public Producer(BlockingQueue<ProducedData> queue) {
this.queue = queue;
}
public void run() {
int producedCount = 0;
try {
while (true) {
producedCount++;
//put throws an InterruptedException when the thread is interrupted
queue.put(new ProducedData());
}
} catch (InterruptedException e) {
// the thread has been interrupted: cleanup and exit
producedCount--;
//re-interrupt the thread in case the interrupt flag is needeed higher up
Thread.currentThread().interrupt();
}
System.out.println("Produced " + producedCount + " objects");
}
}
public class Consumer implements Runnable {
private final BlockingQueue<ProducedData> queue;
public Consumer(BlockingQueue<ProducedData> queue) {
this.queue = queue;
}
public void run() {
int consumedCount = 0;
try {
while (true) {
//put throws an InterruptedException when the thread is interrupted
ProducedData data = queue.poll(10, TimeUnit.MILLISECONDS);
// process data
consumedCount++;
}
} catch (InterruptedException e) {
// the thread has been interrupted: cleanup and exit
consumedCount--;
//re-interrupt the thread in case the interrupt flag is needeed higher up
Thread.currentThread().interrupt();
}
System.out.println("Consumed " + consumedCount + " objects");
}
}
public class ProducerConsumerExample {
static class ProducedData {
// empty data object
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue<ProducedData> queue = new ArrayBlockingQueue<ProducedData>(1000);
// choice of queue determines the actual behavior: see various BlockingQueue implementations
Thread producer = new Thread(new Producer(queue));
Thread consumer = new Thread(new Consumer(queue));
producer.start();
consumer.start();
Thread.sleep(1000);
producer.interrupt();
Thread.sleep(10);
consumer.interrupt();
}
}
Utiliser ThreadLocal
Un outil utile dans la concurrence Java est ThreadLocal
- cela vous permet d'avoir une variable qui sera unique à un thread donné. Ainsi, si le même code s'exécute dans des threads différents, ces exécutions ne partageront pas la valeur, mais chaque thread aura sa propre variable locale au thread .
Par exemple, cela est fréquemment utilisé pour établir le contexte (tel que les informations d'autorisation) du traitement d'une demande dans une servlet. Vous pourriez faire quelque chose comme ceci:
private static final ThreadLocal<MyUserContext> contexts = new ThreadLocal<>();
public static MyUserContext getContext() {
return contexts.get(); // get returns the variable unique to this thread
}
public void doGet(...) {
MyUserContext context = magicGetContextFromRequest(request);
contexts.put(context); // save that context to our thread-local - other threads
// making this call don't overwrite ours
try {
// business logic
} finally {
contexts.remove(); // 'ensure' removal of thread-local variable
}
}
Maintenant, au lieu de passer MyUserContext
à chaque méthode, vous pouvez utiliser MyServlet.getContext()
là où vous en avez besoin. Maintenant, bien sûr, cela introduit une variable qui doit être documentée, mais elle est adaptée aux threads, ce qui élimine beaucoup des inconvénients liés à l'utilisation d'une telle variable.
Le principal avantage est que chaque thread a sa propre variable locale de thread dans ce conteneur de contexts
. Tant que vous l'utilisez à partir d'un point d'entrée défini (comme si vous exigiez que chaque servlet conserve son contexte, ou peut-être en ajoutant un filtre de servlet), vous pouvez compter sur ce contexte lorsque vous en avez besoin.
CountDownLatch
Une aide à la synchronisation qui permet à un ou plusieurs threads d'attendre qu'un ensemble d'opérations soit exécuté dans d'autres threads.
- Un
CountDownLatch
est initialisé avec un nombre donné. - Les méthodes d’attente bloquent jusqu’à ce que le nombre actuel atteigne zéro en raison des
countDown()
méthodecountDown()
, après quoi tous les threads en attente sont libérés et toutes les invocations d’attente suivantes sontcountDown()
immédiatement. - Il s’agit d’un phénomène ponctuel: le compte ne peut pas être réinitialisé. Si vous avez besoin d'une version qui réinitialise le compte, envisagez d'utiliser un
CyclicBarrier
.
Méthodes clés:
public void await() throws InterruptedException
Fait en sorte que le thread en cours attende que le verrou ait été ramené à zéro, à moins que le thread ne soit interrompu.
public void countDown()
Diminue le nombre de verrous, libérant tous les threads en attente si le nombre atteint zéro.
Exemple:
import java.util.concurrent.*;
class DoSomethingInAThread implements Runnable {
CountDownLatch latch;
public DoSomethingInAThread(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
System.out.println("Do some thing");
latch.countDown();
} catch(Exception err) {
err.printStackTrace();
}
}
}
public class CountDownLatchDemo {
public static void main(String[] args) {
try {
int numberOfThreads = 5;
if (args.length < 1) {
System.out.println("Usage: java CountDownLatchDemo numberOfThreads");
return;
}
try {
numberOfThreads = Integer.parseInt(args[0]);
} catch(NumberFormatException ne) {
}
CountDownLatch latch = new CountDownLatch(numberOfThreads);
for (int n = 0; n < numberOfThreads; n++) {
Thread t = new Thread(new DoSomethingInAThread(latch));
t.start();
}
latch.await();
System.out.println("In Main thread after completion of " + numberOfThreads + " threads");
} catch(Exception err) {
err.printStackTrace();
}
}
}
sortie:
java CountDownLatchDemo 5
Do some thing
Do some thing
Do some thing
Do some thing
Do some thing
In Main thread after completion of 5 threads
Explication:
-
CountDownLatch
est initialisé avec un compteur de 5 dans le thread principal - Le thread principal attend en utilisant la méthode
await()
. - Cinq instances de
DoSomethingInAThread
ont été créées. Chaque instance décrémente le compteur avec la méthodecountDown()
. - Une fois que le compteur devient nul, le thread principal reprendra
Synchronisation
En Java, il existe un mécanisme de verrouillage intégré au niveau de la langue: le bloc synchronized
, qui peut utiliser n'importe quel objet Java en tant que verrou intrinsèque (chaque objet Java peut être associé à un moniteur).
Les verrous intrinsèques fournissent l'atomicité à des groupes d'instructions. Pour comprendre ce que cela signifie pour nous, examinons un exemple où la synchronized
est utile:
private static int t = 0;
private static Object mutex = new Object();
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
synchronized (mutex) {
t++;
System.out.println(MessageFormat.format("t: {0}", t));
}
});
}
executorService.shutdown();
}
Dans ce cas, s'il n'y avait pas le bloc synchronized
, il y aurait eu plusieurs problèmes de simultanéité. Le premier serait avec l'opérateur post-incrémentation (ce n'est pas atomique en lui-même), et le second serait que nous observerions la valeur de t après qu'une quantité arbitraire d'autres threads ait eu la chance de le modifier. Cependant, comme nous avons acquis un verrou intrinsèque, il n'y aura pas de conditions de course ici et la sortie contiendra des nombres de 1 à 100 dans leur ordre normal.
Les verrous intrinsèques de Java sont des mutex (c'est-à-dire des verrous d'exécution mutuelle). L'exécution mutuelle signifie que si un thread a acquis le verrou, le second sera obligé d'attendre que le premier le libère avant de pouvoir acquérir le verrou pour lui-même. Remarque: Une opération pouvant placer le thread dans l'état wait (sleep) est appelée opération de blocage . Ainsi, l'acquisition d'un verrou est une opération de blocage.
Les serrures intrinsèques en Java sont réentrantes . Cela signifie que si un thread tente d’acquérir un verrou qu’il possède déjà, il ne le bloquera pas et l’acquérera avec succès. Par exemple, le code suivant ne bloquera pas lorsqu'il sera appelé:
public void bar(){
synchronized(this){
...
}
}
public void foo(){
synchronized(this){
bar();
}
}
Outre synchronized
blocs synchronized
, il existe également synchronized
méthodes synchronized
.
Les blocs de code suivants sont pratiquement équivalents (même si le bytecode semble être différent):
bloc
synchronized
surthis
:public void foo() { synchronized(this) { doStuff(); } }
méthode
synchronized
:public synchronized void foo() { doStuff(); }
De même pour static
méthodes static
, ceci:
class MyClass {
...
public static void bar() {
synchronized(MyClass.class) {
doSomeOtherStuff();
}
}
}
a le même effet que celui-ci:
class MyClass {
...
public static synchronized void bar() {
doSomeOtherStuff();
}
}
Opérations atomiques
Une opération atomique est une opération exécutée "tout à la fois", sans aucune chance que d'autres threads observent ou modifient l'état pendant l'exécution de l'opération atomique.
Considérons un mauvais exemple .
private static int t = 0;
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
t++;
System.out.println(MessageFormat.format("t: {0}", t));
});
}
executorService.shutdown();
}
Dans ce cas, il y a deux problèmes. Le premier problème est que l'opérateur de post-incrémentation n'est pas atomique. Il est composé de plusieurs opérations: obtenir la valeur, ajouter 1 à la valeur, définir la valeur. C'est pourquoi si nous lançons l'exemple, il est probable que nous ne verrons pas t: 100
dans la sortie - deux threads peuvent obtenir simultanément la valeur, l'incrémenter et la définir: disons que la valeur de t est 10 et deux les threads incrémentent t. Les deux threads définiront la valeur de t à 11, puisque le deuxième thread observe la valeur de t avant que le premier thread ne l'ait fini.
Le deuxième problème concerne la façon dont nous observons t. Lorsque nous imprimons la valeur de t, la valeur peut avoir déjà été modifiée par un thread différent après l'opération d'incrémentation de ce thread.
Pour résoudre ces problèmes, nous utiliserons le java.util.concurrent.atomic.AtomicInteger
, qui a plusieurs opérations atomiques à utiliser.
private static AtomicInteger t = new AtomicInteger(0);
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
int currentT = t.incrementAndGet();
System.out.println(MessageFormat.format("t: {0}", currentT));
});
}
executorService.shutdown();
}
La méthode incrementAndGet
d' AtomicInteger
incrémente et retourne la nouvelle valeur, éliminant ainsi la condition de course précédente. Veuillez noter que dans cet exemple, les lignes seront toujours hors service car nous ne faisons aucun effort pour séquencer les appels println
et que cela sort du cadre de cet exemple, car cela nécessiterait une synchronisation et l'objectif de cet exemple est de montrer comment utiliser AtomicInteger
pour éliminer les conditions de course concernant l’état.
Création d'un système bloqué de base
Une impasse se produit lorsque deux actions concurrentes attendent que l'autre se termine et que, par conséquent, aucune de ces actions ne se produit jamais. En Java, un verrou est associé à chaque objet. Pour éviter les modifications simultanées effectuées par plusieurs threads sur un seul objet, nous pouvons utiliser synchronized
mot clé synchronized
, mais tout est payant. L'utilisation incorrecte de mots clés synchronized
peut conduire à des systèmes bloqués appelés systèmes bloqués.
Considérons qu'il y a 2 threads travaillant sur 1 instance, Lets appelle les threads comme First et Second, et disons que nous avons 2 ressources R1 et R2. First acquiert R1 et a également besoin de R2 pour son achèvement tandis que Second acquiert R2 et a besoin de R1 pour son achèvement.
donc à l'instant t = 0,
Le premier a R1 et le second a R2. maintenant First attend R2 alors que Second attend R1. cette attente est indéfinie et cela conduit à une impasse.
public class Example2 {
public static void main(String[] args) throws InterruptedException {
final DeadLock dl = new DeadLock();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
dl.methodA();
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
try {
dl.method2();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
t1.setName("First");
t2.setName("Second");
t1.start();
t2.start();
}
}
class DeadLock {
Object mLock1 = new Object();
Object mLock2 = new Object();
public void methodA() {
System.out.println("methodA wait for mLock1 " + Thread.currentThread().getName());
synchronized (mLock1) {
System.out.println("methodA mLock1 acquired " + Thread.currentThread().getName());
try {
Thread.sleep(100);
method2();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public void method2() throws InterruptedException {
System.out.println("method2 wait for mLock2 " + Thread.currentThread().getName());
synchronized (mLock2) {
System.out.println("method2 mLock2 acquired " + Thread.currentThread().getName());
Thread.sleep(100);
method3();
}
}
public void method3() throws InterruptedException {
System.out.println("method3 mLock1 "+ Thread.currentThread().getName());
synchronized (mLock1) {
System.out.println("method3 mLock1 acquired " + Thread.currentThread().getName());
}
}
}
Sortie de ce programme:
methodA wait for mLock1 First
method2 wait for mLock2 Second
method2 mLock2 acquired Second
methodA mLock1 acquired First
method3 mLock1 Second
method2 wait for mLock2 First
Faire une pause d'exécution
Thread.sleep
provoque la suspension de l'exécution du thread en cours pour une période spécifiée. C'est un moyen efficace de rendre le temps processeur disponible pour les autres threads d'une application ou d'autres applications pouvant s'exécuter sur un système informatique. Il existe deux méthodes de sleep
surchargées dans la classe Thread.
Un qui spécifie le temps de sommeil à la milliseconde
public static void sleep(long millis) throws InterruptedException
Celui qui spécifie le temps de sommeil à la nanoseconde
public static void sleep(long millis, int nanos)
Mettre en pause l'exécution pendant 1 seconde
Thread.sleep(1000);
Il est important de noter que ceci est un indice pour le planificateur du noyau du système d'exploitation. Cela n'est pas forcément précis, et certaines implémentations ne prennent même pas en compte le paramètre nanoseconde (arrondissant éventuellement à la milliseconde près).
Il est recommandé de joindre un appel à Thread.sleep
dans try / catch et intercepter InterruptedException
.
Visualisation des barrières de lecture / écriture lors de l'utilisation de synchronized / volatile
Comme nous le savons, nous devons utiliser le mot-clé synchronized
pour rendre l'exécution d'une méthode ou d'un bloc exclusive. Mais peu d’entre nous ne sont peut-être pas au courant d’un autre aspect important de l’utilisation de mots clés synchronized
et volatile
: en plus de créer une unité de code atomique, elle fournit également une barrière en lecture / écriture . Quelle est cette barrière de lecture / écriture? Discutons-en en utilisant un exemple:
class Counter {
private Integer count = 10;
public synchronized void incrementCount() {
count++;
}
public Integer getCount() {
return count;
}
}
Supposons qu'un thread A appelle incrementCount()
puis un autre thread B appelle getCount()
. Dans ce scénario, il n'y a aucune garantie que B verra la valeur actualisée du count
. Il peut toujours voir count
comme 10
, même s'il est également possible qu'il ne voit jamais la valeur mise à jour du count
jamais.
Pour comprendre ce comportement, nous devons comprendre comment le modèle de mémoire Java s'intègre à l'architecture matérielle. En Java, chaque thread a sa propre pile de threads. Cette pile contient: la pile des appels de méthode et la variable locale créée dans ce thread. Dans un système multi-core, il est fort possible que deux threads s'exécutent simultanément dans des cœurs distincts. Dans un tel scénario, il est possible qu'une partie de la pile d'un thread se trouve dans le registre / cache d'un core. Si dans un thread, on accède à un objet en utilisant le mot-clé synchronized
(ou volatile
), après le blocage synchronized
, le thread synchronise sa copie locale de cette variable avec la mémoire principale. Cela crée une barrière de lecture / écriture et s'assure que le thread voit la dernière valeur de cet objet.
Mais dans notre cas, puisque le thread B n'a pas utilisé l' accès synchronisé à count
, il est peut - être la valeur du référant count
stocké dans le registre et ne peut jamais voir les mises à jour de fil A. Pour vous assurer que B voit dernière valeur de comptage que nous devons faire getCount()
synchronisé également.
public synchronized Integer getCount() {
return count;
}
Désormais, lorsque le thread A est terminé avec le count
mise à jour, il déverrouille l'instance Counter
, crée en même temps une barrière d'écriture et vide toutes les modifications effectuées à l'intérieur de ce bloc dans la mémoire principale. De même, lorsque le thread B acquiert le verrou sur la même instance de Counter
, il entre dans la barrière de lecture et lit la valeur de count
dans la mémoire principale et voit toutes les mises à jour.
Même effet de visibilité pour volatile
lectures / écritures volatile
. Toutes les variables mises à jour avant d'écrire dans volatile
seront vidées dans la mémoire principale et toutes les lectures après lecture d' volatile
variable volatile
proviendront de la mémoire principale.
Créer une instance java.lang.Thread
Il existe deux approches principales pour créer un thread en Java. En résumé, créer un thread est aussi simple que d’écrire le code qui sera exécuté. Les deux approches diffèrent dans la définition de ce code.
En Java, un thread est représenté par un objet - une instance de java.lang.Thread ou de sa sous-classe. La première approche consiste donc à créer cette sous-classe et à remplacer la méthode run () .
Remarque : j'utiliserai Thread pour faire référence à la classe java.lang.Thread et au thread pour faire référence au concept logique des threads.
class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("Thread running!");
}
}
}
Maintenant que nous avons déjà défini le code à exécuter, le thread peut être créé simplement comme:
MyThread t = new MyThread();
La classe Thread contient également un constructeur acceptant une chaîne, qui sera utilisée comme nom du thread. Cela peut être particulièrement utile lors du débogage d'un programme multi-thread.
class MyThread extends Thread {
public MyThread(String name) {
super(name);
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("Thread running! ");
}
}
}
MyThread t = new MyThread("Greeting Producer");
La deuxième approche consiste à définir le code à l'aide de java.lang.Runnable et de sa seule méthode run () . La classe Thread vous permet alors d'exécuter cette méthode dans un thread séparé. Pour ce faire, créez le thread en utilisant un constructeur acceptant une instance de l'interface Runnable .
Thread t = new Thread(aRunnable);
Cela peut être très puissant lorsqu'il est combiné avec des références lambdas ou méthodes (Java 8 uniquement):
Thread t = new Thread(operator::hardWork);
Vous pouvez également spécifier le nom du thread.
Thread t = new Thread(operator::hardWork, "Pi operator");
Pratiquement parlant, vous pouvez utiliser les deux approches sans soucis. Cependant, la sagesse générale dit d'utiliser ce dernier.
Pour chacun des quatre constructeurs mentionnés, il existe également une alternative acceptant une instance de java.lang.ThreadGroup comme premier paramètre.
ThreadGroup tg = new ThreadGroup("Operators");
Thread t = new Thread(tg, operator::hardWork, "PI operator");
Le ThreadGroup représente un ensemble de threads. Vous ne pouvez ajouter un thread à un ThreadGroup qu'en utilisant le constructeur d'un thread . Le ThreadGroup peut alors être utilisé pour gérer tous ses threads ensemble, de même que le thread peut obtenir des informations à partir de son ThreadGroup .
Donc, pour résumer, le thread peut être créé avec l'un de ces constructeurs publics:
Thread()
Thread(String name)
Thread(Runnable target)
Thread(Runnable target, String name)
Thread(ThreadGroup group, String name)
Thread(ThreadGroup group, Runnable target)
Thread(ThreadGroup group, Runnable target, String name)
Thread(ThreadGroup group, Runnable target, String name, long stackSize)
Le dernier nous permet de définir la taille de pile souhaitée pour le nouveau thread.
Souvent, la lisibilité du code souffre lors de la création et de la configuration de nombreux threads avec les mêmes propriétés ou à partir du même modèle. C'est à ce moment que java.util.concurrent.ThreadFactory peut être utilisé. Cette interface vous permet d'encapsuler la procédure de création du thread via le modèle de fabrique et sa seule méthode newThread (Runnable) .
class WorkerFactory implements ThreadFactory {
private int id = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Worker " + id++);
}
}
Thread Interruption / Stopping Threads
Chaque thread Java a un indicateur d'interruption, qui est initialement faux. Interrompre un thread, ce n’est rien d’autre que de lui attribuer la valeur true. Le code exécuté sur ce thread peut vérifier le drapeau à l'occasion et agir en conséquence. Le code peut également l'ignorer complètement. Mais pourquoi chaque fil aurait-il un tel drapeau? Après tout, avoir un drapeau booléen sur un fil est quelque chose que nous pouvons simplement organiser nous-mêmes, si et quand nous en avons besoin. Eh bien, il existe des méthodes qui se comportent de manière particulière lorsque le thread sur lequel elles s'exécutent est interrompu. Ces méthodes sont appelées méthodes de blocage. Ce sont des méthodes qui placent le thread dans l'état WAITING ou TIMED_WAITING. Lorsqu'un thread est dans cet état, l'interrompre provoquera une exception InterruptedException sur le thread interrompu, au lieu que l'indicateur d'interruption soit défini sur true et que le thread redevienne RUNNABLE. Le code qui appelle une méthode de blocage est obligé de gérer l'exception Interrupted, car il s'agit d'une exception vérifiée. Donc, et par conséquent son nom, une interruption peut avoir pour effet d'interrompre un WAIT, le terminant effectivement. Notez que toutes les méthodes en attente (par exemple, le blocage des E / S) ne réagissent pas de cette manière aux interruptions, car elles ne mettent pas le thread en attente. Enfin, un thread dont l’indicateur d’interruption est défini, qui entre dans une méthode de blocage (c’est-à-dire qui tente d’entrer dans un état en attente), lancera immédiatement une exception InterruptedException et l’indicateur d’interruption sera effacé.
Outre ces mécanismes, Java n’attribue aucune signification sémantique particulière à l’interruption. Le code est libre d'interpréter une interruption comme bon lui semble. Mais le plus souvent, l'interruption est utilisée pour signaler à un thread qu'il doit cesser de fonctionner dès que possible. Mais, comme il ressort clairement de ce qui précède, il appartient au code de ce thread de réagir de manière appropriée à cette interruption afin de ne plus fonctionner. Arrêter un thread est une collaboration. Lorsqu'un thread est interrompu, son code en cours peut avoir plusieurs niveaux dans la trace de la pile. La plupart du code n'appelle pas de méthode de blocage et se termine suffisamment rapidement pour ne pas retarder indûment l'arrêt du thread. Le code qui devrait principalement concerner la réactivité aux interruptions, est le code qui est en boucle et gère les tâches jusqu’à ce qu’il n’y en ait plus, ou jusqu’à ce qu’un indicateur soit défini pour l’interrompre. Les boucles qui traitent des tâches éventuellement infinies (c'est-à-dire qu'elles continuent à fonctionner en principe) devraient vérifier l'indicateur d'interruption afin de quitter la boucle. Pour les boucles finies, la sémantique peut exiger que toutes les tâches soient terminées avant de se terminer, ou il peut être approprié de laisser certaines tâches non gérées. Le code qui appelle les méthodes de blocage sera obligé de gérer l'exception Interrupted. S'il est sémantiquement possible, il peut simplement propager l'InterruptedException et déclarer le lancer. En tant que tel, il devient une méthode de blocage en ce qui concerne ses appelants. S'il ne peut pas propager l'exception, il doit au minimum définir l'indicateur interrompu, afin que les appelants plus haut dans la pile sachent également que le thread a été interrompu. Dans certains cas, la méthode doit continuer d'attendre quelle que soit l'interruption, auquel cas elle doit retarder la définition de l'indicateur interrompu jusqu'à la fin de l'attente, cela peut impliquer la définition d'une variable locale à vérifier avant de quitter la méthode. puis interrompre son fil.
Exemples :
Exemple de code qui arrête la gestion des tâches lors de l'interruption
class TaskHandler implements Runnable {
private final BlockingQueue<Task> queue;
TaskHandler(BlockingQueue<Task> queue) {
this.queue = queue;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) { // check for interrupt flag, exit loop when interrupted
try {
Task task = queue.take(); // blocking call, responsive to interruption
handle(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // cannot throw InterruptedException (due to Runnable interface restriction) so indicating interruption by setting the flag
}
}
}
private void handle(Task task) {
// actual handling
}
}
Exemple de code qui retarde la définition de l'indicateur d'interruption jusqu'à sa complète exécution:
class MustFinishHandler implements Runnable {
private final BlockingQueue<Task> queue;
MustFinishHandler(BlockingQueue<Task> queue) {
this.queue = queue;
}
@Override
public void run() {
boolean shouldInterrupt = false;
while (true) {
try {
Task task = queue.take();
if (task.isEndOfTasks()) {
if (shouldInterrupt) {
Thread.currentThread().interrupt();
}
return;
}
handle(task);
} catch (InterruptedException e) {
shouldInterrupt = true; // must finish, remember to set interrupt flag when we're done
}
}
}
private void handle(Task task) {
// actual handling
}
}
Exemple de code qui a une liste de tâches fixe mais qui peut quitter tôt lorsqu'il est interrompu
class GetAsFarAsPossible implements Runnable {
private final List<Task> tasks = new ArrayList<>();
@Override
public void run() {
for (Task task : tasks) {
if (Thread.currentThread().isInterrupted()) {
return;
}
handle(task);
}
}
private void handle(Task task) {
// actual handling
}
}
Exemple de producteur / consommateur multiple avec file d'attente globale partagée
Le code ci-dessous présente plusieurs programmes Producteur / Consommateur. Les threads Producteur et Consommateur partagent la même file d'attente globale.
import java.util.concurrent.*;
import java.util.Random;
public class ProducerConsumerWithES {
public static void main(String args[]) {
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
ExecutorService pes = Executors.newFixedThreadPool(2);
ExecutorService ces = Executors.newFixedThreadPool(2);
pes.submit(new Producer(sharedQueue, 1));
pes.submit(new Producer(sharedQueue, 2));
ces.submit(new Consumer(sharedQueue, 1));
ces.submit(new Consumer(sharedQueue, 2));
pes.shutdown();
ces.shutdown();
}
}
/* Different producers produces a stream of integers continuously to a shared queue,
which is shared between all Producers and consumers */
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
private Random random = new Random();
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
// Producer produces a continuous stream of numbers for every 200 milli seconds
while (true) {
try {
int number = random.nextInt(1000);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
Thread.sleep(200);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
/* Different consumers consume data from shared queue, which is shared by both producer and consumer threads */
class Consumer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
// Consumer consumes numbers generated from Producer threads continuously
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
sortie:
Produced:69:by thread:2
Produced:553:by thread:1
Consumed: 69:by thread:1
Consumed: 553:by thread:2
Produced:41:by thread:2
Produced:796:by thread:1
Consumed: 41:by thread:1
Consumed: 796:by thread:2
Produced:728:by thread:2
Consumed: 728:by thread:1
etc ................
Explication:
-
sharedQueue
, qui est unLinkedBlockingQueue
est partagé entre tous les threads Producer et Consumer. - Les threads de producteurs produisent un entier pour 200 millisecondes en continu et l'ajoutent à
sharedQueue
-
Consumer
threadConsumer
consomme un nombre entier desharedQueue
continu. - Ce programme est implémenté sans constructions explicites
synchronized
ouLock
. BlockingQueue est la clé pour y parvenir.
Les implémentations BlockingQueue sont conçues pour être utilisées principalement pour les files d'attente producteur-consommateur.
Les implémentations BlockingQueue sont sécurisées pour les threads. Toutes les méthodes de mise en file d'attente atteignent leurs effets de manière atomique en utilisant des verrous internes ou d'autres formes de contrôle de concurrence.
Écriture exclusive / accès en lecture simultanée
Un processus doit parfois écrire et lire simultanément les mêmes "données".
L'interface ReadWriteLock
et son implémentation ReentrantReadWriteLock
permettent un modèle d'accès pouvant être décrit comme suit:
- Il peut y avoir un nombre quelconque de lecteurs simultanés des données. Si au moins un accès au lecteur est autorisé, aucun accès au graveur n'est possible.
- Il peut y avoir au plus un seul auteur pour les données. Si un accès en écriture est accordé, aucun lecteur ne peut accéder aux données.
Une implémentation pourrait ressembler à:
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Sample {
// Our lock. The constructor allows a "fairness" setting, which guarantees the chronology of lock attributions.
protected static final ReadWriteLock RW_LOCK = new ReentrantReadWriteLock();
// This is a typical data that needs to be protected for concurrent access
protected static int data = 0;
/** This will write to the data, in an exclusive access */
public static void writeToData() {
RW_LOCK.writeLock().lock();
try {
data++;
} finally {
RW_LOCK.writeLock().unlock();
}
}
public static int readData() {
RW_LOCK.readLock().lock();
try {
return data;
} finally {
RW_LOCK.readLock().unlock();
}
}
}
NOTE 1 : Ce cas d'utilisation précis a une solution plus propre utilisant AtomicInteger
, mais ce qui est décrit ici est un modèle d'accès, qui fonctionne indépendamment du fait que les données ici sont un entier atomique.
NOTE 2 : Le verrouillage de la partie lecture est vraiment nécessaire, même si cela peut ne pas paraître au lecteur occasionnel. En effet, si vous ne verrouillez pas le lecteur, un certain nombre de problèmes peuvent survenir, parmi lesquels:
- Les écritures de valeurs primitives ne sont pas forcément atomiques sur toutes les JVM, de sorte que le lecteur pourrait voir, par exemple, que seuls 32 bits sur 64 bits écrivent si les
data
étaient de type 64 bits - La visibilité de l'écriture à partir d'un thread qui ne l'a pas effectuée est garantie par la JVM uniquement si nous établissons une relation Happen Before entre les écritures et les lectures. Cette relation est établie lorsque les lecteurs et les écrivains utilisent leurs verrous respectifs, mais pas autrement
StampedLock
des performances plus élevées sont requises, sous certains types d'utilisation, il existe un type de verrouillage plus rapide, appelé StampedLock
, qui, entre autres, implémente un mode de verrouillage optimiste. Ce verrou fonctionne très différemment du ReadWriteLock
, et cet exemple n'est pas transposable.
Objet Runnable
L'interface Runnable
définit une méthode unique, run()
, destinée à contenir le code exécuté dans le thread.
L'objet Runnable
est transmis au constructeur Thread
. Et la méthode start()
de Thread est appelée.
Exemple
public class HelloRunnable implements Runnable {
@Override
public void run() {
System.out.println("Hello from a thread");
}
public static void main(String[] args) {
new Thread(new HelloRunnable()).start();
}
}
Exemple dans Java8:
public static void main(String[] args) {
Runnable r = () -> System.out.println("Hello world");
new Thread(r).start();
}
Sous-classe Runnable vs Thread
Un emploi d'objet Runnable
est plus général, car l'objet Runnable
peut sous-classer une classe autre que Thread
.
Thread
sous- Thread
est plus facile à utiliser dans des applications simples, mais il est limité par le fait que votre classe de tâches doit être un descendant de Thread
.
Un objet Runnable
est applicable aux API de gestion des threads de haut niveau.
Sémaphore
Un sémaphore est un synchroniseur de haut niveau qui conserve un ensemble d' autorisations pouvant être acquises et libérées par les threads. Un sémaphore peut être imaginé comme un compteur de permis qui sera décrémenté lorsqu'un fil acquiert et incrémenté lors de la sortie d'un thread. Si le nombre de permis est égal à 0
lorsqu'un thread tente d'acquérir, le thread se bloque jusqu'à ce qu'un permis soit disponible (ou jusqu'à ce que le thread soit interrompu).
Un sémaphore est initialisé comme:
Semaphore semaphore = new Semaphore(1); // The int value being the number of permits
Le constructeur Semaphore accepte un paramètre booléen supplémentaire pour l'équité. Lorsqu'elle est définie sur false, cette classe ne garantit pas l'ordre dans lequel les threads acquièrent l'autorisation. Lorsque l'équité est définie sur true, le sémaphore garantit que les threads appelant l'une des méthodes d'acquisition sont sélectionnés pour obtenir les autorisations dans l'ordre dans lequel leur appel de ces méthodes a été traité. Il est déclaré de la manière suivante:
Semaphore semaphore = new Semaphore(1, true);
Examinons maintenant un exemple de javadocs, où Semaphore est utilisé pour contrôler l'accès à un pool d'éléments. Un sémaphore est utilisé dans cet exemple pour fournir une fonctionnalité de blocage afin de s’assurer qu’il ya toujours des éléments à obtenir lorsque getItem()
est appelé.
class Pool {
/*
* Note that this DOES NOT bound the amount that may be released!
* This is only a starting value for the Semaphore and has no other
* significant meaning UNLESS you enforce this inside of the
* getNextAvailableItem() and markAsUnused() methods
*/
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
/**
* Obtains the next available item and reduces the permit count by 1.
* If there are no items available, block.
*/
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
/**
* Puts the item into the pool and add 1 permit.
*/
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
private Object getNextAvailableItem() {
// Implementation
}
private boolean markAsUnused(Object o) {
// Implementation
}
}
Ajouter deux tableaux `int` à l'aide d'un Threadpool
Un Threadpool a une file d'attente de tâches, dont chacune sera exécutée sur l'un de ces threads.
L'exemple suivant montre comment ajouter deux tableaux int
aide d'un Threadpool.
int[] firstArray = { 2, 4, 6, 8 };
int[] secondArray = { 1, 3, 5, 7 };
int[] result = { 0, 0, 0, 0 };
ExecutorService pool = Executors.newCachedThreadPool();
// Setup the ThreadPool:
// for each element in the array, submit a worker to the pool that adds elements
for (int i = 0; i < result.length; i++) {
final int worker = i;
pool.submit(() -> result[worker] = firstArray[worker] + secondArray[worker] );
}
// Wait for all Workers to finish:
try {
// execute all submitted tasks
pool.shutdown();
// waits until all workers finish, or the timeout ends
pool.awaitTermination(12, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
pool.shutdownNow(); //kill thread
}
System.out.println(Arrays.toString(result));
Remarques:
Cet exemple est purement illustratif. En pratique, il n'y aura pas d'accélération en utilisant des threads pour une tâche aussi petite. Un ralentissement est probable, car les frais généraux liés à la création et à la planification des tâches saturent le temps nécessaire à l'exécution d'une tâche.
Si vous utilisiez Java 7 et versions antérieures, vous utiliseriez des classes anonymes au lieu de lambdas pour implémenter les tâches.
Obtenir l'état de tous les threads démarrés par votre programme, à l'exception des threads système
Extrait de code:
import java.util.Set;
public class ThreadStatus {
public static void main(String args[]) throws Exception {
for (int i = 0; i < 5; i++){
Thread t = new Thread(new MyThread());
t.setName("MyThread:" + i);
t.start();
}
int threadCount = 0;
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
for (Thread t : threadSet) {
if (t.getThreadGroup() == Thread.currentThread().getThreadGroup()) {
System.out.println("Thread :" + t + ":" + "state:" + t.getState());
++threadCount;
}
}
System.out.println("Thread count started by Main thread:" + threadCount);
}
}
class MyThread implements Runnable {
public void run() {
try {
Thread.sleep(2000);
} catch(Exception err) {
err.printStackTrace();
}
}
}
Sortie:
Thread :Thread[MyThread:1,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:3,5,main]:state:TIMED_WAITING
Thread :Thread[main,5,main]:state:RUNNABLE
Thread :Thread[MyThread:4,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:0,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:2,5,main]:state:TIMED_WAITING
Thread count started by Main thread:6
Explication:
Thread.getAllStackTraces().keySet()
renvoie tous les Thread
y compris les threads d'application et les threads système. Si vous êtes uniquement intéressé par le statut des threads, démarré par votre application, effectuez une itération du jeu de Thread
en vérifiant le groupe de threads d'un thread particulier par rapport au thread de votre programme principal.
En l'absence de la condition ThreadGroup ci-dessus, le programme renvoie le statut des threads système ci-dessous:
Reference Handler
Signal Dispatcher
Attach Listener
Finalizer
Callable et Future
Bien que Runnable
fournisse un moyen d’emballer le code pour qu’il soit exécuté dans un thread différent, il présente une limitation en ce sens qu’il ne peut pas renvoyer un résultat de l’exécution. La seule façon d'obtenir une valeur de retour à partir de l'exécution d'un Runnable
est d'affecter le résultat à une variable accessible dans une étendue en dehors de Runnable
.
Callable
été introduit dans Java 5 en tant que pair à Runnable
. Callable
est essentiellement le même sauf qu'il a une méthode d' call
au lieu de run
. La méthode call
a la capacité supplémentaire de renvoyer un résultat et est également autorisée à lancer des exceptions vérifiées.
Le résultat d'une soumission de tâche Callable est disponible pour être exploité via un avenir
Future
peut être considéré comme un conteneur contenant le résultat du calcul Callable
. Le calcul du callable peut se poursuivre dans un autre thread, et toute tentative de toucher le résultat d'un Future
bloquera et ne renverra le résultat qu'une fois disponible.
Interface appelable
public interface Callable<V> {
V call() throws Exception;
}
Avenir
interface Future<V> {
V get();
V get(long timeout, TimeUnit unit);
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
}
En utilisant l'exemple Callable et Future:
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newSingleThreadExecutor();
System.out.println("Time At Task Submission : " + new Date());
Future<String> result = es.submit(new ComplexCalculator());
// the call to Future.get() blocks until the result is available.So we are in for about a 10 sec wait now
System.out.println("Result of Complex Calculation is : " + result.get());
System.out.println("Time At the Point of Printing the Result : " + new Date());
}
Notre callable qui fait un long calcul
public class ComplexCalculator implements Callable<String> {
@Override
public String call() throws Exception {
// just sleep for 10 secs to simulate a lengthy computation
Thread.sleep(10000);
System.out.println("Result after a lengthy 10sec calculation");
return "Complex Result"; // the result
}
}
Sortie
Time At Task Submission : Thu Aug 04 15:05:15 EDT 2016
Result after a lengthy 10sec calculation
Result of Complex Calculation is : Complex Result
Time At the Point of Printing the Result : Thu Aug 04 15:05:25 EDT 2016
Autres opérations autorisées sur Future
Alors que get()
est la méthode pour extraire le résultat réel, Future a provision
-
get(long timeout, TimeUnit unit)
définit la durée maximale pendant laquelle le thread en cours attendra un résultat; - Pour annuler l'appel de tâche,
cancel(mayInterruptIfRunning)
. L'indicateurmayInterrupt
indique que la tâche doit être interrompue si elle a été démarrée et s'exécute maintenant. - Pour vérifier si la tâche est terminée / terminée en appelant
isDone()
; - Pour vérifier si la longue tâche a été annulée
isCancelled()
.
Serrures comme aides à la synchronisation
Avant l'introduction du package simultané de Java 5, le thread était de niveau inférieur. L'introduction de ce package fournissait plusieurs aides / constructions de programmation concurrentes de niveau supérieur.
Les verrous sont des mécanismes de synchronisation de threads qui ont essentiellement le même objectif que les blocs synchronisés ou les mots-clés.
Verrouillage intrinsèque
int count = 0; // shared among multiple threads
public void doSomething() {
synchronized(this) {
++count; // a non-atomic operation
}
}
Synchronisation à l'aide de verrous
int count = 0; // shared among multiple threads
Lock lockObj = new ReentrantLock();
public void doSomething() {
try {
lockObj.lock();
++count; // a non-atomic operation
} finally {
lockObj.unlock(); // sure to release the lock without fail
}
}
Les verrous ont également des fonctionnalités disponibles que le verrouillage intrinsèque n'offre pas, telles que le verrouillage, mais en restant réactif aux interruptions, ou en essayant de se verrouiller, et non de bloquer le cas échéant.
Verrouillage, sensible aux interruptions
class Locky {
int count = 0; // shared among multiple threads
Lock lockObj = new ReentrantLock();
public void doSomething() {
try {
try {
lockObj.lockInterruptibly();
++count; // a non-atomic operation
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // stopping
}
} finally {
if (!Thread.currentThread().isInterrupted()) {
lockObj.unlock(); // sure to release the lock without fail
}
}
}
}
Ne fais quelque chose que lorsque tu es capable de verrouiller
public class Locky2 {
int count = 0; // shared among multiple threads
Lock lockObj = new ReentrantLock();
public void doSomething() {
boolean locked = lockObj.tryLock(); // returns true upon successful lock
if (locked) {
try {
++count; // a non-atomic operation
} finally {
lockObj.unlock(); // sure to release the lock without fail
}
}
}
}
Il existe plusieurs variantes de verrouillage disponibles .