Recherche…


Introduction

L'informatique concurrente est une forme de calcul dans laquelle plusieurs calculs sont exécutés simultanément au lieu d'être séquentiels. Le langage Java est conçu pour prendre en charge la programmation simultanée via l'utilisation de threads. Les objets et les ressources sont accessibles par plusieurs threads; chaque thread peut potentiellement accéder à n'importe quel objet du programme et le programmeur doit s'assurer que les accès en lecture et en écriture aux objets sont correctement synchronisés entre les threads.

Remarques

Sujets apparentés sur StackOverflow:

Multithreading de base

Si vous avez beaucoup de tâches à exécuter et que toutes ces tâches ne dépendent pas du résultat des précédentes, vous pouvez utiliser Multithreading pour que votre ordinateur effectue toutes ces tâches en même temps en utilisant plus de processeurs si votre ordinateur le peut. Cela peut accélérer l' exécution de votre programme si vous avez d'importantes tâches indépendantes.

class CountAndPrint implements Runnable {

    private final String name;

    CountAndPrint(String name) {
        this.name = name;
    }

    /** This is what a CountAndPrint will do */
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            System.out.println(this.name + ": " + i);
        }
    }

    public static void main(String[] args) {
        // Launching 4 parallel threads
        for (int i = 1; i <= 4; i++) {
            // `start` method will call the `run` method 
            // of CountAndPrint in another thread
            new Thread(new CountAndPrint("Instance " + i)).start();
        }

        // Doing some others tasks in the main Thread
        for (int i = 0; i < 10000; i++) {
            System.out.println("Main: " + i);
        }
    }
}

Le code de la méthode d'exécution des différentes instances de CountAndPrint s'exécutera dans un ordre non prévisible. Un extrait d'un exemple d'exécution peut ressembler à ceci:

Instance 4: 1
Instance 2: 1
Instance 4: 2
Instance 1: 1
Instance 1: 2
Main: 1
Instance 4: 3
Main: 2
Instance 3: 1
Instance 4: 4
...

Producteur-consommateur

Un exemple simple de solution du problème producteur-consommateur. Notez que les classes JDK ( AtomicBoolean et BlockingQueue ) sont utilisées pour la synchronisation, ce qui réduit le risque de création d'une solution non valide. Consultez Javadoc pour différents types de BlockingQueue ; Le choix d'une implémentation différente peut modifier radicalement le comportement de cet exemple (comme DelayQueue ou Priority Queue ).

public class Producer implements Runnable {

    private final BlockingQueue<ProducedData> queue;

    public Producer(BlockingQueue<ProducedData> queue) {
        this.queue = queue;
    }

    public void run() {
        int producedCount = 0;
        try {
            while (true) {
                producedCount++;
                //put throws an InterruptedException when the thread is interrupted
                queue.put(new ProducedData());
            }
        } catch (InterruptedException e) {
            // the thread has been interrupted: cleanup and exit
            producedCount--;
            //re-interrupt the thread in case the interrupt flag is needeed higher up
            Thread.currentThread().interrupt();
        }
        System.out.println("Produced " + producedCount + " objects");
    }
}

public class Consumer implements Runnable {

    private final BlockingQueue<ProducedData> queue;

    public Consumer(BlockingQueue<ProducedData> queue) {
        this.queue = queue;
    }

    public void run() {
        int consumedCount = 0;
        try {
            while (true) {
                //put throws an InterruptedException when the thread is interrupted
                ProducedData data = queue.poll(10, TimeUnit.MILLISECONDS);
                // process data
                consumedCount++;
            }
        } catch (InterruptedException e) {
            // the thread has been interrupted: cleanup and exit
            consumedCount--;
            //re-interrupt the thread in case the interrupt flag is needeed higher up
            Thread.currentThread().interrupt();
        }
        System.out.println("Consumed " + consumedCount + " objects");
    }
}


public class ProducerConsumerExample {
    static class ProducedData {    
        // empty data object
    }

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<ProducedData> queue = new ArrayBlockingQueue<ProducedData>(1000);
        // choice of queue determines the actual behavior: see various BlockingQueue implementations

        Thread producer = new Thread(new Producer(queue));
        Thread consumer = new Thread(new Consumer(queue));

        producer.start();
        consumer.start();

        Thread.sleep(1000);
        producer.interrupt();
        Thread.sleep(10);
        consumer.interrupt();
    }
}

Utiliser ThreadLocal

Un outil utile dans la concurrence Java est ThreadLocal - cela vous permet d'avoir une variable qui sera unique à un thread donné. Ainsi, si le même code s'exécute dans des threads différents, ces exécutions ne partageront pas la valeur, mais chaque thread aura sa propre variable locale au thread .

Par exemple, cela est fréquemment utilisé pour établir le contexte (tel que les informations d'autorisation) du traitement d'une demande dans une servlet. Vous pourriez faire quelque chose comme ceci:

private static final ThreadLocal<MyUserContext> contexts = new ThreadLocal<>();

public static MyUserContext getContext() {
    return contexts.get(); // get returns the variable unique to this thread
}

public void doGet(...) {
    MyUserContext context = magicGetContextFromRequest(request); 
    contexts.put(context); // save that context to our thread-local - other threads
                           // making this call don't overwrite ours
    try {
        // business logic
    } finally {
        contexts.remove(); // 'ensure' removal of thread-local variable
    }
}

Maintenant, au lieu de passer MyUserContext à chaque méthode, vous pouvez utiliser MyServlet.getContext() là où vous en avez besoin. Maintenant, bien sûr, cela introduit une variable qui doit être documentée, mais elle est adaptée aux threads, ce qui élimine beaucoup des inconvénients liés à l'utilisation d'une telle variable.

Le principal avantage est que chaque thread a sa propre variable locale de thread dans ce conteneur de contexts . Tant que vous l'utilisez à partir d'un point d'entrée défini (comme si vous exigiez que chaque servlet conserve son contexte, ou peut-être en ajoutant un filtre de servlet), vous pouvez compter sur ce contexte lorsque vous en avez besoin.

CountDownLatch

CountDownLatch

Une aide à la synchronisation qui permet à un ou plusieurs threads d'attendre qu'un ensemble d'opérations soit exécuté dans d'autres threads.

  1. Un CountDownLatch est initialisé avec un nombre donné.
  2. Les méthodes d’attente bloquent jusqu’à ce que le nombre actuel atteigne zéro en raison des countDown() méthode countDown() , après quoi tous les threads en attente sont libérés et toutes les invocations d’attente suivantes sont countDown() immédiatement.
  3. Il s’agit d’un phénomène ponctuel: le compte ne peut pas être réinitialisé. Si vous avez besoin d'une version qui réinitialise le compte, envisagez d'utiliser un CyclicBarrier .

Méthodes clés:

public void await() throws InterruptedException

Fait en sorte que le thread en cours attende que le verrou ait été ramené à zéro, à moins que le thread ne soit interrompu.

public void countDown()

Diminue le nombre de verrous, libérant tous les threads en attente si le nombre atteint zéro.

Exemple:

import java.util.concurrent.*;

class DoSomethingInAThread implements Runnable {
    CountDownLatch latch;
    public DoSomethingInAThread(CountDownLatch latch) {
        this.latch = latch;
    } 
    public void run() {
        try {
            System.out.println("Do some thing");
            latch.countDown();
        } catch(Exception err) {
            err.printStackTrace();
        }
    }
}

public class CountDownLatchDemo {
    public static void main(String[] args) {
        try {
            int numberOfThreads = 5;
            if (args.length < 1) {
                System.out.println("Usage: java CountDownLatchDemo numberOfThreads");
                return;
            }
            try {
                numberOfThreads = Integer.parseInt(args[0]);
            } catch(NumberFormatException ne) {
            
            }
            CountDownLatch latch = new CountDownLatch(numberOfThreads);
            for (int n = 0; n < numberOfThreads; n++) {
                Thread t = new Thread(new DoSomethingInAThread(latch));
                t.start();
            }
            latch.await();
            System.out.println("In Main thread after completion of " + numberOfThreads + " threads");
        } catch(Exception err) {
            err.printStackTrace();
        }
    }
}

sortie:

java CountDownLatchDemo 5
Do some thing
Do some thing
Do some thing
Do some thing
Do some thing
In Main thread after completion of 5 threads

Explication:

  1. CountDownLatch est initialisé avec un compteur de 5 dans le thread principal
  2. Le thread principal attend en utilisant la méthode await() .
  3. Cinq instances de DoSomethingInAThread ont été créées. Chaque instance décrémente le compteur avec la méthode countDown() .
  4. Une fois que le compteur devient nul, le thread principal reprendra

Synchronisation

En Java, il existe un mécanisme de verrouillage intégré au niveau de la langue: le bloc synchronized , qui peut utiliser n'importe quel objet Java en tant que verrou intrinsèque (chaque objet Java peut être associé à un moniteur).

Les verrous intrinsèques fournissent l'atomicité à des groupes d'instructions. Pour comprendre ce que cela signifie pour nous, examinons un exemple où la synchronized est utile:

private static int t = 0;
private static Object mutex = new Object();

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
    for (int i = 0; i < 100; i++) {
        executorService.execute(() -> {
            synchronized (mutex) {
                t++;
                System.out.println(MessageFormat.format("t: {0}", t));
            }
        });
    }
    executorService.shutdown();
}

Dans ce cas, s'il n'y avait pas le bloc synchronized , il y aurait eu plusieurs problèmes de simultanéité. Le premier serait avec l'opérateur post-incrémentation (ce n'est pas atomique en lui-même), et le second serait que nous observerions la valeur de t après qu'une quantité arbitraire d'autres threads ait eu la chance de le modifier. Cependant, comme nous avons acquis un verrou intrinsèque, il n'y aura pas de conditions de course ici et la sortie contiendra des nombres de 1 à 100 dans leur ordre normal.

Les verrous intrinsèques de Java sont des mutex (c'est-à-dire des verrous d'exécution mutuelle). L'exécution mutuelle signifie que si un thread a acquis le verrou, le second sera obligé d'attendre que le premier le libère avant de pouvoir acquérir le verrou pour lui-même. Remarque: Une opération pouvant placer le thread dans l'état wait (sleep) est appelée opération de blocage . Ainsi, l'acquisition d'un verrou est une opération de blocage.

Les serrures intrinsèques en Java sont réentrantes . Cela signifie que si un thread tente d’acquérir un verrou qu’il possède déjà, il ne le bloquera pas et l’acquérera avec succès. Par exemple, le code suivant ne bloquera pas lorsqu'il sera appelé:

public void bar(){
    synchronized(this){
        ...
    }
}
public void foo(){
    synchronized(this){
        bar();
    }
}

Outre synchronized blocs synchronized , il existe également synchronized méthodes synchronized .

Les blocs de code suivants sont pratiquement équivalents (même si le bytecode semble être différent):

  1. bloc synchronized sur this :

    public void foo() {
        synchronized(this) {
            doStuff();
        }
    }
    
  2. méthode synchronized :

     public synchronized void foo() {
         doStuff();
     }
    

De même pour static méthodes static , ceci:

class MyClass {
    ...
    public static void bar() {
        synchronized(MyClass.class) {
            doSomeOtherStuff();
        }
    }
}

a le même effet que celui-ci:

class MyClass {
    ...
    public static synchronized void bar() {
        doSomeOtherStuff();
    }
}

Opérations atomiques

Une opération atomique est une opération exécutée "tout à la fois", sans aucune chance que d'autres threads observent ou modifient l'état pendant l'exécution de l'opération atomique.

Considérons un mauvais exemple .

private static int t = 0;

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
    for (int i = 0; i < 100; i++) {
        executorService.execute(() -> {
            t++;
            System.out.println(MessageFormat.format("t: {0}", t));
        });
    }
    executorService.shutdown();
}

Dans ce cas, il y a deux problèmes. Le premier problème est que l'opérateur de post-incrémentation n'est pas atomique. Il est composé de plusieurs opérations: obtenir la valeur, ajouter 1 à la valeur, définir la valeur. C'est pourquoi si nous lançons l'exemple, il est probable que nous ne verrons pas t: 100 dans la sortie - deux threads peuvent obtenir simultanément la valeur, l'incrémenter et la définir: disons que la valeur de t est 10 et deux les threads incrémentent t. Les deux threads définiront la valeur de t à 11, puisque le deuxième thread observe la valeur de t avant que le premier thread ne l'ait fini.

Le deuxième problème concerne la façon dont nous observons t. Lorsque nous imprimons la valeur de t, la valeur peut avoir déjà été modifiée par un thread différent après l'opération d'incrémentation de ce thread.

Pour résoudre ces problèmes, nous utiliserons le java.util.concurrent.atomic.AtomicInteger , qui a plusieurs opérations atomiques à utiliser.

private static AtomicInteger t = new AtomicInteger(0);

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
    for (int i = 0; i < 100; i++) {
        executorService.execute(() -> {
            int currentT = t.incrementAndGet();
            System.out.println(MessageFormat.format("t: {0}", currentT));
        });
    }
    executorService.shutdown();
}

La méthode incrementAndGet d' AtomicInteger incrémente et retourne la nouvelle valeur, éliminant ainsi la condition de course précédente. Veuillez noter que dans cet exemple, les lignes seront toujours hors service car nous ne faisons aucun effort pour séquencer les appels println et que cela sort du cadre de cet exemple, car cela nécessiterait une synchronisation et l'objectif de cet exemple est de montrer comment utiliser AtomicInteger pour éliminer les conditions de course concernant l’état.

Création d'un système bloqué de base

Une impasse se produit lorsque deux actions concurrentes attendent que l'autre se termine et que, par conséquent, aucune de ces actions ne se produit jamais. En Java, un verrou est associé à chaque objet. Pour éviter les modifications simultanées effectuées par plusieurs threads sur un seul objet, nous pouvons utiliser synchronized mot clé synchronized , mais tout est payant. L'utilisation incorrecte de mots clés synchronized peut conduire à des systèmes bloqués appelés systèmes bloqués.

Considérons qu'il y a 2 threads travaillant sur 1 instance, Lets appelle les threads comme First et Second, et disons que nous avons 2 ressources R1 et R2. First acquiert R1 et a également besoin de R2 pour son achèvement tandis que Second acquiert R2 et a besoin de R1 pour son achèvement.

donc à l'instant t = 0,

Le premier a R1 et le second a R2. maintenant First attend R2 alors que Second attend R1. cette attente est indéfinie et cela conduit à une impasse.

public class Example2 {
    
    public static void main(String[] args) throws InterruptedException {
        final DeadLock dl = new DeadLock();
        Thread t1 = new Thread(new Runnable() {
    
            @Override
            public void run() {
                // TODO Auto-generated method stub
                dl.methodA();
            }
        });
   
        Thread t2 = new Thread(new Runnable() {
    
            @Override
            public void run() {
                // TODO Auto-generated method stub
                try {
                    dl.method2();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        t1.setName("First");
        t2.setName("Second");
        t1.start();
        t2.start();
    }
}

class DeadLock {
    
    Object mLock1 = new Object();
    Object mLock2 = new Object();
    

    public void methodA() {
        System.out.println("methodA wait for mLock1  " + Thread.currentThread().getName());
        synchronized (mLock1) {
            System.out.println("methodA mLock1 acquired   " + Thread.currentThread().getName());
            try {
                Thread.sleep(100);
                method2();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    public void method2() throws InterruptedException {
        System.out.println("method2 wait for mLock2  " + Thread.currentThread().getName());
        synchronized (mLock2) {
            System.out.println("method2  mLock2 acquired   " + Thread.currentThread().getName());
            Thread.sleep(100);
            method3();
        }
    }
    public void method3() throws InterruptedException {
        System.out.println("method3  mLock1  "+ Thread.currentThread().getName());
        synchronized (mLock1) {
            System.out.println("method3   mLock1 acquired  " + Thread.currentThread().getName());
        }
    }
}

Sortie de ce programme:

methodA wait for mLock1  First
method2 wait for mLock2  Second
method2  mLock2 acquired   Second
methodA mLock1 acquired   First
method3  mLock1  Second
method2 wait for mLock2  First

Faire une pause d'exécution

Thread.sleep provoque la suspension de l'exécution du thread en cours pour une période spécifiée. C'est un moyen efficace de rendre le temps processeur disponible pour les autres threads d'une application ou d'autres applications pouvant s'exécuter sur un système informatique. Il existe deux méthodes de sleep surchargées dans la classe Thread.

Un qui spécifie le temps de sommeil à la milliseconde

public static void sleep(long millis) throws InterruptedException

Celui qui spécifie le temps de sommeil à la nanoseconde

public static void sleep(long millis, int nanos)

Mettre en pause l'exécution pendant 1 seconde

Thread.sleep(1000);

Il est important de noter que ceci est un indice pour le planificateur du noyau du système d'exploitation. Cela n'est pas forcément précis, et certaines implémentations ne prennent même pas en compte le paramètre nanoseconde (arrondissant éventuellement à la milliseconde près).

Il est recommandé de joindre un appel à Thread.sleep dans try / catch et intercepter InterruptedException .

Visualisation des barrières de lecture / écriture lors de l'utilisation de synchronized / volatile

Comme nous le savons, nous devons utiliser le mot-clé synchronized pour rendre l'exécution d'une méthode ou d'un bloc exclusive. Mais peu d’entre nous ne sont peut-être pas au courant d’un autre aspect important de l’utilisation de mots clés synchronized et volatile : en plus de créer une unité de code atomique, elle fournit également une barrière en lecture / écriture . Quelle est cette barrière de lecture / écriture? Discutons-en en utilisant un exemple:

class Counter {

  private Integer count = 10;

  public synchronized void incrementCount() {
    count++;
  }

  public Integer getCount() {
    return count;
  }
}

Supposons qu'un thread A appelle incrementCount() puis un autre thread B appelle getCount() . Dans ce scénario, il n'y a aucune garantie que B verra la valeur actualisée du count . Il peut toujours voir count comme 10 , même s'il est également possible qu'il ne voit jamais la valeur mise à jour du count jamais.

Pour comprendre ce comportement, nous devons comprendre comment le modèle de mémoire Java s'intègre à l'architecture matérielle. En Java, chaque thread a sa propre pile de threads. Cette pile contient: la pile des appels de méthode et la variable locale créée dans ce thread. Dans un système multi-core, il est fort possible que deux threads s'exécutent simultanément dans des cœurs distincts. Dans un tel scénario, il est possible qu'une partie de la pile d'un thread se trouve dans le registre / cache d'un core. Si dans un thread, on accède à un objet en utilisant le mot-clé synchronized (ou volatile ), après le blocage synchronized , le thread synchronise sa copie locale de cette variable avec la mémoire principale. Cela crée une barrière de lecture / écriture et s'assure que le thread voit la dernière valeur de cet objet.

Mais dans notre cas, puisque le thread B n'a pas utilisé l' accès synchronisé à count , il est peut - être la valeur du référant count stocké dans le registre et ne peut jamais voir les mises à jour de fil A. Pour vous assurer que B voit dernière valeur de comptage que nous devons faire getCount() synchronisé également.

public synchronized Integer getCount() {
  return count;
}

Désormais, lorsque le thread A est terminé avec le count mise à jour, il déverrouille l'instance Counter , crée en même temps une barrière d'écriture et vide toutes les modifications effectuées à l'intérieur de ce bloc dans la mémoire principale. De même, lorsque le thread B acquiert le verrou sur la même instance de Counter , il entre dans la barrière de lecture et lit la valeur de count dans la mémoire principale et voit toutes les mises à jour.

visibilité

Même effet de visibilité pour volatile lectures / écritures volatile . Toutes les variables mises à jour avant d'écrire dans volatile seront vidées dans la mémoire principale et toutes les lectures après lecture d' volatile variable volatile proviendront de la mémoire principale.

Créer une instance java.lang.Thread

Il existe deux approches principales pour créer un thread en Java. En résumé, créer un thread est aussi simple que d’écrire le code qui sera exécuté. Les deux approches diffèrent dans la définition de ce code.

En Java, un thread est représenté par un objet - une instance de java.lang.Thread ou de sa sous-classe. La première approche consiste donc à créer cette sous-classe et à remplacer la méthode run () .

Remarque : j'utiliserai Thread pour faire référence à la classe java.lang.Thread et au thread pour faire référence au concept logique des threads.

class MyThread extends Thread {
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println("Thread running!");
        }
    }
}

Maintenant que nous avons déjà défini le code à exécuter, le thread peut être créé simplement comme:

MyThread t = new MyThread();

La classe Thread contient également un constructeur acceptant une chaîne, qui sera utilisée comme nom du thread. Cela peut être particulièrement utile lors du débogage d'un programme multi-thread.

class MyThread extends Thread {
    public MyThread(String name) {
        super(name);
    }
    
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println("Thread running! ");
        }
    }
}

MyThread t = new MyThread("Greeting Producer");

La deuxième approche consiste à définir le code à l'aide de java.lang.Runnable et de sa seule méthode run () . La classe Thread vous permet alors d'exécuter cette méthode dans un thread séparé. Pour ce faire, créez le thread en utilisant un constructeur acceptant une instance de l'interface Runnable .

Thread t = new Thread(aRunnable);

Cela peut être très puissant lorsqu'il est combiné avec des références lambdas ou méthodes (Java 8 uniquement):

Thread t = new Thread(operator::hardWork);

Vous pouvez également spécifier le nom du thread.

Thread t = new Thread(operator::hardWork, "Pi operator");

Pratiquement parlant, vous pouvez utiliser les deux approches sans soucis. Cependant, la sagesse générale dit d'utiliser ce dernier.


Pour chacun des quatre constructeurs mentionnés, il existe également une alternative acceptant une instance de java.lang.ThreadGroup comme premier paramètre.

ThreadGroup tg = new ThreadGroup("Operators");
Thread t = new Thread(tg, operator::hardWork, "PI operator");

Le ThreadGroup représente un ensemble de threads. Vous ne pouvez ajouter un thread à un ThreadGroup qu'en utilisant le constructeur d'un thread . Le ThreadGroup peut alors être utilisé pour gérer tous ses threads ensemble, de même que le thread peut obtenir des informations à partir de son ThreadGroup .

Donc, pour résumer, le thread peut être créé avec l'un de ces constructeurs publics:

Thread()
Thread(String name)
Thread(Runnable target)
Thread(Runnable target, String name)
Thread(ThreadGroup group, String name)
Thread(ThreadGroup group, Runnable target)
Thread(ThreadGroup group, Runnable target, String name)
Thread(ThreadGroup group, Runnable target, String name, long stackSize)

Le dernier nous permet de définir la taille de pile souhaitée pour le nouveau thread.


Souvent, la lisibilité du code souffre lors de la création et de la configuration de nombreux threads avec les mêmes propriétés ou à partir du même modèle. C'est à ce moment que java.util.concurrent.ThreadFactory peut être utilisé. Cette interface vous permet d'encapsuler la procédure de création du thread via le modèle de fabrique et sa seule méthode newThread (Runnable) .

class WorkerFactory implements ThreadFactory {
    private int id = 0;

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "Worker " + id++);
    }
}

Thread Interruption / Stopping Threads

Chaque thread Java a un indicateur d'interruption, qui est initialement faux. Interrompre un thread, ce n’est rien d’autre que de lui attribuer la valeur true. Le code exécuté sur ce thread peut vérifier le drapeau à l'occasion et agir en conséquence. Le code peut également l'ignorer complètement. Mais pourquoi chaque fil aurait-il un tel drapeau? Après tout, avoir un drapeau booléen sur un fil est quelque chose que nous pouvons simplement organiser nous-mêmes, si et quand nous en avons besoin. Eh bien, il existe des méthodes qui se comportent de manière particulière lorsque le thread sur lequel elles s'exécutent est interrompu. Ces méthodes sont appelées méthodes de blocage. Ce sont des méthodes qui placent le thread dans l'état WAITING ou TIMED_WAITING. Lorsqu'un thread est dans cet état, l'interrompre provoquera une exception InterruptedException sur le thread interrompu, au lieu que l'indicateur d'interruption soit défini sur true et que le thread redevienne RUNNABLE. Le code qui appelle une méthode de blocage est obligé de gérer l'exception Interrupted, car il s'agit d'une exception vérifiée. Donc, et par conséquent son nom, une interruption peut avoir pour effet d'interrompre un WAIT, le terminant effectivement. Notez que toutes les méthodes en attente (par exemple, le blocage des E / S) ne réagissent pas de cette manière aux interruptions, car elles ne mettent pas le thread en attente. Enfin, un thread dont l’indicateur d’interruption est défini, qui entre dans une méthode de blocage (c’est-à-dire qui tente d’entrer dans un état en attente), lancera immédiatement une exception InterruptedException et l’indicateur d’interruption sera effacé.

Outre ces mécanismes, Java n’attribue aucune signification sémantique particulière à l’interruption. Le code est libre d'interpréter une interruption comme bon lui semble. Mais le plus souvent, l'interruption est utilisée pour signaler à un thread qu'il doit cesser de fonctionner dès que possible. Mais, comme il ressort clairement de ce qui précède, il appartient au code de ce thread de réagir de manière appropriée à cette interruption afin de ne plus fonctionner. Arrêter un thread est une collaboration. Lorsqu'un thread est interrompu, son code en cours peut avoir plusieurs niveaux dans la trace de la pile. La plupart du code n'appelle pas de méthode de blocage et se termine suffisamment rapidement pour ne pas retarder indûment l'arrêt du thread. Le code qui devrait principalement concerner la réactivité aux interruptions, est le code qui est en boucle et gère les tâches jusqu’à ce qu’il n’y en ait plus, ou jusqu’à ce qu’un indicateur soit défini pour l’interrompre. Les boucles qui traitent des tâches éventuellement infinies (c'est-à-dire qu'elles continuent à fonctionner en principe) devraient vérifier l'indicateur d'interruption afin de quitter la boucle. Pour les boucles finies, la sémantique peut exiger que toutes les tâches soient terminées avant de se terminer, ou il peut être approprié de laisser certaines tâches non gérées. Le code qui appelle les méthodes de blocage sera obligé de gérer l'exception Interrupted. S'il est sémantiquement possible, il peut simplement propager l'InterruptedException et déclarer le lancer. En tant que tel, il devient une méthode de blocage en ce qui concerne ses appelants. S'il ne peut pas propager l'exception, il doit au minimum définir l'indicateur interrompu, afin que les appelants plus haut dans la pile sachent également que le thread a été interrompu. Dans certains cas, la méthode doit continuer d'attendre quelle que soit l'interruption, auquel cas elle doit retarder la définition de l'indicateur interrompu jusqu'à la fin de l'attente, cela peut impliquer la définition d'une variable locale à vérifier avant de quitter la méthode. puis interrompre son fil.

Exemples :

Exemple de code qui arrête la gestion des tâches lors de l'interruption

class TaskHandler implements Runnable {
    
    private final BlockingQueue<Task> queue;

    TaskHandler(BlockingQueue<Task> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) { // check for interrupt flag, exit loop when interrupted
            try {
                Task task = queue.take(); // blocking call, responsive to interruption
                handle(task);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // cannot throw InterruptedException (due to Runnable interface restriction) so indicating interruption by setting the flag
            }
        }
    }
    
    private void handle(Task task) {
        // actual handling
    }
}

Exemple de code qui retarde la définition de l'indicateur d'interruption jusqu'à sa complète exécution:

class MustFinishHandler implements Runnable {

    private final BlockingQueue<Task> queue;

    MustFinishHandler(BlockingQueue<Task> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        boolean shouldInterrupt = false;
        
        while (true) {
            try {
                Task task = queue.take();
                if (task.isEndOfTasks()) {
                    if (shouldInterrupt) {
                        Thread.currentThread().interrupt();
                    }
                    return;
                }
                handle(task);
            } catch (InterruptedException e) {
                shouldInterrupt = true; // must finish, remember to set interrupt flag when we're done
            }
        }
    }

    private void handle(Task task) {
        // actual handling
    }
}

Exemple de code qui a une liste de tâches fixe mais qui peut quitter tôt lorsqu'il est interrompu

class GetAsFarAsPossible implements Runnable {

    private final List<Task> tasks = new ArrayList<>();

    @Override
    public void run() {
        for (Task task : tasks) {
            if (Thread.currentThread().isInterrupted()) {
                return;
            }
            handle(task);
        }
    }

    private void handle(Task task) {
        // actual handling
    }
}

Exemple de producteur / consommateur multiple avec file d'attente globale partagée

Le code ci-dessous présente plusieurs programmes Producteur / Consommateur. Les threads Producteur et Consommateur partagent la même file d'attente globale.

import java.util.concurrent.*;
import java.util.Random;

public class ProducerConsumerWithES {
    public static void main(String args[]) {
        BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
         
        ExecutorService pes = Executors.newFixedThreadPool(2);
        ExecutorService ces = Executors.newFixedThreadPool(2);
          
        pes.submit(new Producer(sharedQueue, 1));
        pes.submit(new Producer(sharedQueue, 2));
        ces.submit(new Consumer(sharedQueue, 1));
        ces.submit(new Consumer(sharedQueue, 2));
         
        pes.shutdown();
        ces.shutdown();
    }
}

/* Different producers produces a stream of integers continuously to a shared queue, 
which is shared between all Producers and consumers */

class Producer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    private Random random = new Random();
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        // Producer produces a continuous stream of numbers for every 200 milli seconds
        while (true) {
            try {
                int number = random.nextInt(1000);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
                Thread.sleep(200);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}
/* Different consumers consume data from shared queue, which is shared by both producer and consumer threads */
class Consumer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }
    @Override
    public void run() {
        // Consumer consumes numbers generated from Producer threads continuously
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

sortie:

Produced:69:by thread:2
Produced:553:by thread:1
Consumed: 69:by thread:1
Consumed: 553:by thread:2
Produced:41:by thread:2
Produced:796:by thread:1
Consumed: 41:by thread:1
Consumed: 796:by thread:2
Produced:728:by thread:2
Consumed: 728:by thread:1

etc ................

Explication:

  1. sharedQueue , qui est un LinkedBlockingQueue est partagé entre tous les threads Producer et Consumer.
  2. Les threads de producteurs produisent un entier pour 200 millisecondes en continu et l'ajoutent à sharedQueue
  3. Consumer thread Consumer consomme un nombre entier de sharedQueue continu.
  4. Ce programme est implémenté sans constructions explicites synchronized ou Lock . BlockingQueue est la clé pour y parvenir.

Les implémentations BlockingQueue sont conçues pour être utilisées principalement pour les files d'attente producteur-consommateur.

Les implémentations BlockingQueue sont sécurisées pour les threads. Toutes les méthodes de mise en file d'attente atteignent leurs effets de manière atomique en utilisant des verrous internes ou d'autres formes de contrôle de concurrence.

Écriture exclusive / accès en lecture simultanée

Un processus doit parfois écrire et lire simultanément les mêmes "données".

L'interface ReadWriteLock et son implémentation ReentrantReadWriteLock permettent un modèle d'accès pouvant être décrit comme suit:

  1. Il peut y avoir un nombre quelconque de lecteurs simultanés des données. Si au moins un accès au lecteur est autorisé, aucun accès au graveur n'est possible.
  2. Il peut y avoir au plus un seul auteur pour les données. Si un accès en écriture est accordé, aucun lecteur ne peut accéder aux données.

Une implémentation pourrait ressembler à:

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Sample {

// Our lock. The constructor allows a "fairness" setting, which guarantees the chronology of lock attributions.
protected static final ReadWriteLock RW_LOCK = new ReentrantReadWriteLock();

// This is a typical data that needs to be protected for concurrent access
protected static int data = 0;

/** This will write to the data, in an exclusive access */
public static void writeToData() {
    RW_LOCK.writeLock().lock();
    try {
        data++;
    } finally {
        RW_LOCK.writeLock().unlock();
    }
}

public static int readData() {
    RW_LOCK.readLock().lock();
    try {
        return data;
    } finally {
        RW_LOCK.readLock().unlock();
    }
}

}

NOTE 1 : Ce cas d'utilisation précis a une solution plus propre utilisant AtomicInteger , mais ce qui est décrit ici est un modèle d'accès, qui fonctionne indépendamment du fait que les données ici sont un entier atomique.

NOTE 2 : Le verrouillage de la partie lecture est vraiment nécessaire, même si cela peut ne pas paraître au lecteur occasionnel. En effet, si vous ne verrouillez pas le lecteur, un certain nombre de problèmes peuvent survenir, parmi lesquels:

  1. Les écritures de valeurs primitives ne sont pas forcément atomiques sur toutes les JVM, de sorte que le lecteur pourrait voir, par exemple, que seuls 32 bits sur 64 bits écrivent si les data étaient de type 64 bits
  2. La visibilité de l'écriture à partir d'un thread qui ne l'a pas effectuée est garantie par la JVM uniquement si nous établissons une relation Happen Before entre les écritures et les lectures. Cette relation est établie lorsque les lecteurs et les écrivains utilisent leurs verrous respectifs, mais pas autrement
Java SE 8

StampedLock des performances plus élevées sont requises, sous certains types d'utilisation, il existe un type de verrouillage plus rapide, appelé StampedLock , qui, entre autres, implémente un mode de verrouillage optimiste. Ce verrou fonctionne très différemment du ReadWriteLock , et cet exemple n'est pas transposable.

Objet Runnable

L'interface Runnable définit une méthode unique, run() , destinée à contenir le code exécuté dans le thread.

L'objet Runnable est transmis au constructeur Thread . Et la méthode start() de Thread est appelée.

Exemple

public class HelloRunnable implements Runnable {

    @Override
    public void run() {
        System.out.println("Hello from a thread");
    }

    public static void main(String[] args) {
        new Thread(new HelloRunnable()).start();
    }
}

Exemple dans Java8:

public static void main(String[] args) {
    Runnable r = () -> System.out.println("Hello world");
    new Thread(r).start();
}

Sous-classe Runnable vs Thread

Un emploi d'objet Runnable est plus général, car l'objet Runnable peut sous-classer une classe autre que Thread .

Thread sous- Thread est plus facile à utiliser dans des applications simples, mais il est limité par le fait que votre classe de tâches doit être un descendant de Thread .

Un objet Runnable est applicable aux API de gestion des threads de haut niveau.

Sémaphore

Un sémaphore est un synchroniseur de haut niveau qui conserve un ensemble d' autorisations pouvant être acquises et libérées par les threads. Un sémaphore peut être imaginé comme un compteur de permis qui sera décrémenté lorsqu'un fil acquiert et incrémenté lors de la sortie d'un thread. Si le nombre de permis est égal à 0 lorsqu'un thread tente d'acquérir, le thread se bloque jusqu'à ce qu'un permis soit disponible (ou jusqu'à ce que le thread soit interrompu).

Un sémaphore est initialisé comme:

Semaphore semaphore = new Semaphore(1); // The int value being the number of permits

Le constructeur Semaphore accepte un paramètre booléen supplémentaire pour l'équité. Lorsqu'elle est définie sur false, cette classe ne garantit pas l'ordre dans lequel les threads acquièrent l'autorisation. Lorsque l'équité est définie sur true, le sémaphore garantit que les threads appelant l'une des méthodes d'acquisition sont sélectionnés pour obtenir les autorisations dans l'ordre dans lequel leur appel de ces méthodes a été traité. Il est déclaré de la manière suivante:

Semaphore semaphore = new Semaphore(1, true);

Examinons maintenant un exemple de javadocs, où Semaphore est utilisé pour contrôler l'accès à un pool d'éléments. Un sémaphore est utilisé dans cet exemple pour fournir une fonctionnalité de blocage afin de s’assurer qu’il ya toujours des éléments à obtenir lorsque getItem() est appelé.

class Pool {
    /*
     * Note that this DOES NOT bound the amount that may be released!
     * This is only a starting value for the Semaphore and has no other
     * significant meaning UNLESS you enforce this inside of the
     * getNextAvailableItem() and markAsUnused() methods
     */
    private static final int MAX_AVAILABLE = 100;
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

    /**
     * Obtains the next available item and reduces the permit count by 1. 
     * If there are no items available, block.
     */
    public Object getItem() throws InterruptedException {
        available.acquire();
        return getNextAvailableItem();
    }

    /**
     * Puts the item into the pool and add 1 permit.
     */
    public void putItem(Object x) {
        if (markAsUnused(x))
            available.release();
    }

    private Object getNextAvailableItem() {
        // Implementation
    }

    private boolean markAsUnused(Object o) {
        // Implementation
    }
}

Ajouter deux tableaux `int` à l'aide d'un Threadpool

Un Threadpool a une file d'attente de tâches, dont chacune sera exécutée sur l'un de ces threads.

L'exemple suivant montre comment ajouter deux tableaux int aide d'un Threadpool.

Java SE 8
int[] firstArray = { 2, 4, 6, 8 };
int[] secondArray = { 1, 3, 5, 7 };
int[] result = { 0, 0, 0, 0 };

ExecutorService pool = Executors.newCachedThreadPool(); 

// Setup the ThreadPool:
// for each element in the array, submit a worker to the pool that adds elements
for (int i = 0; i < result.length; i++) {
    final int worker = i;
    pool.submit(() -> result[worker] = firstArray[worker] + secondArray[worker] );
}

// Wait for all Workers to finish:
try {
    // execute all submitted tasks
    pool.shutdown();
    // waits until all workers finish, or the timeout ends
    pool.awaitTermination(12, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
    pool.shutdownNow(); //kill thread
}

System.out.println(Arrays.toString(result));

Remarques:

  1. Cet exemple est purement illustratif. En pratique, il n'y aura pas d'accélération en utilisant des threads pour une tâche aussi petite. Un ralentissement est probable, car les frais généraux liés à la création et à la planification des tâches saturent le temps nécessaire à l'exécution d'une tâche.

  2. Si vous utilisiez Java 7 et versions antérieures, vous utiliseriez des classes anonymes au lieu de lambdas pour implémenter les tâches.

Obtenir l'état de tous les threads démarrés par votre programme, à l'exception des threads système

Extrait de code:

import java.util.Set;

public class ThreadStatus {
    public static void main(String args[]) throws Exception {
        for (int i = 0; i < 5; i++){
            Thread t = new Thread(new MyThread());
            t.setName("MyThread:" + i);
            t.start();
        }
        int threadCount = 0;
        Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
        for (Thread t : threadSet) {
            if (t.getThreadGroup() == Thread.currentThread().getThreadGroup()) {
                System.out.println("Thread :" + t + ":" + "state:" + t.getState());
                ++threadCount;
            }
        }
        System.out.println("Thread count started by Main thread:" + threadCount);
    }
}

class MyThread implements Runnable {
    public void run() {
        try {
            Thread.sleep(2000);
        } catch(Exception err) {
            err.printStackTrace();
        }
    }
}

Sortie:

Thread :Thread[MyThread:1,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:3,5,main]:state:TIMED_WAITING
Thread :Thread[main,5,main]:state:RUNNABLE
Thread :Thread[MyThread:4,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:0,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:2,5,main]:state:TIMED_WAITING
Thread count started by Main thread:6

Explication:

Thread.getAllStackTraces().keySet() renvoie tous les Thread y compris les threads d'application et les threads système. Si vous êtes uniquement intéressé par le statut des threads, démarré par votre application, effectuez une itération du jeu de Thread en vérifiant le groupe de threads d'un thread particulier par rapport au thread de votre programme principal.

En l'absence de la condition ThreadGroup ci-dessus, le programme renvoie le statut des threads système ci-dessous:

Reference Handler
Signal Dispatcher
Attach Listener
Finalizer

Callable et Future

Bien que Runnable fournisse un moyen d’emballer le code pour qu’il soit exécuté dans un thread différent, il présente une limitation en ce sens qu’il ne peut pas renvoyer un résultat de l’exécution. La seule façon d'obtenir une valeur de retour à partir de l'exécution d'un Runnable est d'affecter le résultat à une variable accessible dans une étendue en dehors de Runnable .

Callable été introduit dans Java 5 en tant que pair à Runnable . Callable est essentiellement le même sauf qu'il a une méthode d' call au lieu de run . La méthode call a la capacité supplémentaire de renvoyer un résultat et est également autorisée à lancer des exceptions vérifiées.

Le résultat d'une soumission de tâche Callable est disponible pour être exploité via un avenir

Future peut être considéré comme un conteneur contenant le résultat du calcul Callable . Le calcul du callable peut se poursuivre dans un autre thread, et toute tentative de toucher le résultat d'un Future bloquera et ne renverra le résultat qu'une fois disponible.

Interface appelable

public interface Callable<V> {
    V call() throws Exception;
}

Avenir

interface Future<V> {
    V get();
    V get(long timeout, TimeUnit unit);
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
}

En utilisant l'exemple Callable et Future:

public static void main(String[] args) throws Exception {
    ExecutorService es = Executors.newSingleThreadExecutor();
          
    System.out.println("Time At Task Submission : " + new Date());
    Future<String> result = es.submit(new ComplexCalculator());
    // the call to Future.get() blocks until the result is available.So we are in for about a 10 sec wait now
    System.out.println("Result of Complex Calculation is : " + result.get());
    System.out.println("Time At the Point of Printing the Result : " + new Date());
}

Notre callable qui fait un long calcul

public class ComplexCalculator implements Callable<String> {

    @Override
    public String call() throws Exception {
        // just sleep for 10 secs to simulate a lengthy computation
        Thread.sleep(10000);            
        System.out.println("Result after a lengthy 10sec calculation");
        return "Complex Result"; // the result 
    }
}

Sortie

Time At Task Submission : Thu Aug 04 15:05:15 EDT 2016
Result after a lengthy 10sec calculation
Result of Complex Calculation is : Complex Result
Time At the Point of Printing the Result : Thu Aug 04 15:05:25 EDT 2016

Autres opérations autorisées sur Future

Alors que get() est la méthode pour extraire le résultat réel, Future a provision

  • get(long timeout, TimeUnit unit) définit la durée maximale pendant laquelle le thread en cours attendra un résultat;
  • Pour annuler l'appel de tâche, cancel(mayInterruptIfRunning) . L'indicateur mayInterrupt indique que la tâche doit être interrompue si elle a été démarrée et s'exécute maintenant.
  • Pour vérifier si la tâche est terminée / terminée en appelant isDone() ;
  • Pour vérifier si la longue tâche a été annulée isCancelled() .

Serrures comme aides à la synchronisation

Avant l'introduction du package simultané de Java 5, le thread était de niveau inférieur. L'introduction de ce package fournissait plusieurs aides / constructions de programmation concurrentes de niveau supérieur.

Les verrous sont des mécanismes de synchronisation de threads qui ont essentiellement le même objectif que les blocs synchronisés ou les mots-clés.

Verrouillage intrinsèque

int count = 0; // shared among multiple threads

public void doSomething() {
    synchronized(this) {
        ++count; // a non-atomic operation
    }
}

Synchronisation à l'aide de verrous

int count = 0; // shared among multiple threads

Lock lockObj = new ReentrantLock();
public void doSomething() {
    try {
        lockObj.lock();
        ++count; // a non-atomic operation
    } finally {    
        lockObj.unlock(); // sure to release the lock without fail
    }
}

Les verrous ont également des fonctionnalités disponibles que le verrouillage intrinsèque n'offre pas, telles que le verrouillage, mais en restant réactif aux interruptions, ou en essayant de se verrouiller, et non de bloquer le cas échéant.

Verrouillage, sensible aux interruptions

class Locky {
    int count = 0; // shared among multiple threads

    Lock lockObj = new ReentrantLock();

    public void doSomething() {
        try {
            try {
                lockObj.lockInterruptibly();
                ++count; // a non-atomic operation
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // stopping
            }
        } finally {
            if (!Thread.currentThread().isInterrupted()) {
                lockObj.unlock(); // sure to release the lock without fail
            }
        }
    }
}

Ne fais quelque chose que lorsque tu es capable de verrouiller

public class Locky2 {
    int count = 0; // shared among multiple threads

    Lock lockObj = new ReentrantLock();

    public void doSomething() {
        boolean locked = lockObj.tryLock(); // returns true upon successful lock
        if (locked) {
            try {
                ++count; // a non-atomic operation
            } finally {
                lockObj.unlock(); // sure to release the lock without fail
            }
        }
    }
}

Il existe plusieurs variantes de verrouillage disponibles .



Modified text is an extract of the original Stack Overflow Documentation
Sous licence CC BY-SA 3.0
Non affilié à Stack Overflow