Sök…


Introduktion

Samtidig beräkning är en form av beräkning där flera beräkningar utförs samtidigt istället för i följd. Java-språket är utformat för att stödja samtidig programmering genom användning av trådar. Objekt och resurser kan nås av flera trådar; varje tråd kan potentiellt komma åt alla objekt i programmet och programmeraren måste se till att läs- och skrivåtkomst till objekt är korrekt synkroniserad mellan trådarna.

Anmärkningar

Relaterade ämnen på StackOverflow:

Grundläggande multithreading

Om du har många uppgifter att utföra, och alla dessa uppgifter inte är beroende av resultatet av de föregående, kan du använda Multithreading för din dator för att utföra alla dessa uppgifter samtidigt med fler processorer om din dator kan. Detta kan göra att ditt program körs snabbare om du har några stora oberoende uppgifter.

class CountAndPrint implements Runnable {

    private final String name;

    CountAndPrint(String name) {
        this.name = name;
    }

    /** This is what a CountAndPrint will do */
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            System.out.println(this.name + ": " + i);
        }
    }

    public static void main(String[] args) {
        // Launching 4 parallel threads
        for (int i = 1; i <= 4; i++) {
            // `start` method will call the `run` method 
            // of CountAndPrint in another thread
            new Thread(new CountAndPrint("Instance " + i)).start();
        }

        // Doing some others tasks in the main Thread
        for (int i = 0; i < 10000; i++) {
            System.out.println("Main: " + i);
        }
    }
}

Koden för CountAndPrint för de olika CountAndPrint instanserna körs i icke förutsägbar ordning. Ett utdrag av en exekvering kan se ut så här:

Instance 4: 1
Instance 2: 1
Instance 4: 2
Instance 1: 1
Instance 1: 2
Main: 1
Instance 4: 3
Main: 2
Instance 3: 1
Instance 4: 4
...

Producenter och konsumenter

Ett enkelt exempel på producent-konsumentproblemlösning. Observera att JDK-klasser ( AtomicBoolean och BlockingQueue ) används för synkronisering, vilket minskar risken för att skapa en ogiltig lösning. Konsultera Javadoc för olika typer av BlockingQueue ; att välja olika implementering kan förändra beteendet i detta exempel drastiskt (som DelayQueue eller Priority Queue ).

public class Producer implements Runnable {

    private final BlockingQueue<ProducedData> queue;

    public Producer(BlockingQueue<ProducedData> queue) {
        this.queue = queue;
    }

    public void run() {
        int producedCount = 0;
        try {
            while (true) {
                producedCount++;
                //put throws an InterruptedException when the thread is interrupted
                queue.put(new ProducedData());
            }
        } catch (InterruptedException e) {
            // the thread has been interrupted: cleanup and exit
            producedCount--;
            //re-interrupt the thread in case the interrupt flag is needeed higher up
            Thread.currentThread().interrupt();
        }
        System.out.println("Produced " + producedCount + " objects");
    }
}

public class Consumer implements Runnable {

    private final BlockingQueue<ProducedData> queue;

    public Consumer(BlockingQueue<ProducedData> queue) {
        this.queue = queue;
    }

    public void run() {
        int consumedCount = 0;
        try {
            while (true) {
                //put throws an InterruptedException when the thread is interrupted
                ProducedData data = queue.poll(10, TimeUnit.MILLISECONDS);
                // process data
                consumedCount++;
            }
        } catch (InterruptedException e) {
            // the thread has been interrupted: cleanup and exit
            consumedCount--;
            //re-interrupt the thread in case the interrupt flag is needeed higher up
            Thread.currentThread().interrupt();
        }
        System.out.println("Consumed " + consumedCount + " objects");
    }
}


public class ProducerConsumerExample {
    static class ProducedData {    
        // empty data object
    }

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<ProducedData> queue = new ArrayBlockingQueue<ProducedData>(1000);
        // choice of queue determines the actual behavior: see various BlockingQueue implementations

        Thread producer = new Thread(new Producer(queue));
        Thread consumer = new Thread(new Consumer(queue));

        producer.start();
        consumer.start();

        Thread.sleep(1000);
        producer.interrupt();
        Thread.sleep(10);
        consumer.interrupt();
    }
}

Använda ThreadLocal

Ett användbart verktyg i Java Concurrency är ThreadLocal - detta gör att du kan ha en variabel som är unik för en viss tråd. Således, om samma kod körs i olika trådar, kommer dessa exekveringar inte att dela värdet, utan istället har varje tråd sin egen variabel som är lokal för tråden .

Till exempel används detta ofta för att fastställa sammanhanget (t.ex. behörighetsinformation) för hantering av en begäran i en servlet. Du kan göra något liknande:

private static final ThreadLocal<MyUserContext> contexts = new ThreadLocal<>();

public static MyUserContext getContext() {
    return contexts.get(); // get returns the variable unique to this thread
}

public void doGet(...) {
    MyUserContext context = magicGetContextFromRequest(request); 
    contexts.put(context); // save that context to our thread-local - other threads
                           // making this call don't overwrite ours
    try {
        // business logic
    } finally {
        contexts.remove(); // 'ensure' removal of thread-local variable
    }
}

Istället för att skicka MyUserContext till varje enskild metod kan du istället använda MyServlet.getContext() där du behöver det. Nu introducerar det naturligtvis en variabel som måste dokumenteras, men den är trådsäker, vilket eliminerar en hel del av nackdelarna med att använda en så högt omfattande variabel.

Den viktigaste fördelen här är att varje tråd har sin egen lokala lokala variabel i den contexts behållare. Så länge du använder den från en definierad startpunkt (som att kräva att varje servlet behåller sin kontext, eller kanske genom att lägga till ett servletfilter) kan du lita på att det här sammanhanget är där när du behöver det.

CountDownLatch

CountDownLatch

Ett synkroniseringshjälpmedel som gör att en eller flera trådar kan vänta tills en uppsättning operationer som utförs i andra trådar slutförs.

  1. En CountDownLatch initialiseras med ett givet antal.
  2. Väntarmetoderna blockerar tills det aktuella antalet når noll på grund av invokationer av countDown() , varefter alla väntetrådar släpps och eventuella efterföljande invokationer av väntar återgår omedelbart.
  3. Detta är ett fenomen med en skott - räkningen kan inte återställas. Om du behöver en version som återställer räkningen kan du överväga att använda en CyclicBarrier .

Viktiga metoder:

public void await() throws InterruptedException

Gör att den aktuella tråden väntar tills spärren har räknats ner till noll, såvida inte tråden avbryts.

public void countDown()

Minskar spärren på spärren och släpper alla väntetrådar om räkningen når noll.

Exempel:

import java.util.concurrent.*;

class DoSomethingInAThread implements Runnable {
    CountDownLatch latch;
    public DoSomethingInAThread(CountDownLatch latch) {
        this.latch = latch;
    } 
    public void run() {
        try {
            System.out.println("Do some thing");
            latch.countDown();
        } catch(Exception err) {
            err.printStackTrace();
        }
    }
}

public class CountDownLatchDemo {
    public static void main(String[] args) {
        try {
            int numberOfThreads = 5;
            if (args.length < 1) {
                System.out.println("Usage: java CountDownLatchDemo numberOfThreads");
                return;
            }
            try {
                numberOfThreads = Integer.parseInt(args[0]);
            } catch(NumberFormatException ne) {
            
            }
            CountDownLatch latch = new CountDownLatch(numberOfThreads);
            for (int n = 0; n < numberOfThreads; n++) {
                Thread t = new Thread(new DoSomethingInAThread(latch));
                t.start();
            }
            latch.await();
            System.out.println("In Main thread after completion of " + numberOfThreads + " threads");
        } catch(Exception err) {
            err.printStackTrace();
        }
    }
}

produktion:

java CountDownLatchDemo 5
Do some thing
Do some thing
Do some thing
Do some thing
Do some thing
In Main thread after completion of 5 threads

Förklaring:

  1. CountDownLatch initialiseras med en räknare på 5 i Main thread
  2. Huvudtråden väntar med metoden await() .
  3. Fem fall av DoSomethingInAThread har skapats. Varje instans minskade räknaren med countDown() -metoden.
  4. När räknaren blir noll kommer Huvudtråden att återupptas

Synkronisering

I Java finns det en inbyggd låsmekanism på språknivå: det synchronized blocket, som kan använda valfritt Java-objekt som ett inre lås (dvs varje Java-objekt kan ha en bildskärm associerad med det).

Intrinsiska lås ger atomicitet till grupper av uttalanden. För att förstå vad det betyder för oss, låt oss titta på ett exempel där synchronized är användbar:

private static int t = 0;
private static Object mutex = new Object();

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
    for (int i = 0; i < 100; i++) {
        executorService.execute(() -> {
            synchronized (mutex) {
                t++;
                System.out.println(MessageFormat.format("t: {0}", t));
            }
        });
    }
    executorService.shutdown();
}

I det här fallet, om det inte vore för det synchronized blocket, skulle det ha varit flera problem med samtidighet inblandade. Den första skulle vara med postökningsoperatören (det är inte atomiskt i sig), och den andra skulle vara att vi skulle observera värdet på t efter att en godtycklig mängd andra trådar har haft chansen att ändra det. Men eftersom vi skaffade oss ett inneboende lås kommer det inte att finnas några tävlingsförhållanden här och utgången kommer att innehålla siffror från 1 till 100 i sin normala ordning.

Intrinsiska lås i Java är mutexer (dvs ömsesidig exekveringslås). Ömsesidig utförande innebär att om en tråd har förvärvat låset, kommer den andra att tvingas vänta på att den första släpper det innan den kan skaffa låset för sig själv. Obs: En operation som kan sätta tråden i väntetillstånd (viloläge) kallas en blockeringsoperation . Således är förvärv av ett lås en blockeringsoperation.

Intrinsiska lås i Java är reentranta . Detta innebär att om en tråd försöker skaffa ett lås som den redan äger kommer den inte att blockeras och den lyckas förvärva den. Till exempel kommer följande kod inte att blockeras när den anropas:

public void bar(){
    synchronized(this){
        ...
    }
}
public void foo(){
    synchronized(this){
        bar();
    }
}

Förutom synchronized block finns det också synchronized metoder.

Följande kodblock är praktiskt taget likvärdiga (även om bytekoden verkar vara annorlunda):

  1. synchronized block på this :

    public void foo() {
        synchronized(this) {
            doStuff();
        }
    }
    
  2. synchronized metod:

     public synchronized void foo() {
         doStuff();
     }
    

På samma sätt för static metoder:

class MyClass {
    ...
    public static void bar() {
        synchronized(MyClass.class) {
            doSomeOtherStuff();
        }
    }
}

har samma effekt som detta:

class MyClass {
    ...
    public static synchronized void bar() {
        doSomeOtherStuff();
    }
}

Atomoperationer

En atomoperation är en operation som utförs "på en gång", utan chans att andra trådar observerar eller modifierar tillståndet under atomoperationens genomförande.

Låt oss överväga ett DÅLIGT EXEMPEL .

private static int t = 0;

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
    for (int i = 0; i < 100; i++) {
        executorService.execute(() -> {
            t++;
            System.out.println(MessageFormat.format("t: {0}", t));
        });
    }
    executorService.shutdown();
}

I det här fallet finns det två frågor. Den första frågan är att postökningsoperatören inte är atomisk. Det består av flera operationer: få värdet, lägg till 1 till värdet, ställ in värdet. Det är därför som om vi kör exemplet är det troligt att vi inte ser t: 100 i utgången - två trådar kan samtidigt få värdet, öka det och ställa in det: låt oss säga att värdet på t är 10 och två trådarna ökar t. Båda trådarna kommer att ställa in värdet på t till 11, eftersom den andra tråden observerar värdet på t innan den första tråden hade avslutat steget.

Den andra frågan handlar om hur vi observerar t. När vi skriver ut värdet på t kan värdet redan ha ändrats av en annan tråd efter den här trådens inkrement.

För att lösa dessa problem använder vi java.util.concurrent.atomic.AtomicInteger , som har många atomoperationer för oss att använda.

private static AtomicInteger t = new AtomicInteger(0);

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(400); // The high thread count is for demonstration purposes.
    for (int i = 0; i < 100; i++) {
        executorService.execute(() -> {
            int currentT = t.incrementAndGet();
            System.out.println(MessageFormat.format("t: {0}", currentT));
        });
    }
    executorService.shutdown();
}

Metoden incrementAndGet för AtomicInteger atomiskt och returnerar det nya värdet, vilket eliminerar det tidigare racetillståndet. Observera att i det här exemplet kommer linjerna fortfarande att fungera, eftersom vi inte gör några ansträngningar för att ordna println samtal och att detta faller utanför ramen för detta exempel, eftersom det kräver synkronisering och målet med detta exempel är att visa hur att använda AtomicInteger att eliminera AtomicInteger beträffande tillstånd.

Skapa grundläggande låst system

Ett dödläge inträffar när två konkurrerande åtgärder väntar på att den andra ska slutföras, och därmed aldrig gör någon heller. I java finns ett lås associerat med varje objekt. För att undvika samtidiga ändringar som gjorts av flera trådar på ett enda objekt kan vi använda synchronized nyckelord, men allt kostar. Om du använder felaktigt synchronized nyckelord kan det leda till fastade system som kallas för dödlåst system.

Tänk på att det finns 2 trådar som arbetar på en instans, låter oss ringa trådar som första och andra, och låt oss säga att vi har två resurser R1 och R2. Först förvärvar R1 och behöver också R2 för att fullfölja medan andra förvärvar R2 och behöver R1 för slutförande.

så säg vid tidpunkten t = 0,

Först har R1 och andra R2. nu väntar First på R2 medan Second väntar på R1. denna vänta är obestämd och detta leder till dödläge.

public class Example2 {
    
    public static void main(String[] args) throws InterruptedException {
        final DeadLock dl = new DeadLock();
        Thread t1 = new Thread(new Runnable() {
    
            @Override
            public void run() {
                // TODO Auto-generated method stub
                dl.methodA();
            }
        });
   
        Thread t2 = new Thread(new Runnable() {
    
            @Override
            public void run() {
                // TODO Auto-generated method stub
                try {
                    dl.method2();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        t1.setName("First");
        t2.setName("Second");
        t1.start();
        t2.start();
    }
}

class DeadLock {
    
    Object mLock1 = new Object();
    Object mLock2 = new Object();
    

    public void methodA() {
        System.out.println("methodA wait for mLock1  " + Thread.currentThread().getName());
        synchronized (mLock1) {
            System.out.println("methodA mLock1 acquired   " + Thread.currentThread().getName());
            try {
                Thread.sleep(100);
                method2();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    public void method2() throws InterruptedException {
        System.out.println("method2 wait for mLock2  " + Thread.currentThread().getName());
        synchronized (mLock2) {
            System.out.println("method2  mLock2 acquired   " + Thread.currentThread().getName());
            Thread.sleep(100);
            method3();
        }
    }
    public void method3() throws InterruptedException {
        System.out.println("method3  mLock1  "+ Thread.currentThread().getName());
        synchronized (mLock1) {
            System.out.println("method3   mLock1 acquired  " + Thread.currentThread().getName());
        }
    }
}

Output från detta program:

methodA wait for mLock1  First
method2 wait for mLock2  Second
method2  mLock2 acquired   Second
methodA mLock1 acquired   First
method3  mLock1  Second
method2 wait for mLock2  First

Pausar körningen

Thread.sleep får den aktuella tråden att avbryta körningen under en viss period. Detta är ett effektivt sätt att göra processortid tillgänglig för de andra trådarna i en applikation eller andra applikationer som kan köras på ett datorsystem. Det finns två överbelastade sleep i trådklassen.

En som specificerar sömntiden till millisekundet

public static void sleep(long millis) throws InterruptedException

En som specificerar vilotiden till nanosekundet

public static void sleep(long millis, int nanos)

Pausar körningen i 1 sekund

Thread.sleep(1000);

Det är viktigt att notera att detta är ett ledtråd till operativsystemets kärnans schemaläggare. Detta kanske inte nödvändigtvis är exakt, och vissa implementeringar beaktar inte ens nanosekundparametern (möjligen avrundning till närmaste millisekund).

Det rekommenderas att bifoga ett samtal till Thread.sleep i försök / fånga och fånga InterruptedException .

Visualisera läs- / skrivbarriärer medan du använder synkroniserad / flyktig

Som vi vet att vi bör använda synchronized nyckelord för att göra exekvering av en metod eller blockera. Men få av oss kanske inte är medvetna om en viktigare aspekt av att använda synchronized och volatile nyckelord: bortsett från att skapa en enhet av kodatomen ger det också läs / skrivbarriär . Vad är denna läs- / skrivbarriär? Låt oss diskutera detta med hjälp av ett exempel:

class Counter {

  private Integer count = 10;

  public synchronized void incrementCount() {
    count++;
  }

  public Integer getCount() {
    return count;
  }
}

Låt oss anta att en tråd A ringer incrementCount() först sedan en annan tråd B kallar getCount() . I detta scenario finns det ingen garanti för att B ser uppdaterat värde för count . Det kan fortfarande se count som 10 , även det är också möjligt att det aldrig ser uppdaterat värde för count någonsin.

För att förstå detta beteende måste vi förstå hur Java-minnesmodellen integreras med hårdvaruarkitektur. I Java har varje tråd sin egen trådstack. Denna stack innehåller: metodsamtalstack och lokal variabel som skapats i den tråden. I ett multikärnsystem är det mycket möjligt att två trådar körs samtidigt i separata kärnor. I ett sådant scenario är det möjligt att en del av en tråds stack ligger i register / cache i en kärna. Om det finns inuti en tråd, får du åtkomst till ett objekt med synchronized (eller volatile ) nyckelord, efter synchronized block som tråden synkroniserar är det den lokala kopian av den variabeln med huvudminnet. Detta skapar en läs / skrivbarriär och ser till att tråden ser det senaste värdet på objektet.

Men i vårt fall, eftersom tråd B inte har använt synkroniserad åtkomst till count , kan det hänvisa värdet på count lagrad i registret och kanske aldrig se uppdateringar från tråd A. För att se till att B ser det senaste räknarvärdet måste vi göra getCount() synkroniseras också.

public synchronized Integer getCount() {
  return count;
}

När tråd A är klar med uppdatering count den upp Counter instansen och skapar samtidigt skrivbarriär och spolar alla ändringar som görs i blocket till huvudminnet. På samma sätt när tråd B förvärvar lås i samma instans av Counter , kommer den in i läsbarriären och läser värdet på count från huvudminnet och ser alla uppdateringar.

synlighet

Samma synlighetseffekt gäller också för volatile läsningar / skrivningar. Alla variabler som uppdateras innan de skrivs till volatile spolas till huvudminnet och alla läsningar efter den volatile variabelläsningen kommer från huvudminnet.

Skapa en java.lang.Thread-instans

Det finns två huvudsakliga metoder för att skapa en tråd i Java. I själva verket är det lika enkelt att skapa en tråd som att skriva koden som kommer att köras i den. De två metoderna skiljer sig åt var du definierar den koden.

I Java representeras en tråd av ett objekt - en instans av java.lang.Thread eller dess underklass. Så den första metoden är att skapa den underklassen och åsidosätta metoden run () .

Obs! Jag använder tråd för att hänvisa till java.lang.Tread- klassen och tråd för att hänvisa till det logiska begreppet trådar.

class MyThread extends Thread {
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println("Thread running!");
        }
    }
}

Eftersom vi redan har definierat koden som ska köras, kan tråden skapas helt enkelt som:

MyThread t = new MyThread();

Trådklassen innehåller också en konstruktör som accepterar en sträng, som kommer att användas som trådens namn. Detta kan vara särskilt användbart vid felsökning av ett flertrådsprogram.

class MyThread extends Thread {
    public MyThread(String name) {
        super(name);
    }
    
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println("Thread running! ");
        }
    }
}

MyThread t = new MyThread("Greeting Producer");

Den andra metoden är att definiera koden med hjälp av java.lang.Runnable och dess enda metodkörning () . Trådklassen låter dig sedan köra den metoden i en separat tråd. För att uppnå detta skapar du tråden med hjälp av en konstruktör som accepterar en instans av grannskapet Runnable .

Thread t = new Thread(aRunnable);

Detta kan vara mycket kraftfullt i kombination med lambdas eller metodreferenser (endast Java 8):

Thread t = new Thread(operator::hardWork);

Du kan också ange trådens namn.

Thread t = new Thread(operator::hardWork, "Pi operator");

Praktiskt sett kan du använda båda metoderna utan bekymmer. Men den allmänna visdomen säger att använda den senare.


För alla fyra nämnda konstruktörer finns det också ett alternativ som accepterar en instans av java.lang.ThreadGroup som den första parametern.

ThreadGroup tg = new ThreadGroup("Operators");
Thread t = new Thread(tg, operator::hardWork, "PI operator");

ThreadGroup representerar en uppsättning trådar. Du kan bara lägga en Thread till en ThreadGroup med hjälp av en Thread är konstruktör. ThreadGroup kan sedan användas för att hantera alla sina trådar tillsammans, liksom trådarna kan få information från sin ThreadGroup .

Så för att sumarize kan tråden skapas med en av dessa offentliga konstruktörer:

Thread()
Thread(String name)
Thread(Runnable target)
Thread(Runnable target, String name)
Thread(ThreadGroup group, String name)
Thread(ThreadGroup group, Runnable target)
Thread(ThreadGroup group, Runnable target, String name)
Thread(ThreadGroup group, Runnable target, String name, long stackSize)

Den sista tillåter oss att definiera önskad stackstorlek för den nya tråden.


Ofta lider kodläsbarheten när du skapar och konfigurerar många trådar med samma egenskaper eller med samma mönster. Det är när java.util.concurrent.ThreadFactory kan användas. Det här gränssnittet gör att du kan kapa in proceduren för att skapa tråden genom fabriksmönstret och dess enda metod newThread (Runnable) .

class WorkerFactory implements ThreadFactory {
    private int id = 0;

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "Worker " + id++);
    }
}

Trådavbrott / stopp av trådar

Varje Java-tråd har en avbrottsflagg, som ursprungligen är falsk. Att avbryta en tråd är i huvudsak inget annat än att sätta den flaggan till true. Koden som körs på den tråden kan kontrollera flaggan ibland och agera på den. Koden kan också ignorera den helt. Men varför skulle varje tråd ha en sådan flagga? Att ha en boolesisk flagga på en tråd är ju något vi bara kan ordna oss, om och när vi behöver det. Tja, det finns metoder som uppför sig på ett speciellt sätt när tråden de kör på avbryts. Dessa metoder kallas blockeringsmetoder. Det här är metoder som sätter tråden i tillståndet WAITING eller TIMED_WAITING. När en tråd är i det här tillståndet, kommer den att avbryta den, att en InterruptException kastas på den avbrutna tråden, snarare än att avbrytningsflaggan är inställd på true, och tråden blir RUNNABLE igen. Kod som åberopar en blockeringsmetod tvingas hantera InterruptException, eftersom det är ett kontrollerat undantag. Så, och därmed namnet, kan ett avbrott ha effekt att avbryta ett väntande och effektivt avsluta det. Observera att inte alla metoder som på något sätt väntar (t.ex. blockerar IO) svarar på avbrott på det sättet, eftersom de inte sätter tråden i väntetillstånd. Slutligen kommer en tråd som har sin interrupt flagga inställd, som går in i en blockeringsmetod (dvs. försöker komma in i ett väntande tillstånd), omedelbart att kasta en InterruptException och interrupt flaggan kommer att rensas.

Förutom denna mekanik tilldelar Java inte någon speciell semantisk betydelse till avbrott. Koden är gratis att tolka ett avbrott på vilket sätt som helst. Men ofta används avbrott för att signalera till en tråd att den bör sluta springa så snart som möjligt. Men som det bör framgå av ovanstående är det upp till koden på den tråden att reagera på det avbrottet på lämpligt sätt för att sluta springa. Att stoppa en tråd är ett samarbete. När en tråd avbryts kan dess körkod vara flera nivåer djupt in i trappan. De flesta koder kallar inte en blockeringsmetod och avslutas tillräckligt snabbt för att inte fördröja trådens stopp för onödigt. Koden som oftast borde handla om att vara lyhörd för avbrott, är kod som finns i en loophanteringsuppgifter tills det inte finns några kvar, eller tills en flagga är inställd som signalerar den för att stoppa den slingan. Loopar som hanterar möjliga oändliga uppgifter (dvs de kör i princip) bör kontrollera avbrottsflaggan för att lämna slingan. För ändliga öglor kan semantiken diktera att alla uppgifter måste vara färdiga innan de avslutas, eller det kan vara lämpligt att lämna vissa uppgifter obehandlade. Kod som kallar blockeringsmetoder kommer att tvingas hantera InterruptException. Om det alls är semantiskt möjligt kan det helt enkelt sprida InterruptException och förklara att det kastas. Som sådan blir det en blockeringsmetod i sig för sina anropare. Om den inte kan sprida undantaget, bör den åtminstone ställa in den avbrutna flaggan, så att uppringare högre upp på bunten också vet att tråden avbröts. I vissa fall måste metoden fortsätta att vänta oavsett InterruptException, i vilket fall den måste försena inställningen av den avbrutna flaggan tills efter att den är klar att vänta, kan det innebära att en lokal variabel ställs in, som ska kontrolleras innan metoden lämnas till avbryt sedan tråden.

Exempel:

Exempel på kod som slutar hantera uppgifter vid avbrott

class TaskHandler implements Runnable {
    
    private final BlockingQueue<Task> queue;

    TaskHandler(BlockingQueue<Task> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) { // check for interrupt flag, exit loop when interrupted
            try {
                Task task = queue.take(); // blocking call, responsive to interruption
                handle(task);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // cannot throw InterruptedException (due to Runnable interface restriction) so indicating interruption by setting the flag
            }
        }
    }
    
    private void handle(Task task) {
        // actual handling
    }
}

Exempel på kod som försenar inställningen av avbrottsflaggan tills den är helt klar:

class MustFinishHandler implements Runnable {

    private final BlockingQueue<Task> queue;

    MustFinishHandler(BlockingQueue<Task> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        boolean shouldInterrupt = false;
        
        while (true) {
            try {
                Task task = queue.take();
                if (task.isEndOfTasks()) {
                    if (shouldInterrupt) {
                        Thread.currentThread().interrupt();
                    }
                    return;
                }
                handle(task);
            } catch (InterruptedException e) {
                shouldInterrupt = true; // must finish, remember to set interrupt flag when we're done
            }
        }
    }

    private void handle(Task task) {
        // actual handling
    }
}

Exempel på kod som har en fast lista med uppgifter men som kan sluta tidigt vid avbrott

class GetAsFarAsPossible implements Runnable {

    private final List<Task> tasks = new ArrayList<>();

    @Override
    public void run() {
        for (Task task : tasks) {
            if (Thread.currentThread().isInterrupted()) {
                return;
            }
            handle(task);
        }
    }

    private void handle(Task task) {
        // actual handling
    }
}

Flera tillverkare / konsumentexempel med delad global kö

Nedanstående kod visar flera producenter / konsumentprogram. Både producent- och konsumenttrådar delar samma globala kö.

import java.util.concurrent.*;
import java.util.Random;

public class ProducerConsumerWithES {
    public static void main(String args[]) {
        BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
         
        ExecutorService pes = Executors.newFixedThreadPool(2);
        ExecutorService ces = Executors.newFixedThreadPool(2);
          
        pes.submit(new Producer(sharedQueue, 1));
        pes.submit(new Producer(sharedQueue, 2));
        ces.submit(new Consumer(sharedQueue, 1));
        ces.submit(new Consumer(sharedQueue, 2));
         
        pes.shutdown();
        ces.shutdown();
    }
}

/* Different producers produces a stream of integers continuously to a shared queue, 
which is shared between all Producers and consumers */

class Producer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    private Random random = new Random();
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        // Producer produces a continuous stream of numbers for every 200 milli seconds
        while (true) {
            try {
                int number = random.nextInt(1000);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
                Thread.sleep(200);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}
/* Different consumers consume data from shared queue, which is shared by both producer and consumer threads */
class Consumer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }
    @Override
    public void run() {
        // Consumer consumes numbers generated from Producer threads continuously
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

produktion:

Produced:69:by thread:2
Produced:553:by thread:1
Consumed: 69:by thread:1
Consumed: 553:by thread:2
Produced:41:by thread:2
Produced:796:by thread:1
Consumed: 41:by thread:1
Consumed: 796:by thread:2
Produced:728:by thread:2
Consumed: 728:by thread:1

och så vidare ................

Förklaring:

  1. sharedQueue , som är en LinkedBlockingQueue delas mellan alla tillverkare- och konsumenttrådar.
  2. Producenttrådar producerar ett heltal för varje 200 milli sekunder kontinuerligt och lägger till det på sharedQueue
  3. Consumer konsumerar heltal från sharedQueue kontinuerligt.
  4. Detta program implementeras utan uttryckliga synchronized eller Lock . BlockingQueue är nyckeln till att uppnå det.

BlockingQueue-implementeringar är utformade för att främst användas för köer mellan producent och konsument.

BlockingQueue-implementeringar är tråd-säkra. Alla kömetoder uppnår sina effekter atomiskt med interna lås eller andra former av samtidighetskontroll.

Exklusivt skriv / samtidigt läsåtkomst

Ibland krävs det att en process samtidigt skriver och läser samma "data".

ReadWriteLock gränssnittet och dess ReentrantReadWriteLock implementering möjliggör ett åtkomstmönster som kan beskrivas enligt följande:

  1. Det kan finnas valfritt antal samtidiga läsare av data. Om åtminstone en läsartillgång beviljas är ingen åtkomst till författare möjlig.
  2. Det kan finnas högst en enda författare till data. Om det finns en åtkomst till författare kan ingen läsare få åtkomst till uppgifterna.

En implementering kan se ut som:

import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Sample {

// Our lock. The constructor allows a "fairness" setting, which guarantees the chronology of lock attributions.
protected static final ReadWriteLock RW_LOCK = new ReentrantReadWriteLock();

// This is a typical data that needs to be protected for concurrent access
protected static int data = 0;

/** This will write to the data, in an exclusive access */
public static void writeToData() {
    RW_LOCK.writeLock().lock();
    try {
        data++;
    } finally {
        RW_LOCK.writeLock().unlock();
    }
}

public static int readData() {
    RW_LOCK.readLock().lock();
    try {
        return data;
    } finally {
        RW_LOCK.readLock().unlock();
    }
}

}

OBS 1 : Det exakta användningsfallet har en renare lösning med AtomicInteger , men det som beskrivs här är ett åtkomstmönster som fungerar oavsett att data här är ett heltal som som en Atomic-variant.

OBS 2 : Låset på läsdelen behövs verkligen, även om det kanske inte ser så ut för den avslappnade läsaren. Om du inte låser dig på läsarsidan kan ett antal saker gå fel, bland vilka:

  1. Skrivningarna av primitiva värden garanteras inte vara atomiska på alla JVM, så att läsaren kunde se t.ex. bara 32 bitar av en 64 bitar skriva om data var en 64 bitars lång typ
  2. Synligheten för skrivningen från en tråd som inte utförde det garanteras av JVM endast om vi upprättar Happen Before-förhållandet mellan skrivarna och läsarna. Denna relation upprättas när både läsare och författare använder sina respektive lås, men inte på annat sätt
Java SE 8

Om högre prestanda krävs, under vissa typer av användning, finns det en snabbare låstyp, kallad StampedLock , som bland annat implementerar ett optimistiskt låsläge. Det här låset fungerar mycket annorlunda än ReadWriteLock , och det här exemplet kan inte transponeras.

Körbart objekt

Runnable gränssnittet definierar en enda metod, run() , som är avsedd att innehålla koden exekverad i tråden.

Det Runnable objektet skickas till Thread . Och Threads start() -metod kallas.

Exempel

public class HelloRunnable implements Runnable {

    @Override
    public void run() {
        System.out.println("Hello from a thread");
    }

    public static void main(String[] args) {
        new Thread(new HelloRunnable()).start();
    }
}

Exempel på Java8:

public static void main(String[] args) {
    Runnable r = () -> System.out.println("Hello world");
    new Thread(r).start();
}

Runnable vs Thread subclass

En Runnable object-anställning är mer allmän, eftersom det Runnable objektet kan underklassera en annan klass än Thread .

Thread är lättare att använda i enkla applikationer, men begränsas av det faktum att din uppgiftsklass måste vara en ättling till Thread .

Ett Runnable objekt är tillämpligt på API: er på hög nivå för trådhantering.

Semafor

En Semaphore är en synkroniserare på hög nivå som upprätthåller en uppsättning tillstånd som kan förvärvas och släppas av trådar. En semafor kan föreställas som en räknare för tillstånd som kommer att minskas när en tråd förvärvas och ökas när en tråd släpps. Om mängden tillstånd är 0 när en tråd försöker skaffa sig kommer tråden att blockeras tills ett tillstånd är tillgängligt (eller tills tråden avbryts).

En semafor initialiseras som:

Semaphore semaphore = new Semaphore(1); // The int value being the number of permits

Semaphore-konstruktören accepterar en extra boolesk parameter för rättvisa. När den är falsk, garanterar denna klass inga garantier för den ordning i vilka trådar får tillstånd. När rättvisa är satt, garanterar semaforen att trådar som åberopar någon av förvärvningsmetoderna väljs för att erhålla tillstånd i den ordning som deras åkallande av dessa metoder behandlades. Det förklaras på följande sätt:

Semaphore semaphore = new Semaphore(1, true);

Låt oss nu titta på ett exempel från javadocs, där Semaphore används för att kontrollera åtkomsten till en pool av artiklar. En semafor används i detta exempel för att tillhandahålla blockeringsfunktioner för att säkerställa att det alltid finns artiklar att erhålla när getItem() anropas.

class Pool {
    /*
     * Note that this DOES NOT bound the amount that may be released!
     * This is only a starting value for the Semaphore and has no other
     * significant meaning UNLESS you enforce this inside of the
     * getNextAvailableItem() and markAsUnused() methods
     */
    private static final int MAX_AVAILABLE = 100;
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

    /**
     * Obtains the next available item and reduces the permit count by 1. 
     * If there are no items available, block.
     */
    public Object getItem() throws InterruptedException {
        available.acquire();
        return getNextAvailableItem();
    }

    /**
     * Puts the item into the pool and add 1 permit.
     */
    public void putItem(Object x) {
        if (markAsUnused(x))
            available.release();
    }

    private Object getNextAvailableItem() {
        // Implementation
    }

    private boolean markAsUnused(Object o) {
        // Implementation
    }
}

Lägg till två int-matriser med hjälp av en Threadpool

En Threadpool har en kö med uppgifter, varav var och en kommer att köras på en av dessa trådar.

Följande exempel visar hur du lägger till två int arrays med hjälp av en Threadpool.

Java SE 8
int[] firstArray = { 2, 4, 6, 8 };
int[] secondArray = { 1, 3, 5, 7 };
int[] result = { 0, 0, 0, 0 };

ExecutorService pool = Executors.newCachedThreadPool(); 

// Setup the ThreadPool:
// for each element in the array, submit a worker to the pool that adds elements
for (int i = 0; i < result.length; i++) {
    final int worker = i;
    pool.submit(() -> result[worker] = firstArray[worker] + secondArray[worker] );
}

// Wait for all Workers to finish:
try {
    // execute all submitted tasks
    pool.shutdown();
    // waits until all workers finish, or the timeout ends
    pool.awaitTermination(12, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
    pool.shutdownNow(); //kill thread
}

System.out.println(Arrays.toString(result));

Anmärkningar:

  1. Detta exempel är rent illustrativt. I praktiken kommer det inte att finnas någon speedup genom att använda trådar för en så liten uppgift. En avmattning är sannolikt eftersom kostnaderna för skapande och schemaläggning av uppgifter kommer att svänga tiden det tar att köra en uppgift.

  2. Om du använde Java 7 och tidigare, skulle du använda anonyma klasser istället för lambdas för att implementera uppgifterna.

Få status för alla trådar startade av ditt program exklusive systemtrådar

Kodavsnitt:

import java.util.Set;

public class ThreadStatus {
    public static void main(String args[]) throws Exception {
        for (int i = 0; i < 5; i++){
            Thread t = new Thread(new MyThread());
            t.setName("MyThread:" + i);
            t.start();
        }
        int threadCount = 0;
        Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
        for (Thread t : threadSet) {
            if (t.getThreadGroup() == Thread.currentThread().getThreadGroup()) {
                System.out.println("Thread :" + t + ":" + "state:" + t.getState());
                ++threadCount;
            }
        }
        System.out.println("Thread count started by Main thread:" + threadCount);
    }
}

class MyThread implements Runnable {
    public void run() {
        try {
            Thread.sleep(2000);
        } catch(Exception err) {
            err.printStackTrace();
        }
    }
}

Produktion:

Thread :Thread[MyThread:1,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:3,5,main]:state:TIMED_WAITING
Thread :Thread[main,5,main]:state:RUNNABLE
Thread :Thread[MyThread:4,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:0,5,main]:state:TIMED_WAITING
Thread :Thread[MyThread:2,5,main]:state:TIMED_WAITING
Thread count started by Main thread:6

Förklaring:

Thread.getAllStackTraces().keySet() returnerar all Thread s inklusive applikations gängor och system trådar. Om du bara är intresserad av status för trådar, startat av din applikation, uppdaterar du Thread genom att kontrollera trådgrupp för en viss tråd mot din huvudprogramtråd.

I avsaknad av ovanstående ThreadGroup-tillstånd returnerar programmet status under System Threads:

Reference Handler
Signal Dispatcher
Attach Listener
Finalizer

Callable och Future

Medan Runnable tillhandahåller ett sätt att linda in kod som ska köras i en annan tråd, har det en begränsning att den inte kan returnera ett resultat från exekveringen. Det enda sättet att få något returvärde från exekveringen av en Runnable är att tilldela resultatet till en variabel som är tillgänglig inom ett område utanför Runnable .

Callable introducerades i Java 5 som en kamrat till Runnable . Callable är i huvudsak densamma förutom att den har en call istället för att run . call har den extra kapaciteten att returnera ett resultat och får också kasta kontrollerade undantag.

Resultatet från en inlämningsbar uppgiftsinlämning finns tillgänglig för att tappa via en framtid

Future kan betraktas som en container slags som inrymmer resultatet av Callable beräkning. Beräkningen av den utrullningsbara kan fortsätta i en annan tråd, och alla försök att knacka på resultatet av en Future kommer att blockera och kommer bara att returnera resultatet när det är tillgängligt.

Kallbart gränssnitt

public interface Callable<V> {
    V call() throws Exception;
}

Framtida

interface Future<V> {
    V get();
    V get(long timeout, TimeUnit unit);
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
}

Med hjälp av Callable och Future-exempel:

public static void main(String[] args) throws Exception {
    ExecutorService es = Executors.newSingleThreadExecutor();
          
    System.out.println("Time At Task Submission : " + new Date());
    Future<String> result = es.submit(new ComplexCalculator());
    // the call to Future.get() blocks until the result is available.So we are in for about a 10 sec wait now
    System.out.println("Result of Complex Calculation is : " + result.get());
    System.out.println("Time At the Point of Printing the Result : " + new Date());
}

Vår Callable som gör en lång beräkning

public class ComplexCalculator implements Callable<String> {

    @Override
    public String call() throws Exception {
        // just sleep for 10 secs to simulate a lengthy computation
        Thread.sleep(10000);            
        System.out.println("Result after a lengthy 10sec calculation");
        return "Complex Result"; // the result 
    }
}

Produktion

Time At Task Submission : Thu Aug 04 15:05:15 EDT 2016
Result after a lengthy 10sec calculation
Result of Complex Calculation is : Complex Result
Time At the Point of Printing the Result : Thu Aug 04 15:05:25 EDT 2016

Andra åtgärder tillåtna på Future

Medan get() är metoden för att extrahera det verkliga resultatet Future har avsättning

  • get(long timeout, TimeUnit unit) definierar maximal tidsperiod under aktuell tråd kommer att vänta på ett resultat;
  • För att avbryta annulleringen av uppgiften cancel(mayInterruptIfRunning) . Flaggan mayInterrupt indikerar att uppgiften ska avbrytas om den startades och körs just nu;
  • För att kontrollera om uppgiften är klar / avslutad genom att ringa isDone() ;
  • För att kontrollera om den långa uppgiften har avbrutits isCancelled() .

Låser som synkroniseringshjälpmedel

Före Java 5: s samtidiga paketintroduktion var trådarna mer låg nivå. Införandet av detta paket gav flera samtidiga programmeringshjälp / konstruktioner på högre nivå.

Lås är trådsynkroniseringsmekanismer som i huvudsak tjänar samma syfte som synkroniserade block eller nyckelord.

Intrinsic Locking

int count = 0; // shared among multiple threads

public void doSomething() {
    synchronized(this) {
        ++count; // a non-atomic operation
    }
}

Synkronisering med lås

int count = 0; // shared among multiple threads

Lock lockObj = new ReentrantLock();
public void doSomething() {
    try {
        lockObj.lock();
        ++count; // a non-atomic operation
    } finally {    
        lockObj.unlock(); // sure to release the lock without fail
    }
}

Låsar har också funktionalitet tillgängligt som inre låsning inte erbjuder, till exempel låsning men förblir känslig för avbrott eller försök att låsa och inte blockera när det inte går att göra.

Låsning, lyhörd för avbrott

class Locky {
    int count = 0; // shared among multiple threads

    Lock lockObj = new ReentrantLock();

    public void doSomething() {
        try {
            try {
                lockObj.lockInterruptibly();
                ++count; // a non-atomic operation
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // stopping
            }
        } finally {
            if (!Thread.currentThread().isInterrupted()) {
                lockObj.unlock(); // sure to release the lock without fail
            }
        }
    }
}

Gör bara något när du kan låsa

public class Locky2 {
    int count = 0; // shared among multiple threads

    Lock lockObj = new ReentrantLock();

    public void doSomething() {
        boolean locked = lockObj.tryLock(); // returns true upon successful lock
        if (locked) {
            try {
                ++count; // a non-atomic operation
            } finally {
                lockObj.unlock(); // sure to release the lock without fail
            }
        }
    }
}

Det finns flera varianter av lås tillgängliga. Mer information finns i api-dokumenten här



Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow