Java Language
Programmazione simultanea (thread)
Ricerca…
introduzione
Il calcolo simultaneo è una forma di calcolo in cui diversi calcoli vengono eseguiti contemporaneamente anziché in modo sequenziale. Il linguaggio Java è progettato per supportare la programmazione concorrente attraverso l'uso di thread. Gli oggetti e le risorse sono accessibili da più thread; ogni thread può potenzialmente accedere a qualsiasi oggetto nel programma e il programmatore deve garantire che l'accesso in lettura e scrittura agli oggetti sia sincronizzato correttamente tra i thread.
Osservazioni
Argomenti correlati su StackOverflow:
Multithreading di base
Se hai molte attività da eseguire e tutte queste attività non dipendono dal risultato di quelle precedenti, puoi utilizzare il Multithreading per il tuo computer per eseguire tutte queste attività contemporaneamente utilizzando più processori, se il tuo computer lo può fare. Questo può rendere più veloce l' esecuzione del tuo programma se hai delle grosse attività indipendenti.
class CountAndPrint implements Runnable {
private final String name;
CountAndPrint(String name) {
this.name = name;
}
/** This is what a CountAndPrint will do */
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
System.out.println(this.name + ": " + i);
}
}
public static void main(String[] args) {
// Launching 4 parallel threads
for (int i = 1; i <= 4; i++) {
// `start` method will call the `run` method
// of CountAndPrint in another thread
new Thread(new CountAndPrint("Instance " + i)).start();
}
// Doing some others tasks in the main Thread
for (int i = 0; i < 10000; i++) {
System.out.println("Main: " + i);
}
}
}
Il codice del metodo di esecuzione delle varie istanze CountAndPrint
verrà eseguito in un ordine non prevedibile. Un frammento di un'esecuzione di esempio potrebbe essere simile a questo:
Instance 4: 1
Instance 2: 1
Instance 4: 2
Instance 1: 1
Instance 1: 2
Main: 1
Instance 4: 3
Main: 2
Instance 3: 1
Instance 4: 4
...
Producer-Consumer
Un semplice esempio di soluzione di problemi produttore-consumatore. Si noti che le classi JDK ( AtomicBoolean
e BlockingQueue
) vengono utilizzate per la sincronizzazione, riducendo la possibilità di creare una soluzione non valida. Consultare Javadoc per vari tipi di BlockingQueue ; la scelta di un'implementazione differente può modificare drasticamente il comportamento di questo esempio (come DelayQueue o Priority Queue ).
public class Producer implements Runnable {
private final BlockingQueue<ProducedData> queue;
public Producer(BlockingQueue<ProducedData> queue) {
this.queue = queue;
}
public void run() {
int producedCount = 0;
try {
while (true) {
producedCount++;
//put throws an InterruptedException when the thread is interrupted
queue.put(new ProducedData());
}
} catch (InterruptedException e) {
// the thread has been interrupted: cleanup and exit
producedCount--;
//re-interrupt the thread in case the interrupt flag is needeed higher up
Thread.currentThread().interrupt();
}
System.out.println("Produced " + producedCount + " objects");
}
}
public class Consumer implements Runnable {
private final BlockingQueue<ProducedData> queue;
public Consumer(BlockingQueue<ProducedData> queue) {
this.queue = queue;
}
public void run() {
int consumedCount = 0;
try {
while (true) {
//put throws an InterruptedException when the thread is interrupted
ProducedData data = queue.poll(10, TimeUnit.MILLISECONDS);
// process data
consumedCount++;
}
} catch (InterruptedException e) {
// the thread has been interrupted: cleanup and exit
consumedCount--;
//re-interrupt the thread in case the interrupt flag is needeed higher up
Thread.currentThread().interrupt();
}
System.out.println("Consumed " + consumedCount + " objects");
}
}
public class ProducerConsumerExample {
static class ProducedData {
// empty data object
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue<ProducedData> queue = new ArrayBlockingQueue<ProducedData>(1000);
// choice of queue determines the actual behavior: see various BlockingQueue implementations
Thread producer = new Thread(new Producer(queue));
Thread consumer = new Thread(new Consumer(queue));
producer.start();
consumer.start();
Thread.sleep(1000);
producer.interrupt();
Thread.sleep(10);
consumer.interrupt();
}
}
Usando ThreadLocal
Uno strumento utile in Java Concurrency è ThreadLocal
- questo ti permette di avere una variabile che sarà unica per un dato thread. Pertanto, se lo stesso codice viene eseguito in thread diversi, queste esecuzioni non condivideranno il valore, ma ogni thread ha una propria variabile che è locale al thread .
Ad esempio, questo è frequentemente usato per stabilire il contesto (come le informazioni di autorizzazione) della gestione di una richiesta in un servlet. Potresti fare qualcosa del genere:
private static final ThreadLocal<MyUserContext> contexts = new ThreadLocal<>();
public static MyUserContext getContext() {
return contexts.get(); // get returns the variable unique to this thread
}
public void doGet(...) {
MyUserContext context = magicGetContextFromRequest(request);
contexts.put(context); // save that context to our thread-local - other threads
// making this call don't overwrite ours
try {
// business logic
} finally {
contexts.remove(); // 'ensure' removal of thread-local variable
}
}
Ora, invece di passare MyUserContext
in ogni singolo metodo, puoi invece utilizzare MyServlet.getContext()
dove ti serve. Ora, naturalmente, questo introduce una variabile che deve essere documentata, ma è thread-safe, che elimina molti aspetti negativi nell'utilizzo di una variabile così ad alto scope.
Il vantaggio chiave qui è che ogni thread ha la propria variabile locale del thread nel contenitore dei contexts
. Finché lo si utilizza da un punto di ingresso definito (come richiedere che ogni servlet mantenga il suo contesto, o magari aggiungendo un filtro servlet), si può fare affidamento sul fatto che questo contesto sia lì quando ne hai bisogno.
CountDownLatch
Un aiuto di sincronizzazione che consente a uno o più thread di attendere fino al completamento di un insieme di operazioni eseguite in altri thread.
- Un
CountDownLatch
viene inizializzato con un determinato conteggio. - Il blocco
countDown()
i metodi fino a quando il conteggio corrente non raggiunge lo zero a causa di invocazioni del metodocountDown()
, dopo il quale tutti i thread in attesa vengono rilasciati e qualsiasi successiva chiamata di attesa attende immediatamente. - Questo è un fenomeno one-shot: il conteggio non può essere resettato. Se è necessaria una versione che azzeri il conteggio, prendere in considerazione l'uso di
CyclicBarrier
.
Metodi chiave:
public void await() throws InterruptedException
Fa in modo che il thread corrente attenda fino a quando il latch non viene contato fino a zero, a meno che il thread non venga interrotto.
public void countDown()
Riduce il conteggio del latch, rilasciando tutti i thread in attesa se il conteggio raggiunge lo zero.
Esempio:
import java.util.concurrent.*;
class DoSomethingInAThread implements Runnable {
CountDownLatch latch;
public DoSomethingInAThread(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
System.out.println("Do some thing");
latch.countDown();
} catch(Exception err) {
err.printStackTrace();
}
}
}
public class CountDownLatchDemo {
public static void main(String[] args) {
try {
int numberOfThreads = 5;
if (args.length < 1) {
System.out.println("Usage: java CountDownLatchDemo numberOfThreads");
return;
}
try {
numberOfThreads = Integer.parseInt(args[0]);
} catch(NumberFormatException ne) {
}
CountDownLatch latch = new CountDownLatch(numberOfThreads);
for (int n = 0; n < numberOfThreads; n++) {
Thread t = new Thread(new DoSomethingInAThread(latch));
t.start();
}
latch.await();
System.out.println("In Main thread after completion of " + numberOfThreads + " threads");
} catch(Exception err) {
err.printStackTrace();
}
}
}
produzione:
java CountDownLatchDemo 5
Do some thing
Do some thing
Do some thing
Do some thing
Do some thing
In Main thread after completion of 5 threads
Spiegazione:
-
CountDownLatch
è inizializzato con un contatore di 5 in thread principale - Il thread principale è in attesa usando il metodo
await()
. - Sono state create cinque istanze di
DoSomethingInAThread
. Ogni istanza decrementava il contatore con il metodocountDown()
. - Quando il contatore diventa zero, il thread principale riprenderà
Sincronizzazione
In Java, esiste un meccanismo di blocco a livello di lingua incorporato: il blocco synchronized
, che può utilizzare qualsiasi oggetto Java come blocco intrinseco (ad esempio, ogni oggetto Java può avere un monitor ad esso associato).
I blocchi intrinseci forniscono l'atomicità a gruppi di dichiarazioni. Per capire cosa questo significhi per noi, diamo un'occhiata a un esempio in cui la synchronized
è utile:
private static int t = 0;
private static Object mutex = new Object();
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
synchronized (mutex) {
t++;
System.out.println(MessageFormat.format("t: {0}", t));
}
});
}
executorService.shutdown();
}
In questo caso, se non fosse stato per il blocco synchronized
, sarebbero stati coinvolti più problemi di concorrenza. Il primo sarebbe con l'operatore post-incremento (non è atomico in sé), e il secondo sarebbe che osserveremmo il valore di t dopo che una quantità arbitraria di altri thread ha avuto la possibilità di modificarla. Tuttavia, poiché abbiamo acquisito un blocco intrinseco, qui non ci saranno condizioni di gara e l'uscita conterrà numeri da 1 a 100 nel loro ordine normale.
I blocchi intrinseci in Java sono mutex (cioè blocchi di esecuzione reciproca). L'esecuzione reciproca significa che se un thread ha acquisito il blocco, il secondo sarà costretto ad aspettare che il primo lo rilasci prima che possa acquisire il blocco per se stesso. Nota: un'operazione che può mettere il thread nello stato di attesa (sospensione) viene chiamata un'operazione di blocco . Pertanto, l'acquisizione di un blocco è un'operazione di blocco.
I blocchi intrinseci in Java sono rientranti . Ciò significa che se un thread tenta di acquisire un lock che già possiede, non lo bloccherà e lo acquisterà con successo. Ad esempio, il seguente codice non si bloccherà se chiamato:
public void bar(){
synchronized(this){
...
}
}
public void foo(){
synchronized(this){
bar();
}
}
Accanto ai blocchi synchronized
, ci sono anche metodi synchronized
.
I seguenti blocchi di codice sono praticamente equivalenti (anche se il bytecode sembra essere diverso):
blocco
synchronized
suthis
:public void foo() { synchronized(this) { doStuff(); } }
metodo
synchronized
:public synchronized void foo() { doStuff(); }
Allo stesso modo per static
metodi static
, questo:
class MyClass {
...
public static void bar() {
synchronized(MyClass.class) {
doSomeOtherStuff();
}
}
}
ha lo stesso effetto di questo:
class MyClass {
...
public static synchronized void bar() {
doSomeOtherStuff();
}
}
Operazioni atomiche
Un'operazione atomica è un'operazione che viene eseguita "tutto in una volta", senza alcuna possibilità che altri thread osservino o modificino lo stato durante l'esecuzione dell'operazione atomica.
Considera un BAD EXAMPLE .
private static int t = 0;
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
t++;
System.out.println(MessageFormat.format("t: {0}", t));
});
}
executorService.shutdown();
}
In questo caso, ci sono due problemi. Il primo problema è che l'operatore di post-incremento non è atomico. Comprende più operazioni: ottieni il valore, aggiungi 1 al valore, imposta il valore. Ecco perché se eseguiamo l'esempio, è probabile che non vedremo t: 100
nell'output: due thread potrebbero contemporaneamente ottenere il valore, incrementarlo e impostarlo: diciamo che il valore di t è 10 e due i thread stanno aumentando t. Entrambi i thread imposteranno il valore da t a 11, poiché il secondo thread osserva il valore di t prima che il primo thread abbia finito di incrementarlo.
Il secondo problema riguarda il modo in cui stiamo osservando t. Quando stampiamo il valore di t, il valore potrebbe essere già stato modificato da un thread diverso dopo l'operazione di incremento di questo thread.
Per risolvere questi problemi, utilizzeremo java.util.concurrent.atomic.AtomicInteger
, che ha molte operazioni atomiche da utilizzare.
private static AtomicInteger t = new AtomicInteger(0);
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
int currentT = t.incrementAndGet();
System.out.println(MessageFormat.format("t: {0}", currentT));
});
}
executorService.shutdown();
}
Il metodo incrementAndGet
di AtomicInteger
incrementa atomicamente e restituisce il nuovo valore, eliminando così la condizione di gara precedente. Si noti che in questo esempio le linee saranno ancora fuori servizio perché non facciamo nessuno sforzo per sequenziare le chiamate println
e questo non rientra nell'ambito di questo esempio, poiché richiederebbe la sincronizzazione e l'obiettivo di questo esempio è mostrare come utilizzare AtomicInteger
per eliminare le condizioni di gara relative allo stato.
Creazione di un sistema deadlocked di base
Un deadlock si verifica quando due azioni in competizione aspettano che l'altro finisca, e quindi non lo fa mai. In Java è presente un blocco associato a ciascun oggetto. Per evitare modifiche simultanee eseguite da più thread su un singolo oggetto, possiamo utilizzare synchronized
parola chiave synchronized
, ma tutto ha un costo. L'uso errato di parole chiave synchronized
può portare a sistemi bloccati chiamati sistemi con deadlock.
Considera che ci sono 2 thread che lavorano su 1 istanza, Lets chiama i thread come First e Second, e diciamo che abbiamo 2 risorse R1 e R2. Prima acquisisce R1 e ha bisogno anche di R2 per il suo completamento mentre Second acquisisce R2 e necessita di R1 per il completamento.
quindi dì al tempo t = 0,
Prima ha R1 e Second ha R2. ora First sta aspettando R2 mentre Second è in attesa di R1. questa attesa è indefinita e questo porta a un punto morto.
public class Example2 {
public static void main(String[] args) throws InterruptedException {
final DeadLock dl = new DeadLock();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
dl.methodA();
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
try {
dl.method2();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
t1.setName("First");
t2.setName("Second");
t1.start();
t2.start();
}
}
class DeadLock {
Object mLock1 = new Object();
Object mLock2 = new Object();
public void methodA() {
System.out.println("methodA wait for mLock1 " + Thread.currentThread().getName());
synchronized (mLock1) {
System.out.println("methodA mLock1 acquired " + Thread.currentThread().getName());
try {
Thread.sleep(100);
method2();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public void method2() throws InterruptedException {
System.out.println("method2 wait for mLock2 " + Thread.currentThread().getName());
synchronized (mLock2) {
System.out.println("method2 mLock2 acquired " + Thread.currentThread().getName());
Thread.sleep(100);
method3();
}
}
public void method3() throws InterruptedException {
System.out.println("method3 mLock1 "+ Thread.currentThread().getName());
synchronized (mLock1) {
System.out.println("method3 mLock1 acquired " + Thread.currentThread().getName());
}
}
}
Uscita di questo programma:
methodA wait for mLock1 First
method2 wait for mLock2 Second
method2 mLock2 acquired Second
methodA mLock1 acquired First
method3 mLock1 Second
method2 wait for mLock2 First
Mettere in pausa l'esecuzione
Thread.sleep
fa in modo che il thread corrente sospenda l'esecuzione per un periodo specificato. Questo è un mezzo efficace per rendere il tempo del processore disponibile per gli altri thread di un'applicazione o altre applicazioni che potrebbero essere in esecuzione su un sistema informatico. Ci sono due metodi di sleep
sovraccarico nella classe Thread.
Uno che specifica il tempo di sonno al millisecondo
public static void sleep(long millis) throws InterruptedException
Uno che specifica il tempo di sonno per il nanosecondo
public static void sleep(long millis, int nanos)
Metti in pausa l'esecuzione per 1 secondo
Thread.sleep(1000);
È importante notare che questo è un suggerimento per lo scheduler del kernel del sistema operativo. Questo potrebbe non essere necessariamente preciso, e alcune implementazioni non considerano nemmeno il parametro nanosecondo (possibilmente arrotondando al millisecondo più vicino).
Si consiglia di includere una chiamata a Thread.sleep
in try / catch e catch InterruptedException
.
Visualizzazione di barriere di lettura / scrittura durante l'utilizzo sincronizzato / volatile
Come sappiamo, dovremmo usare synchronized
parola chiave synchronized
per rendere l'esecuzione di un metodo o blocco esclusivo. Ma pochi di noi potrebbero non essere a conoscenza di un aspetto più importante dell'utilizzo di parole chiave synchronized
e volatile
: oltre a rendere atomica un'unità di codice, fornisce anche una barriera di lettura / scrittura . Cos'è questa barriera di lettura / scrittura? Discutiamo di questo usando un esempio:
class Counter {
private Integer count = 10;
public synchronized void incrementCount() {
count++;
}
public Integer getCount() {
return count;
}
}
Supponiamo che un thread A richiami incrementCount()
prima di un altro thread B chiama getCount()
. In questo scenario non è garantito che B vedrà il valore aggiornato del count
. Può ancora vedere count
come 10
, anche è anche possibile che non veda mai il valore aggiornato di count
mai.
Per comprendere questo comportamento, è necessario capire come il modello di memoria Java si integra con l'architettura hardware. In Java, ogni thread ha il proprio stack di thread. Questo stack contiene: stack di chiamata del metodo e variabile locale creata in quel thread. In un sistema multi core, è del tutto possibile che due thread vengano eseguiti contemporaneamente in core separati. In tale scenario è possibile che parte della pila di un thread si trovi all'interno del registro / cache di un core. Se all'interno di un thread, si accede a un oggetto usando synchronized
parola chiave synchronized
(o volatile
), dopo synchronized
blocco synchronized
quel thread sincronizza la sua copia locale di quella variabile con la memoria principale. Questo crea una barriera di lettura / scrittura e si assicura che il thread veda l'ultimo valore di quell'oggetto.
Ma nel nostro caso, poiché il thread B non ha usato l'accesso sincronizzato per count
, potrebbe essere il valore di riferimento del count
memorizzato nel registro e potrebbe non vedere mai gli aggiornamenti dal thread A. Per assicurarsi che B veda l'ultimo valore di conteggio dobbiamo fare getCount()
sincronizzato pure.
public synchronized Integer getCount() {
return count;
}
Ora, quando il thread A viene eseguito con il count
aggiornamento, sblocca l'istanza di Counter
, allo stesso tempo crea una barriera di scrittura e svuota tutte le modifiche apportate all'interno di quel blocco alla memoria principale. Allo stesso modo quando il thread B acquisisce il lock sulla stessa istanza di Counter
, entra in una barriera di lettura e legge il valore del count
dalla memoria principale e vede tutti gli aggiornamenti.
Lo stesso effetto di visibilità vale anche per volatile
letture / scritture volatile
. Tutte le variabili aggiornate prima della scrittura in volatile
verranno scaricate nella memoria principale e tutte le letture dopo la lettura della variabile volatile
saranno dalla memoria principale.
Creazione di un'istanza java.lang.Thread
Esistono due approcci principali per la creazione di un thread in Java. In sostanza, creare un thread è facile come scrivere il codice che verrà eseguito in esso. I due approcci differiscono nel punto in cui definisci quel codice.
In Java, un thread è rappresentato da un oggetto: un'istanza di java.lang.Thread o la sua sottoclasse. Quindi il primo approccio è creare quella sottoclasse e sovrascrivere il metodo run () .
Nota : userò Thread per fare riferimento alla classe e al thread java.lang.Thread per fare riferimento al concetto logico dei thread.
class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("Thread running!");
}
}
}
Da quando abbiamo già definito il codice da eseguire, il thread può essere creato semplicemente come:
MyThread t = new MyThread();
La classe Thread contiene anche un costruttore che accetta una stringa, che verrà utilizzata come nome del thread. Questo può essere particolarmente utile durante il debug di un programma multi-thread.
class MyThread extends Thread {
public MyThread(String name) {
super(name);
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("Thread running! ");
}
}
}
MyThread t = new MyThread("Greeting Producer");
Il secondo approccio è definire il codice usando java.lang.Runnable e il suo unico metodo run () . La classe Thread consente quindi di eseguire tale metodo in un thread separato. Per ottenere ciò, creare il thread utilizzando un costruttore che accetta un'istanza dell'interfaccia Runnable .
Thread t = new Thread(aRunnable);
Questo può essere molto potente se combinato con riferimenti a lambda o metodi (solo Java 8):
Thread t = new Thread(operator::hardWork);
Puoi anche specificare il nome del thread.
Thread t = new Thread(operator::hardWork, "Pi operator");
Praticamente parlando, puoi usare entrambi gli approcci senza preoccupazioni. Tuttavia la saggezza generale dice di usare quest'ultimo.
Per ognuno dei quattro costruttori menzionati, esiste anche un'alternativa che accetta un'istanza di java.lang.ThreadGroup come primo parametro.
ThreadGroup tg = new ThreadGroup("Operators");
Thread t = new Thread(tg, operator::hardWork, "PI operator");
Il ThreadGroup rappresenta un set di thread. È possibile solo aggiungere una discussione a un ThreadGroup utilizzando il costruttore di Thread . Il ThreadGroup può quindi essere utilizzato per gestire tutti i suoi Thread insieme, così come il Thread può ottenere informazioni dal suo ThreadGroup .
Quindi per sumarize, il Thread può essere creato con uno di questi costruttori pubblici:
Thread()
Thread(String name)
Thread(Runnable target)
Thread(Runnable target, String name)
Thread(ThreadGroup group, String name)
Thread(ThreadGroup group, Runnable target)
Thread(ThreadGroup group, Runnable target, String name)
Thread(ThreadGroup group, Runnable target, String name, long stackSize)
L'ultimo ci consente di definire le dimensioni dello stack desiderate per il nuovo thread.
Spesso la leggibilità del codice soffre quando si creano e si configurano molti thread con le stesse proprietà o con lo stesso modello. È qui che può essere utilizzato java.util.concurrent.ThreadFactory . Questa interfaccia consente di incapsulare la procedura di creazione del thread attraverso il modello factory e il suo unico metodo newThread (Runnable) .
class WorkerFactory implements ThreadFactory {
private int id = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Worker " + id++);
}
}
Discussione Interruzione / interruzione di thread
Ogni thread Java ha un flag di interrupt, che inizialmente è falso. L'interruzione di un thread, in sostanza, non è altro che impostare quel flag su true. Il codice in esecuzione su quel thread può controllare la bandiera occasionalmente e agire su di esso. Il codice può anche ignorarlo completamente. Ma perché ogni discussione ha una tale bandiera? Dopotutto, avere una bandiera booleana su un thread è qualcosa che possiamo semplicemente organizzarci, se e quando ne abbiamo bisogno. Bene, ci sono metodi che si comportano in un modo speciale quando il thread su cui stanno girando viene interrotto. Questi metodi sono chiamati metodi di blocco. Questi sono metodi che inseriscono il thread nello stato WAITING o TIMED_WAITING. Quando un thread si trova in questo stato, interrompendolo, si genera un InterruptedException sul thread interrotto, anziché il flag di interrupt impostato su true e il thread diventa nuovamente RUNNABLE. Il codice che richiama un metodo di blocco è costretto ad occuparsi di InterruptedException, poiché si tratta di un'eccezione controllata. Quindi, e quindi il suo nome, un interrupt può avere l'effetto di interrompere un WAIT, chiudendolo efficacemente. Si noti che non tutti i metodi che sono in qualche modo in attesa (ad es. Bloccando IO) rispondono all'interruzione in questo modo, poiché non mettono il thread in uno stato di attesa. Infine, un thread con il proprio flag di interrupt, che immette un metodo di blocco (cioè tenta di entrare in uno stato di attesa), genererà immediatamente un InterruptedException e il flag di interrupt verrà cancellato.
Oltre a questi meccanismi, Java non assegna alcun significato semantico speciale all'interruzione. Il codice è libero di interpretare un'interruzione come preferisce. Ma il più delle volte l'interruzione viene utilizzata per segnalare a un thread che dovrebbe smettere di funzionare al più presto. Ma, come dovrebbe essere chiaro da quanto sopra, è il codice su quel thread a reagire a tale interruzione in modo appropriato al fine di interrompere la corsa. Fermare un thread è una collaborazione. Quando un thread viene interrotto, il codice in esecuzione può essere a diversi livelli in profondità nello stacktrace. La maggior parte del codice non chiama un metodo di blocco e termina abbastanza tempestivamente per non ritardare indebitamente l'interruzione del thread. Il codice che dovrebbe preoccuparsi principalmente di essere reattivo all'interruzione, è il codice che è in un ciclo che gestisce le attività finché non ne rimane nessuna, o finché non viene impostato un flag che lo segnala per interrompere quel ciclo. I loop che gestiscono attività potenzialmente infinite (ovvero continuano a funzionare in linea di principio) dovrebbero controllare il flag di interrupt per uscire dal ciclo. Per i cicli finiti, la semantica può stabilire che tutte le attività devono essere completate prima di terminare, oppure potrebbe essere opportuno lasciare alcune attività non gestite. Il codice che richiama i metodi di blocco sarà costretto ad occuparsi di InterruptedException. Se tutto è semanticamente possibile, può semplicemente propagare l'InterruptedException e dichiarare di lanciarlo. In quanto tale, diventa esso stesso un metodo di blocco per quanto riguarda i suoi chiamanti. Se non può propagare l'eccezione, dovrebbe almeno impostare il flag interrotto, in modo che i chiamanti più in alto nello stack sappiano anche che il thread è stato interrotto. In alcuni casi, il metodo deve continuare ad attendere indipendentemente da InterruptedException, nel qual caso deve ritardare l'impostazione del flag interrotto fino a quando non viene eseguito in attesa, ciò può comportare l'impostazione di una variabile locale, che deve essere controllata prima di uscire dal metodo quindi interrompi il suo thread.
Esempi:
Esempio di codice che interrompe la gestione delle attività in caso di interruzione
class TaskHandler implements Runnable {
private final BlockingQueue<Task> queue;
TaskHandler(BlockingQueue<Task> queue) {
this.queue = queue;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) { // check for interrupt flag, exit loop when interrupted
try {
Task task = queue.take(); // blocking call, responsive to interruption
handle(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // cannot throw InterruptedException (due to Runnable interface restriction) so indicating interruption by setting the flag
}
}
}
private void handle(Task task) {
// actual handling
}
}
Esempio di codice che ritarda l'impostazione del flag di interrupt fino al completamento completo:
class MustFinishHandler implements Runnable {
private final BlockingQueue<Task> queue;
MustFinishHandler(BlockingQueue<Task> queue) {
this.queue = queue;
}
@Override
public void run() {
boolean shouldInterrupt = false;
while (true) {
try {
Task task = queue.take();
if (task.isEndOfTasks()) {
if (shouldInterrupt) {
Thread.currentThread().interrupt();
}
return;
}
handle(task);
} catch (InterruptedException e) {
shouldInterrupt = true; // must finish, remember to set interrupt flag when we're done
}
}
}
private void handle(Task task) {
// actual handling
}
}
Esempio di codice che ha un elenco fisso di attività ma potrebbe chiudersi presto quando viene interrotto
class GetAsFarAsPossible implements Runnable {
private final List<Task> tasks = new ArrayList<>();
@Override
public void run() {
for (Task task : tasks) {
if (Thread.currentThread().isInterrupted()) {
return;
}
handle(task);
}
}
private void handle(Task task) {
// actual handling
}
}
Esempio di produttore / consumatore multiplo con coda globale condivisa
Sotto il codice vengono presentati più programmi Producer / Consumer. Entrambi i thread Producer e Consumer condividono la stessa coda globale.
import java.util.concurrent.*;
import java.util.Random;
public class ProducerConsumerWithES {
public static void main(String args[]) {
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
ExecutorService pes = Executors.newFixedThreadPool(2);
ExecutorService ces = Executors.newFixedThreadPool(2);
pes.submit(new Producer(sharedQueue, 1));
pes.submit(new Producer(sharedQueue, 2));
ces.submit(new Consumer(sharedQueue, 1));
ces.submit(new Consumer(sharedQueue, 2));
pes.shutdown();
ces.shutdown();
}
}
/* Different producers produces a stream of integers continuously to a shared queue,
which is shared between all Producers and consumers */
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
private Random random = new Random();
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
// Producer produces a continuous stream of numbers for every 200 milli seconds
while (true) {
try {
int number = random.nextInt(1000);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
Thread.sleep(200);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
/* Different consumers consume data from shared queue, which is shared by both producer and consumer threads */
class Consumer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
// Consumer consumes numbers generated from Producer threads continuously
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
produzione:
Produced:69:by thread:2
Produced:553:by thread:1
Consumed: 69:by thread:1
Consumed: 553:by thread:2
Produced:41:by thread:2
Produced:796:by thread:1
Consumed: 41:by thread:1
Consumed: 796:by thread:2
Produced:728:by thread:2
Consumed: 728:by thread:1
e così via ................
Spiegazione:
-
sharedQueue
, che èLinkedBlockingQueue
è condiviso tra tutti i thread Producer e Consumer. - I thread di produzione producono continuamente un intero per ogni 200 milli secondi e lo
sharedQueue
asharedQueue
-
Consumer
threadConsumer
consuma continuamente integer dasharedQueue
. - Questo programma è implementato senza costrutti espliciti
synchronized
oLock
. BlockingQueue è la chiave per raggiungerlo.
Le implementazioni BlockingQueue sono progettate per essere utilizzate principalmente per code produttore-consumatore.
Le implementazioni di BlockingQueue sono thread-safe. Tutti i metodi di accodamento ottengono i loro effetti atomicamente utilizzando blocchi interni o altre forme di controllo della concorrenza.
Scrittura esclusiva / accesso di lettura simultaneo
Talvolta è necessario che un processo scriva e legga contemporaneamente gli stessi "dati".
L'interfaccia ReadWriteLock
e l'implementazione ReentrantReadWriteLock
consentono un pattern di accesso che può essere descritto come segue:
- Ci può essere un numero qualsiasi di lettori concorrenti dei dati. Se è concesso almeno un accesso al lettore, non è possibile l'accesso allo scrittore.
- Ci può essere al massimo un singolo writer per i dati. Se è concesso l'accesso allo scrittore, nessun lettore può accedere ai dati.
Un'implementazione potrebbe essere simile a:
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Sample {
// Our lock. The constructor allows a "fairness" setting, which guarantees the chronology of lock attributions.
protected static final ReadWriteLock RW_LOCK = new ReentrantReadWriteLock();
// This is a typical data that needs to be protected for concurrent access
protected static int data = 0;
/** This will write to the data, in an exclusive access */
public static void writeToData() {
RW_LOCK.writeLock().lock();
try {
data++;
} finally {
RW_LOCK.writeLock().unlock();
}
}
public static int readData() {
RW_LOCK.readLock().lock();
try {
return data;
} finally {
RW_LOCK.readLock().unlock();
}
}
}
NOTA 1 : Questo caso d'uso preciso ha una soluzione più pulita usando AtomicInteger
, ma ciò che è descritto qui è un modello di accesso, che funziona indipendentemente dal fatto che i dati qui siano un numero intero come una variante Atomica.
NOTA 2 : Il blocco sulla parte di lettura è davvero necessario, anche se potrebbe non sembrare così per il lettore casuale. Infatti, se non si blocca sul lato del lettore, qualsiasi numero di cose può andare storto, tra cui:
- Le scritture dei valori primitivi non sono garantite per essere atomiche su tutte le JVM, quindi il lettore potrebbe vedere ad esempio solo 32 bit di una scrittura a 64 bit se i
data
erano di tipo a 64 bit - La visibilità della scrittura da un thread che non l'ha eseguita è garantita dalla JVM solo se stabiliamo la relazione Happen Before tra le scritture e le letture. Questa relazione viene stabilita quando sia i lettori che gli scrittori utilizzano i rispettivi blocchi, ma non altrimenti
Nel caso in cui siano richieste prestazioni più elevate, sotto certi tipi di utilizzo, è disponibile un tipo di blocco più rapido, denominato StampedLock
, che, tra le altre cose, implementa una modalità di blocco ottimistica. Questo blocco funziona in modo molto diverso da ReadWriteLock
e questo esempio non è trasportabile.
Oggetto eseguibile
L'interfaccia Runnable
definisce un singolo metodo, run()
, pensato per contenere il codice eseguito nel thread.
L'oggetto Runnable
viene passato al costruttore Thread
. E viene chiamato il metodo start()
di Thread.
Esempio
public class HelloRunnable implements Runnable {
@Override
public void run() {
System.out.println("Hello from a thread");
}
public static void main(String[] args) {
new Thread(new HelloRunnable()).start();
}
}
Esempio in Java8:
public static void main(String[] args) {
Runnable r = () -> System.out.println("Hello world");
new Thread(r).start();
}
Sottoclasse Runnable vs Thread
Un impiego di oggetti Runnable
è più generale, perché l'oggetto Runnable
può creare una sottoclasse di una classe diversa da Thread
.
Thread
sottoclasse di Thread
è più semplice da utilizzare nelle applicazioni semplici, ma è limitata dal fatto che la classe di attività deve essere un discendente di Thread
.
Un oggetto Runnable
è applicabile alle API di gestione del thread di alto livello.
Semaforo
Un semaforo è un sincronizzatore di alto livello che mantiene un insieme di permessi che possono essere acquisiti e rilasciati dai thread. Un semaforo può essere immaginato come un contatore di permessi che verrà decrementato quando un thread acquisisce e incrementato quando un thread viene rilasciato. Se la quantità di permessi è 0
quando un thread tenta di acquisire, il thread si bloccherà fino a quando non verrà reso disponibile un permesso (o finché il thread non viene interrotto).
Un semaforo è inizializzato come:
Semaphore semaphore = new Semaphore(1); // The int value being the number of permits
Il costruttore del semaforo accetta un parametro booleano aggiuntivo per l'equità. Se impostato su false, questa classe non fornisce garanzie sull'ordine in cui i thread acquisiscono i permessi. Quando l'equità è impostata su true, il semaforo garantisce che i thread che invocano uno dei metodi di acquisizione siano selezionati per ottenere i permessi nell'ordine in cui è stata elaborata la loro chiamata di tali metodi. È dichiarato nel modo seguente:
Semaphore semaphore = new Semaphore(1, true);
Ora diamo un'occhiata a un esempio di javadocs, in cui il semaforo viene utilizzato per controllare l'accesso a un gruppo di elementi. In questo esempio viene utilizzato un semaforo per fornire funzionalità di blocco al fine di garantire che ci siano sempre elementi da ottenere quando viene chiamato getItem()
.
class Pool {
/*
* Note that this DOES NOT bound the amount that may be released!
* This is only a starting value for the Semaphore and has no other
* significant meaning UNLESS you enforce this inside of the
* getNextAvailableItem() and markAsUnused() methods
*/
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
/**
* Obtains the next available item and reduces the permit count by 1.
* If there are no items available, block.
*/
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
/**
* Puts the item into the pool and add 1 permit.
*/
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
private Object getNextAvailableItem() {
// Implementation
}
private boolean markAsUnused(Object o) {
// Implementation
}
}
Aggiungi due array `int` usando un Threadpool
Un Threadpool ha una coda di compiti, di cui ognuno verrà eseguito su uno di questi Thread.
L'esempio seguente mostra come aggiungere due array int
usando un Threadpool.
int[] firstArray = { 2, 4, 6, 8 };
int[] secondArray = { 1, 3, 5, 7 };
int[] result = { 0, 0, 0, 0 };
ExecutorService pool = Executors.newCachedThreadPool();
// Setup the ThreadPool:
// for each element in the array, submit a worker to the pool that adds elements
for (int i = 0; i < result.length; i++) {
final int worker = i;
pool.submit(() -> result[worker] = firstArray[worker] + secondArray[worker] );
}
// Wait for all Workers to finish:
try {
// execute all submitted tasks
pool.shutdown();
// waits until all workers finish, or the timeout ends
pool.awaitTermination(12, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
pool.shutdownNow(); //kill thread
}
System.out.println(Arrays.toString(result));
Gli appunti:
Questo esempio è puramente illustrativo. In pratica, non ci sarà alcuna accelerazione utilizzando i thread per un'attività così piccola. È probabile un rallentamento, dal momento che i costi generali di creazione e programmazione dell'attività acquisteranno il tempo necessario per eseguire un'attività.
Se stavi usando Java 7 e versioni precedenti, dovresti utilizzare le classi anonime invece di lambda per implementare le attività.
Ottieni lo stato di tutti i thread avviati dal tuo programma escludendo i thread di sistema
Snippet di codice:
import java.util.Set;
public class ThreadStatus {
public static void main(String args[]) throws Exception {
for (int i = 0; i < 5; i++){
Thread t = new Thread(new MyThread());
t.setName("MyThread:" + i);
t.start();
}
int threadCount = 0;
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
for (Thread t : threadSet) {
if (t.getThreadGroup() == Thread.currentThread().getThreadGroup()) {
System.out.println("Thread :" + t + ":" + "state:" + t.getState());
++threadCount;
}
}
System.out.println("Thread count started by Main thread:" + threadCount);
}
}
class MyThread implements Runnable {
public void run() {
try {
Thread.sleep(2000);
} catch(Exception err) {
err.printStackTrace();
}
}
}
Produzione:
Thread :Thread[MyThread:1,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:3,5,main]:state:TIMED_WAITING
Thread :Thread[main,5,main]:state:RUNNABLE
Thread :Thread[MyThread:4,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:0,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:2,5,main]:state:TIMED_WAITING
Thread count started by Main thread:6
Spiegazione:
Thread.getAllStackTraces().keySet()
restituisce tutti i Thread
, inclusi i thread di applicazione e i thread di sistema. Se sei interessato solo allo stato di Thread, avviato dalla tua applicazione, iterare il Thread
set controllando il Thread Group di un particolare thread con il thread del tuo programma principale.
In assenza di una condizione ThreadGroup superiore, il programma restituisce lo stato di sotto Thread di sistema:
Reference Handler
Signal Dispatcher
Attach Listener
Finalizer
Callable e Future
Mentre Runnable
fornisce un mezzo per avvolgere il codice da eseguire in un thread diverso, ha una limitazione nel senso che non può restituire un risultato dell'esecuzione. L'unico modo per ottenere un valore di ritorno dall'esecuzione di un Runnable
è assegnare il risultato a una variabile accessibile in un ambito esterno a Runnable
.
Callable
stato introdotto in Java 5 come peer to Runnable
. Callable
è essenzialmente lo stesso eccetto che ha un metodo di call
invece di run
. Il metodo di call
ha la capacità aggiuntiva di restituire un risultato ed è inoltre autorizzato a generare eccezioni controllate.
Il risultato di una richiesta di attività Callable è disponibile per essere sfruttato tramite un futuro
Future
può essere considerato un contenitore di sorta che ospita il risultato del calcolo Callable
. Il calcolo del callable può continuare in un altro thread, e qualsiasi tentativo di toccare il risultato di un Future
bloccherà e restituirà il risultato solo quando sarà disponibile.
Interfaccia richiamabile
public interface Callable<V> {
V call() throws Exception;
}
Futuro
interface Future<V> {
V get();
V get(long timeout, TimeUnit unit);
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
}
Usando l'esempio Callable e Future:
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newSingleThreadExecutor();
System.out.println("Time At Task Submission : " + new Date());
Future<String> result = es.submit(new ComplexCalculator());
// the call to Future.get() blocks until the result is available.So we are in for about a 10 sec wait now
System.out.println("Result of Complex Calculation is : " + result.get());
System.out.println("Time At the Point of Printing the Result : " + new Date());
}
Il nostro Callable che esegue un lungo calcolo
public class ComplexCalculator implements Callable<String> {
@Override
public String call() throws Exception {
// just sleep for 10 secs to simulate a lengthy computation
Thread.sleep(10000);
System.out.println("Result after a lengthy 10sec calculation");
return "Complex Result"; // the result
}
}
Produzione
Time At Task Submission : Thu Aug 04 15:05:15 EDT 2016
Result after a lengthy 10sec calculation
Result of Complex Calculation is : Complex Result
Time At the Point of Printing the Result : Thu Aug 04 15:05:25 EDT 2016
Altre operazioni permesse su Future
Mentre get()
è il metodo per estrarre il risultato reale, il futuro ha la possibilità
-
get(long timeout, TimeUnit unit)
definisce il periodo di tempo massimo durante il thread corrente aspetterà un risultato; - Per annullare l'operazione, annullare la chiamata
cancel(mayInterruptIfRunning)
. Il flagmayInterrupt
indica che l'attività deve essere interrotta se è stata avviata ed è in esecuzione in questo momento; - Per verificare se l'attività è completata / terminata chiamando
isDone()
; - Per verificare se la lunga attività è stata annullata
isCancelled()
.
Blocchi come aiuti di sincronizzazione
Prima dell'introduzione simultanea del pacchetto di Java 5 il threading era di livello più basso. L'introduzione di questo pacchetto forniva numerosi ausili / costrutti di programmazione simultanei di livello superiore.
I lock sono meccanismi di sincronizzazione dei thread che essenzialmente hanno lo stesso scopo dei blocchi sincronizzati o delle parole chiave.
Blocco intrinseco
int count = 0; // shared among multiple threads
public void doSomething() {
synchronized(this) {
++count; // a non-atomic operation
}
}
Sincronizzazione tramite blocchi
int count = 0; // shared among multiple threads
Lock lockObj = new ReentrantLock();
public void doSomething() {
try {
lockObj.lock();
++count; // a non-atomic operation
} finally {
lockObj.unlock(); // sure to release the lock without fail
}
}
I blocchi hanno anche funzionalità disponibili che il blocco intrinseco non offre, come il blocco ma che rimangono reattivi all'interruzione o che cercano di bloccare e non bloccano quando non sono in grado di farlo.
Chiusura, sensibile all'interruzione
class Locky {
int count = 0; // shared among multiple threads
Lock lockObj = new ReentrantLock();
public void doSomething() {
try {
try {
lockObj.lockInterruptibly();
++count; // a non-atomic operation
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // stopping
}
} finally {
if (!Thread.currentThread().isInterrupted()) {
lockObj.unlock(); // sure to release the lock without fail
}
}
}
}
Fai qualcosa solo quando puoi bloccare
public class Locky2 {
int count = 0; // shared among multiple threads
Lock lockObj = new ReentrantLock();
public void doSomething() {
boolean locked = lockObj.tryLock(); // returns true upon successful lock
if (locked) {
try {
++count; // a non-atomic operation
} finally {
lockObj.unlock(); // sure to release the lock without fail
}
}
}
}
Sono disponibili diverse varianti di blocco. Per ulteriori dettagli, consultare i documenti API qui