Oversigt over java.util.concurrent

1. Oversigt

Det java.util.concurrent pakken indeholder værktøjer til oprettelse af samtidige applikationer.

I denne artikel vil vi lave en oversigt over hele pakken.

2. Hovedkomponenter

Det java.util.concurrent indeholder alt for mange funktioner til at diskutere i en enkelt opskrivning. I denne artikel vil vi hovedsageligt fokusere på nogle af de mest nyttige hjælpeprogrammer fra denne pakke som:

  • Eksekutor
  • ExecutorService
  • ScheduledExecutorService
  • Fremtid
  • CountDownLatch
  • Cyklisk barriere
  • Semafor
  • Trådfabrik
  • BlockingQueue
  • DelayQueue
  • Låse
  • Phaser

Du kan også finde mange dedikerede artikler til individuelle klasser her.

2.1. Eksekutor

Eksekutor er en grænseflade, der repræsenterer et objekt, der udfører leverede opgaver.

Det afhænger af den specifikke implementering (hvorfra indkaldelsen startes), hvis opgaven skal køres på en ny eller aktuel tråd. Derfor kan vi ved hjælp af denne grænseflade afkoble strømmen til udførelse af opgave fra den aktuelle udførelsesmekanisme.

Et punkt at bemærke her er, at Eksekutor kræver ikke strengt, at udførelsen af ​​opgaven skal være asynkron. I det enkleste tilfælde kan en eksekutor påberåbe sig den indsendte opgave med det samme i påkaldningstråden.

Vi er nødt til at oprette en invoker for at oprette eksekutorinstansen:

public class Invoker implementerer Executor {@Override public void execute (Runnable r) {r.run (); }}

Nu kan vi bruge denne påkalder til at udføre opgaven.

public void execute () {Executor executor = new Invoker (); executor.execute (() -> {// opgave, der skal udføres}); }

Punkt at bemærke her er, at hvis eksekutøren ikke kan acceptere opgaven til udførelse, vil den kaste RejectedExecutionException.

2.2. ExecutorService

ExecutorService er en komplet løsning til asynkron behandling. Den administrerer en kø i hukommelsen og planlægger indsendte opgaver baseret på trådtilgængelighed.

At bruge ExecutorService, vi er nødt til at oprette en Kan køres klasse.

offentlig klasse Opgave implementerer Runnable {@Override public void run () {// task details}}

Nu kan vi oprette ExecutorService eksempel og tildel denne opgave. På tidspunktet for oprettelsen skal vi specificere trådpuljestørrelsen.

ExecutorService eksekutor = Executors.newFixedThreadPool (10);

Hvis vi vil oprette en enkelt tråd ExecutorService eksempel kan vi bruge newSingleThreadExecutor (ThreadFactory threadFactory) for at oprette forekomsten.

Når eksekutøren er oprettet, kan vi bruge den til at indsende opgaven.

public void execute () {executor.submit (new Task ()); }

Vi kan også oprette Kan køres eksempel, mens opgaven sendes.

executor.submit (() -> {ny opgave ();});

Det leveres også med to out-of-the-box-afslutningsmetoder. Den første er lukke ned(); det venter, indtil alle de indsendte opgaver er udført. Den anden metode er shutdownNow () which afslutter straks alle afventende / udførende opgaver.

Der er også en anden metode awaitTermination (lang timeout, TimeUnit enhed) som kraftigt blokerer, indtil alle opgaver er afsluttet, efter at en nedlukningshændelse er udløst eller eksekvering-timeout opstod, eller selve udførelsestråden afbrydes

prøv {executor.awaitTermination (20l, TimeUnit.NANOSECONDS); } fange (InterruptedException e) {e.printStackTrace (); }

2.3. ScheduledExecutorService

ScheduledExecutorService er en lignende grænseflade til ExecutorService, men det kan udføre opgaver med jævne mellemrum.

Executor og ExecutorService'S metoder er planlagt på stedet uden at indføre nogen kunstig forsinkelse. Nul eller en negativ værdi betyder, at anmodningen skal udføres med det samme.

Vi kan bruge begge dele Kan køres og Kan kaldes interface til at definere opgaven.

public void execute () {ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor (); Fremtidens fremtid = executorService.schedule (() -> {// ... return "Hello world";}, 1, TimeUnit.SECONDS); ScheduledFuture planningFuture = executorService.schedule (() -> {// ...}, 1, TimeUnit.SECONDS); executorService.shutdown (); }

ScheduledExecutorService kan også planlægge opgaven efter en vis fast forsinkelse:

executorService.scheduleAtFixedRate (() -> {// ...}, 1, 10, TimeUnit.SECONDS); executorService.scheduleWithFixedDelay (() -> {// ...}, 1, 10, TimeUnit.SECONDS);

Her, den schedAtFixedRate (kommando, der kan køres, lang initialDelay, lang periode, TimeUnit-enhed) metoden opretter og udfører en periodisk handling, der påberåbes først efter den angivne indledende forsinkelse og derefter med den givne periode, indtil serviceinstansen lukker ned.

Det schedWithFixedDelay (Kommando, der kan køres, lang initialDelay, lang forsinkelse, TimeUnit-enhed) metode opretter og udfører en periodisk handling, der påberåbes først efter den angivne indledende forsinkelse og gentagne gange med den givne forsinkelse mellem afslutningen af ​​den udførende og påkaldelsen af ​​den næste.

2.4. Fremtid

Fremtid bruges til at repræsentere resultatet af en asynkron operation. Den leveres med metoder til at kontrollere, om den asynkrone operation er afsluttet eller ej, få det beregnede resultat osv.

Hvad mere er, den annuller (boolsk mayInterruptIfRunning) API annullerer handlingen og frigiver den udførende tråd. Hvis værdien af mayInterruptIfRunning er sandt, vil den tråd, der udfører opgaven, blive afsluttet med det samme.

Ellers får igangværende opgaver tilladelse til at udføre.

Vi kan bruge nedenstående kodestykke til at oprette en fremtidig forekomst:

public void invoke () {ExecutorService executorService = Executors.newFixedThreadPool (10); Fremtidig fremtid = executorService.submit (() -> {// ... Thread.sleep (10000l); returner "Hej verden";}); }

Vi kan bruge følgende kodestykke til at kontrollere, om det fremtidige resultat er klar og hente data, hvis beregningen er udført:

hvis (future.isDone () &&! future.isCancelled ()) {prøv {str = future.get (); } fange (InterruptedException | ExecutionException e) {e.printStackTrace (); }}

Vi kan også specificere en timeout for en given operation. Hvis opgaven tager mere end denne gang, a TimeoutException kastes:

prøv {future.get (10, TimeUnit.SECONDS); } fange (InterruptedException | ExecutionException | TimeoutException e) {e.printStackTrace (); }

2.5. CountDownLatch

CountDownLatch (introduceret i JDK 5) er en hjælpeklasse, der blokerer et sæt tråde, indtil en eller anden handling er afsluttet.

EN CountDownLatch initialiseres med en tæller (heltal type); denne tæller mindskes, da de afhængige tråde fuldfører udførelsen. Men når tælleren når nul, frigøres andre tråde.

Du kan lære mere om CountDownLatch her.

2.6. Cyklisk barriere

Cyklisk barriere fungerer næsten det samme som CountDownLatch bortset fra at vi kan genbruge det. I modsætning til CountDownLatch, det giver flere tråde mulighed for at vente på hinanden ved hjælp af vente() metode (kendt som barrierebetingelse), før den sidste opgave påberåbes.

Vi er nødt til at oprette en Kan køres opgaveinstans for at indlede barrierebetingelsen:

offentlig klasse Opgaver implementerer Runnable {private CyclicBarrier barriere; offentlig opgave (CyclicBarrier-barriere) {this.barrier = barriere; } @ Override public void run () {prøv {LOG.info (Thread.currentThread (). GetName () + "venter"); barrier.await (); LOG.info (Thread.currentThread (). GetName () + "frigives"); } fange (InterruptedException | BrokenBarrierException e) {e.printStackTrace (); }}}

Nu kan vi påberåbe os nogle tråde for at kæmpe for barrierebetingelsen:

offentlig ugyldig start () {CyclicBarrier cyclicBarrier = new CyclicBarrier (3, () -> {// ... LOG.info ("Alle tidligere opgaver er afsluttet");}); Tråd t1 = ny tråd (ny opgave (cyklisk barriere), "T1"); Tråd t2 = ny tråd (ny opgave (cyklisk barriere), "T2"); Tråd t3 = ny tråd (ny opgave (cyklisk barriere), "T3"); hvis (! cyclicBarrier.isBroken ()) {t1.start (); t2.start (); t3.start (); }}

Her, den Er ødelagt() metode kontrollerer, om nogen af ​​trådene blev afbrudt i løbet af udførelsestiden. Vi skal altid udføre denne kontrol, før vi udfører den aktuelle proces.

2.7. Semafor

Det Semafor bruges til at blokere trådniveauadgang til en del af den fysiske eller logiske ressource. En semafor indeholder et sæt tilladelser; hver gang en tråd forsøger at komme ind i det kritiske afsnit, skal den kontrollere semaforen, om der er en tilladelse eller ej.

Hvis en tilladelse ikke er tilgængelig (via tryAcquire ()), er tråden ikke tilladt at springe ind i det kritiske afsnit; men hvis tilladelsen er tilgængelig, gives adgangen, og tilladelsestælleren falder.

Når den udførende tråd frigiver den kritiske sektion, stiger tilladelsestælleren igen (udført af frigøre() metode).

Vi kan specificere en timeout for at få adgang ved at bruge tryAcquire (lang timeout, TimeUnit enhed) metode.

Vi kan også kontrollere antallet af tilgængelige tilladelser eller antallet af tråde, der venter på at erhverve semaforen.

Følgende kodestykke kan bruges til at implementere en semafor:

statisk semafor semafor = ny semafor (10); offentlig ugyldig udfør () kaster InterruptedException {LOG.info ("Tilgængelig tilladelse:" + semaphore.availablePermits ()); LOG.info ("Antal tråde, der venter på at erhverve:" + semaphore.getQueueLength ()); hvis (semaphore.tryAcquire ()) {prøv {// ...} endelig {semaphore.release (); }}}

Vi kan implementere en Mutex som datastruktur ved hjælp af Semafor. Flere detaljer om dette kan findes her.

2.8. Trådfabrik

Som navnet antyder, Trådfabrik fungerer som en tråd (ikke-eksisterende) pool, der skaber en ny tråd efter behov. Det eliminerer behovet for en masse kedelpladekodning til implementering af effektive tråddannelsesmekanismer.

Vi kan definere en Trådfabrik:

offentlig klasse BaeldungThreadFactory implementerer ThreadFactory {private int threadId; privat strengnavn; offentlig BaeldungThreadFactory (strengnavn) {threadId = 1; dette.navn = navn; } @ Override public Tråd newThread (Runnable r) {Thread t = new Thread (r, name + "-Thread_" + threadId); LOG.info ("oprettet ny tråd med id:" + threadId + "og navn:" + t.getName ()); threadId ++; returnere t; }}

Vi kan bruge dette newThread (Runnable r) metode til at oprette en ny tråd ved kørsel:

BaeldungThreadFactory-fabrik = ny BaeldungThreadFactory ("BaeldungThreadFactory"); for (int i = 0; i <10; i ++) {Thread t = factory.newThread (new Task ()); t.start (); }

2.9. BlockingQueue

I asynkron programmering er producent-forbruger mønster et af de mest almindelige integrationsmønstre. Det java.util.concurrent pakken leveres med en datastruktur kendt som BlockingQueue - hvilket kan være meget nyttigt i disse asynkroniseringsscenarier.

Flere oplysninger og et eksempel på dette findes her.

2.10. DelayQueue

DelayQueue er en uendelig størrelse blokerende kø med elementer, hvor et element kun kan trækkes, hvis dets udløbstid (kendt som brugerdefineret forsinkelse) er afsluttet. Derfor er det øverste element (hoved) vil have den største mængde forsinkelse, og den polles sidst.

Flere oplysninger og et arbejdseksempel herom findes her.

2.11. Låse

Ikke overraskende, Låse er et værktøj til at blokere for andre tråde fra at få adgang til et bestemt kodesegment bortset fra den tråd, der udfører den i øjeblikket.

Hovedforskellen mellem en lås og en synkroniseret blok er, at synkroniseret blok er fuldt indeholdt i en metode; dog kan vi have Lock API's lås () og låse op () i separate metoder.

Flere oplysninger og et eksempel på dette findes her.

2.12. Phaser

Phaser er en mere fleksibel løsning end Cyklisk barriere og CountDownLatch - bruges til at fungere som en genanvendelig barriere, hvor det dynamiske antal tråde skal vente, før den fortsætter udførelsen. Vi kan koordinere flere udførelsesfaser ved at genbruge en Phaser eksempel for hver programfase.

Flere oplysninger og et eksempel på dette findes her.

3. Konklusion

I denne oversigtsartikel på højt niveau har vi fokuseret på de forskellige tilgængelige hjælpeprogrammer java.util.concurrent pakke.

Som altid er den fulde kildekode tilgængelig på GitHub.