Java Language
Programowanie równoległe (wątki)
Szukaj…
Wprowadzenie
Przetwarzanie równoległe to forma obliczeń, w której kilka obliczeń jest wykonywanych jednocześnie zamiast sekwencyjnie. Język Java został zaprojektowany do obsługi współbieżnego programowania za pomocą wątków. Dostęp do obiektów i zasobów można uzyskać za pomocą wielu wątków; każdy wątek może potencjalnie uzyskać dostęp do dowolnego obiektu w programie, a programista musi zapewnić, że dostęp do odczytu i zapisu do obiektów jest odpowiednio zsynchronizowany między wątkami.
Uwagi
Powiązane tematy na StackOverflow:
Podstawowy wielowątkowość
Jeśli masz wiele zadań do wykonania, a wszystkie te zadania nie są zależne od wyników poprzednich, możesz użyć wielowątkowości dla swojego komputera, aby wykonać wszystkie te zadania w tym samym czasie, używając większej liczby procesorów, jeśli komputer może. Może to przyspieszyć wykonywanie programu, jeśli masz duże zadania niezależne.
class CountAndPrint implements Runnable {
private final String name;
CountAndPrint(String name) {
this.name = name;
}
/** This is what a CountAndPrint will do */
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
System.out.println(this.name + ": " + i);
}
}
public static void main(String[] args) {
// Launching 4 parallel threads
for (int i = 1; i <= 4; i++) {
// `start` method will call the `run` method
// of CountAndPrint in another thread
new Thread(new CountAndPrint("Instance " + i)).start();
}
// Doing some others tasks in the main Thread
for (int i = 0; i < 10000; i++) {
System.out.println("Main: " + i);
}
}
}
Kod metody uruchamiania różnych instancji CountAndPrint
będzie wykonywany w nieprzewidywalnej kolejności. Fragment przykładowego wykonania może wyglądać następująco:
Instance 4: 1
Instance 2: 1
Instance 4: 2
Instance 1: 1
Instance 1: 2
Main: 1
Instance 4: 3
Main: 2
Instance 3: 1
Instance 4: 4
...
Producent-konsument
Prosty przykład rozwiązania problemu producent-konsument. Zwróć uwagę, że klasy JDK ( AtomicBoolean
i BlockingQueue
) są używane do synchronizacji, co zmniejsza ryzyko utworzenia nieprawidłowego rozwiązania. Skonsultuj się z Javadoc w sprawie różnych rodzajów BlockingQueue ; wybranie innej implementacji może drastycznie zmienić zachowanie tego przykładu (np. DelayQueue lub Priority Queue ).
public class Producer implements Runnable {
private final BlockingQueue<ProducedData> queue;
public Producer(BlockingQueue<ProducedData> queue) {
this.queue = queue;
}
public void run() {
int producedCount = 0;
try {
while (true) {
producedCount++;
//put throws an InterruptedException when the thread is interrupted
queue.put(new ProducedData());
}
} catch (InterruptedException e) {
// the thread has been interrupted: cleanup and exit
producedCount--;
//re-interrupt the thread in case the interrupt flag is needeed higher up
Thread.currentThread().interrupt();
}
System.out.println("Produced " + producedCount + " objects");
}
}
public class Consumer implements Runnable {
private final BlockingQueue<ProducedData> queue;
public Consumer(BlockingQueue<ProducedData> queue) {
this.queue = queue;
}
public void run() {
int consumedCount = 0;
try {
while (true) {
//put throws an InterruptedException when the thread is interrupted
ProducedData data = queue.poll(10, TimeUnit.MILLISECONDS);
// process data
consumedCount++;
}
} catch (InterruptedException e) {
// the thread has been interrupted: cleanup and exit
consumedCount--;
//re-interrupt the thread in case the interrupt flag is needeed higher up
Thread.currentThread().interrupt();
}
System.out.println("Consumed " + consumedCount + " objects");
}
}
public class ProducerConsumerExample {
static class ProducedData {
// empty data object
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue<ProducedData> queue = new ArrayBlockingQueue<ProducedData>(1000);
// choice of queue determines the actual behavior: see various BlockingQueue implementations
Thread producer = new Thread(new Producer(queue));
Thread consumer = new Thread(new Consumer(queue));
producer.start();
consumer.start();
Thread.sleep(1000);
producer.interrupt();
Thread.sleep(10);
consumer.interrupt();
}
}
Korzystanie z ThreadLocal
Przydatnym narzędziem w Java Concurrency jest ThreadLocal
- pozwala to mieć zmienną, która będzie unikalna dla danego wątku. Zatem jeśli ten sam kod działa w różnych wątkach, te wykonania nie będą dzielić tej wartości, ale zamiast tego każdy wątek ma swoją zmienną lokalną dla wątku .
Na przykład jest to często używane do ustalenia kontekstu (takiego jak informacje o autoryzacji) obsługi żądania w serwletu. Możesz zrobić coś takiego:
private static final ThreadLocal<MyUserContext> contexts = new ThreadLocal<>();
public static MyUserContext getContext() {
return contexts.get(); // get returns the variable unique to this thread
}
public void doGet(...) {
MyUserContext context = magicGetContextFromRequest(request);
contexts.put(context); // save that context to our thread-local - other threads
// making this call don't overwrite ours
try {
// business logic
} finally {
contexts.remove(); // 'ensure' removal of thread-local variable
}
}
Teraz zamiast przekazywać MyUserContext
do każdej metody, możesz zamiast tego użyć MyServlet.getContext()
tam, gdzie jest to potrzebne. Teraz oczywiście wprowadza to zmienną, którą należy udokumentować, ale jest bezpieczna dla wątków, co eliminuje wiele wad stosowania tak wysoce zmiennej zmiennej.
Kluczową zaletą jest to, że każdy wątek ma własną zmienną lokalną wątku w kontenerze contexts
. Tak długo, jak używasz go z określonego punktu wejścia (np. Wymagając, aby każdy serwlet zachowywał swój kontekst, lub być może dodając filtr serwletu), możesz polegać na tym, że ten kontekst istnieje tam, gdzie jest potrzebny.
CountDownLatch
Pomoc w synchronizacji, która pozwala co najmniej jednemu wątkowi poczekać, aż zakończy się zestaw operacji wykonywanych w innych wątkach.
-
CountDownLatch
jest inicjowany z podaną liczbą. - Blokuje metody oczekujące, aż bieżąca liczba osiągnie zero z powodu wywołań metody
countDown()
, po czym wszystkie oczekujące wątki są zwalniane, a wszelkie kolejne wywołania oczekiwania natychmiast wracają. - Jest to zjawisko jednorazowe - liczby nie można zresetować. Jeśli potrzebujesz wersji, która resetuje licznik, rozważ użycie
CyclicBarrier
.
Kluczowe metody:
public void await() throws InterruptedException
Powoduje, że bieżący wątek czeka, aż zatrzask zostanie odliczony do zera, chyba że wątek zostanie przerwany.
public void countDown()
Zmniejsza liczbę zatrzasku, zwalniając wszystkie oczekujące wątki, jeśli liczba osiągnie zero.
Przykład:
import java.util.concurrent.*;
class DoSomethingInAThread implements Runnable {
CountDownLatch latch;
public DoSomethingInAThread(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
System.out.println("Do some thing");
latch.countDown();
} catch(Exception err) {
err.printStackTrace();
}
}
}
public class CountDownLatchDemo {
public static void main(String[] args) {
try {
int numberOfThreads = 5;
if (args.length < 1) {
System.out.println("Usage: java CountDownLatchDemo numberOfThreads");
return;
}
try {
numberOfThreads = Integer.parseInt(args[0]);
} catch(NumberFormatException ne) {
}
CountDownLatch latch = new CountDownLatch(numberOfThreads);
for (int n = 0; n < numberOfThreads; n++) {
Thread t = new Thread(new DoSomethingInAThread(latch));
t.start();
}
latch.await();
System.out.println("In Main thread after completion of " + numberOfThreads + " threads");
} catch(Exception err) {
err.printStackTrace();
}
}
}
wynik:
java CountDownLatchDemo 5
Do some thing
Do some thing
Do some thing
Do some thing
Do some thing
In Main thread after completion of 5 threads
Wyjaśnienie:
-
CountDownLatch
jest inicjowany licznikiem 5 w wątku głównym - Główny wątek czeka za pomocą metody
await()
. - Utworzono pięć instancji
DoSomethingInAThread
. Każda instancja zmniejszała licznik zacountDown()
metodycountDown()
. - Gdy licznik osiągnie zero, wątek główny zostanie wznowiony
Synchronizacja
W Javie istnieje wbudowany mechanizm blokowania na poziomie języka: synchronized
blok, który może wykorzystywać dowolny obiekt Java jako wewnętrzną blokadę (tzn. Każdy obiekt Java może być powiązany z monitorem).
Zamki wewnętrzne zapewniają atomowość grupom instrukcji. Aby zrozumieć, co to dla nas oznacza, rzućmy okiem na przykład, w którym synchronized
jest przydatna:
private static int t = 0;
private static Object mutex = new Object();
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
synchronized (mutex) {
t++;
System.out.println(MessageFormat.format("t: {0}", t));
}
});
}
executorService.shutdown();
}
W takim przypadku, gdyby nie blok synchronized
, wiązałoby się to z wieloma problemami dotyczącymi współbieżności. Pierwszy byłby z operatorem inkrementacji postu (sam w sobie nie jest atomowy), a drugi to, że obserwowalibyśmy wartość t po tym, jak dowolna ilość innych wątków miała szansę ją zmodyfikować. Ponieważ jednak uzyskaliśmy wewnętrzną blokadę, nie będzie tutaj warunków wyścigu, a dane wyjściowe będą zawierać liczby od 1 do 100 w normalnej kolejności.
Blokady wewnętrzne w Javie to muteksy (tj. Blokady wzajemnego wykonywania). Wzajemne wykonanie oznacza, że jeśli jeden wątek uzyskał zamek, drugi będzie musiał poczekać, aż pierwszy go zwolni, zanim będzie mógł uzyskać zamek dla siebie. Uwaga: Operacja, która może wprowadzić wątek w stan oczekiwania (uśpienia), nazywana jest operacją blokowania . Zatem uzyskanie blokady jest operacją blokującą.
Wewnętrzne blokady w Javie są ponownie wysyłane . Oznacza to, że jeśli wątek spróbuje uzyskać blokadę, którą już posiada, nie zostanie zablokowany i pomyślnie go uzyska. Na przykład następujący kod nie zostanie zablokowany po wywołaniu:
public void bar(){
synchronized(this){
...
}
}
public void foo(){
synchronized(this){
bar();
}
}
Oprócz synchronized
bloków istnieją również metody synchronized
.
Następujące bloki kodu są praktycznie równoważne (mimo że kod bajtowy wydaje się inny):
synchronized
blok nathis
:public void foo() { synchronized(this) { doStuff(); } }
metoda
synchronized
:public synchronized void foo() { doStuff(); }
Podobnie w przypadku metod static
:
class MyClass {
...
public static void bar() {
synchronized(MyClass.class) {
doSomeOtherStuff();
}
}
}
ma taki sam efekt jak ten:
class MyClass {
...
public static synchronized void bar() {
doSomeOtherStuff();
}
}
Operacje atomowe
Operacja atomowa to operacja wykonywana „naraz”, bez szansy na zaobserwowanie lub zmianę stanu przez inne wątki podczas wykonywania operacji atomowej.
Rozważmy ZŁY PRZYKŁAD .
private static int t = 0;
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
t++;
System.out.println(MessageFormat.format("t: {0}", t));
});
}
executorService.shutdown();
}
W takim przypadku występują dwa problemy. Pierwszą kwestią jest to, że operator przyrostowy nie jest atomowy. Składa się z wielu operacji: pobierz wartość, dodaj 1 do wartości, ustaw wartość. Dlatego jeśli uruchomimy przykład, prawdopodobnie nie zobaczymy t: 100
na wyjściu - dwa wątki mogą jednocześnie uzyskać wartość, zwiększyć ją i ustawić: powiedzmy, że wartość t wynosi 10, a dwa wątki zwiększają t. Oba wątki ustawią wartość t na 11, ponieważ drugi wątek obserwuje wartość t, zanim pierwszy wątek skończy ją zwiększać.
Druga kwestia dotyczy tego, w jaki sposób obserwujemy t. Gdy drukujemy wartość t, wartość ta mogła już zostać zmieniona przez inny wątek po operacji przyrostu tego wątku.
Aby rozwiązać te problemy, użyjemy java.util.concurrent.atomic.AtomicInteger
, który ma wiele operacji atomowych do użycia.
private static AtomicInteger t = new AtomicInteger(0);
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
for (int i = 0; i < 100; i++) {
executorService.execute(() -> {
int currentT = t.incrementAndGet();
System.out.println(MessageFormat.format("t: {0}", currentT));
});
}
executorService.shutdown();
}
Metoda incrementAndGet
AtomicInteger
zwiększa atomowo i zwraca nową wartość, eliminując w ten sposób poprzednie warunki wyścigu. Należy pamiętać, że w tym przykładzie linie nadal będą nieczynne, ponieważ nie podejmujemy żadnego wysiłku w celu sekwencjonowania println
i że wykracza to poza zakres tego przykładu, ponieważ wymagałoby to synchronizacji, a celem tego przykładu jest pokazanie, w jaki sposób używać AtomicInteger
celu wyeliminowania warunków wyścigu dotyczących stanu.
Tworzenie podstawowego systemu zakleszczonego
Zakleszczenie występuje, gdy dwie konkurencyjne akcje czekają na zakończenie drugiej, a zatem żadna z nich nigdy się nie kończy. W java jest jeden zamek powiązany z każdym obiektem. Aby uniknąć jednoczesnej modyfikacji wykonanej przez wiele wątków na jednym obiekcie, możemy użyć synchronized
słowa kluczowego, ale wszystko kosztuje. synchronized
użycie synchronized
słowa kluczowego może doprowadzić do zablokowania systemów nazywanych systemem zakleszczonym.
Weź pod uwagę, że na 1 instancję działają 2 wątki. Pozwala na wywoływanie wątków jako Pierwszy i Drugi, i powiedzmy, że mamy 2 zasoby R1 i R2. Pierwszy nabywa R1, a także potrzebuje R2 do ukończenia, a drugi nabywa R2 i potrzebuje R1 do ukończenia.
powiedzmy, że w czasie t = 0,
Pierwszy ma R1, a drugi R2. teraz Pierwszy czeka na R2, podczas gdy Drugi czeka na R1. to oczekiwanie jest nieokreślone i prowadzi do impasu.
public class Example2 {
public static void main(String[] args) throws InterruptedException {
final DeadLock dl = new DeadLock();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
dl.methodA();
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
try {
dl.method2();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
t1.setName("First");
t2.setName("Second");
t1.start();
t2.start();
}
}
class DeadLock {
Object mLock1 = new Object();
Object mLock2 = new Object();
public void methodA() {
System.out.println("methodA wait for mLock1 " + Thread.currentThread().getName());
synchronized (mLock1) {
System.out.println("methodA mLock1 acquired " + Thread.currentThread().getName());
try {
Thread.sleep(100);
method2();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public void method2() throws InterruptedException {
System.out.println("method2 wait for mLock2 " + Thread.currentThread().getName());
synchronized (mLock2) {
System.out.println("method2 mLock2 acquired " + Thread.currentThread().getName());
Thread.sleep(100);
method3();
}
}
public void method3() throws InterruptedException {
System.out.println("method3 mLock1 "+ Thread.currentThread().getName());
synchronized (mLock1) {
System.out.println("method3 mLock1 acquired " + Thread.currentThread().getName());
}
}
}
Wynik tego programu:
methodA wait for mLock1 First
method2 wait for mLock2 Second
method2 mLock2 acquired Second
methodA mLock1 acquired First
method3 mLock1 Second
method2 wait for mLock2 First
Wstrzymywanie wykonania
Thread.sleep
powoduje zawieszenie wykonywania bieżącego wątku na określony czas. Jest to skuteczny sposób udostępniania czasu procesora innym wątkom aplikacji lub innym aplikacjom, które mogą być uruchomione w systemie komputerowym. Istnieją dwie przeciążone metody sleep
w klasie Thread.
Jeden, który określa czas uśpienia do milisekundy
public static void sleep(long millis) throws InterruptedException
Jeden, który określa czas snu do nanosekundy
public static void sleep(long millis, int nanos)
Wstrzymywanie wykonania na 1 sekundę
Thread.sleep(1000);
Należy zauważyć, że jest to wskazówka dla programu planującego jądro systemu operacyjnego. To niekoniecznie musi być precyzyjne, a niektóre implementacje nawet nie uwzględniają parametru nanosekundowego (być może zaokrąglenia do najbliższej milisekundy).
Zalecane jest dołączenie wywołania do Thread.sleep
w try / catch i catch InterruptedException
.
Wizualizacja barier odczytu / zapisu przy użyciu synchronizacji / niestabilności
Ponieważ wiemy, że powinniśmy używać synchronized
słowa kluczowego, aby wyłączać wykonanie metody lub bloku. Ale niewielu z nas może nie zdawać sobie sprawy z jeszcze jednego ważnego aspektu korzystania ze synchronized
i volatile
słowa kluczowego: oprócz stworzenia atomowej jednostki kodu, zapewnia również barierę odczytu / zapisu . Czym jest ta bariera odczytu / zapisu? Omówmy to na przykładzie:
class Counter {
private Integer count = 10;
public synchronized void incrementCount() {
count++;
}
public Integer getCount() {
return count;
}
}
Załóżmy, że wątek A wywołuje najpierw incrementCount()
a następnie inny wątek B wywołuje getCount()
. W tym scenariuszu nie ma gwarancji, że B zobaczy zaktualizowaną wartość count
. Nadal może być count
jako 10
, nawet możliwe, że nigdy nie zobaczy zaktualizowanej wartości count
.
Aby zrozumieć to zachowanie, musimy zrozumieć, w jaki sposób model pamięci Java integruje się z architekturą sprzętową. W Javie każdy wątek ma własny stos wątków. Ten stos zawiera: stos wywołań metod i zmienną lokalną utworzoną w tym wątku. W systemie wielordzeniowym całkiem możliwe jest, że dwa wątki pracują jednocześnie w osobnych rdzeniach. W takim scenariuszu możliwe jest, że część stosu wątku znajduje się w rejestrze / pamięci podręcznej rdzenia. Wewnątrz wątku dostęp do obiektu uzyskuje się za pomocą słowa kluczowego synchronized
(lub volatile
), po synchronized
bloku tego wątku synchronizuje się jego lokalną kopię tej zmiennej z pamięcią główną. Tworzy to barierę odczytu / zapisu i zapewnia, że wątek widzi najnowszą wartość tego obiektu.
Ale w naszym przypadku, ponieważ wątek B nie wykorzystał zsynchronizowanego dostępu do count
, może to być wartość referencyjna count
przechowywana w rejestrze i może nigdy nie zobaczyć aktualizacji z wątku A. Aby upewnić się, że B widzi ostatnią wartość count, musimy getCount()
zsynchronizowane.
public synchronized Integer getCount() {
return count;
}
Teraz, gdy wątek A jest zrobione z aktualizacją count
to odblokowuje Counter
instancji, w tym samym czasie tworzy barierę zapisu i wypłukuje wszystkie zmiany wykonywane wewnątrz tego bloku do pamięci głównej. Podobnie, gdy wątek B uzyskuje blokadę na tym samym wystąpieniu Counter
, wchodzi w barierę odczytu i odczytuje wartość count
z pamięci głównej i widzi wszystkie aktualizacje.
Ten sam efekt widoczności dotyczy również volatile
odczytu / zapisu. Wszystkie zmienne zaktualizowane przed zapisem do pamięci volatile
zostaną opróżnione do pamięci głównej, a wszystkie odczyty po odczytaniu zmiennych volatile
będą z pamięci głównej.
Tworzenie instancji java.lang.Thread
Istnieją dwa główne podejścia do tworzenia wątku w Javie. Zasadniczo utworzenie wątku jest tak proste, jak napisanie kodu, który zostanie w nim wykonany. Te dwa podejścia różnią się tym, gdzie definiujesz ten kod.
W Javie wątek jest reprezentowany przez obiekt - instancję java.lang.Thread lub jego podklasę. Zatem pierwszym podejściem jest utworzenie tej podklasy i przesłonięcie metody run () .
Uwaga : Wątek użyję do odwołania się do klasy java.lang.Thread i wątku do odniesienia do logicznej koncepcji wątków.
class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("Thread running!");
}
}
}
Ponieważ już zdefiniowaliśmy kod do wykonania, wątek można utworzyć po prostu jako:
MyThread t = new MyThread();
Klasa Thread zawiera również konstruktor akceptujący ciąg znaków, który będzie używany jako nazwa wątku. Może to być szczególnie przydatne podczas debugowania programu wielowątkowego.
class MyThread extends Thread {
public MyThread(String name) {
super(name);
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("Thread running! ");
}
}
}
MyThread t = new MyThread("Greeting Producer");
Drugim podejściem jest zdefiniowanie kodu za pomocą java.lang.Runnable i jego jedynej metody run () . Klasa Thread pozwala następnie wykonać tę metodę w oddzielnym wątku. Aby to osiągnąć, utwórz wątek za pomocą konstruktora akceptującego instancję interfejsu Runnable .
Thread t = new Thread(aRunnable);
Może to być bardzo wydajne w połączeniu z lambdami lub referencjami metod (tylko Java 8):
Thread t = new Thread(operator::hardWork);
Możesz także podać nazwę wątku.
Thread t = new Thread(operator::hardWork, "Pi operator");
Praktycznie rzecz biorąc, możesz używać obu podejść bez obaw. Jednak ogólna mądrość mówi, aby używać tego drugiego.
Dla każdego z czterech wspomnianych konstruktorów istnieje również alternatywa zaakceptowania instancji java.lang.ThreadGroup jako pierwszego parametru.
ThreadGroup tg = new ThreadGroup("Operators");
Thread t = new Thread(tg, operator::hardWork, "PI operator");
ThreadGroup reprezentuje zestaw wątków. Możesz dodać wątek do grupy wątków tylko za pomocą konstruktora wątku . ThreadGroup mogą być następnie wykorzystane do zarządzania wszystkimi jej gwint s razem, jak również wątek może uzyskać informacje od jego ThreadGroup .
Podsumowując, Wątek można utworzyć za pomocą jednego z tych publicznych konstruktorów:
Thread()
Thread(String name)
Thread(Runnable target)
Thread(Runnable target, String name)
Thread(ThreadGroup group, String name)
Thread(ThreadGroup group, Runnable target)
Thread(ThreadGroup group, Runnable target, String name)
Thread(ThreadGroup group, Runnable target, String name, long stackSize)
Ostatni pozwala nam zdefiniować pożądany rozmiar stosu dla nowego wątku.
Często podczas odczytu i konfiguracji wielu wątków o takich samych właściwościach lub z tego samego wzorca ma to wpływ na czytelność kodu. Wtedy można użyć java.util.concurrent.ThreadFactory . Ten interfejs umożliwia enkapsulację procedury tworzenia wątku za pomocą wzorca fabrycznego i jego jedynej metody newThread (Runnable) .
class WorkerFactory implements ThreadFactory {
private int id = 0;
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Worker " + id++);
}
}
Przerwanie wątku / zatrzymanie wątków
Każdy wątek Java ma flagę przerwania, która początkowo jest fałszywa. Przerwanie wątku jest w zasadzie niczym więcej niż ustawieniem tej flagi na true. Kod działający w tym wątku może czasami sprawdzać flagę i działać zgodnie z nią. Kod może również całkowicie go zignorować. Ale dlaczego każdy wątek miałby taką flagę? W końcu posiadanie flagi logicznej na wątku jest czymś, co możemy po prostu zorganizować, jeśli i kiedy tego potrzebujemy. Są metody, które zachowują się w specjalny sposób, gdy wątek, na którym działają, jest przerwany. Metody te nazywane są metodami blokowania. Są to metody, które wprowadzają wątek w stan OCZEKIWANIA lub CZASU OCZEKIWANIA. Gdy wątek jest w tym stanie, jego przerwanie spowoduje zgłoszenie wyjątku InterruptedException na przerwany wątek, a nie ustawienie flagi przerwania na wartość true, a wątek ponownie stanie się ZWRACANY. Kod wywołujący metodę blokowania jest zmuszony do obsługi wyjątku InterruptedException, ponieważ jest to sprawdzony wyjątek. Tak więc, i stąd jego nazwa, przerwanie może skutkować przerwaniem OCZEKIWANIA, a tym samym jego zakończeniem. Zauważ, że nie wszystkie metody, które w jakiś sposób czekają (np. Blokowanie IO) reagują w ten sposób na przerwanie, ponieważ nie wprowadzają wątku w stan oczekiwania. W końcu wątek, który ma ustawioną flagę przerwania, która wchodzi do metody blokowania (tj. Próbuje dostać się do stanu oczekiwania), natychmiast wyrzuci InterruptedException i flaga przerwania zostanie wyczyszczona.
Poza tymi mechanizmami Java nie przypisuje żadnego specjalnego znaczenia semantycznego przerwaniu. Kod może dowolnie interpretować przerwanie w dowolny sposób. Ale najczęściej przerwanie służy do sygnalizowania wątkowi, że powinien przestać działać w najbliższym dogodnym momencie. Ale, jak powinno być jasne z powyższego, to do kodu w tym wątku należy odpowiednio zareagować na to przerwanie, aby przestać działać. Zatrzymanie wątku to współpraca. Gdy wątek zostanie przerwany, jego kod może mieć kilka poziomów w głąb stosu. Większość kodu nie wywołuje metody blokowania i kończy się wystarczająco wcześnie, aby nie opóźniać nadmiernie zatrzymania wątku. Kod, który powinien przede wszystkim dotyczyć reagowania na przerwy, to kod, który wykonuje zadania obsługi pętli, dopóki ich nie pozostanie, lub dopóki nie zostanie ustawiona flaga sygnalizująca zatrzymanie tej pętli. Pętle, które obsługują prawdopodobnie nieskończone zadania (tj. Nadal działają w zasadzie), powinny sprawdzić flagę przerwania, aby wyjść z pętli. W przypadku pętli skończonych semantyka może dyktować, że wszystkie zadania muszą zostać zakończone przed zakończeniem, lub może być właściwe pozostawienie niektórych zadań nieobsługiwanych. Kod wywołujący metody blokowania będzie zmuszony do obsługi wyjątku InterruptedException. Jeśli to semantycznie możliwe, może po prostu propagować wyjątek InterruptedException i zadeklarować jego wyrzucenie. Jako taki sam staje się metodą blokującą w stosunku do swoich rozmówców. Jeśli nie może propagować wyjątku, powinien przynajmniej ustawić flagę przerwania, więc osoby wywołujące wyżej stos wiedzą również, że wątek został przerwany. W niektórych przypadkach metoda musi kontynuować oczekiwanie bez względu na wyjątek InterruptedException, w którym to przypadku musi opóźnić ustawienie flagi przerwanej, aż do momentu zakończenia oczekiwania, może to obejmować ustawienie zmiennej lokalnej, którą należy sprawdzić przed wyjściem z metody następnie przerwij wątek.
Przykłady:
Przykład kodu, który przestaje obsługiwać zadania po przerwie
class TaskHandler implements Runnable {
private final BlockingQueue<Task> queue;
TaskHandler(BlockingQueue<Task> queue) {
this.queue = queue;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) { // check for interrupt flag, exit loop when interrupted
try {
Task task = queue.take(); // blocking call, responsive to interruption
handle(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // cannot throw InterruptedException (due to Runnable interface restriction) so indicating interruption by setting the flag
}
}
}
private void handle(Task task) {
// actual handling
}
}
Przykład kodu, który opóźnia ustawienie flagi przerwania, aż do całkowitego zakończenia:
class MustFinishHandler implements Runnable {
private final BlockingQueue<Task> queue;
MustFinishHandler(BlockingQueue<Task> queue) {
this.queue = queue;
}
@Override
public void run() {
boolean shouldInterrupt = false;
while (true) {
try {
Task task = queue.take();
if (task.isEndOfTasks()) {
if (shouldInterrupt) {
Thread.currentThread().interrupt();
}
return;
}
handle(task);
} catch (InterruptedException e) {
shouldInterrupt = true; // must finish, remember to set interrupt flag when we're done
}
}
}
private void handle(Task task) {
// actual handling
}
}
Przykład kodu, który ma ustaloną listę zadań, ale może zostać zamknięty wcześniej, gdy zostanie przerwany
class GetAsFarAsPossible implements Runnable {
private final List<Task> tasks = new ArrayList<>();
@Override
public void run() {
for (Task task : tasks) {
if (Thread.currentThread().isInterrupted()) {
return;
}
handle(task);
}
}
private void handle(Task task) {
// actual handling
}
}
Przykład wielu producentów / konsumentów ze wspólną globalną kolejką
Poniższy kod przedstawia wiele programów producenta / konsumenta. Wątki producenta i konsumenta mają tę samą globalną kolejkę.
import java.util.concurrent.*;
import java.util.Random;
public class ProducerConsumerWithES {
public static void main(String args[]) {
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
ExecutorService pes = Executors.newFixedThreadPool(2);
ExecutorService ces = Executors.newFixedThreadPool(2);
pes.submit(new Producer(sharedQueue, 1));
pes.submit(new Producer(sharedQueue, 2));
ces.submit(new Consumer(sharedQueue, 1));
ces.submit(new Consumer(sharedQueue, 2));
pes.shutdown();
ces.shutdown();
}
}
/* Different producers produces a stream of integers continuously to a shared queue,
which is shared between all Producers and consumers */
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
private Random random = new Random();
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
// Producer produces a continuous stream of numbers for every 200 milli seconds
while (true) {
try {
int number = random.nextInt(1000);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
Thread.sleep(200);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
/* Different consumers consume data from shared queue, which is shared by both producer and consumer threads */
class Consumer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
// Consumer consumes numbers generated from Producer threads continuously
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
wynik:
Produced:69:by thread:2
Produced:553:by thread:1
Consumed: 69:by thread:1
Consumed: 553:by thread:2
Produced:41:by thread:2
Produced:796:by thread:1
Consumed: 41:by thread:1
Consumed: 796:by thread:2
Produced:728:by thread:2
Consumed: 728:by thread:1
i tak dalej ................
Wyjaśnienie:
-
sharedQueue
, która jestLinkedBlockingQueue
jest współużytkowana przez wszystkie wątki producenta i konsumenta. -
sharedQueue
producenta produkują jedną liczbę całkowitą na każde 200 mili sekund w sposób ciągły i dołączają ją dosharedQueue
- Wątek
Consumer
stale zużywa liczbę całkowitą zsharedQueue
. - Ten program jest implementowany bez jawnych
synchronized
lubLock
konstrukcji. BlockingQueue jest kluczem do osiągnięcia tego celu.
Implementacje BlockingQueue są przeznaczone głównie do użytku w kolejkach producent-konsument.
Implementacje BlockingQueue są bezpieczne dla wątków. Wszystkie metody kolejkowania osiągają swoje efekty atomowo za pomocą wewnętrznych blokad lub innych form kontroli współbieżności.
Wyłączny dostęp do zapisu / jednoczesnego odczytu
Czasami proces wymaga jednoczesnego zapisu i odczytu tych samych „danych”.
Interfejs ReadWriteLock
i jego implementacja ReentrantReadWriteLock
pozwala na wzorzec dostępu, który można opisać następująco:
- Może istnieć dowolna liczba współbieżnych czytników danych. Jeśli przyznano przynajmniej jeden dostęp do czytnika, oznacza to, że nie jest możliwy dostęp do zapisu.
- Może istnieć co najwyżej jeden zapisujący dane. Jeśli przyznano dostęp do zapisu, żaden czytelnik nie może uzyskać dostępu do danych.
Implementacja może wyglądać następująco:
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Sample {
// Our lock. The constructor allows a "fairness" setting, which guarantees the chronology of lock attributions.
protected static final ReadWriteLock RW_LOCK = new ReentrantReadWriteLock();
// This is a typical data that needs to be protected for concurrent access
protected static int data = 0;
/** This will write to the data, in an exclusive access */
public static void writeToData() {
RW_LOCK.writeLock().lock();
try {
data++;
} finally {
RW_LOCK.writeLock().unlock();
}
}
public static int readData() {
RW_LOCK.readLock().lock();
try {
return data;
} finally {
RW_LOCK.readLock().unlock();
}
}
}
UWAGA 1 : Ten precyzyjny przypadek użycia ma czystsze rozwiązanie przy użyciu AtomicInteger
, ale tutaj opisano wzorzec dostępu, który działa niezależnie od faktu, że dane tutaj są liczbami całkowitymi, które jako wariant Atomic.
UWAGA 2 : Blokada części do czytania jest naprawdę potrzebna, chociaż może nie wyglądać tak dla zwykłego czytelnika. Rzeczywiście, jeśli nie zablokujesz się po stronie czytnika, dowolna liczba rzeczy może się nie udać, między innymi:
- Zapis pierwotnych wartości nie jest gwarantowany jako atomowy na wszystkich maszynach JVM, więc czytnik może zobaczyć np. Tylko 32-bitowe zapisy 64-bitowe, jeśli
data
są typu 64-bitowego - JVM gwarantuje widoczność zapisu z wątku, który go nie wykonał, tylko jeśli ustalimy relację Happen Before między zapisami i odczytami. Ta relacja jest ustalana, gdy zarówno czytelnicy, jak i pisarze używają swoich odpowiednich blokad, ale nie inaczej
W przypadku, gdy wymagana jest wyższa wydajność, przy pewnych typach użycia, dostępny jest szybszy typ blokady, zwany StampedLock
, który między innymi realizuje optymistyczny tryb blokady. Ta blokada działa zupełnie inaczej niż ReadWriteLock
, a tej próbki nie można przenosić.
Obiekt Runnable
Interfejs Runnable
definiuje pojedynczą metodę run()
, która ma zawierać kod wykonywany w wątku.
Obiekt Runnable
jest przekazywany do konstruktora Thread
. I wywoływana jest metoda start()
wątku.
Przykład
public class HelloRunnable implements Runnable {
@Override
public void run() {
System.out.println("Hello from a thread");
}
public static void main(String[] args) {
new Thread(new HelloRunnable()).start();
}
}
Przykład w Javie 8:
public static void main(String[] args) {
Runnable r = () -> System.out.println("Hello world");
new Thread(r).start();
}
Podklasa Runnable vs Thread
Zastosowanie obiektu Runnable
jest bardziej ogólne, ponieważ obiekt Runnable
może podklasować klasę inną niż Thread
.
Podklasowanie Thread
jest łatwiejsze w użyciu w prostych aplikacjach, ale jest ograniczone faktem, że klasa zadań musi być potomkiem Thread
.
Obiekt Runnable
ma zastosowanie do interfejsów API zarządzania wątkami wysokiego poziomu.
Semafor
Semafor to synchronizator wysokiego poziomu, który utrzymuje zestaw zezwoleń, które mogą być pozyskiwane i zwalniane przez wątki. Semafor można sobie wyobrazić jako licznik zezwoleń, który będzie zmniejszany, gdy wątek się zdobędzie, i będzie zwiększany, gdy wątek zostanie zwolniony. Jeśli liczba zezwoleń wynosi 0
gdy wątek próbuje uzyskać, wątek zostanie zablokowany, dopóki pozwolenie nie zostanie udostępnione (lub dopóki wątek nie zostanie przerwany).
Semafor jest inicjowany jako:
Semaphore semaphore = new Semaphore(1); // The int value being the number of permits
Konstruktor semafora akceptuje dodatkowy parametr boolowski dla zapewnienia uczciwości. Po ustawieniu wartości false ta klasa nie daje żadnych gwarancji dotyczących kolejności uzyskiwania zezwoleń przez wątki. Gdy uczciwość jest ustawiona na prawdę, semafor gwarantuje, że wątki wywołujące dowolną z metod pozyskiwania są wybierane w celu uzyskania pozwoleń w kolejności, w jakiej przetworzono ich wywołanie tych metod. Deklaruje się go w następujący sposób:
Semaphore semaphore = new Semaphore(1, true);
Spójrzmy teraz na przykład z javadocs, w których semafor służy do kontrolowania dostępu do puli przedmiotów. W tym przykładzie zastosowano getItem()
aby zapewnić funkcję blokowania w celu zapewnienia, że zawsze są elementy do uzyskania, gdy getItem()
jest getItem()
.
class Pool {
/*
* Note that this DOES NOT bound the amount that may be released!
* This is only a starting value for the Semaphore and has no other
* significant meaning UNLESS you enforce this inside of the
* getNextAvailableItem() and markAsUnused() methods
*/
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
/**
* Obtains the next available item and reduces the permit count by 1.
* If there are no items available, block.
*/
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
/**
* Puts the item into the pool and add 1 permit.
*/
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
private Object getNextAvailableItem() {
// Implementation
}
private boolean markAsUnused(Object o) {
// Implementation
}
}
Dodaj dwie tablice `int` przy użyciu Threadpool
Pula wątków ma kolejkę zadań, z których każde zostanie wykonane na jednym z tych wątków.
Poniższy przykład pokazuje, jak dodać dwie tablice int
przy użyciu puli wątków.
int[] firstArray = { 2, 4, 6, 8 };
int[] secondArray = { 1, 3, 5, 7 };
int[] result = { 0, 0, 0, 0 };
ExecutorService pool = Executors.newCachedThreadPool();
// Setup the ThreadPool:
// for each element in the array, submit a worker to the pool that adds elements
for (int i = 0; i < result.length; i++) {
final int worker = i;
pool.submit(() -> result[worker] = firstArray[worker] + secondArray[worker] );
}
// Wait for all Workers to finish:
try {
// execute all submitted tasks
pool.shutdown();
// waits until all workers finish, or the timeout ends
pool.awaitTermination(12, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
pool.shutdownNow(); //kill thread
}
System.out.println(Arrays.toString(result));
Uwagi:
Ten przykład jest wyłącznie ilustracyjny. W praktyce nie będzie żadnego przyspieszenia przy użyciu wątków do tak małego zadania. Możliwe jest spowolnienie, ponieważ narzuty związane z tworzeniem i planowaniem zadań pochłoną czas potrzebny do uruchomienia zadania.
Jeśli korzystasz z Javy 7 i wcześniejszych, do wykonania zadań użyjesz anonimowych klas zamiast lambdas.
Uzyskaj status wszystkich wątków uruchomionych przez Twój program, z wyjątkiem wątków systemowych
Fragment kodu:
import java.util.Set;
public class ThreadStatus {
public static void main(String args[]) throws Exception {
for (int i = 0; i < 5; i++){
Thread t = new Thread(new MyThread());
t.setName("MyThread:" + i);
t.start();
}
int threadCount = 0;
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
for (Thread t : threadSet) {
if (t.getThreadGroup() == Thread.currentThread().getThreadGroup()) {
System.out.println("Thread :" + t + ":" + "state:" + t.getState());
++threadCount;
}
}
System.out.println("Thread count started by Main thread:" + threadCount);
}
}
class MyThread implements Runnable {
public void run() {
try {
Thread.sleep(2000);
} catch(Exception err) {
err.printStackTrace();
}
}
}
Wynik:
Thread :Thread[MyThread:1,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:3,5,main]:state:TIMED_WAITING
Thread :Thread[main,5,main]:state:RUNNABLE
Thread :Thread[MyThread:4,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:0,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:2,5,main]:state:TIMED_WAITING
Thread count started by Main thread:6
Wyjaśnienie:
Thread.getAllStackTraces().keySet()
zwraca wszystkie Thread
s włącznie wątków aplikacji i wątków systemowych. Jeśli interesuje Cię tylko status wątków, uruchomiony przez aplikację, iteruj zestaw Thread
, sprawdzając grupę wątków danego wątku względem głównego wątku programu.
W przypadku braku powyższego stanu ThreadGroup program zwraca status poniższych wątków systemowych:
Reference Handler
Signal Dispatcher
Attach Listener
Finalizer
Callable and Future
Chociaż Runnable
zapewnia sposób na zawijanie kodu, który ma zostać wykonany w innym wątku, ma on ograniczenie polegające na tym, że nie może zwrócić wyniku z wykonania. Jedynym sposobem na uzyskanie pewnej wartości zwrotnej z wykonania Runnable
jest przypisanie wyniku do zmiennej dostępnej w zakresie poza Runnable
.
Callable
został wprowadzony w Javie 5 jako peer do Runnable
. Callable
jest zasadniczo takie samo, z tym wyjątkiem, że ma metodę call
zamiast run
. Metoda call
ma dodatkową możliwość zwrócenia wyniku i może również generować sprawdzone wyjątki.
Wynik przesłanego zadania na żądanie można wykorzystać w przyszłości
Future
można uznać za rodzaj kontenera, w którym mieści się wynik obliczeń na Callable
. Obliczanie wywołania może być kontynuowane w innym wątku, a każda próba dotknięcia wyniku Future
zostanie zablokowana i zwróci wynik tylko wtedy, gdy będzie dostępny.
Interfejs na żądanie
public interface Callable<V> {
V call() throws Exception;
}
Przyszłość
interface Future<V> {
V get();
V get(long timeout, TimeUnit unit);
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
}
Przykład Callable i Future:
public static void main(String[] args) throws Exception {
ExecutorService es = Executors.newSingleThreadExecutor();
System.out.println("Time At Task Submission : " + new Date());
Future<String> result = es.submit(new ComplexCalculator());
// the call to Future.get() blocks until the result is available.So we are in for about a 10 sec wait now
System.out.println("Result of Complex Calculation is : " + result.get());
System.out.println("Time At the Point of Printing the Result : " + new Date());
}
Nasz program wywołujący, który wykonuje długie obliczenia
public class ComplexCalculator implements Callable<String> {
@Override
public String call() throws Exception {
// just sleep for 10 secs to simulate a lengthy computation
Thread.sleep(10000);
System.out.println("Result after a lengthy 10sec calculation");
return "Complex Result"; // the result
}
}
Wynik
Time At Task Submission : Thu Aug 04 15:05:15 EDT 2016
Result after a lengthy 10sec calculation
Result of Complex Calculation is : Complex Result
Time At the Point of Printing the Result : Thu Aug 04 15:05:25 EDT 2016
Inne operacje dozwolone w przyszłości
Podczas gdy get()
jest metodą do wyodrębnienia faktycznego wyniku, który Future udostępnia
-
get(long timeout, TimeUnit unit)
określa maksymalny okres czasu, w którym bieżący wątek będzie oczekiwał na wynik; - Aby anulować wywołanie zadania
cancel(mayInterruptIfRunning)
. Flaga możemayInterrupt
wskazuje, że zadanie powinno zostać przerwane, jeśli zostało uruchomione i działa teraz; - Aby sprawdzić, czy zadanie zostało zakończone / zakończone przez wywołanie
isDone()
; - Aby sprawdzić, czy długie zadanie zostało anulowane, należy
isCancelled()
.
Blokuje jako pomoc w synchronizacji
Przed współbieżnym wprowadzeniem pakietu Java 5 wątki były na niskim poziomie. Wprowadzenie tego pakietu zapewniło kilka pomocniczych / konstrukcyjnych programów wyższego poziomu.
Zamki to mechanizmy synchronizacji wątków, które zasadniczo służą temu samemu celowi, co zsynchronizowane bloki lub słowa kluczowe.
Blokowanie iskrobezpieczne
int count = 0; // shared among multiple threads
public void doSomething() {
synchronized(this) {
++count; // a non-atomic operation
}
}
Synchronizacja za pomocą blokad
int count = 0; // shared among multiple threads
Lock lockObj = new ReentrantLock();
public void doSomething() {
try {
lockObj.lock();
++count; // a non-atomic operation
} finally {
lockObj.unlock(); // sure to release the lock without fail
}
}
Blokady mają również dostępną funkcjonalność, której nie zapewnia blokowanie wewnętrzne, takie jak blokowanie, ale reagowanie na przerwanie lub próba zablokowania i nie blokowanie, gdy nie jest to możliwe.
Blokowanie, reagujące na zakłócenia
class Locky {
int count = 0; // shared among multiple threads
Lock lockObj = new ReentrantLock();
public void doSomething() {
try {
try {
lockObj.lockInterruptibly();
++count; // a non-atomic operation
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // stopping
}
} finally {
if (!Thread.currentThread().isInterrupted()) {
lockObj.unlock(); // sure to release the lock without fail
}
}
}
}
Zrób coś tylko wtedy, gdy możesz zablokować
public class Locky2 {
int count = 0; // shared among multiple threads
Lock lockObj = new ReentrantLock();
public void doSomething() {
boolean locked = lockObj.tryLock(); // returns true upon successful lock
if (locked) {
try {
++count; // a non-atomic operation
} finally {
lockObj.unlock(); // sure to release the lock without fail
}
}
}
}
Dostępnych jest kilka wariantów zamka. Więcej informacji można znaleźć tutaj