Поиск…


Вступление

Параллельные вычисления - это одна из форм вычисления, при которой несколько вычислений выполняются одновременно, а не последовательно. Язык Java предназначен для поддержки параллельного программирования посредством использования потоков. Доступ к объектам и ресурсам осуществляется несколькими потоками; каждый поток может потенциально получить доступ к любому объекту в программе, и программист должен обеспечить, чтобы доступ для чтения и записи к объектам был правильно синхронизирован между потоками.

замечания

Связанная тема (ы) по StackOverflow:

Базовая многопоточность

Если у вас много задач для выполнения, и все эти задачи не зависят от результата предыдущих, вы можете использовать Multithreading для вашего компьютера для выполнения всех этих задач одновременно с использованием большего количества процессоров, если ваш компьютер может. Это может ускорить выполнение вашей программы, если у вас есть большие независимые задачи.

class CountAndPrint implements Runnable {

    private final String name;

    CountAndPrint(String name) {
        this.name = name;
    }

    /** This is what a CountAndPrint will do */
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            System.out.println(this.name + ": " + i);
        }
    }

    public static void main(String[] args) {
        // Launching 4 parallel threads
        for (int i = 1; i <= 4; i++) {
            // `start` method will call the `run` method 
            // of CountAndPrint in another thread
            new Thread(new CountAndPrint("Instance " + i)).start();
        }

        // Doing some others tasks in the main Thread
        for (int i = 0; i < 10000; i++) {
            System.out.println("Main: " + i);
        }
    }
}

Код метода run для различных экземпляров CountAndPrint будет выполняться в непредсказуемом порядке. Отрывок примера выполнения может выглядеть так:

Instance 4: 1
Instance 2: 1
Instance 4: 2
Instance 1: 1
Instance 1: 2
Main: 1
Instance 4: 3
Main: 2
Instance 3: 1
Instance 4: 4
...

Производитель-Потребитель

Простой пример решения проблемы производителя и потребителя. Обратите внимание, что для синхронизации используются классы JDK ( AtomicBoolean и BlockingQueue ), что уменьшает вероятность создания недопустимого решения. Проконсультируйтесь с Javadoc для различных типов BlockingQueue ; выбор другой реализации может кардинально изменить поведение этого примера (например, DelayQueue или Priority Queue ).

public class Producer implements Runnable {

    private final BlockingQueue<ProducedData> queue;

    public Producer(BlockingQueue<ProducedData> queue) {
        this.queue = queue;
    }

    public void run() {
        int producedCount = 0;
        try {
            while (true) {
                producedCount++;
                //put throws an InterruptedException when the thread is interrupted
                queue.put(new ProducedData());
            }
        } catch (InterruptedException e) {
            // the thread has been interrupted: cleanup and exit
            producedCount--;
            //re-interrupt the thread in case the interrupt flag is needeed higher up
            Thread.currentThread().interrupt();
        }
        System.out.println("Produced " + producedCount + " objects");
    }
}

public class Consumer implements Runnable {

    private final BlockingQueue<ProducedData> queue;

    public Consumer(BlockingQueue<ProducedData> queue) {
        this.queue = queue;
    }

    public void run() {
        int consumedCount = 0;
        try {
            while (true) {
                //put throws an InterruptedException when the thread is interrupted
                ProducedData data = queue.poll(10, TimeUnit.MILLISECONDS);
                // process data
                consumedCount++;
            }
        } catch (InterruptedException e) {
            // the thread has been interrupted: cleanup and exit
            consumedCount--;
            //re-interrupt the thread in case the interrupt flag is needeed higher up
            Thread.currentThread().interrupt();
        }
        System.out.println("Consumed " + consumedCount + " objects");
    }
}


public class ProducerConsumerExample {
    static class ProducedData {    
        // empty data object
    }

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<ProducedData> queue = new ArrayBlockingQueue<ProducedData>(1000);
        // choice of queue determines the actual behavior: see various BlockingQueue implementations

        Thread producer = new Thread(new Producer(queue));
        Thread consumer = new Thread(new Consumer(queue));

        producer.start();
        consumer.start();

        Thread.sleep(1000);
        producer.interrupt();
        Thread.sleep(10);
        consumer.interrupt();
    }
}

Использование ThreadLocal

Полезным инструментом в Java Concurrency является ThreadLocal - это позволяет вам иметь переменную, которая будет уникальной для данного потока. Таким образом, если один и тот же код работает в разных потоках, эти исполнения не будут совместно использовать значение, но вместо этого каждый поток имеет свою собственную переменную, которая является локальной для потока .

Например, это часто используется для установления контекста (например, информации авторизации) обработки запроса в сервлете. Вы можете сделать что-то вроде этого:

private static final ThreadLocal<MyUserContext> contexts = new ThreadLocal<>();

public static MyUserContext getContext() {
    return contexts.get(); // get returns the variable unique to this thread
}

public void doGet(...) {
    MyUserContext context = magicGetContextFromRequest(request); 
    contexts.put(context); // save that context to our thread-local - other threads
                           // making this call don't overwrite ours
    try {
        // business logic
    } finally {
        contexts.remove(); // 'ensure' removal of thread-local variable
    }
}

Теперь вместо того, чтобы передавать MyUserContext в каждый отдельный метод, вы можете вместо этого использовать MyServlet.getContext() где он вам нужен. Теперь, конечно, это вводит переменную, которая должна быть документирована, но она поточно-безопасна, что устраняет множество недостатков для использования такой переменной с высокой степенью охвата.

Ключевым преимуществом здесь является то, что каждый поток имеет свою собственную локальную переменную потока в контейнере contexts . Пока вы используете его из определенной точки входа (например, требуя, чтобы каждый сервлет поддерживал свой контекст или, возможно, добавляя фильтр сервлета), вы можете полагаться на этот контекст, когда он вам нужен.

CountDownLatch

CountDownLatch

Вспомогательное средство синхронизации, которое позволяет одному или нескольким потокам дождаться завершения набора операций в других потоках.

  1. CountDownLatch инициализируется с заданным подсчетом.
  2. Методы ожидания выполняются до тех пор, пока текущий счетчик не достигнет нуля из-за вызовов метода countDown() , после чего все ожидающие потоки освобождаются, и любые последующие вызовы ожидания возвращаются немедленно.
  3. Это феномен с одним выстрелом - счетчик не может быть сброшен. Если вам нужна версия, которая сбрасывает счетчик, рассмотрите возможность использования CyclicBarrier .

Ключевые методы:

public void await() throws InterruptedException

Заставляет текущий поток ждать, пока защелка не будет отсчитываться до нуля, если поток не будет прерван.

public void countDown()

Уменьшает количество защелок, освобождая все ожидающие потоки, если счетчик достигает нуля.

Пример:

import java.util.concurrent.*;

class DoSomethingInAThread implements Runnable {
    CountDownLatch latch;
    public DoSomethingInAThread(CountDownLatch latch) {
        this.latch = latch;
    } 
    public void run() {
        try {
            System.out.println("Do some thing");
            latch.countDown();
        } catch(Exception err) {
            err.printStackTrace();
        }
    }
}

public class CountDownLatchDemo {
    public static void main(String[] args) {
        try {
            int numberOfThreads = 5;
            if (args.length < 1) {
                System.out.println("Usage: java CountDownLatchDemo numberOfThreads");
                return;
            }
            try {
                numberOfThreads = Integer.parseInt(args[0]);
            } catch(NumberFormatException ne) {
            
            }
            CountDownLatch latch = new CountDownLatch(numberOfThreads);
            for (int n = 0; n < numberOfThreads; n++) {
                Thread t = new Thread(new DoSomethingInAThread(latch));
                t.start();
            }
            latch.await();
            System.out.println("In Main thread after completion of " + numberOfThreads + " threads");
        } catch(Exception err) {
            err.printStackTrace();
        }
    }
}

выход:

java CountDownLatchDemo 5
Do some thing
Do some thing
Do some thing
Do some thing
Do some thing
In Main thread after completion of 5 threads

Объяснение:

  1. CountDownLatch инициализируется счетчиком 5 в главной теме
  2. Основной поток ожидает с помощью метода wait await() .
  3. Создано пять экземпляров DoSomethingInAThread . Каждый экземпляр countDown() счетчик countDown() .
  4. Когда счетчик станет нулевым, основной поток возобновится

синхронизация

В Java существует встроенный механизм блокировки на уровне языка: synchronized блок, который может использовать любой объект Java как встроенную блокировку (т.е. каждый объект Java может иметь связанный с ним монитор).

Внутренние блокировки обеспечивают атомарность групп операторов. Чтобы понять, что это значит для нас, давайте посмотрим на пример, когда synchronized полезна:

private static int t = 0;
private static Object mutex = new Object();

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
    for (int i = 0; i < 100; i++) {
        executorService.execute(() -> {
            synchronized (mutex) {
                t++;
                System.out.println(MessageFormat.format("t: {0}", t));
            }
        });
    }
    executorService.shutdown();
}

В этом случае, если бы не synchronized блок, было бы много проблем параллелизма. Первый из них был бы с оператором приращения post (он сам по себе не является атомом), а вторым будет то, что мы будем наблюдать значение t после того, как произвольное количество других потоков получило бы возможность его модифицировать. Однако, поскольку мы приобрели встроенный замок, здесь не будет никаких условий гонки, и выход будет содержать цифры от 1 до 100 в их обычном порядке.

Встроенные замки в Java являются мьютексы (т.е. взаимного исполнения замков). Взаимное исполнение означает, что, если один поток приобрел блокировку, второй будет вынужден дождаться, когда первый из них освободит его, прежде чем он сможет получить блокировку для себя. Примечание. Операция, которая может поместить поток в состояние ожидания (сна), называется блокировкой . Таким образом, приобретение блокировки - это операция блокировки.

Внутренние блокировки на Java являются реентерабельными . Это означает, что если поток пытается получить блокировку, которой он уже владеет, он не будет блокироваться, и он успешно его приобретет. Например, следующий код не будет блокироваться при вызове:

public void bar(){
    synchronized(this){
        ...
    }
}
public void foo(){
    synchronized(this){
        bar();
    }
}

Помимо synchronized блоков существуют также synchronized методы.

Следующие блоки кода практически эквивалентны (хотя байт-код кажется другим):

  1. synchronized блок на this :

    public void foo() {
        synchronized(this) {
            doStuff();
        }
    }
    
  2. synchronized метод:

     public synchronized void foo() {
         doStuff();
     }
    

Аналогично для static методов это:

class MyClass {
    ...
    public static void bar() {
        synchronized(MyClass.class) {
            doSomeOtherStuff();
        }
    }
}

имеет такой же эффект, как и этот:

class MyClass {
    ...
    public static synchronized void bar() {
        doSomeOtherStuff();
    }
}

Атомные операции

Атомная операция - это операция, которая выполняется «все сразу», без каких-либо шансов на то, что другие потоки будут наблюдать или изменять состояние во время выполнения атомной операции.

Давайте рассмотрим ПЯТЫЙ ПРИМЕР .

private static int t = 0;

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
    for (int i = 0; i < 100; i++) {
        executorService.execute(() -> {
            t++;
            System.out.println(MessageFormat.format("t: {0}", t));
        });
    }
    executorService.shutdown();
}

В этом случае есть два вопроса. Первая проблема заключается в том, что оператор post increment не является атомарным. Он состоит из нескольких операций: получите значение, добавьте 1 к значению, установите значение. Вот почему, если мы запустим этот пример, вероятно, мы не увидим в выходном файле t: 100 - два потока могут одновременно получать значение, увеличивать его и устанавливать его: допустим, значение t равно 10, а два потоки увеличивают t. Оба потока устанавливают значение t равным 11, так как второй поток наблюдает значение t до того, как первый поток завершил его увеличение.

Вторая проблема заключается в том, как мы наблюдаем t. Когда мы печатаем значение t, это значение может быть уже изменено другим потоком после операции приращения потока.

Чтобы исправить эти проблемы, мы будем использовать java.util.concurrent.atomic.AtomicInteger , который имеет много атомных операций для нас.

private static AtomicInteger t = new AtomicInteger(0);

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
    for (int i = 0; i < 100; i++) {
        executorService.execute(() -> {
            int currentT = t.incrementAndGet();
            System.out.println(MessageFormat.format("t: {0}", currentT));
        });
    }
    executorService.shutdown();
}

Метод incrementAndGet AtomicInteger атомарно увеличивает и возвращает новое значение, тем самым устраняя предыдущее состояние гонки. Обратите внимание, что в этом примере строки будут по-прежнему выходить из строя, потому что мы не прилагаем никаких усилий, чтобы упорядочить вызовы println и что это выходит за рамки этого примера, поскольку для этого потребуется синхронизация, и цель этого примера - показать, как использовать AtomicInteger для устранения условий гонки, касающихся состояния.

Создание базовой тупиковой системы

Тупик возникает, когда два конкурирующих действия ждут, пока другой закончит, и, таким образом, никогда не будет. В java есть один замок, связанный с каждым объектом. Чтобы избежать параллельной модификации, выполняемой несколькими потоками на одном объекте, мы можем использовать synchronized ключевое слово, но все идет по себестоимости. Использование synchronized ключевого слова ошибочно может привести к застрявшим системам, называемым системой с блокировкой.

Подумайте, что в 1 экземпляре работают 2 потока, Позволяет обрабатывать потоки как First и Second, и позволяет сказать, что у нас есть 2 ресурса R1 и R2. Сначала получает R1, а также R2 нуждается в его завершении, а Second приобретает R2 и нуждается в R1 для завершения.

так сказать, в момент времени t = 0,

Сначала R1, а второй - R2. теперь First ждет R2, а Second ждет R1. это ожидание неопределенное, и это приводит к тупиковой ситуации.

public class Example2 {
    
    public static void main(String[] args) throws InterruptedException {
        final DeadLock dl = new DeadLock();
        Thread t1 = new Thread(new Runnable() {
    
            @Override
            public void run() {
                // TODO Auto-generated method stub
                dl.methodA();
            }
        });
   
        Thread t2 = new Thread(new Runnable() {
    
            @Override
            public void run() {
                // TODO Auto-generated method stub
                try {
                    dl.method2();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        t1.setName("First");
        t2.setName("Second");
        t1.start();
        t2.start();
    }
}

class DeadLock {
    
    Object mLock1 = new Object();
    Object mLock2 = new Object();
    

    public void methodA() {
        System.out.println("methodA wait for mLock1  " + Thread.currentThread().getName());
        synchronized (mLock1) {
            System.out.println("methodA mLock1 acquired   " + Thread.currentThread().getName());
            try {
                Thread.sleep(100);
                method2();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    public void method2() throws InterruptedException {
        System.out.println("method2 wait for mLock2  " + Thread.currentThread().getName());
        synchronized (mLock2) {
            System.out.println("method2  mLock2 acquired   " + Thread.currentThread().getName());
            Thread.sleep(100);
            method3();
        }
    }
    public void method3() throws InterruptedException {
        System.out.println("method3  mLock1  "+ Thread.currentThread().getName());
        synchronized (mLock1) {
            System.out.println("method3   mLock1 acquired  " + Thread.currentThread().getName());
        }
    }
}

Вывод этой программы:

methodA wait for mLock1  First
method2 wait for mLock2  Second
method2  mLock2 acquired   Second
methodA mLock1 acquired   First
method3  mLock1  Second
method2 wait for mLock2  First

Приостановка выполнения

Thread.sleep заставляет текущий поток приостанавливать выполнение в течение определенного периода. Это эффективное средство обеспечения времени процессора для других потоков приложения или других приложений, которые могут выполняться в компьютерной системе. В классе Thread есть два перегруженных метода sleep .

Тот, который указывает время сна на миллисекунду

public static void sleep(long millis) throws InterruptedException

Тот, который указывает время сна на наносекунду

public static void sleep(long millis, int nanos)

Приостановка выполнения в течение 1 секунды

Thread.sleep(1000);

Важно отметить, что это намек на планировщик ядра операционной системы. Это может быть не обязательно точным, и некоторые реализации даже не учитывают наносекундный параметр (возможно округление до ближайшей миллисекунды).

Рекомендуется заключить вызов Thread.sleep в try / catch и catch InterruptedException .

Визуализация барьеров чтения / записи при использовании синхронизированных / нестабильных

Поскольку мы знаем, что мы должны использовать synchronized ключевое слово, чтобы выполнить выполнение метода или исключить блок. Но мало кто из нас может не знать об одном важном аспекте использования synchronized и volatile ключевого слова: помимо создания атома кода, он также обеспечивает барьер чтения / записи . Что это за барьер чтения / записи? Давайте обсудим это с помощью примера:

class Counter {

  private Integer count = 10;

  public synchronized void incrementCount() {
    count++;
  }

  public Integer getCount() {
    return count;
  }
}

Предположим, что поток A сначала вызывает incrementCount() затем другой поток B вызывает getCount() . В этом случае нет гарантии, что B увидит обновленное значение count . Он все еще может count равным 10 , даже возможно, что он никогда не видит обновленную ценность count .

Чтобы понять это поведение, нам нужно понять, как модель памяти Java интегрируется с аппаратной архитектурой. В Java каждый поток имеет собственный поток стека. Этот стек содержит: стек вызовов метода и локальную переменную, созданные в этом потоке. В многоядерной системе вполне возможно, что два потока одновременно работают в отдельных ядрах. В таком сценарии возможно, что часть стека потока находится внутри регистра / кеша ядра. Если внутри потока, объект получает доступ с использованием synchronized (или volatile ) ключевого слова, после synchronized блока этот поток синхронизирует его локальную копию этой переменной с основной памятью. Это создает барьер чтения / записи и гарантирует, что поток увидит последнее значение этого объекта.

Но в нашем случае, поскольку поток B не использовал синхронизированный доступ к count , он может ссылаться на значение count хранящегося в регистре, и никогда не может видеть обновления из потока A. Чтобы убедиться, что B видит последнее значение count, нам нужно сделать getCount() синхронизированы.

public synchronized Integer getCount() {
  return count;
}

Теперь , когда поток А делаются с обновлением count он разблокирует Counter экземпляр, в то же время создает барьер для записи и сбрасывает все изменения , сделанные внутри этого блока в основную память. Аналогично, когда поток B получает блокировку на одном экземпляре Counter , он входит в барьер чтения и считывает значение count из основной памяти и видит все обновления.

видимость

Тот же эффект видимости распространяется и на volatile чтение / запись. Все переменные, обновленные до записи в volatile будут сброшены в основную память, и все прочитанные после чтения volatile переменной будут записаны из основной памяти.

Создание экземпляра java.lang.Thread

Существует два основных подхода к созданию потока в Java. По сути, создание потока так же просто, как запись кода, который будет выполняться в нем. Эти два подхода отличаются тем, что вы определяете этот код.

В Java поток представлен объектом - экземпляром java.lang.Thread или его подкласса. Таким образом, первый подход заключается в создании этого подкласса и переопределении метода run () .

Примечание . Я использую Thread для ссылки на класс java.lang.Thread и поток для ссылки на логическую концепцию потоков.

class MyThread extends Thread {
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println("Thread running!");
        }
    }
}

Теперь, поскольку мы уже определили исполняемый код, поток можно создать просто так:

MyThread t = new MyThread();

Класс Thread также содержит конструктор, принимающий строку, которая будет использоваться в качестве имени потока. Это может быть особенно полезно при отладке программы с несколькими потоками.

class MyThread extends Thread {
    public MyThread(String name) {
        super(name);
    }
    
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println("Thread running! ");
        }
    }
}

MyThread t = new MyThread("Greeting Producer");

Второй подход - определить код с помощью java.lang.Runnable и его единственный метод run () . Затем класс Thread позволяет выполнить этот метод в отдельном потоке. Для этого создайте поток, используя конструктор, принимающий экземпляр интерфейса Runnable .

Thread t = new Thread(aRunnable);

Это может быть очень мощным в сочетании с ссылками на lambdas или methods (только Java 8):

Thread t = new Thread(operator::hardWork);

Вы также можете указать имя потока.

Thread t = new Thread(operator::hardWork, "Pi operator");

Практически говоря, вы можете использовать оба подхода без забот. Однако общая мудрость говорит об использовании последнего.


Для каждого из четырех упомянутых конструкторов существует также альтернатива, принимающая экземпляр java.lang.ThreadGroup в качестве первого параметра.

ThreadGroup tg = new ThreadGroup("Operators");
Thread t = new Thread(tg, operator::hardWork, "PI operator");

ThreadGroup представляет собой набор потоков. Вы можете добавить только тему на ThreadGroup с помощью Thread конструктор «s. ThreadGroup затем может быть использован для управления всем его Thread s вместе, а также резьба может получить информацию от своего ThreadGroup .

Итак, чтобы суманализировать, Thread может быть создан одним из этих публичных конструкторов:

Thread()
Thread(String name)
Thread(Runnable target)
Thread(Runnable target, String name)
Thread(ThreadGroup group, String name)
Thread(ThreadGroup group, Runnable target)
Thread(ThreadGroup group, Runnable target, String name)
Thread(ThreadGroup group, Runnable target, String name, long stackSize)

Последний позволяет нам определить размер требуемого стека для нового потока.


Часто читаемость кода страдает при создании и настройке многих потоков с одинаковыми свойствами или из одного шаблона. Вот когда можно использовать java.util.concurrent.ThreadFactory . Этот интерфейс позволяет вам инкапсулировать процедуру создания потока через фабричный шаблон и его единственный метод newThread (Runnable) .

class WorkerFactory implements ThreadFactory {
    private int id = 0;

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "Worker " + id++);
    }
}

Потоки прерывания потока / остановки

Каждый поток Java имеет флаг прерывания, который изначально ошибочен. Прерывание потока, по сути, не более чем установка этого флага в true. Код, выполняющийся на этом потоке, может иногда проверять флаг и действовать на него. Код также может полностью игнорировать его. Но почему каждый поток имеет такой флаг? В конце концов, наличие булевого флага в потоке - это то, что мы можем просто организовать, если и когда нам это нужно. Ну, есть методы, которые ведут себя особым образом, когда поток, в котором они работают, прерывается. Эти методы называются методами блокировки. Это методы, которые помещают поток в состояние WAITING или TIMED_WAITING. Когда поток находится в этом состоянии, прерывая его, будет выведено прерывание Exception на прерванный поток, вместо того, чтобы флаг прерывания был установлен в true, и поток снова станет RUNNABLE. Код, который вызывает метод блокировки, вынужден иметь дело с InterruptedException, поскольку это проверенное исключение. Таким образом, и, следовательно, его имя, прерывание может иметь эффект прерывания WAIT, эффективно заканчивая его. Обратите внимание, что не все методы, которые каким-то образом ждут (например, блокирование IO), реагируют на прерывание таким образом, поскольку они не помещают поток в состояние ожидания. Наконец, поток, у которого установлен флаг прерывания, который вводит метод блокировки (т. Е. Пытается попасть в состояние ожидания), немедленно выдаст исключение InterruptedException, и флаг прерывания будет очищен.

Помимо этой механики, Java не назначает никакого специального семантического значения прерывания. Код может интерпретировать прерывание любым способом, который ему нравится. Но чаще всего прерывание используется для подачи сигнала в поток, который он должен прекратить работать в кратчайшие сроки. Но, как должно быть ясно из вышесказанного, для того, чтобы прекратить работу, отреагировать на это прерывание будет только код этого потока. Остановка потока - это сотрудничество. Когда поток прерывается, его код запуска может находиться на несколько уровней в стеке. Большая часть кода не вызывает метод блокировки и заканчивается достаточно своевременно, чтобы не задерживать остановку потока чрезмерно. Код, который должен в основном касаться реагирования на прерывание, - это код, который находится в задачах обработки цикла, пока их не осталось, или пока флаг не будет установлен, чтобы сигнализировать об этом, чтобы остановить этот цикл. Циклы, которые обрабатывают, возможно, бесконечные задачи (т. Е. Продолжают работать в принципе), должны проверять флаг прерывания, чтобы выйти из цикла. Для конечных циклов семантика может диктовать, что все задачи должны быть закончены до окончания или может быть уместно оставить некоторые необработанные задачи. Код, который вызывает методы блокировки, будет вынужден иметь дело с InterruptedException. Если это вообще возможно семантически, оно может просто распространять InterruptedException и объявлять его бросить. Таким образом, он становится методом блокировки в отношении своих вызывающих абонентов. Если он не может распространять исключение, он должен по крайней мере установить прерванный флаг, поэтому вызывающие выше столбцы также знают, что поток был прерван. В некоторых случаях метод должен продолжать ожидание независимо от InterruptedException, и в этом случае он должен задерживать установку прерванного флага до тех пор, пока он не будет завершен, это может включать настройку локальной переменной, которая должна быть проверена до выхода из метода затем прервите его поток.

Примеры :

Пример кода, который прекращает обработку задач при прерывании

class TaskHandler implements Runnable {
    
    private final BlockingQueue<Task> queue;

    TaskHandler(BlockingQueue<Task> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) { // check for interrupt flag, exit loop when interrupted
            try {
                Task task = queue.take(); // blocking call, responsive to interruption
                handle(task);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // cannot throw InterruptedException (due to Runnable interface restriction) so indicating interruption by setting the flag
            }
        }
    }
    
    private void handle(Task task) {
        // actual handling
    }
}

Пример кода, который задерживает установку флага прерывания до полного завершения:

class MustFinishHandler implements Runnable {

    private final BlockingQueue<Task> queue;

    MustFinishHandler(BlockingQueue<Task> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        boolean shouldInterrupt = false;
        
        while (true) {
            try {
                Task task = queue.take();
                if (task.isEndOfTasks()) {
                    if (shouldInterrupt) {
                        Thread.currentThread().interrupt();
                    }
                    return;
                }
                handle(task);
            } catch (InterruptedException e) {
                shouldInterrupt = true; // must finish, remember to set interrupt flag when we're done
            }
        }
    }

    private void handle(Task task) {
        // actual handling
    }
}

Пример кода, который имеет фиксированный список задач, но может быть прекращен раньше, когда прерван

class GetAsFarAsPossible implements Runnable {

    private final List<Task> tasks = new ArrayList<>();

    @Override
    public void run() {
        for (Task task : tasks) {
            if (Thread.currentThread().isInterrupted()) {
                return;
            }
            handle(task);
        }
    }

    private void handle(Task task) {
        // actual handling
    }
}

Несколько примеров производителей / потребителей с общей глобальной очередью

Ниже кода демонстрируется несколько программ Producer / Consumer. Как потоки Producer, так и Consumer затрагивают одну и ту же глобальную очередь.

import java.util.concurrent.*;
import java.util.Random;

public class ProducerConsumerWithES {
    public static void main(String args[]) {
        BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
         
        ExecutorService pes = Executors.newFixedThreadPool(2);
        ExecutorService ces = Executors.newFixedThreadPool(2);
          
        pes.submit(new Producer(sharedQueue, 1));
        pes.submit(new Producer(sharedQueue, 2));
        ces.submit(new Consumer(sharedQueue, 1));
        ces.submit(new Consumer(sharedQueue, 2));
         
        pes.shutdown();
        ces.shutdown();
    }
}

/* Different producers produces a stream of integers continuously to a shared queue, 
which is shared between all Producers and consumers */

class Producer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    private Random random = new Random();
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        // Producer produces a continuous stream of numbers for every 200 milli seconds
        while (true) {
            try {
                int number = random.nextInt(1000);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
                Thread.sleep(200);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}
/* Different consumers consume data from shared queue, which is shared by both producer and consumer threads */
class Consumer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }
    @Override
    public void run() {
        // Consumer consumes numbers generated from Producer threads continuously
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

выход:

Produced:69:by thread:2
Produced:553:by thread:1
Consumed: 69:by thread:1
Consumed: 553:by thread:2
Produced:41:by thread:2
Produced:796:by thread:1
Consumed: 41:by thread:1
Consumed: 796:by thread:2
Produced:728:by thread:2
Consumed: 728:by thread:1

и так далее ................

Объяснение:

  1. sharedQueue , который является LinkedBlockingQueue , разделяется между всеми потоками Producer и Consumer.
  2. Нити производителя производят одно целое число каждые 200 миллисекунд непрерывно и присоединяют его к sharedQueue
  3. Consumer поток непрерывно потребляет целое число из sharedQueue .
  4. Эта программа реализована без явных synchronized или Lock конструкций. BlockingQueue - это ключ к его достижению.

Реализации BlockingQueue предназначены для использования в основном для очередей производителей-потребителей.

Реализации BlockingQueue являются потокобезопасными. Все методы очередей осуществляют их эффекты атомарно, используя внутренние блокировки или другие формы контроля параллелизма.

Эксклюзивная запись / параллельный доступ для чтения

Иногда требуется, чтобы процесс одновременно записывал и читал одни и те же «данные».

Интерфейс ReadWriteLock и его реализация ReentrantReadWriteLock позволяют получить шаблон доступа, который можно описать следующим образом:

  1. Может быть любое количество одновременных считывателей данных. Если есть хотя бы один доступ к читателю, доступ к записи невозможен.
  2. Данные могут быть не более одного отдельного автора. Если есть доступ к записи, то ни один читатель не может получить доступ к данным.

Реализация может выглядеть так:

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Sample {

// Our lock. The constructor allows a "fairness" setting, which guarantees the chronology of lock attributions.
protected static final ReadWriteLock RW_LOCK = new ReentrantReadWriteLock();

// This is a typical data that needs to be protected for concurrent access
protected static int data = 0;

/** This will write to the data, in an exclusive access */
public static void writeToData() {
    RW_LOCK.writeLock().lock();
    try {
        data++;
    } finally {
        RW_LOCK.writeLock().unlock();
    }
}

public static int readData() {
    RW_LOCK.readLock().lock();
    try {
        return data;
    } finally {
        RW_LOCK.readLock().unlock();
    }
}

}

ПРИМЕЧАНИЕ 1. Этот точный вариант использования имеет более чистое решение с использованием AtomicInteger , но здесь описывается шаблон доступа, который работает независимо от того, что данные здесь представляют собой целое число, которое как вариант Atomic.

ПРИМЕЧАНИЕ 2 : Блокировка считывающей части действительно необходима, хотя она может выглядеть не так, как у обычного читателя. Действительно, если вы не блокируете читателя, любое количество вещей может пойти не так, в том числе:

  1. Запись примитивных значений не гарантируется атомарным на всех JVM, поэтому читатель мог видеть, например, только 32 бита из 64-битной записи, если data были 64-битным длинным типом
  2. Видимость записи из потока, который не выполнял его, гарантируется JVM только в том случае, если мы устанавливаем Happen Before relationship между write и reads. Эта связь устанавливается, когда и читатели, и писатели используют свои соответствующие блокировки, но не иначе
Java SE 8

Если требуется более высокая производительность, при определенных типах использования существует более быстрый тип блокировки, называемый StampedLock , который, среди прочего, реализует оптимистичный режим блокировки. Этот замок работает совсем иначе, чем ReadWriteLock , и этот образец не может быть транспонирован.

Управляемый объект

Интерфейс Runnable определяет один метод, run() , предназначенный для содержания кода, выполняемого в потоке.

Объект Runnable передается конструктору Thread . И вызывается метод start() Thread.

пример

public class HelloRunnable implements Runnable {

    @Override
    public void run() {
        System.out.println("Hello from a thread");
    }

    public static void main(String[] args) {
        new Thread(new HelloRunnable()).start();
    }
}

Пример в Java8:

public static void main(String[] args) {
    Runnable r = () -> System.out.println("Hello world");
    new Thread(r).start();
}

Подпроцесс Runnable vs Thread

Runnable объекта более общее, потому что объект Runnable может подклассифицировать класс, отличный от Thread .

Подклассы Thread легче использовать в простых приложениях, но ограничены тем фактом, что ваш класс задачи должен быть потомком Thread .

Объект Runnable применим к API-интерфейсам управления потоками высокого уровня.

семафор

Семафор - это высокоуровневый синхронизатор, который поддерживает набор разрешений, которые могут быть получены и выпущены потоками. Семафор можно представить как счетчик разрешений, который будет уменьшаться при получении потока и увеличиваться при потоке. Если количество разрешений равно 0 когда поток пытается получить, то поток будет блокироваться до тех пор, пока разрешение не будет доступно (или пока поток не будет прерван).

Семафор инициализируется как:

Semaphore semaphore = new Semaphore(1); // The int value being the number of permits

Конструктор Семафор допускает дополнительный логический параметр для справедливости. Если установлено false, этот класс не дает никаких гарантий относительно порядка, в котором потоки приобретают разрешения. Когда справедливость установлена, семафор гарантирует, что потоки, вызывающие любой из методов получения, выбираются для получения разрешений в том порядке, в котором их обращение этих методов было обработано. Он объявляется следующим образом:

Semaphore semaphore = new Semaphore(1, true);

Теперь давайте рассмотрим пример из javadocs, где Семафор используется для контроля доступа к пулу элементов. Семафор используется в этом примере для обеспечения функциональности блокировки, чтобы гарантировать, что всегда будут получаться элементы, получаемые при getItem() .

class Pool {
    /*
     * Note that this DOES NOT bound the amount that may be released!
     * This is only a starting value for the Semaphore and has no other
     * significant meaning UNLESS you enforce this inside of the
     * getNextAvailableItem() and markAsUnused() methods
     */
    private static final int MAX_AVAILABLE = 100;
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

    /**
     * Obtains the next available item and reduces the permit count by 1. 
     * If there are no items available, block.
     */
    public Object getItem() throws InterruptedException {
        available.acquire();
        return getNextAvailableItem();
    }

    /**
     * Puts the item into the pool and add 1 permit.
     */
    public void putItem(Object x) {
        if (markAsUnused(x))
            available.release();
    }

    private Object getNextAvailableItem() {
        // Implementation
    }

    private boolean markAsUnused(Object o) {
        // Implementation
    }
}

Добавьте два массива `int`, используя Threadpool

В Threadpool есть очередь задач, каждая из которых будет выполнена на одной из этих потоков.

В следующем примере показано, как добавить две массивы int с помощью Threadpool.

Java SE 8
int[] firstArray = { 2, 4, 6, 8 };
int[] secondArray = { 1, 3, 5, 7 };
int[] result = { 0, 0, 0, 0 };

ExecutorService pool = Executors.newCachedThreadPool(); 

// Setup the ThreadPool:
// for each element in the array, submit a worker to the pool that adds elements
for (int i = 0; i < result.length; i++) {
    final int worker = i;
    pool.submit(() -> result[worker] = firstArray[worker] + secondArray[worker] );
}

// Wait for all Workers to finish:
try {
    // execute all submitted tasks
    pool.shutdown();
    // waits until all workers finish, or the timeout ends
    pool.awaitTermination(12, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
    pool.shutdownNow(); //kill thread
}

System.out.println(Arrays.toString(result));

Заметки:

  1. Этот пример является исключительно иллюстративным. На практике не будет никакого ускорения, если использовать потоки для этой задачи. Вероятно, замедление замедляется, поскольку накладные расходы на создание и планирование задач увеличивают время, затрачиваемое на выполнение задачи.

  2. Если вы использовали Java 7 и более ранние версии, вы должны использовать анонимные классы вместо lambdas для выполнения задач.

Получить статус всех потоков, запущенных вашей программой, за исключением системных потоков

Фрагмент кода:

import java.util.Set;

public class ThreadStatus {
    public static void main(String args[]) throws Exception {
        for (int i = 0; i < 5; i++){
            Thread t = new Thread(new MyThread());
            t.setName("MyThread:" + i);
            t.start();
        }
        int threadCount = 0;
        Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
        for (Thread t : threadSet) {
            if (t.getThreadGroup() == Thread.currentThread().getThreadGroup()) {
                System.out.println("Thread :" + t + ":" + "state:" + t.getState());
                ++threadCount;
            }
        }
        System.out.println("Thread count started by Main thread:" + threadCount);
    }
}

class MyThread implements Runnable {
    public void run() {
        try {
            Thread.sleep(2000);
        } catch(Exception err) {
            err.printStackTrace();
        }
    }
}

Выход:

Thread :Thread[MyThread:1,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:3,5,main]:state:TIMED_WAITING
Thread :Thread[main,5,main]:state:RUNNABLE
Thread :Thread[MyThread:4,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:0,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:2,5,main]:state:TIMED_WAITING
Thread count started by Main thread:6

Объяснение:

Thread.getAllStackTraces().keySet() возвращает все Thread включая потоки приложений и потоки системы. Если вас интересует только статус Threads, запущенный вашим приложением, выполните итерацию набора Thread , проверив Thread Group определенного потока на ваш основной поток программы.

В отсутствии выше условия ThreadGroup программа возвращает статус ниже системных потоков:

Reference Handler
Signal Dispatcher
Attach Listener
Finalizer

Вызов и будущее

Хотя Runnable предоставляет средство для переноса кода, который должен выполняться в другом потоке, он имеет ограничение в том, что он не может вернуть результат выполнения. Единственный способ получить некоторое возвращаемое значение из выполнения Runnable - это присвоить результат переменной, доступной в области вне Runnable .

Callable была представлена ​​на Java 5 как одноранговое средство Runnable . Callable по сути Callable и та же, за исключением того, что вместо run метод call . Метод call имеет дополнительную возможность возвращать результат, а также разрешено выдавать проверенные исключения.

Результат отправки задания Callable доступен для использования через будущее

Future можно рассматривать как контейнер, который содержит результат вычисления Callable . Вычисление вызываемого может продолжаться в другом потоке, и любая попытка использовать результат Future будет блокировать и будет возвращать результат только после его появления.

Интерфейс с возможностью вызова

public interface Callable<V> {
    V call() throws Exception;
}

Будущее

interface Future<V> {
    V get();
    V get(long timeout, TimeUnit unit);
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
}

Использование примера Callable and Future:

public static void main(String[] args) throws Exception {
    ExecutorService es = Executors.newSingleThreadExecutor();
          
    System.out.println("Time At Task Submission : " + new Date());
    Future<String> result = es.submit(new ComplexCalculator());
    // the call to Future.get() blocks until the result is available.So we are in for about a 10 sec wait now
    System.out.println("Result of Complex Calculation is : " + result.get());
    System.out.println("Time At the Point of Printing the Result : " + new Date());
}

Наш Callable, который делает длительные вычисления

public class ComplexCalculator implements Callable<String> {

    @Override
    public String call() throws Exception {
        // just sleep for 10 secs to simulate a lengthy computation
        Thread.sleep(10000);            
        System.out.println("Result after a lengthy 10sec calculation");
        return "Complex Result"; // the result 
    }
}

Выход

Time At Task Submission : Thu Aug 04 15:05:15 EDT 2016
Result after a lengthy 10sec calculation
Result of Complex Calculation is : Complex Result
Time At the Point of Printing the Result : Thu Aug 04 15:05:25 EDT 2016

Другие операции, разрешенные на будущее

Хотя get() - это метод для извлечения фактического результата. Будущее имеет положение

  • get(long timeout, TimeUnit unit) определяет максимальный период времени в течение текущего потока, который будет ждать результата;
  • Для отмены отмены вызова задачи cancel(mayInterruptIfRunning) . Флаг mayInterrupt указывает, что задача должна быть прервана, если она была запущена и работает прямо сейчас;
  • Чтобы проверить, завершена ли задача / завершена вызовом isDone() ;
  • Чтобы проверить, отменена ли длительная задача, isCancelled() .

Замки как средства синхронизации

До внедрения параллельного пакета Java 5 потоки были более низкими. Введение этого пакета обеспечило несколько вспомогательных программных средств / конструкций, поддерживающих более высокий уровень.

Замки - это механизмы синхронизации потоков, которые по существу служат той же цели, что и синхронизированные блоки или ключевые слова.

Внутренняя блокировка

int count = 0; // shared among multiple threads

public void doSomething() {
    synchronized(this) {
        ++count; // a non-atomic operation
    }
}

Синхронизация с использованием замков

int count = 0; // shared among multiple threads

Lock lockObj = new ReentrantLock();
public void doSomething() {
    try {
        lockObj.lock();
        ++count; // a non-atomic operation
    } finally {    
        lockObj.unlock(); // sure to release the lock without fail
    }
}

Блокировки также имеют функциональные возможности, которые встроенная блокировка не предлагает, например, блокировка, но остается реагирующей на прерывание или пытается заблокировать, а не блокировать, когда не удается.

Блокировка, реагирующая на прерывание

class Locky {
    int count = 0; // shared among multiple threads

    Lock lockObj = new ReentrantLock();

    public void doSomething() {
        try {
            try {
                lockObj.lockInterruptibly();
                ++count; // a non-atomic operation
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // stopping
            }
        } finally {
            if (!Thread.currentThread().isInterrupted()) {
                lockObj.unlock(); // sure to release the lock without fail
            }
        }
    }
}

Делайте только то, что можно заблокировать

public class Locky2 {
    int count = 0; // shared among multiple threads

    Lock lockObj = new ReentrantLock();

    public void doSomething() {
        boolean locked = lockObj.tryLock(); // returns true upon successful lock
        if (locked) {
            try {
                ++count; // a non-atomic operation
            } finally {
                lockObj.unlock(); // sure to release the lock without fail
            }
        }
    }
}

Существует несколько вариантов блокировки. Для более подробной информации обратитесь к api docs здесь



Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow