Buscar..


Introducción

La computación concurrente es una forma de computación en la cual varios cálculos se ejecutan simultáneamente en lugar de secuencialmente. El lenguaje Java está diseñado para admitir la programación concurrente mediante el uso de subprocesos. Los objetos y recursos pueden ser accedidos por múltiples hilos; cada subproceso puede potencialmente acceder a cualquier objeto en el programa y el programador debe garantizar que los accesos de lectura y escritura a los objetos estén correctamente sincronizados entre los subprocesos.

Observaciones

Tema (s) relacionado (s) en StackOverflow:

Multihilo básico

Si tiene muchas tareas que ejecutar y todas estas tareas no dependen del resultado de las anteriores, puede usar Multithreading para su computadora para realizar todas estas tareas al mismo tiempo con más procesadores si su computadora puede. Esto puede hacer que la ejecución de su programa sea más rápida si tiene algunas grandes tareas independientes.

class CountAndPrint implements Runnable {

    private final String name;

    CountAndPrint(String name) {
        this.name = name;
    }

    /** This is what a CountAndPrint will do */
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            System.out.println(this.name + ": " + i);
        }
    }

    public static void main(String[] args) {
        // Launching 4 parallel threads
        for (int i = 1; i <= 4; i++) {
            // `start` method will call the `run` method 
            // of CountAndPrint in another thread
            new Thread(new CountAndPrint("Instance " + i)).start();
        }

        // Doing some others tasks in the main Thread
        for (int i = 0; i < 10000; i++) {
            System.out.println("Main: " + i);
        }
    }
}

El código del método de ejecución de las distintas instancias de CountAndPrint se ejecutará en un orden no predecible. Un fragmento de una ejecución de muestra podría verse así:

Instance 4: 1
Instance 2: 1
Instance 4: 2
Instance 1: 1
Instance 1: 2
Main: 1
Instance 4: 3
Main: 2
Instance 3: 1
Instance 4: 4
...

Productor-consumidor

Un ejemplo simple de solución de problema productor-consumidor. Tenga en cuenta que las clases JDK ( AtomicBoolean y BlockingQueue ) se utilizan para la sincronización, lo que reduce la posibilidad de crear una solución no válida. Consulte Javadoc para varios tipos de BlockingQueue ; elegir una implementación diferente puede cambiar drásticamente el comportamiento de este ejemplo (como DelayQueue o Priority Queue ).

public class Producer implements Runnable {

    private final BlockingQueue<ProducedData> queue;

    public Producer(BlockingQueue<ProducedData> queue) {
        this.queue = queue;
    }

    public void run() {
        int producedCount = 0;
        try {
            while (true) {
                producedCount++;
                //put throws an InterruptedException when the thread is interrupted
                queue.put(new ProducedData());
            }
        } catch (InterruptedException e) {
            // the thread has been interrupted: cleanup and exit
            producedCount--;
            //re-interrupt the thread in case the interrupt flag is needeed higher up
            Thread.currentThread().interrupt();
        }
        System.out.println("Produced " + producedCount + " objects");
    }
}

public class Consumer implements Runnable {

    private final BlockingQueue<ProducedData> queue;

    public Consumer(BlockingQueue<ProducedData> queue) {
        this.queue = queue;
    }

    public void run() {
        int consumedCount = 0;
        try {
            while (true) {
                //put throws an InterruptedException when the thread is interrupted
                ProducedData data = queue.poll(10, TimeUnit.MILLISECONDS);
                // process data
                consumedCount++;
            }
        } catch (InterruptedException e) {
            // the thread has been interrupted: cleanup and exit
            consumedCount--;
            //re-interrupt the thread in case the interrupt flag is needeed higher up
            Thread.currentThread().interrupt();
        }
        System.out.println("Consumed " + consumedCount + " objects");
    }
}


public class ProducerConsumerExample {
    static class ProducedData {    
        // empty data object
    }

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<ProducedData> queue = new ArrayBlockingQueue<ProducedData>(1000);
        // choice of queue determines the actual behavior: see various BlockingQueue implementations

        Thread producer = new Thread(new Producer(queue));
        Thread consumer = new Thread(new Consumer(queue));

        producer.start();
        consumer.start();

        Thread.sleep(1000);
        producer.interrupt();
        Thread.sleep(10);
        consumer.interrupt();
    }
}

Usando ThreadLocal

Una herramienta útil en Java Concurrency es ThreadLocal : esto le permite tener una variable que será única para un hilo determinado. Por lo tanto, si el mismo código se ejecuta en subprocesos diferentes, estas ejecuciones no compartirán el valor, sino que cada subproceso tiene su propia variable que es local al subproceso .

Por ejemplo, esto se usa con frecuencia para establecer el contexto (como la información de autorización) del manejo de una solicitud en un servlet. Podrías hacer algo como esto:

private static final ThreadLocal<MyUserContext> contexts = new ThreadLocal<>();

public static MyUserContext getContext() {
    return contexts.get(); // get returns the variable unique to this thread
}

public void doGet(...) {
    MyUserContext context = magicGetContextFromRequest(request); 
    contexts.put(context); // save that context to our thread-local - other threads
                           // making this call don't overwrite ours
    try {
        // business logic
    } finally {
        contexts.remove(); // 'ensure' removal of thread-local variable
    }
}

Ahora, en lugar de pasar MyUserContext a cada método, puede usar MyServlet.getContext() donde lo necesite. Ahora, por supuesto, esto introduce una variable que necesita ser documentada, pero es segura para subprocesos, lo que elimina muchas de las desventajas de usar una variable de alto alcance.

La ventaja clave aquí es que cada hilo tiene su propia variable local de hilo en ese contenedor de contexts . Siempre que lo use desde un punto de entrada definido (como exigir que cada servlet mantenga su contexto, o quizás agregando un filtro de servlet) puede confiar en que este contexto estará allí cuando lo necesite.

CountDownLatch

CountDownLatch

Una ayuda de sincronización que permite que uno o más subprocesos esperen hasta que se complete un conjunto de operaciones que se están realizando en otros subprocesos.

  1. Un CountDownLatch se inicializa con un recuento dado.
  2. El bloque de métodos de espera hasta que el conteo actual llegue a cero debido a invocaciones del método countDown() , después de lo cual se liberan todos los subprocesos en espera y cualquier invocación posterior de espera regresa de inmediato.
  3. Este es un fenómeno de un solo disparo: el conteo no se puede restablecer. Si necesita una versión que restablezca la cuenta, considere usar un CyclicBarrier .

Métodos clave:

public void await() throws InterruptedException

Provoca que el subproceso actual espere hasta que el pestillo haya descendido hasta cero, a menos que se interrumpa el subproceso.

public void countDown()

Reduce el conteo del pestillo, liberando todos los hilos en espera si el conteo llega a cero.

Ejemplo:

import java.util.concurrent.*;

class DoSomethingInAThread implements Runnable {
    CountDownLatch latch;
    public DoSomethingInAThread(CountDownLatch latch) {
        this.latch = latch;
    } 
    public void run() {
        try {
            System.out.println("Do some thing");
            latch.countDown();
        } catch(Exception err) {
            err.printStackTrace();
        }
    }
}

public class CountDownLatchDemo {
    public static void main(String[] args) {
        try {
            int numberOfThreads = 5;
            if (args.length < 1) {
                System.out.println("Usage: java CountDownLatchDemo numberOfThreads");
                return;
            }
            try {
                numberOfThreads = Integer.parseInt(args[0]);
            } catch(NumberFormatException ne) {
            
            }
            CountDownLatch latch = new CountDownLatch(numberOfThreads);
            for (int n = 0; n < numberOfThreads; n++) {
                Thread t = new Thread(new DoSomethingInAThread(latch));
                t.start();
            }
            latch.await();
            System.out.println("In Main thread after completion of " + numberOfThreads + " threads");
        } catch(Exception err) {
            err.printStackTrace();
        }
    }
}

salida:

java CountDownLatchDemo 5
Do some thing
Do some thing
Do some thing
Do some thing
Do some thing
In Main thread after completion of 5 threads

Explicación:

  1. CountDownLatch se inicializa con un contador de 5 en el subproceso principal
  2. El hilo principal está esperando usando el método await() .
  3. Se han creado cinco instancias de DoSomethingInAThread . Cada instancia decrementó el contador con el método countDown() .
  4. Una vez que el contador se vuelve cero, el hilo principal se reanudará

Sincronización

En Java, hay un mecanismo integrado de bloqueo de nivel de idioma: el bloque synchronized , que puede usar cualquier objeto Java como un bloqueo intrínseco (es decir, cada objeto Java puede tener un monitor asociado).

Los bloqueos intrínsecos proporcionan atomicidad a grupos de afirmaciones. Para entender lo que eso significa para nosotros, echemos un vistazo a un ejemplo donde la synchronized es útil:

private static int t = 0;
private static Object mutex = new Object();

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
    for (int i = 0; i < 100; i++) {
        executorService.execute(() -> {
            synchronized (mutex) {
                t++;
                System.out.println(MessageFormat.format("t: {0}", t));
            }
        });
    }
    executorService.shutdown();
}

En este caso, si no fuera por el bloque synchronized , habría habido múltiples problemas de concurrencia involucrados. El primero sería con el operador de incremento posterior (no es atómico en sí mismo), y el segundo sería que observaríamos el valor de t después de que una cantidad arbitraria de otros subprocesos haya tenido la oportunidad de modificarlo. Sin embargo, desde que adquirimos un bloqueo intrínseco, no habrá condiciones de carrera aquí y la salida contendrá números del 1 al 100 en su orden normal.

Los bloqueos intrínsecos en Java son exclusiones mutuas (es decir, bloqueos de ejecución mutua). La ejecución mutua significa que si un subproceso ha adquirido el bloqueo, el segundo se verá obligado a esperar a que el primero lo libere antes de que pueda adquirir el bloqueo por sí mismo. Nota: Una operación que puede poner el hilo en el estado de espera (reposo) se denomina operación de bloqueo . Por lo tanto, la adquisición de un bloqueo es una operación de bloqueo.

Los bloqueos intrínsecos en Java son reentrantes . Esto significa que si un hilo intenta adquirir un bloqueo que ya posee, no lo bloqueará y lo logrará con éxito. Por ejemplo, el siguiente código no se bloqueará cuando se le llame:

public void bar(){
    synchronized(this){
        ...
    }
}
public void foo(){
    synchronized(this){
        bar();
    }
}

Además de synchronized bloques synchronized , también hay métodos synchronized .

Los siguientes bloques de código son prácticamente equivalentes (aunque el código de bytes parece ser diferente):

  1. Bloque synchronized en this :

    public void foo() {
        synchronized(this) {
            doStuff();
        }
    }
    
  2. método synchronized :

     public synchronized void foo() {
         doStuff();
     }
    

Igualmente para static métodos static , esto:

class MyClass {
    ...
    public static void bar() {
        synchronized(MyClass.class) {
            doSomeOtherStuff();
        }
    }
}

tiene el mismo efecto que este:

class MyClass {
    ...
    public static synchronized void bar() {
        doSomeOtherStuff();
    }
}

Operaciones atómicas

Una operación atómica es una operación que se ejecuta "todo a la vez", sin ninguna posibilidad de que otros hilos observen o modifiquen el estado durante la ejecución de la operación atómica.

Consideremos un mal ejemplo .

private static int t = 0;

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
    for (int i = 0; i < 100; i++) {
        executorService.execute(() -> {
            t++;
            System.out.println(MessageFormat.format("t: {0}", t));
        });
    }
    executorService.shutdown();
}

En este caso, hay dos cuestiones. El primer problema es que el operador de incremento posterior no es atómico. Se compone de varias operaciones: obtener el valor, agregar 1 al valor, establecer el valor. Es por eso que si ejecutamos el ejemplo, es probable que no veamos t: 100 en la salida; dos subprocesos pueden obtener el valor al mismo tiempo, incrementarlo y establecerlo: digamos que el valor de t es 10, y dos subprocesos están aumentando t. Ambos subprocesos establecerán el valor de t en 11, ya que el segundo subproceso observa el valor de t antes de que el primer subproceso haya terminado de incrementarlo.

El segundo tema es sobre cómo estamos observando t. Cuando estamos imprimiendo el valor de t, es posible que el valor ya haya sido cambiado por un hilo diferente después de la operación de incremento de este hilo.

Para solucionar esos problemas, usaremos el java.util.concurrent.atomic.AtomicInteger , que tiene muchas operaciones atómicas para que utilicemos.

private static AtomicInteger t = new AtomicInteger(0);

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
    for (int i = 0; i < 100; i++) {
        executorService.execute(() -> {
            int currentT = t.incrementAndGet();
            System.out.println(MessageFormat.format("t: {0}", currentT));
        });
    }
    executorService.shutdown();
}

El método incrementAndGet de AtomicInteger incrementa y devuelve el nuevo valor, eliminando así la condición de carrera anterior. Tenga en cuenta que, en este ejemplo, las líneas aún estarán fuera de orden porque no hacemos ningún esfuerzo por secuenciar las llamadas de println y que esto queda fuera del alcance de este ejemplo, ya que requeriría sincronización y el objetivo de este ejemplo es mostrar cómo usar AtomicInteger para eliminar las condiciones de la raza concernientes al estado.

Creación de un sistema básico de punto muerto.

Se produce un punto muerto cuando dos acciones en competencia esperan a que la otra termine, y por lo tanto ninguna de las dos lo hace. En java hay un bloqueo asociado a cada objeto. Para evitar la modificación simultánea realizada por varios subprocesos en un solo objeto, podemos usar palabras clave synchronized , pero todo tiene un costo. El uso incorrecto de una palabra clave synchronized puede llevar a sistemas atascados llamados sistemas bloqueados.

Considere que hay 2 subprocesos trabajando en 1 instancia, permite llamar a los subprocesos como primero y segundo, y digamos que tenemos 2 recursos R1 y R2. First adquiere R1 y también necesita R2 para completarse, mientras que Second adquiere R2 y necesita R1 para completarse.

digamos en el tiempo t = 0,

Primero tiene R1 y Segundo tiene R2. ahora First está esperando a R2 mientras que Second está esperando a R1. esta espera es indefinida y esto lleva a un punto muerto.

public class Example2 {
    
    public static void main(String[] args) throws InterruptedException {
        final DeadLock dl = new DeadLock();
        Thread t1 = new Thread(new Runnable() {
    
            @Override
            public void run() {
                // TODO Auto-generated method stub
                dl.methodA();
            }
        });
   
        Thread t2 = new Thread(new Runnable() {
    
            @Override
            public void run() {
                // TODO Auto-generated method stub
                try {
                    dl.method2();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        t1.setName("First");
        t2.setName("Second");
        t1.start();
        t2.start();
    }
}

class DeadLock {
    
    Object mLock1 = new Object();
    Object mLock2 = new Object();
    

    public void methodA() {
        System.out.println("methodA wait for mLock1  " + Thread.currentThread().getName());
        synchronized (mLock1) {
            System.out.println("methodA mLock1 acquired   " + Thread.currentThread().getName());
            try {
                Thread.sleep(100);
                method2();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    public void method2() throws InterruptedException {
        System.out.println("method2 wait for mLock2  " + Thread.currentThread().getName());
        synchronized (mLock2) {
            System.out.println("method2  mLock2 acquired   " + Thread.currentThread().getName());
            Thread.sleep(100);
            method3();
        }
    }
    public void method3() throws InterruptedException {
        System.out.println("method3  mLock1  "+ Thread.currentThread().getName());
        synchronized (mLock1) {
            System.out.println("method3   mLock1 acquired  " + Thread.currentThread().getName());
        }
    }
}

Salida de este programa:

methodA wait for mLock1  First
method2 wait for mLock2  Second
method2  mLock2 acquired   Second
methodA mLock1 acquired   First
method3  mLock1  Second
method2 wait for mLock2  First

Pausa de ejecucion

Thread.sleep hace que el subproceso actual suspenda la ejecución durante un período específico. Este es un medio eficaz para hacer que el tiempo del procesador esté disponible para los otros subprocesos de una aplicación u otras aplicaciones que puedan estar ejecutándose en un sistema informático. Hay dos métodos de sleep sobrecargados en la clase Thread.

Uno que especifica el tiempo de reposo al milisegundo.

public static void sleep(long millis) throws InterruptedException

Una que especifica el tiempo de reposo al nanosegundo.

public static void sleep(long millis, int nanos)

Pausando Ejecución por 1 segundo

Thread.sleep(1000);

Es importante tener en cuenta que esto es una sugerencia para el programador del kernel del sistema operativo. Esto puede no ser necesariamente preciso, y algunas implementaciones ni siquiera consideran el parámetro de nanosegundo (posiblemente redondeando al milisegundo más cercano).

Se recomienda incluir una llamada a Thread.sleep en try / catch and catch InterruptedException .

Visualizar barreras de lectura / escritura mientras se usa sincronizado / volátil

Como sabemos, debemos usar palabras clave synchronized para hacer que la ejecución de un método o bloque sea exclusiva. Pero es posible que pocos de nosotros no estemos conscientes de un aspecto más importante del uso de palabras clave synchronized y volatile : además de hacer una unidad de código atómico, también proporciona una barrera de lectura / escritura . ¿Qué es esta barrera de lectura / escritura? Vamos a discutir esto usando un ejemplo:

class Counter {

  private Integer count = 10;

  public synchronized void incrementCount() {
    count++;
  }

  public Integer getCount() {
    return count;
  }
}

Supongamos que un subproceso A llama a incrementCount() primero y luego otro subproceso B llama a getCount() . En este escenario, no hay garantía de que B vea el valor actualizado del count . Todavía puede ver el count como 10 , incluso es posible que nunca vea el valor actualizado del count nunca.

Para comprender este comportamiento, debemos comprender cómo se integra el modelo de memoria Java con la arquitectura de hardware. En Java, cada hilo tiene su propia pila de hilos. Esta pila contiene: pila de llamada a método y variable local creada en ese hilo. En un sistema de varios núcleos, es muy posible que dos subprocesos se ejecuten simultáneamente en núcleos separados. En tal escenario, es posible que parte de la pila de un hilo se encuentre dentro del registro / caché de un núcleo. Si dentro de un hilo, se accede a un objeto usando una palabra clave synchronized (o volatile ), después del bloque synchronized ese hilo sincroniza su copia local de esa variable con la memoria principal. Esto crea una barrera de lectura / escritura y se asegura de que el hilo vea el último valor de ese objeto.

Pero en nuestro caso, dado que el subproceso B no ha utilizado el acceso sincronizado para el count , puede ser un valor referencial del count almacenado en el registro y puede que nunca vea las actualizaciones del subproceso A. Para asegurarse de que B ve el valor más reciente del recuento, necesitamos hacer getCount() sincronizado también.

public synchronized Integer getCount() {
  return count;
}

Ahora, cuando el subproceso A finaliza con el count actualizaciones, desbloquea la instancia de Counter , al mismo tiempo crea una barrera de escritura y borra todos los cambios realizados dentro de ese bloque en la memoria principal. De manera similar, cuando el subproceso B adquiere el bloqueo en la misma instancia de Counter , entra en la barrera de lectura y lee el valor del count desde la memoria principal y ve todas las actualizaciones.

visibilidad

El mismo efecto de visibilidad se volatile lecturas / escrituras volatile . Todas las variables actualizadas antes de escribir en volatile se vaciarán en la memoria principal y todas las lecturas después de la lectura de la variable volatile serán de la memoria principal.

Creando una instancia java.lang.Thread

Hay dos enfoques principales para crear un hilo en Java. En esencia, crear un hilo es tan fácil como escribir el código que se ejecutará en él. Los dos enfoques difieren en donde se define ese código.

En Java, un subproceso está representado por un objeto: una instancia de java.lang.Thread o su subclase. Entonces, el primer enfoque es crear esa subclase y anular el método run () .

Nota : usaré Thread para referirme a la clase java.lang.Thread y thread para referirse al concepto lógico de threads.

class MyThread extends Thread {
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println("Thread running!");
        }
    }
}

Ahora que ya hemos definido el código que se ejecutará, el hilo se puede crear simplemente como:

MyThread t = new MyThread();

La clase Thread también contiene un constructor que acepta una cadena, que se utilizará como el nombre del hilo. Esto puede ser particularmente útil cuando se depura un programa multihilo.

class MyThread extends Thread {
    public MyThread(String name) {
        super(name);
    }
    
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println("Thread running! ");
        }
    }
}

MyThread t = new MyThread("Greeting Producer");

El segundo enfoque es definir el código utilizando java.lang.Runnable y su único método run () . La clase Thread le permite ejecutar ese método en un hilo separado. Para lograr esto, cree el hilo usando un constructor que acepte una instancia de la interfaz Runnable .

Thread t = new Thread(aRunnable);

Esto puede ser muy poderoso cuando se combina con lambdas o referencias de métodos (solo Java 8):

Thread t = new Thread(operator::hardWork);

También puedes especificar el nombre del hilo.

Thread t = new Thread(operator::hardWork, "Pi operator");

Hablando de manera práctica, puede utilizar ambos enfoques sin preocupaciones. Sin embargo, la sabiduría general dice usar este último.


Para cada uno de los cuatro constructores mencionados, también hay una alternativa que acepta una instancia de java.lang.ThreadGroup como primer parámetro.

ThreadGroup tg = new ThreadGroup("Operators");
Thread t = new Thread(tg, operator::hardWork, "PI operator");

El ThreadGroup representa un conjunto de hilos. Solo puede agregar un Thread a un ThreadGroup usando un constructor de Thread . El ThreadGroup se puede utilizar para gestionar toda su rosca s juntos, así como el hilo se puede obtener información de su ThreadGroup .

Para resumir, el subproceso se puede crear con uno de estos constructores públicos:

Thread()
Thread(String name)
Thread(Runnable target)
Thread(Runnable target, String name)
Thread(ThreadGroup group, String name)
Thread(ThreadGroup group, Runnable target)
Thread(ThreadGroup group, Runnable target, String name)
Thread(ThreadGroup group, Runnable target, String name, long stackSize)

El último nos permite definir el tamaño de pila deseado para el nuevo hilo.


A menudo, la legibilidad del código sufre al crear y configurar muchos subprocesos con las mismas propiedades o desde el mismo patrón. Ahí es cuando se puede usar java.util.concurrent.ThreadFactory . Esta interfaz le permite encapsular el procedimiento de crear el hilo a través del patrón de fábrica y su único método, newThread (Runnable) .

class WorkerFactory implements ThreadFactory {
    private int id = 0;

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "Worker " + id++);
    }
}

Interrupción de hilo / hilos de parada

Cada subproceso de Java tiene un indicador de interrupción, que inicialmente es falso. Interrumpir un hilo no es más que establecer esa bandera en verdadero. El código que se ejecuta en ese hilo puede verificar la bandera en ocasiones y actuar sobre ella. El código también puede ignorarlo por completo. Pero ¿por qué cada Hilo tiene una bandera así? Después de todo, tener una bandera booleana en un hilo es algo que podemos organizarnos siempre que lo necesitemos. Bueno, hay métodos que se comportan de una manera especial cuando se interrumpe el hilo en el que se están ejecutando. Estos métodos se denominan métodos de bloqueo. Estos son métodos que ponen el hilo en el estado WAITING o TIMED_WAITING. Cuando un subproceso se encuentra en este estado, al interrumpirlo, se generará una InterruptedException en el subproceso interrumpido, en lugar de que el indicador de interrupción se establezca en verdadero, y el subproceso se RUNNABLE nuevamente. El código que invoca un método de bloqueo está obligado a tratar con la excepción InterruptedException, ya que es una excepción comprobada. Entonces, y de ahí su nombre, una interrupción puede tener el efecto de interrumpir una ESPERA, terminándola de manera efectiva. Tenga en cuenta que no todos los métodos que están esperando de alguna manera (por ejemplo, bloqueando IO) responden a la interrupción de esa manera, ya que no ponen el hilo en un estado de espera. Por último, un subproceso que tiene establecido su indicador de interrupción, que ingresa a un método de bloqueo (es decir, intenta pasar al estado de espera), lanzará inmediatamente una InterruptedException y el indicador de interrupción se borrará.

Aparte de estas mecánicas, Java no asigna ningún significado semántico especial a la interrupción. El código es libre de interpretar una interrupción de la manera que quiera. Pero la mayoría de las veces, la interrupción se utiliza para señalar a un subproceso que debe dejar de ejecutarse lo antes posible. Pero, como debe quedar claro de lo anterior, depende del código de ese hilo reaccionar adecuadamente a esa interrupción para detener la ejecución. Detener un hilo es una colaboración. Cuando un hilo se interrumpe, su código de ejecución puede tener varios niveles de profundidad en el seguimiento de pila. La mayoría de los códigos no llama a un método de bloqueo, y termina lo suficientemente oportuno como para no retrasar demasiado la detención del hilo. El código que debería preocuparse principalmente de responder a la interrupción, es un código que se encuentra en una tarea de manejo de bucle hasta que no queda ninguno, o hasta que se establece un indicador que lo indica para detener ese bucle. Los bucles que manejan tareas posiblemente infinitas (es decir, se siguen ejecutando en principio) deben verificar el indicador de interrupción para salir del bucle. Para bucles finitos, la semántica puede dictar que todas las tareas deben terminarse antes de finalizar, o puede ser apropiado dejar algunas tareas sin manejar. El código que llama a los métodos de bloqueo se verá obligado a lidiar con la excepción InterruptedException. Si es semánticamente posible, puede simplemente propagar la InterruptedException y declarar que se debe lanzar. Como tal, se convierte en un método de bloqueo en sí mismo en relación con sus llamadores. Si no puede propagar la excepción, al menos debería establecer el indicador interrumpido, para que las personas que llaman más arriba en la pila también sepan que el hilo fue interrumpido. En algunos casos, el método debe continuar esperando, independientemente de la InterruptedException, en cuyo caso debe demorar la configuración del indicador interrumpido hasta que, una vez que se termina de esperar, esto puede implicar establecer una variable local, que debe verificarse antes de salir del método para entonces interrumpe su hilo.

Ejemplos:

Ejemplo de código que deja de manejar tareas en caso de interrupción.

class TaskHandler implements Runnable {
    
    private final BlockingQueue<Task> queue;

    TaskHandler(BlockingQueue<Task> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) { // check for interrupt flag, exit loop when interrupted
            try {
                Task task = queue.take(); // blocking call, responsive to interruption
                handle(task);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // cannot throw InterruptedException (due to Runnable interface restriction) so indicating interruption by setting the flag
            }
        }
    }
    
    private void handle(Task task) {
        // actual handling
    }
}

Ejemplo de código que retrasa la configuración de la bandera de interrupción hasta que esté completamente terminado:

class MustFinishHandler implements Runnable {

    private final BlockingQueue<Task> queue;

    MustFinishHandler(BlockingQueue<Task> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        boolean shouldInterrupt = false;
        
        while (true) {
            try {
                Task task = queue.take();
                if (task.isEndOfTasks()) {
                    if (shouldInterrupt) {
                        Thread.currentThread().interrupt();
                    }
                    return;
                }
                handle(task);
            } catch (InterruptedException e) {
                shouldInterrupt = true; // must finish, remember to set interrupt flag when we're done
            }
        }
    }

    private void handle(Task task) {
        // actual handling
    }
}

Ejemplo de código que tiene una lista fija de tareas pero que puede cerrarse antes cuando se interrumpe

class GetAsFarAsPossible implements Runnable {

    private final List<Task> tasks = new ArrayList<>();

    @Override
    public void run() {
        for (Task task : tasks) {
            if (Thread.currentThread().isInterrupted()) {
                return;
            }
            handle(task);
        }
    }

    private void handle(Task task) {
        // actual handling
    }
}

Ejemplo de productor / consumidor múltiple con cola global compartida

A continuación el código muestra múltiples programas de Productor / Consumidor. Tanto las hebras del productor como las del consumidor comparten la misma cola global.

import java.util.concurrent.*;
import java.util.Random;

public class ProducerConsumerWithES {
    public static void main(String args[]) {
        BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
         
        ExecutorService pes = Executors.newFixedThreadPool(2);
        ExecutorService ces = Executors.newFixedThreadPool(2);
          
        pes.submit(new Producer(sharedQueue, 1));
        pes.submit(new Producer(sharedQueue, 2));
        ces.submit(new Consumer(sharedQueue, 1));
        ces.submit(new Consumer(sharedQueue, 2));
         
        pes.shutdown();
        ces.shutdown();
    }
}

/* Different producers produces a stream of integers continuously to a shared queue, 
which is shared between all Producers and consumers */

class Producer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    private Random random = new Random();
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        // Producer produces a continuous stream of numbers for every 200 milli seconds
        while (true) {
            try {
                int number = random.nextInt(1000);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
                Thread.sleep(200);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}
/* Different consumers consume data from shared queue, which is shared by both producer and consumer threads */
class Consumer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }
    @Override
    public void run() {
        // Consumer consumes numbers generated from Producer threads continuously
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

salida:

Produced:69:by thread:2
Produced:553:by thread:1
Consumed: 69:by thread:1
Consumed: 553:by thread:2
Produced:41:by thread:2
Produced:796:by thread:1
Consumed: 41:by thread:1
Consumed: 796:by thread:2
Produced:728:by thread:2
Consumed: 728:by thread:1

y así ................

Explicación:

  1. sharedQueue , que es un LinkedBlockingQueue se comparte entre todos los subprocesos Producer y Consumer.
  2. Los hilos de producción producen un número entero por cada 200 milisegundos de forma continua y lo sharedQueue a sharedQueue
  3. Hilo de Consumer consume entero de sharedQueue continuamente.
  4. Este programa se implementa sin construcciones explícitamente synchronized o de Lock . BlockingQueue es la clave para lograrlo.

Las implementaciones de BlockingQueue están diseñadas para ser utilizadas principalmente para colas productor-consumidor.

Las implementaciones de BlockingQueue son seguras para subprocesos. Todos los métodos de colas logran sus efectos de forma atómica utilizando bloqueos internos u otras formas de control de concurrencia.

Acceso exclusivo de lectura / lectura simultánea

A veces se requiere que un proceso escriba y lea simultáneamente los mismos "datos".

La interfaz ReadWriteLock y su implementación ReentrantReadWriteLock permiten un patrón de acceso que se puede describir de la siguiente manera:

  1. Puede haber cualquier número de lectores concurrentes de los datos. Si hay al menos un acceso de lector otorgado, entonces no es posible el acceso de un escritor.
  2. Puede haber como máximo un único escritor para los datos. Si hay un acceso de escritor otorgado, entonces ningún lector puede acceder a los datos.

Una implementación podría verse como:

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Sample {

// Our lock. The constructor allows a "fairness" setting, which guarantees the chronology of lock attributions.
protected static final ReadWriteLock RW_LOCK = new ReentrantReadWriteLock();

// This is a typical data that needs to be protected for concurrent access
protected static int data = 0;

/** This will write to the data, in an exclusive access */
public static void writeToData() {
    RW_LOCK.writeLock().lock();
    try {
        data++;
    } finally {
        RW_LOCK.writeLock().unlock();
    }
}

public static int readData() {
    RW_LOCK.readLock().lock();
    try {
        return data;
    } finally {
        RW_LOCK.readLock().unlock();
    }
}

}

NOTA 1 : Este caso de uso preciso tiene una solución más limpia que usa AtomicInteger , pero lo que se describe aquí es un patrón de acceso, que funciona independientemente del hecho de que los datos aquí son un número entero que es una variante atómica.

NOTA 2 : El bloqueo en la parte de lectura es realmente necesario, aunque podría no parecerlo para el lector ocasional. De hecho, si no se bloquea en el lado del lector, cualquier cosa puede salir mal, entre las cuales:

  1. No se garantiza que las escrituras de valores primitivos sean atómicas en todas las JVM, por lo que el lector podría ver, por ejemplo, que solo se escriben 32 bits de 64 bits si los data fueran de 64 bits.
  2. La visibilidad de la escritura desde un subproceso que no la realizó está garantizada por la JVM solo si establecemos la relación Suceder antes de entre las escrituras y las lecturas. Esta relación se establece cuando tanto los lectores como los escritores usan sus respectivos bloqueos, pero no de otra manera
Java SE 8

En caso de que se requiera un mayor rendimiento, y bajo ciertos tipos de uso, hay un tipo de bloqueo más rápido disponible, llamado StampedLock , que entre otras cosas implementa un modo de bloqueo optimista. Este bloqueo funciona de manera muy diferente de ReadWriteLock , y esta muestra no es transponible.

Objeto ejecutable

La interfaz Runnable define un solo método, run() , destinado a contener el código ejecutado en el hilo.

El objeto Runnable se pasa al constructor Thread . Y se llama el método start() Thread.

Ejemplo

public class HelloRunnable implements Runnable {

    @Override
    public void run() {
        System.out.println("Hello from a thread");
    }

    public static void main(String[] args) {
        new Thread(new HelloRunnable()).start();
    }
}

Ejemplo en Java8:

public static void main(String[] args) {
    Runnable r = () -> System.out.println("Hello world");
    new Thread(r).start();
}

Subclase ejecutable vs subproceso

El empleo de un objeto Runnable es más general, porque el objeto Runnable puede subclasificar una clase que no sea Thread .

Thread subclasificación de Thread es más fácil de usar en aplicaciones simples, pero está limitada por el hecho de que su clase de tarea debe ser descendiente de Thread .

Un objeto Runnable es aplicable a las API de administración de subprocesos de alto nivel.

Semáforo

Un semáforo es un sincronizador de alto nivel que mantiene un conjunto de permisos que pueden ser adquiridos y liberados por subprocesos. Un semáforo se puede imaginar como un contador de permisos que se reducirá cuando un hilo adquiera, y se incrementará cuando un hilo se libere. Si la cantidad de permisos es 0 cuando un subproceso intenta adquirir, el subproceso se bloqueará hasta que un permiso esté disponible (o hasta que el subproceso se interrumpa).

Un semáforo se inicializa como:

Semaphore semaphore = new Semaphore(1); // The int value being the number of permits

El constructor Semaphore acepta un parámetro booleano adicional para ser justos. Cuando se establece como falso, esta clase no ofrece garantías sobre el orden en que los subprocesos adquieren permisos. Cuando la imparcialidad se establece como verdadera, el semáforo garantiza que los subprocesos que invocan cualquiera de los métodos de adquisición se seleccionan para obtener permisos en el orden en que se procesó su invocación de dichos métodos. Se declara de la siguiente manera:

Semaphore semaphore = new Semaphore(1, true);

Ahora veamos un ejemplo de javadocs, donde Semaphore se usa para controlar el acceso a un conjunto de elementos. En este ejemplo se usa un semáforo para proporcionar una funcionalidad de bloqueo a fin de garantizar que siempre haya elementos que se obtendrán cuando se getItem() .

class Pool {
    /*
     * Note that this DOES NOT bound the amount that may be released!
     * This is only a starting value for the Semaphore and has no other
     * significant meaning UNLESS you enforce this inside of the
     * getNextAvailableItem() and markAsUnused() methods
     */
    private static final int MAX_AVAILABLE = 100;
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

    /**
     * Obtains the next available item and reduces the permit count by 1. 
     * If there are no items available, block.
     */
    public Object getItem() throws InterruptedException {
        available.acquire();
        return getNextAvailableItem();
    }

    /**
     * Puts the item into the pool and add 1 permit.
     */
    public void putItem(Object x) {
        if (markAsUnused(x))
            available.release();
    }

    private Object getNextAvailableItem() {
        // Implementation
    }

    private boolean markAsUnused(Object o) {
        // Implementation
    }
}

Agregue dos arreglos `int` usando un Threadpool

Un Threadpool tiene una cola de tareas, de las cuales cada una se ejecutará en uno de estos Threads.

El siguiente ejemplo muestra cómo agregar dos matrices int utilizando un Threadpool.

Java SE 8
int[] firstArray = { 2, 4, 6, 8 };
int[] secondArray = { 1, 3, 5, 7 };
int[] result = { 0, 0, 0, 0 };

ExecutorService pool = Executors.newCachedThreadPool(); 

// Setup the ThreadPool:
// for each element in the array, submit a worker to the pool that adds elements
for (int i = 0; i < result.length; i++) {
    final int worker = i;
    pool.submit(() -> result[worker] = firstArray[worker] + secondArray[worker] );
}

// Wait for all Workers to finish:
try {
    // execute all submitted tasks
    pool.shutdown();
    // waits until all workers finish, or the timeout ends
    pool.awaitTermination(12, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
    pool.shutdownNow(); //kill thread
}

System.out.println(Arrays.toString(result));

Notas:

  1. Este ejemplo es puramente ilustrativo. En la práctica, no habrá ningún aumento de velocidad mediante el uso de subprocesos para una tarea tan pequeña. Es probable que se produzca una ralentización, ya que los gastos generales de la creación y programación de tareas sobrecargarán el tiempo necesario para ejecutar una tarea.

  2. Si estuviera utilizando Java 7 y anteriores, usaría clases anónimas en lugar de lambdas para implementar las tareas.

Obtenga el estado de todas las hebras iniciadas por su programa, excluyendo las hebras del sistema

Fragmento de código:

import java.util.Set;

public class ThreadStatus {
    public static void main(String args[]) throws Exception {
        for (int i = 0; i < 5; i++){
            Thread t = new Thread(new MyThread());
            t.setName("MyThread:" + i);
            t.start();
        }
        int threadCount = 0;
        Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
        for (Thread t : threadSet) {
            if (t.getThreadGroup() == Thread.currentThread().getThreadGroup()) {
                System.out.println("Thread :" + t + ":" + "state:" + t.getState());
                ++threadCount;
            }
        }
        System.out.println("Thread count started by Main thread:" + threadCount);
    }
}

class MyThread implements Runnable {
    public void run() {
        try {
            Thread.sleep(2000);
        } catch(Exception err) {
            err.printStackTrace();
        }
    }
}

Salida:

Thread :Thread[MyThread:1,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:3,5,main]:state:TIMED_WAITING
Thread :Thread[main,5,main]:state:RUNNABLE
Thread :Thread[MyThread:4,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:0,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:2,5,main]:state:TIMED_WAITING
Thread count started by Main thread:6

Explicación:

Thread.getAllStackTraces().keySet() devuelve todos los Thread , incluidos los subprocesos de aplicaciones y los subprocesos del sistema. Si solo está interesado en el estado de los Thread , iniciado por su aplicación, itere el conjunto de subprocesos marcando el Grupo de subprocesos de un subproceso en particular con el subproceso principal del programa.

En ausencia de la condición de ThreadGroup anterior, el programa devuelve el estado de los subprocesos del sistema a continuación:

Reference Handler
Signal Dispatcher
Attach Listener
Finalizer

Callable y Futuro

Si bien Runnable proporciona un medio para envolver el código para que se ejecute en un subproceso diferente, tiene una limitación en cuanto a que no puede devolver un resultado de la ejecución. La única forma de obtener algún valor de retorno de la ejecución de un Runnable es asignar el resultado a una variable accesible en un ámbito fuera del Runnable .

Callable se introdujo en Java 5 como un par de Runnable . Callable es esencialmente el mismo, excepto que tiene un método de call lugar de run . El método de call tiene la capacidad adicional de devolver un resultado y también se le permite lanzar excepciones marcadas.

El resultado de un envío de tarea callable está disponible para ser pulsado a través de un futuro

Future puede considerarse un contenedor de clases que alberga el resultado de la computación Callable . El cálculo de la llamada puede continuar en otro hilo, y cualquier intento de tocar el resultado de un Future se bloqueará y solo devolverá el resultado una vez que esté disponible.

Interfaz invocable

public interface Callable<V> {
    V call() throws Exception;
}

Futuro

interface Future<V> {
    V get();
    V get(long timeout, TimeUnit unit);
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
}

Usando ejemplo llamable y futuro:

public static void main(String[] args) throws Exception {
    ExecutorService es = Executors.newSingleThreadExecutor();
          
    System.out.println("Time At Task Submission : " + new Date());
    Future<String> result = es.submit(new ComplexCalculator());
    // the call to Future.get() blocks until the result is available.So we are in for about a 10 sec wait now
    System.out.println("Result of Complex Calculation is : " + result.get());
    System.out.println("Time At the Point of Printing the Result : " + new Date());
}

Nuestro Callable que hace un largo cálculo.

public class ComplexCalculator implements Callable<String> {

    @Override
    public String call() throws Exception {
        // just sleep for 10 secs to simulate a lengthy computation
        Thread.sleep(10000);            
        System.out.println("Result after a lengthy 10sec calculation");
        return "Complex Result"; // the result 
    }
}

Salida

Time At Task Submission : Thu Aug 04 15:05:15 EDT 2016
Result after a lengthy 10sec calculation
Result of Complex Calculation is : Complex Result
Time At the Point of Printing the Result : Thu Aug 04 15:05:25 EDT 2016

Otras operaciones permitidas en el futuro

Si bien get() es el método para extraer el resultado real, Future tiene provisión

  • get(long timeout, TimeUnit unit) define el período de tiempo máximo durante la get(long timeout, TimeUnit unit) actual esperará un resultado;
  • Para cancelar la tarea, cancele la llamada cancel(mayInterruptIfRunning) . La bandera mayInterrupt indica que la tarea debe interrumpirse si se inició y se está ejecutando en este momento;
  • Para verificar si la tarea se completó / terminó llamando a isDone() ;
  • Para verificar si la tarea prolongada se canceló, se cancela isCancelled() .

Bloqueos como ayudas de sincronización

Antes de la introducción del paquete concurrente de Java 5, el subproceso era más bajo. La introducción de este paquete proporcionaba varias ayudas / construcciones de programación concurrente de nivel superior.

Los bloqueos son mecanismos de sincronización de subprocesos que sirven esencialmente para el mismo propósito que los bloques sincronizados o palabras clave.

Bloqueo intrínseco

int count = 0; // shared among multiple threads

public void doSomething() {
    synchronized(this) {
        ++count; // a non-atomic operation
    }
}

Sincronización mediante Locks

int count = 0; // shared among multiple threads

Lock lockObj = new ReentrantLock();
public void doSomething() {
    try {
        lockObj.lock();
        ++count; // a non-atomic operation
    } finally {    
        lockObj.unlock(); // sure to release the lock without fail
    }
}

Los bloqueos también tienen una funcionalidad disponible que el bloqueo intrínseco no ofrece, como el bloqueo pero que responde a la interrupción, o intenta bloquear, y no bloquear cuando no puede hacerlo.

Bloqueo, sensible a la interrupción.

class Locky {
    int count = 0; // shared among multiple threads

    Lock lockObj = new ReentrantLock();

    public void doSomething() {
        try {
            try {
                lockObj.lockInterruptibly();
                ++count; // a non-atomic operation
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // stopping
            }
        } finally {
            if (!Thread.currentThread().isInterrupted()) {
                lockObj.unlock(); // sure to release the lock without fail
            }
        }
    }
}

Solo hacer algo cuando se pueda bloquear.

public class Locky2 {
    int count = 0; // shared among multiple threads

    Lock lockObj = new ReentrantLock();

    public void doSomething() {
        boolean locked = lockObj.tryLock(); // returns true upon successful lock
        if (locked) {
            try {
                ++count; // a non-atomic operation
            } finally {
                lockObj.unlock(); // sure to release the lock without fail
            }
        }
    }
}

Hay varias variantes de bloqueo disponibles. Para más detalles, consulte los documentos api aquí



Modified text is an extract of the original Stack Overflow Documentation
Licenciado bajo CC BY-SA 3.0
No afiliado a Stack Overflow