サーチ…


前書き

コンカレントコンピューティングは、いくつかの計算が逐次ではなく同時に実行されるコンピューティングの一形態です。 Java言語は、スレッドの使用による並行プログラミングをサポートするように設計されています。オブジェクトとリソースには複数のスレッドからアクセスできます。各スレッドはプログラム内の任意のオブジェクトに潜在的にアクセスする可能性があり、プログラマはスレッド間でオブジェクトへの読み書きアクセスが正しく同期されていることを保証する必要があります。

備考

StackOverflowに関する関連トピック:

基本的なマルチスレッド

実行するタスクが多数あり、これらのタスクがすべて前のタスクの結果に依存しない場合は、 マルチスレッドを使用して、コンピュータが可能であれば複数のプロセッサを使用してこれらのタスクをすべて同時に実行できます。大きな独立したタスクがあれば、これによりプログラムの実行が速くなります。

class CountAndPrint implements Runnable {

    private final String name;

    CountAndPrint(String name) {
        this.name = name;
    }

    /** This is what a CountAndPrint will do */
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            System.out.println(this.name + ": " + i);
        }
    }

    public static void main(String[] args) {
        // Launching 4 parallel threads
        for (int i = 1; i <= 4; i++) {
            // `start` method will call the `run` method 
            // of CountAndPrint in another thread
            new Thread(new CountAndPrint("Instance " + i)).start();
        }

        // Doing some others tasks in the main Thread
        for (int i = 0; i < 10000; i++) {
            System.out.println("Main: " + i);
        }
    }
}

さまざまなCountAndPrintインスタンスのrunメソッドのコードは、予測不可能な順序で実行されます。サンプル実行のスニペットは次のようになります。

Instance 4: 1
Instance 2: 1
Instance 4: 2
Instance 1: 1
Instance 1: 2
Main: 1
Instance 4: 3
Main: 2
Instance 3: 1
Instance 4: 4
...

プロデューサー/コンシューマー

生産者 - 消費者問題解決の簡単な例。 JDKクラス( AtomicBooleanBlockingQueue )は同期に使用されるため、無効なソリューションが作成される可能性が減ります。さまざまな種類のBlockingQueueについてJavadocに問い合わせてください。異なる実装を選択すると、 DelayQueuePriority Queueなどのこの例の動作が大幅に変更される可能性があります。

public class Producer implements Runnable {

    private final BlockingQueue<ProducedData> queue;

    public Producer(BlockingQueue<ProducedData> queue) {
        this.queue = queue;
    }

    public void run() {
        int producedCount = 0;
        try {
            while (true) {
                producedCount++;
                //put throws an InterruptedException when the thread is interrupted
                queue.put(new ProducedData());
            }
        } catch (InterruptedException e) {
            // the thread has been interrupted: cleanup and exit
            producedCount--;
            //re-interrupt the thread in case the interrupt flag is needeed higher up
            Thread.currentThread().interrupt();
        }
        System.out.println("Produced " + producedCount + " objects");
    }
}

public class Consumer implements Runnable {

    private final BlockingQueue<ProducedData> queue;

    public Consumer(BlockingQueue<ProducedData> queue) {
        this.queue = queue;
    }

    public void run() {
        int consumedCount = 0;
        try {
            while (true) {
                //put throws an InterruptedException when the thread is interrupted
                ProducedData data = queue.poll(10, TimeUnit.MILLISECONDS);
                // process data
                consumedCount++;
            }
        } catch (InterruptedException e) {
            // the thread has been interrupted: cleanup and exit
            consumedCount--;
            //re-interrupt the thread in case the interrupt flag is needeed higher up
            Thread.currentThread().interrupt();
        }
        System.out.println("Consumed " + consumedCount + " objects");
    }
}


public class ProducerConsumerExample {
    static class ProducedData {    
        // empty data object
    }

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<ProducedData> queue = new ArrayBlockingQueue<ProducedData>(1000);
        // choice of queue determines the actual behavior: see various BlockingQueue implementations

        Thread producer = new Thread(new Producer(queue));
        Thread consumer = new Thread(new Consumer(queue));

        producer.start();
        consumer.start();

        Thread.sleep(1000);
        producer.interrupt();
        Thread.sleep(10);
        consumer.interrupt();
    }
}

ThreadLocalの使用

Java Concurrencyの便利なツールはThreadLocalです。これにより、特定のスレッドに固有の変数を持つことができます。したがって、同じコードが異なるスレッドで実行される場合、これらの実行は値を共有しませんが、スレッドごとにスレッド固有の変数を持ちます

たとえば、これはサーブレット内の要求を処理するコンテキスト(認証情報など)を確立するためによく使用されます。あなたは次のようなことをするかもしれません:

private static final ThreadLocal<MyUserContext> contexts = new ThreadLocal<>();

public static MyUserContext getContext() {
    return contexts.get(); // get returns the variable unique to this thread
}

public void doGet(...) {
    MyUserContext context = magicGetContextFromRequest(request); 
    contexts.put(context); // save that context to our thread-local - other threads
                           // making this call don't overwrite ours
    try {
        // business logic
    } finally {
        contexts.remove(); // 'ensure' removal of thread-local variable
    }
}

今では、 MyUserContextをすべての単一のメソッドに渡す代わりに、必要に応じてMyServlet.getContext()使用することができます。今はもちろん、これは文書化する必要のある変数を紹介しますが、スレッドセーフであるため、このようなスコープの大きい変数を使用することには多くの短所がありません。

ここでの主な利点は、すべてのスレッドがそのcontextsコンテナに独自のスレッドローカル変数を持つことです。定義されたエントリポイントから(各サーブレットがコンテキストを維持するように、あるいはサーブレットフィルタを追加するように要求する場合など)必要に応じて、このコンテキストに依存することができます。

CountDownLatch

CountDownLatch

他のスレッドで実行されている一連の操作が完了するまで、1つまたは複数のスレッドを待機させる同期化補助機能。

  1. CountDownLatchは、指定されたカウントで初期化されます。
  2. awaitメソッドは、 countDown()メソッドの呼び出しによって現在のカウントがゼロになるまでブロックします。その後、待機中のすべてのスレッドが解放され、その後のすべてのwaitの呼び出しがすぐに戻ります。
  3. これはワンショット現象です。カウントをリセットすることはできません。カウントをリセットするバージョンが必要な場合は、 CyclicBarrier使用を検討してCyclicBarrier

主な方法:

public void await() throws InterruptedException

スレッドが中断されない限り、ラッチがゼロまでカウントダウンするまで現在のスレッドを待機させます。

public void countDown()

ラッチのカウントをデクリメントし、カウントがゼロになるとすべての待機スレッドを解放します。

例:

import java.util.concurrent.*;

class DoSomethingInAThread implements Runnable {
    CountDownLatch latch;
    public DoSomethingInAThread(CountDownLatch latch) {
        this.latch = latch;
    } 
    public void run() {
        try {
            System.out.println("Do some thing");
            latch.countDown();
        } catch(Exception err) {
            err.printStackTrace();
        }
    }
}

public class CountDownLatchDemo {
    public static void main(String[] args) {
        try {
            int numberOfThreads = 5;
            if (args.length < 1) {
                System.out.println("Usage: java CountDownLatchDemo numberOfThreads");
                return;
            }
            try {
                numberOfThreads = Integer.parseInt(args[0]);
            } catch(NumberFormatException ne) {
            
            }
            CountDownLatch latch = new CountDownLatch(numberOfThreads);
            for (int n = 0; n < numberOfThreads; n++) {
                Thread t = new Thread(new DoSomethingInAThread(latch));
                t.start();
            }
            latch.await();
            System.out.println("In Main thread after completion of " + numberOfThreads + " threads");
        } catch(Exception err) {
            err.printStackTrace();
        }
    }
}

出力:

java CountDownLatchDemo 5
Do some thing
Do some thing
Do some thing
Do some thing
Do some thing
In Main thread after completion of 5 threads

説明:

  1. CountDownLatchは、メインスレッドのカウンタが5で初期化されます
  2. メインスレッドはawait()メソッドを使用しawait()待機してawait()ます。
  3. DoSomethingInAThread 5つのインスタンスが作成されました。各インスタンスは、 countDown()メソッドをcountDown()てカウンタを減分しました。
  4. カウンタがゼロになると、メインスレッドが再開します

同期

Javaでは、言語レベルのロック機構が組み込まれています。 synchronizedブロックは、Javaオブジェクトを組み込みロックとして使用できます(つまり、すべてのJavaオブジェクトに関連付けられたモニターがある可能性があります)。

本質的なロックは、文のグループに原子性を提供します。それが何を意味するのかを理解するには、 synchronizedが役に立つ例を見てみましょう:

private static int t = 0;
private static Object mutex = new Object();

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
    for (int i = 0; i < 100; i++) {
        executorService.execute(() -> {
            synchronized (mutex) {
                t++;
                System.out.println(MessageFormat.format("t: {0}", t));
            }
        });
    }
    executorService.shutdown();
}

この場合、 synchronizedブロックでなければ、複数の並行処理の問題が発生していました。最初のものはポストインクリメント演算子であり(それ自体はアトミックではありません)、2番目のものは、任意の量の他のスレッドがそれを修正する機会を得た後にtの値を観測することです。しかし、本質的なロックを獲得して以来、ここに競合状態はなく、出力には通常の順序で1〜100の数字が含まれます。

Javaの本質的なロックはmutex (つまり相互実行ロック)です。相互実行とは、一方のスレッドがロックを取得した場合、他方のスレッドがロックを獲得する前に最初のロックを解放するのを待つことを意味します。注:スレッドを待機(スリープ)状態にする可能性のある操作は、 ブロッキング操作と呼ばれます 。したがって、ロックの獲得はブロック動作である。

Javaの本質的なロックはリエントラントです。これは、スレッドがすでに所有しているロックを取得しようとすると、ブロックされず、正常に獲得されることを意味します。たとえば、次のコードは呼び出されたときにブロックされません

public void bar(){
    synchronized(this){
        ...
    }
}
public void foo(){
    synchronized(this){
        bar();
    }
}

synchronizedブロックの他に、 synchronized方法もあります。

次のコードブロックは、バイトコードが異なるように見えても、実質的に同等です。

  1. synchronizedのブロックthis

    public void foo() {
        synchronized(this) {
            doStuff();
        }
    }
    
  2. synchronizedメソッド:

     public synchronized void foo() {
         doStuff();
     }
    

staticメソッドの場合も同様です。

class MyClass {
    ...
    public static void bar() {
        synchronized(MyClass.class) {
            doSomeOtherStuff();
        }
    }
}

これと同じ効果があります:

class MyClass {
    ...
    public static synchronized void bar() {
        doSomeOtherStuff();
    }
}

原子操作

アトミック操作は、アトミック操作の実行中に他のスレッドが状態を観察または変更することなく、「一度に」実行される操作です。

悪い例を考えてみましょう。

private static int t = 0;

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
    for (int i = 0; i < 100; i++) {
        executorService.execute(() -> {
            t++;
            System.out.println(MessageFormat.format("t: {0}", t));
        });
    }
    executorService.shutdown();
}

この場合、2つの問題があります。最初の問題は、ポストインクリメント演算子がアトミックではないことです。それは複数の操作で構成されています:値を取得し、値に1を加え、値を設定します。そのため、例を実行すると、出力にt: 100が表示されない可能性があります。つまり、2つのスレッドが同時に値を取得し、インクリメントして設定することができます。つまり、tの値を10とし、スレッドはtをインクリメントしています。両方のスレッドは、tの値を11に設定します。これは、2番目のスレッドが最初のスレッドのインクリメントを完了する前にtの値を監視するためです。

2番目の問題は、どのように我々がtを観察しているかである。 tの値を出力しているとき、値はこのスレッドのインクリメント操作の後に別のスレッドによって既に変更されている可能性があります。

これらの問題を解決するために、 java.util.concurrent.atomic.AtomicIntegerを使用します。これは、多くのアトミック操作が使用できるようになっています。

private static AtomicInteger t = new AtomicInteger(0);

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
    for (int i = 0; i < 100; i++) {
        executorService.execute(() -> {
            int currentT = t.incrementAndGet();
            System.out.println(MessageFormat.format("t: {0}", currentT));
        });
    }
    executorService.shutdown();
}

AtomicIntegerincrementAndGetメソッドは、原子的に増分して新しい値を返すので、以前の競合状態は解消されます。この例では、 println呼び出しの順序を決めることがないため、これは同期を必要とするため、この例の範囲外であるため、行は順不同です。この例の目的は、 AtomicIntegerを使用して状態に関する競合状態を解消する

基本的なデッドロックシステムの作成

デッドロックは、2つの競合するアクションがもう一方のアクションを完了するのを待っているときに発生します。 javaでは、各オブジェクトに関連付けられたロックが1つあります。単一のオブジェクト上の複数のスレッドによって同時に行われる変更を避けるために、 synchronizedキーワードを使用することができますが、すべてはコストがかかります。 synchronizedキーワードを間違って使用すると、システムがデッドロックされた状態になることがあります。

2つのスレッドが1つのインスタンスで動作しているとみなし、スレッドを1番目と2番目とし、2つのリソースR1とR2があるとします。最初にR1を取得し、完了のためにR2も必要ですが、SecondはR2を取得し、完了するためにR1が必要です。

時刻t = 0で言うように、

最初にR1があり、2番目にR2があります。今すぐ最初にR2を待っています.2番目がR1を待っています。この待ち時間は不定で、デッドロックにつながります。

public class Example2 {
    
    public static void main(String[] args) throws InterruptedException {
        final DeadLock dl = new DeadLock();
        Thread t1 = new Thread(new Runnable() {
    
            @Override
            public void run() {
                // TODO Auto-generated method stub
                dl.methodA();
            }
        });
   
        Thread t2 = new Thread(new Runnable() {
    
            @Override
            public void run() {
                // TODO Auto-generated method stub
                try {
                    dl.method2();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        t1.setName("First");
        t2.setName("Second");
        t1.start();
        t2.start();
    }
}

class DeadLock {
    
    Object mLock1 = new Object();
    Object mLock2 = new Object();
    

    public void methodA() {
        System.out.println("methodA wait for mLock1  " + Thread.currentThread().getName());
        synchronized (mLock1) {
            System.out.println("methodA mLock1 acquired   " + Thread.currentThread().getName());
            try {
                Thread.sleep(100);
                method2();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    public void method2() throws InterruptedException {
        System.out.println("method2 wait for mLock2  " + Thread.currentThread().getName());
        synchronized (mLock2) {
            System.out.println("method2  mLock2 acquired   " + Thread.currentThread().getName());
            Thread.sleep(100);
            method3();
        }
    }
    public void method3() throws InterruptedException {
        System.out.println("method3  mLock1  "+ Thread.currentThread().getName());
        synchronized (mLock1) {
            System.out.println("method3   mLock1 acquired  " + Thread.currentThread().getName());
        }
    }
}

このプログラムの出力:

methodA wait for mLock1  First
method2 wait for mLock2  Second
method2  mLock2 acquired   Second
methodA mLock1 acquired   First
method3  mLock1  Second
method2 wait for mLock2  First

実行を一時停止する

Thread.sleepは、現在のスレッドを指定された期間実行を中断させます。これは、アプリケーションまたはコンピュータシステム上で実行されている他のアプリケーションの他のスレッドがプロセッサ時間を利用できるようにする効率的な方法です。 Threadクラスには、2つのオーバーロードされたsleepメソッドがあります。

スリープ時間をミリ秒に指定するもの

public static void sleep(long millis) throws InterruptedException

スリープ時間をナノ秒に指定するもの

public static void sleep(long millis, int nanos)

実行を1秒間中断する

Thread.sleep(1000);

これは、オペレーティングシステムのカーネルのスケジューラに対するヒントであることに注意することが重要です。これは必ずしも正確ではないかもしれませんし、実装によってはナノ秒のパラメータ(おそらく最も近いミリ秒に丸められる)を考慮しないものもあります。

try / catchにThread.sleep呼び出しを囲み、 InterruptedExceptionを捕捉することをお勧めします。

同期/揮発性を使用しているときの読み書き障壁の可視化

私たちが知っているように、 synchronizedキーワードを使ってメソッドやブロックを排他的に実行する必要があります。しかし、私たちのいくつかは、使用してのもう一つの重要な側面を認識しないかもしれませんsynchronizedvolatileキーワードを: 別にコード原子の単位を作るから、それはまた、読み取り/書き込みバリアを提供します 。この読み取り/書き込みバリアとは何ですか?例を使ってこれについて議論しましょう:

class Counter {

  private Integer count = 10;

  public synchronized void incrementCount() {
    count++;
  }

  public Integer getCount() {
    return count;
  }
}

スレッドAが最初にincrementCount()呼び出し、次に別のスレッドBgetCount()呼び出すとしましょう。このシナリオでは、Bにcount更新値が表示されるという保証はありません。それでもcount10と表示されることがありますが、 count更新された値が決して見られない可能性もありcount

この動作を理解するには、Javaメモリモデルがハードウェアアーキテクチャとどのように統合されるかを理解する必要があります。 Javaでは、各スレッドには独自のスレッドスタックがあります。このスタックには、メソッド呼び出しスタックとそのスレッドで作成されたローカル変数が含まれています。マルチコアシステムでは、2つのスレッドが別々のコアで同時に実行されている可能性があります。このようなシナリオでは、スレッドのスタックの一部がコアのレジスタ/キャッシュ内にある可能性があります。スレッド内であれば、オブジェクトはsynchronized (またはvolatile )キーワードを使用してアクセスされ、 synchronizedブロックの後にスレッドはその変数のローカルコピーをメインメモリと同期させます。これにより、読み取り/書き込みバリアが作成され、スレッドがそのオブジェクトの最新の値を参照していることを確認します。

しかし、スレッドBがに同期アクセスを使用していないので、我々の場合には、 count 、それはの値参照のうえれるかもしれないcountレジスタに格納されているのとBは、我々は確認する必要があり、カウントの最新の値が見えることを確認するために、スレッドAから更新を見ることはないかもしれgetCount()も同様に同期します。

public synchronized Integer getCount() {
  return count;
}

スレッドAが更新countで終了すると、 Counterインスタンスのロックが解除され、同時に書き込みバリアが作成され、そのブロック内で実行されたすべての変更がメインメモリにフラッシュされます。同様に、スレッドBがCounterの同じインスタンスでロックを取得すると、スレッドBは読み取りバリアに入り、メインメモリからcount値を読み取り、すべての更新を表示します。

可視性

同じ可視性の効果は、 volatile読み取り/書き込みにも適用されます。 volatileに書き込む前に更新されたすべての変数はメインメモリにフラッシュされ、 volatile変数の読み込み後のすべての読み取りはメインメモリからのものになります。

java.lang.Threadインスタンスの作成

Javaでスレッドを作成する主な2つの方法があります。本質的には、スレッドを作成することは、実行されるコードを書くことと同じくらい簡単です。 2つのアプローチは、そのコードを定義する場所が異なります。

Javaでは、スレッドはオブジェクト( java.lang.Threadまたはそのサブクラスのインスタンス)によって表されます 。だから最初のアプローチは、そのサブクラスを作成し、 run()メソッドをオーバーライドすることです。

スレッドを使用して、 java.lang.Threadクラスとスレッドを参照し、スレッドの論理概念を参照します。

class MyThread extends Thread {
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println("Thread running!");
        }
    }
}

実行するコードをすでに定義しているので、スレッドは次のように単純に作成できます。

MyThread t = new MyThread();

Threadクラスには、スレッドの名前として使用される文字列を受け入れるコンストラクタも含まれています。これはマルチスレッドプログラムをデバッグする際に特に役立ちます。

class MyThread extends Thread {
    public MyThread(String name) {
        super(name);
    }
    
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println("Thread running! ");
        }
    }
}

MyThread t = new MyThread("Greeting Producer");

2番目の方法は、 java.lang.Runnableとその唯一のメソッドrun()を使用してコードを定義することです。 Threadクラスを使用すると、別のスレッドでそのメソッドを実行できます。これを実現するには、 Runnableインタフェースのインスタンスを受け入れるコンストラクタを使用してスレッドを作成します。

Thread t = new Thread(aRunnable);

これは、lambdaまたはメソッド参照(Java 8のみ)と組み合わせると非常に強力になります。

Thread t = new Thread(operator::hardWork);

スレッドの名前も指定できます。

Thread t = new Thread(operator::hardWork, "Pi operator");

実用的に言えば、両方のアプローチを心配することなく使用できます。しかし、 一般的な知恵は、後者を使用すると言います。


前述の4つのコンストラクタのそれぞれについて、最初のパラメータとしてjava.lang.ThreadGroupのインスタンスを受け入れる代替手段もあります。

ThreadGroup tg = new ThreadGroup("Operators");
Thread t = new Thread(tg, operator::hardWork, "PI operator");

ThreadGroupは一連のスレッドを表します。 スレッドのコンストラクタを使用してスレッド グループに スレッドを追加することができます。 ThreadGroupを使用して、すべてのThreadを一緒に管理することができます。また、 ThreadThreadGroupから情報を得ることができます。

したがって、 スレッドをsumarizeするには、次のいずれかのパブリックコンストラクタを使用してThreadを作成します。

Thread()
Thread(String name)
Thread(Runnable target)
Thread(Runnable target, String name)
Thread(ThreadGroup group, String name)
Thread(ThreadGroup group, Runnable target)
Thread(ThreadGroup group, Runnable target, String name)
Thread(ThreadGroup group, Runnable target, String name, long stackSize)

最後のものは、新しいスレッドの望ましいスタックサイズを定義することができます。


多くの場合、同じプロパティまたは同じパターンのスレッドを作成して構成すると、コードの可読性が損なわれることがあります。これは、 java.util.concurrent.ThreadFactoryを使用できるときです。このインタフェースを使用すると、ファクトリパターンとその唯一のメソッドnewThread(Runnable)を通じてスレッドを作成するプロシージャをカプセル化できます。

class WorkerFactory implements ThreadFactory {
    private int id = 0;

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "Worker " + id++);
    }
}

スレッド中断/中断スレッド

各Javaスレッドには割り込みフラグがあり、最初はfalseです。スレッドを中断することは、本質的に、そのフラグをtrueに設定することだけです。そのスレッドで実行されているコードは、時々フラグをチェックしてそのフラグを処理することができます。コードは完全に無視することもできます。しかし、なぜそれぞれのスレッドはそのようなフラグを持っていますか?結局のところ、スレッド上にブール値のフラグを設定することは、必要な時にいつでも自分自身を整理することができます。さて、実行中のスレッドが中断されたときに特別な振る舞いをするメソッドがあります。これらのメソッドはブロッキングメソッドと呼ばれます。これらは、スレッドをWAITINGまたはTIMED_WAITING状態にするメソッドです。スレッドがこの状態にあるときに割り込みを行うと、割り込みフラグがtrueに設定されているのではなく、中断されたスレッドでInterruptedExceptionがスローされ、スレッドは再びRUNNABLEになります。ブロッキングメソッドを呼び出すコードは、チェック例外であるため、InterruptedExceptionを処理する必要があります。だから、その名前、割り込みは効果的にそれを終了する、WAITを中断する効果を持つことができます。何らかの形で待機しているメソッド(例えばIOのブロッキング)がスレッドを待機状態にしないので、そのような方法で割り込みに応答するわけではないことに注意してください。最後に、ブロッキングメソッドに入る(つまり、待機状態に入る)割り込みフラグが設定されたスレッドはすぐにInterruptedExceptionをスローし、割り込みフラグはクリアされます。

これらの機構以外にも、Javaは割り込みに特別な意味的意味を割り当てません。コードは、好きなように割り込みを自由に解釈することができます。しかし、ほとんどの場合、中断はスレッドに信号を送るために使用され、最も早い時点で実行を停止する必要があります。しかし、上記から明らかなように、実行を停止するために、その割り込みに適切に反応するのはそのスレッド上のコードです。スレッドを停止することはコラボレーションです。スレッドが中断されると、実行中のコードはスタックトレースの深さまで数レベルになる可能性があります。ほとんどのコードではブロッキングメソッドが呼び出されず、スレッドの停止を過度に遅らせることができないほどタイムリーに終了します。ほとんどの場合、割り込みに応答することに関わるコードは、ループが残っていない限り、またはループが停止するようにフラグが設定されるまで、ループ処理中のコードです。おそらく無限のタスクを処理するループ(つまり、原則的に実行し続けるループ)は、ループを終了するために割り込みフラグをチェックする必要があります。有限ループの場合、セマンティクスは、すべてのタスクを終了する前に終了しなければならないか、またはいくつかのタスクを未処理のままにすることが適切である可能性があります。ブロッキングメソッドを呼び出すコードは、InterruptedExceptionを処理するように強制されます。意味的に可能であれば、単にInterruptedExceptionを伝播し、それをスローすると宣言できます。このように、それは呼び出し側に関してブロッキングメソッドそのものになります。例外を伝播できない場合は、最低限割り込みフラグを設定する必要があります。したがって、呼び出し側はスレッドを中断したことをスタックに通知します。場合によっては、InterruptedExceptionに関係なくメソッドを待機させる必要があります。この場合、メソッドが終了するまで割り込みフラグの設定を遅らせる必要があります。ローカル変数を設定する必要があります。そのスレッドを中断する。

例:

中断されたタスクの処理を停止するコードの例

class TaskHandler implements Runnable {
    
    private final BlockingQueue<Task> queue;

    TaskHandler(BlockingQueue<Task> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) { // check for interrupt flag, exit loop when interrupted
            try {
                Task task = queue.take(); // blocking call, responsive to interruption
                handle(task);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // cannot throw InterruptedException (due to Runnable interface restriction) so indicating interruption by setting the flag
            }
        }
    }
    
    private void handle(Task task) {
        // actual handling
    }
}

完全に完了するまで割り込みフラグの設定を遅らせるコードの例:

class MustFinishHandler implements Runnable {

    private final BlockingQueue<Task> queue;

    MustFinishHandler(BlockingQueue<Task> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        boolean shouldInterrupt = false;
        
        while (true) {
            try {
                Task task = queue.take();
                if (task.isEndOfTasks()) {
                    if (shouldInterrupt) {
                        Thread.currentThread().interrupt();
                    }
                    return;
                }
                handle(task);
            } catch (InterruptedException e) {
                shouldInterrupt = true; // must finish, remember to set interrupt flag when we're done
            }
        }
    }

    private void handle(Task task) {
        // actual handling
    }
}

タスクの固定リストを持つが、中断されると早期に終了するコードの例

class GetAsFarAsPossible implements Runnable {

    private final List<Task> tasks = new ArrayList<>();

    @Override
    public void run() {
        for (Task task : tasks) {
            if (Thread.currentThread().isInterrupted()) {
                return;
            }
            handle(task);
        }
    }

    private void handle(Task task) {
        // actual handling
    }
}

共有グローバルキューを持つ複数のプロデューサ/コンシューマの例

以下のコードは、複数のプロデューサー/コンシューマープログラムを紹介しています。プロデューサスレッドとコンシューマスレッドは同じグローバルキューを共有します。

import java.util.concurrent.*;
import java.util.Random;

public class ProducerConsumerWithES {
    public static void main(String args[]) {
        BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
         
        ExecutorService pes = Executors.newFixedThreadPool(2);
        ExecutorService ces = Executors.newFixedThreadPool(2);
          
        pes.submit(new Producer(sharedQueue, 1));
        pes.submit(new Producer(sharedQueue, 2));
        ces.submit(new Consumer(sharedQueue, 1));
        ces.submit(new Consumer(sharedQueue, 2));
         
        pes.shutdown();
        ces.shutdown();
    }
}

/* Different producers produces a stream of integers continuously to a shared queue, 
which is shared between all Producers and consumers */

class Producer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    private Random random = new Random();
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        // Producer produces a continuous stream of numbers for every 200 milli seconds
        while (true) {
            try {
                int number = random.nextInt(1000);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
                Thread.sleep(200);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}
/* Different consumers consume data from shared queue, which is shared by both producer and consumer threads */
class Consumer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }
    @Override
    public void run() {
        // Consumer consumes numbers generated from Producer threads continuously
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

出力:

Produced:69:by thread:2
Produced:553:by thread:1
Consumed: 69:by thread:1
Consumed: 553:by thread:2
Produced:41:by thread:2
Produced:796:by thread:1
Consumed: 41:by thread:1
Consumed: 796:by thread:2
Produced:728:by thread:2
Consumed: 728:by thread:1

等々 ................

説明:

  1. sharedQueueは、 LinkedBlockingQueueあり、すべてのプロデューサとコンシューマのスレッドで共有されます。
  2. プロデューサスレッドは、200ミリ秒ごとに1つの整数を連続的にsharedQueueし、 sharedQueue追加します
  3. ConsumerスレッドはsharedQueueから整数を連続的にConsumerます。
  4. このプログラムは、明示的にsynchronizedされた構造体またはLock構造体なしで実装されます。 BlockingQueueはそれを達成するための鍵です。

BlockingQueue実装は、主にプロデューサ/コンシューマキューで使用するように設計されています。

BlockingQueueの実装はスレッドセーフです。すべてのキューイング方法は、内部ロックまたは他の形式の同時実行制御を使用して、その効果をアトミックに実現します。

排他的書込み/同時読出しアクセス

あるプロセスが同じ "データ"を同時に読み書きする必要があることがあります。

ReadWriteLockインタフェースとそのReentrantReadWriteLock実装では、次のように記述できるアクセスパターンを使用できます。

  1. 任意の数のデータを同時に読み取ることができます。少なくとも1つのリーダアクセスが許可されている場合、ライタアクセスは不可能です。
  2. 多くの場合、1つのライターがデータにアクセスできます。ライターアクセスが許可されている場合、リーダーはデータにアクセスできません。

実装は次のようになります。

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Sample {

// Our lock. The constructor allows a "fairness" setting, which guarantees the chronology of lock attributions.
protected static final ReadWriteLock RW_LOCK = new ReentrantReadWriteLock();

// This is a typical data that needs to be protected for concurrent access
protected static int data = 0;

/** This will write to the data, in an exclusive access */
public static void writeToData() {
    RW_LOCK.writeLock().lock();
    try {
        data++;
    } finally {
        RW_LOCK.writeLock().unlock();
    }
}

public static int readData() {
    RW_LOCK.readLock().lock();
    try {
        return data;
    } finally {
        RW_LOCK.readLock().unlock();
    }
}

}

注1 :この正確なユースケースは、 AtomicIntegerを使用したよりクリーンなソリューションをAtomicIntegerいますが、ここで説明するのはアクセスパターンです。これは、ここでのデータが整数型であるという事実に関係なく機能します。

注記2 :読書部のロックは本当に必要ですが、カジュアルリーダーには見えないかもしれません。実際、あなたが読者側でロックしていなければ、いくつかのことが間違っている可能性があります:

  1. プリミティブ値の書き込みは、すべてのJVMでアトミックであることが保証されていないため、 dataが64ビットのlong型の場合は、64ビット書き込みの32ビットのみが読み取られdata
  2. それを実行しなかったスレッドからの書き込みの可視性は、書き込みと読み取りの間にHappen Beforeの関係を確立した場合にのみ、JVMによって保証されます。この関係は、リーダとライタの両方がそれぞれのロックを使用する場合に確立されますが、そうでなければ
Java SE 8

より高いパフォーマンスが必要な場合、特定のタイプの使用法の下では、 StampedLockと呼ばれるより速いロックタイプがあります。これは、 StampedLock 、オプティミスティックロックモードを実装します。このロックはReadWriteLockとは非常に異なる働きをしますが、このサンプルは転置型ではありません。

実行可能オブジェクト

Runnableインタフェースはスレッド内で実行されるコードを格納するための単一のメソッドrun()定義します。

RunnableオブジェクトはThreadコンストラクタに渡されます。スレッドのstart()メソッドが呼び出されます。

public class HelloRunnable implements Runnable {

    @Override
    public void run() {
        System.out.println("Hello from a thread");
    }

    public static void main(String[] args) {
        new Thread(new HelloRunnable()).start();
    }
}

Java8の例:

public static void main(String[] args) {
    Runnable r = () -> System.out.println("Hello world");
    new Thread(r).start();
}

実行可能スレッドとスレッドサブクラス

Runnableため、オブジェクトの雇用は、より一般的であるRunnableオブジェクト以外のクラスのサブクラスできるThread

Threadサブクラス化は単純なアプリケーションでは使いやすくなりますが、タスククラスがThread子孫でなければならないというThreadます。

Runnableオブジェクトは、高レベルのスレッド管理APIに適用できます。

セマフォ

セマフォは、スレッドによって取得および解放される一連の許可を維持する高水準シンクロナイザです。セマフォは、スレッドが取得するときに減少し、スレッドが解放されたときに増加する許可のカウンタとして想像することができます。スレッドが取得を試みるときに許可の量が0場合、許可が利用可能になるまで(またはスレッドが中断されるまで)スレッドはブロックされます。

セマフォは次のように初期化されます。

Semaphore semaphore = new Semaphore(1); // The int value being the number of permits

セマフォー・コンストラクターは、公平性のために追加のブール・パラメーターを受け入れます。 falseに設定すると、このクラスはスレッドが許可を取得する順序を保証しません。フェアネスがtrueに設定されている場合、セマフォは、acquireメソッドのいずれかを呼び出すスレッドが、それらのメソッドの呼び出しが処理された順序で許可を得るために選択されることを保証します。それは次のように宣言されます:

Semaphore semaphore = new Semaphore(1, true);

次に、javadocの例を見てみましょう。セマフォを使って項目のプールへのアクセスを制御します。この例では、セマフォを使用して、 getItem()が呼び出されたときに取得される項目が常に存在することを保証するために、ブロック機能を提供しています。

class Pool {
    /*
     * Note that this DOES NOT bound the amount that may be released!
     * This is only a starting value for the Semaphore and has no other
     * significant meaning UNLESS you enforce this inside of the
     * getNextAvailableItem() and markAsUnused() methods
     */
    private static final int MAX_AVAILABLE = 100;
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

    /**
     * Obtains the next available item and reduces the permit count by 1. 
     * If there are no items available, block.
     */
    public Object getItem() throws InterruptedException {
        available.acquire();
        return getNextAvailableItem();
    }

    /**
     * Puts the item into the pool and add 1 permit.
     */
    public void putItem(Object x) {
        if (markAsUnused(x))
            available.release();
    }

    private Object getNextAvailableItem() {
        // Implementation
    }

    private boolean markAsUnused(Object o) {
        // Implementation
    }
}

スレッドプールを使って2つの `int`配列を追加する

スレッドプールにはタスクのキューがあり、そのうちそれぞれがこれらのスレッドで実行されます。

次の例は、スレッドプールを使用して2つのint配列を追加する方法を示しています。

Java SE 8
int[] firstArray = { 2, 4, 6, 8 };
int[] secondArray = { 1, 3, 5, 7 };
int[] result = { 0, 0, 0, 0 };

ExecutorService pool = Executors.newCachedThreadPool(); 

// Setup the ThreadPool:
// for each element in the array, submit a worker to the pool that adds elements
for (int i = 0; i < result.length; i++) {
    final int worker = i;
    pool.submit(() -> result[worker] = firstArray[worker] + secondArray[worker] );
}

// Wait for all Workers to finish:
try {
    // execute all submitted tasks
    pool.shutdown();
    // waits until all workers finish, or the timeout ends
    pool.awaitTermination(12, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
    pool.shutdownNow(); //kill thread
}

System.out.println(Arrays.toString(result));

ノート:

  1. この例は純粋に例示的なものである。実際には、この小さなタスクにスレッドを使用することでスピードアップはありません。タスクの作成とスケジューリングのオーバーヘッドは、タスクを実行するのに費やされる時間を浪費するため、減速が予想されます。

  2. Java 7以前を使用していた場合は、タスクを実装するためにlambdaの代わりに匿名のクラスを使用します。

システムスレッド以外の、プログラムによって開始されたすべてのスレッドのステータスを取得する

コードスニペット:

import java.util.Set;

public class ThreadStatus {
    public static void main(String args[]) throws Exception {
        for (int i = 0; i < 5; i++){
            Thread t = new Thread(new MyThread());
            t.setName("MyThread:" + i);
            t.start();
        }
        int threadCount = 0;
        Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
        for (Thread t : threadSet) {
            if (t.getThreadGroup() == Thread.currentThread().getThreadGroup()) {
                System.out.println("Thread :" + t + ":" + "state:" + t.getState());
                ++threadCount;
            }
        }
        System.out.println("Thread count started by Main thread:" + threadCount);
    }
}

class MyThread implements Runnable {
    public void run() {
        try {
            Thread.sleep(2000);
        } catch(Exception err) {
            err.printStackTrace();
        }
    }
}

出力:

Thread :Thread[MyThread:1,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:3,5,main]:state:TIMED_WAITING
Thread :Thread[main,5,main]:state:RUNNABLE
Thread :Thread[MyThread:4,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:0,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:2,5,main]:state:TIMED_WAITING
Thread count started by Main thread:6

説明:

Thread.getAllStackTraces().keySet()は、アプリケーションスレッドとシステムスレッドを含むすべてのThread返します。アプリケーションが開始したスレッドの状態だけに興味がある場合は、特定のスレッドのスレッドグループをメインプログラムスレッドと照合してThreadセットを繰り返します。

上記のスレッドグループ条件がない場合、プログラムは以下のシステムスレッドのステータスを返します。

Reference Handler
Signal Dispatcher
Attach Listener
Finalizer

呼び出し可能と未来

Runnableは、別のスレッドで実行されるコードをラップする手段を提供しますが、実行結果を返すことができないという制限があります。 Runnableの実行から何らかの戻り値を得る唯一の方法は、 Runnable外のスコープでアクセス可能な変数に結果を代入することです。

CallableはJava 5でRunnableピアとして導入されました。 Callableは本質的に同じですが、 runする代わりにcallメソッドを持っている点が異なりruncallメソッドは、結果を返す追加の機能を持ち、チェック例外をスローすることもできます。

Callableタスクの提出の結果は、 Futureを介してタップすることができます

Futureは、 Callable計算の結果を格納するソートのコンテナと考えることができます。呼び出し可能関数の計算は別のスレッドで実行でき、 Futureの結果をタップしようとするとブロックされ、利用可能になった時点で結果が返されます。

呼び出し可能なインタフェース

public interface Callable<V> {
    V call() throws Exception;
}

未来

interface Future<V> {
    V get();
    V get(long timeout, TimeUnit unit);
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
}

Callableと今後の使用例:

public static void main(String[] args) throws Exception {
    ExecutorService es = Executors.newSingleThreadExecutor();
          
    System.out.println("Time At Task Submission : " + new Date());
    Future<String> result = es.submit(new ComplexCalculator());
    // the call to Future.get() blocks until the result is available.So we are in for about a 10 sec wait now
    System.out.println("Result of Complex Calculation is : " + result.get());
    System.out.println("Time At the Point of Printing the Result : " + new Date());
}

長い計算をするCallable

public class ComplexCalculator implements Callable<String> {

    @Override
    public String call() throws Exception {
        // just sleep for 10 secs to simulate a lengthy computation
        Thread.sleep(10000);            
        System.out.println("Result after a lengthy 10sec calculation");
        return "Complex Result"; // the result 
    }
}

出力

Time At Task Submission : Thu Aug 04 15:05:15 EDT 2016
Result after a lengthy 10sec calculation
Result of Complex Calculation is : Complex Result
Time At the Point of Printing the Result : Thu Aug 04 15:05:25 EDT 2016

未来に許される他の操作

get()は実際の結果を抽出するメソッドですが、将来はプロビジョニングがあります

  • get(long timeout, TimeUnit unit)は、現在のスレッドが結果を待つ最長期間を定義します。
  • タスクコールcancel(mayInterruptIfRunning)をキャンセルします。フラグmayInterruptは、タスクが開始され、今実行されている場合に中断されるべきであることを示す。
  • isDone()呼び出すことによって、タスクが完了/終了したかどうかをチェックします。
  • 長いタスクがキャンセルされたかどうかを確認するには、 isCancelled()ます。

同期補助としてのロック

Java 5の並行パッケージ導入以前は、スレッディングが低レベルでした。このパッケージの導入により、いくつかのより高いレベルの並行プログラミング補助/構成が提供されました。

ロックは、基本的に同期ブロックまたはキーワードと同じ目的を果たすスレッド同期メカニズムです。

組み込みロック

int count = 0; // shared among multiple threads

public void doSomething() {
    synchronized(this) {
        ++count; // a non-atomic operation
    }
}

ロックを使用した同期

int count = 0; // shared among multiple threads

Lock lockObj = new ReentrantLock();
public void doSomething() {
    try {
        lockObj.lock();
        ++count; // a non-atomic operation
    } finally {    
        lockObj.unlock(); // sure to release the lock without fail
    }
}

また、ロックは、ロックなどの内在的なロックが提供しないが、中断に応答したままであるか、ロックしようとする機能を有し、不可能な場合にはブロックしない。

中断に応答してロックする

class Locky {
    int count = 0; // shared among multiple threads

    Lock lockObj = new ReentrantLock();

    public void doSomething() {
        try {
            try {
                lockObj.lockInterruptibly();
                ++count; // a non-atomic operation
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // stopping
            }
        } finally {
            if (!Thread.currentThread().isInterrupted()) {
                lockObj.unlock(); // sure to release the lock without fail
            }
        }
    }
}

ロックできるときにのみ何かをする

public class Locky2 {
    int count = 0; // shared among multiple threads

    Lock lockObj = new ReentrantLock();

    public void doSomething() {
        boolean locked = lockObj.tryLock(); // returns true upon successful lock
        if (locked) {
            try {
                ++count; // a non-atomic operation
            } finally {
                lockObj.unlock(); // sure to release the lock without fail
            }
        }
    }
}

利用可能なロックにはいくつかのバリエーションがあります。詳細については、 こちらのAPIドキュメントを参照してください。



Modified text is an extract of the original Stack Overflow Documentation
ライセンスを受けた CC BY-SA 3.0
所属していない Stack Overflow