Introduktion til trådpuljer i Java

1. Introduktion

Denne artikel er et kig på trådpuljer i Java - startende med de forskellige implementeringer i standard Java-biblioteket og derefter på Googles Guava-bibliotek.

2. Trådpuljen

I Java kortlægges tråde til tråde på systemniveau, der er operativsystemets ressourcer. Hvis du opretter tråde ukontrollabelt, kan du løbe tør for disse ressourcer hurtigt.

Kontekstskiftet mellem tråde udføres også af operativsystemet - for at efterligne parallelitet. En forenklet opfattelse er, at jo flere tråde du gyder, jo mindre tid bruger hver tråd på at arbejde faktisk.

Thread Pool-mønsteret hjælper med at spare ressourcer i en multitrådet applikation og også til at indeholde parallelismen i visse foruddefinerede grænser.

Når du bruger en trådpulje, skal du skriv din samtidige kode i form af parallelle opgaver, og send dem til udførelse til en forekomst af en trådpulje. Denne instans styrer flere genbrugte tråde til udførelse af disse opgaver.

Mønsteret giver dig mulighed for at styr antallet af tråde, som applikationen opretter, deres livscyklus, samt at planlægge opgavernes udførelse og holde indgående opgaver i en kø.

3. Trådpuljer i Java

3.1. Eksekutører, Eksekutor og ExecutorService

Det Eksekutører hjælperklasse indeholder flere metoder til oprettelse af forudkonfigurerede forekomster af trådpuljer til dig. Disse klasser er et godt sted at starte med - brug det, hvis du ikke behøver at anvende brugerdefineret finjustering.

Det Eksekutor og ExecutorService grænseflader bruges til at arbejde med forskellige thread pool-implementeringer i Java. Normalt skal du holde din kode afkoblet fra den faktiske implementering af trådpuljen og brug disse grænseflader i hele din applikation.

Det Eksekutor interface har en enkelt udføre metode til at indsende Kan køres instanser til udførelse.

Her er et hurtigt eksempel hvordan du kan bruge Eksekutører API til at erhverve en Eksekutor forekomst understøttet af en enkelt trådpulje og en ubegrænset kø til udførelse af opgaver sekventielt. Her udfører vi en enkelt opgave, der blot udskriver “Hej Verden”På skærmen. Opgaven sendes som en lambda (en Java 8-funktion), som det antages at være Kan køres.

Executor executor = Executors.newSingleThreadExecutor (); executor.execute (() -> System.out.println ("Hello World"));

Det ExecutorService interface indeholder et stort antal metoder til kontrol af fremskridtene i opgaverne og styring af afslutningen af ​​tjenesten. Ved hjælp af denne grænseflade kan du indsende opgaverne til udførelse og også kontrollere udførelsen ved hjælp af den returnerede Fremtid eksempel.

I det følgende eksempel, opretter vi en ExecutorService, indsend en opgave, og brug derefter den returnerede Fremtid'S metode til at vente, indtil den indsendte opgave er færdig, og værdien returneres:

ExecutorService executorService = Executors.newFixedThreadPool (10); Fremtidig fremtid = executorService.submit (() -> "Hej verden"); // nogle operationer String result = future.get ();

I et virkeligt scenarie vil du naturligvis ikke ringe future.get () med det samme, men udsæt kalde det, indtil du rent faktisk har brug for værdien af ​​beregningen.

Det Indsend metoden er overbelastet for at tage enten Kan køres eller Kan kaldes begge er funktionelle grænseflader og kan sendes som lambdas (startende med Java 8).

Kan køres'S enkelt metode kaster ikke en undtagelse og returnerer ikke en værdi. Det Kan kaldes interface kan være mere praktisk, da det giver os mulighed for at kaste en undtagelse og returnere en værdi.

Endelig - for at lade kompilatoren udlede Kan kaldes type, skal du blot returnere en værdi fra lambda.

For flere eksempler på brug af ExecutorService interface og futures, se på "En guide til Java ExecutorService".

3.2. ThreadPoolExecutor

Det ThreadPoolExecutor er en udvidelig trådpoolimplementering med mange parametre og kroge til finjustering.

De vigtigste konfigurationsparametre, som vi diskuterer her, er: corePoolSize, maximumPoolSizeog keepAliveTime.

Puljen består af et fast antal kernetråde, der holdes inde hele tiden, og nogle overdrevne tråde, der muligvis er gyde og derefter afsluttes, når de ikke længere er nødvendige. Det corePoolSize parameter er antallet af kernetråde, der bliver instantieret og opbevaret i puljen. Når en ny opgave kommer ind, hvis alle kernetråde er optaget, og den interne kø er fuld, får puljen lov til at vokse op til maximumPoolSize.

Det keepAliveTime parameter er det tidsinterval, for hvilket de overdrevne tråde (instantieret ud over corePoolSize) får lov til at eksistere i inaktiv tilstand. Som standard er ThreadPoolExecutor betragter kun tråde, der ikke er kerner, til fjernelse. For at anvende den samme fjernelsespolitik på kernetråde kan vi bruge allowCoreThreadTimeOut (sand) metode.

Disse parametre dækker en lang række brugssager, men de mest typiske konfigurationer er foruddefineret i Eksekutører statiske metoder.

For eksempel, newFixedThreadPool metode skaber en ThreadPoolExecutor med lige corePoolSize og maximumPoolSize parameterværdier og et nul keepAliveTime. Dette betyder, at antallet af tråde i denne trådpulje altid er det samme:

ThreadPoolExecutor eksekutor = (ThreadPoolExecutor) Executors.newFixedThreadPool (2); executor.submit (() -> {Thread.sleep (1000); return null;}); executor.submit (() -> {Thread.sleep (1000); return null;}); executor.submit (() -> {Thread.sleep (1000); return null;}); assertEquals (2, executor.getPoolSize ()); assertEquals (1, executor.getQueue (). størrelse ());

I eksemplet ovenfor instantierer vi a ThreadPoolExecutor med et fast trådantal på 2. Dette betyder, at hvis antallet af samtidigt kørende opgaver til enhver tid er mindre eller lig med to, så bliver de udført med det samme. Ellers, nogle af disse opgaver kan placeres i en kø for at vente på deres tur.

Vi skabte tre Kan kaldes opgaver, der efterligner tungt arbejde ved at sove i 1000 millisekunder. De første to opgaver udføres på én gang, og den tredje skal vente i køen. Vi kan bekræfte det ved at ringe til getPoolSize () og getQueue (). størrelse () metoder straks efter indsendelse af opgaverne.

En anden forudkonfigureret ThreadPoolExecutor kan oprettes med Executors.newCachedThreadPool () metode. Denne metode modtager slet ikke et antal tråde. Det corePoolSize er faktisk indstillet til 0, og maximumPoolSize er indstillet til Heltal.MAX_VALUE i dette tilfælde. Det keepAliveTime er 60 sekunder for denne.

Disse parameterværdier betyder det Den cachelagrede trådpulje kan vokse uden grænser for at imødekomme et antal indsendte opgaver. Men når trådene ikke længere er nødvendige, bortskaffes de efter 60 sekunders inaktivitet. En typisk brugssag er, når du har mange kortvarige opgaver i din applikation.

ThreadPoolExecutor eksekutor = (ThreadPoolExecutor) Executors.newCachedThreadPool (); executor.submit (() -> {Thread.sleep (1000); return null;}); executor.submit (() -> {Thread.sleep (1000); return null;}); executor.submit (() -> {Thread.sleep (1000); return null;}); assertEquals (3, executor.getPoolSize ()); assertEquals (0, executor.getQueue (). størrelse ());

Køstørrelsen i eksemplet ovenfor vil altid være nul, fordi internt a SynchronousQueue eksempel anvendes. I en SynchronousQueue, par af indsæt og fjerne operationer forekommer altid samtidigt, så køen faktisk aldrig indeholder noget.

Det Executors.newSingleThreadExecutor () API opretter en anden typisk form for ThreadPoolExecutor indeholdende en enkelt tråd. Den enkelte trådudførelse er ideel til at oprette en begivenhedssløjfe. Det corePoolSize og maximumPoolSize parametre er lig med 1, og keepAliveTime er nul.

Opgaver i ovenstående eksempel udføres sekventielt, så flagværdien vil være 2 efter opgavens afslutning:

AtomicInteger-tæller = nyt AtomicInteger (); ExecutorService eksekutor = Executors.newSingleThreadExecutor (); executor.submit (() -> {counter.set (1);}); executor.submit (() -> {counter.compareAndSet (1, 2);});

Derudover dette ThreadPoolExecutor er dekoreret med en uforanderlig indpakning, så den kan ikke omkonfigureres efter oprettelsen. Bemærk, at også dette er grunden til, at vi ikke kan kaste det til en ThreadPoolExecutor.

3.3. ScheduledThreadPoolExecutor

Det ScheduledThreadPoolExecutor udvider ThreadPoolExecutor klasse og implementerer også ScheduledExecutorService interface med flere yderligere metoder:

  • tidsplan metode tillader at udføre en opgave en gang efter en specificeret forsinkelse;
  • scheduleAtFixedRate metode tillader at udføre en opgave efter en specificeret indledende forsinkelse og derefter udføre den gentagne gange med en bestemt periode; det periode argumentet er tiden målt mellem starttidspunktet for opgaverne, så udførelsesgraden er fast;
  • scheduleWithFixedDelay metoden ligner scheduleAtFixedRate ved at den gentagne gange udfører den givne opgave, men den angivne forsinkelse er målt mellem slutningen af ​​den forrige opgave og starten af ​​den næste; udførelsesgraden kan variere afhængigt af den tid det tager at udføre en given opgave.

Det Executors.newScheduledThreadPool () metode bruges typisk til at oprette en ScheduledThreadPoolExecutor med et givet corePoolSize, ubegrænset maximumPoolSize og nul keepAliveTime. Sådan planlægger du en opgave til udførelse i 500 millisekunder:

ScheduledExecutorService executor = Executors.newScheduledThreadPool (5); executor.schedule (() -> {System.out.println ("Hello World");}, 500, TimeUnit.MILLISECONDS);

Den følgende kode viser, hvordan man udfører en opgave efter 500 millisekunders forsinkelse og gentager den derefter hver 100 millisekund. Efter planlægning af opgaven venter vi, indtil den udløses tre gange ved hjælp af CountDownLatch låse, annuller det derefter ved hjælp af Future.cancel () metode.

CountDownLatch-lås = ny CountDownLatch (3); ScheduledExecutorService executor = Executors.newScheduledThreadPool (5); ScheduledFuture future = executor.scheduleAtFixedRate (() -> {System.out.println ("Hello World"); lock.countDown ();}, 500, 100, TimeUnit.MILLISECONDS); lock.await (1000, TimeUnit.MILLISECONDS); future.cancel (sand);

3.4. ForkJoinPool

ForkJoinPool er den centrale del af fork / join framework introduceret i Java 7. Det løser et almindeligt problem med gydning af flere opgaver i rekursive algoritmer. Brug af en simpel ThreadPoolExecutor, du løber tør for tråde hurtigt, da hver opgave eller underopgave kræver sin egen tråd for at køre.

I en fork / join ramme, kan enhver opgave gyde (gaffel) et antal underopgaver, og vent på, at de er afsluttet ved hjælp af tilslutte metode. Fordelen ved fork / join rammen er, at det opretter ikke en ny tråd til hver opgave eller underopgaveimplementerer i stedet Work Stealing-algoritmen. Denne ramme er grundigt beskrevet i artiklen “Guide to the Fork / Join Framework in Java”

Lad os se på et simpelt eksempel på brug ForkJoinPool at krydse et knudetræ og beregne summen af ​​alle bladværdier. Her er en simpel implementering af et træ bestående af en node, en int værdi og et sæt underordnede noder:

statisk klasse TreeNode {int-værdi; Sæt børn; TreeNode (int-værdi, TreeNode ... børn) {this.value = værdi; this.children = Sets.newHashSet (børn); }}

Hvis vi nu vil sammenfatte alle værdier i et træ parallelt, skal vi implementere en Rekursiv opgave interface. Hver opgave modtager sin egen node og tilføjer sin værdi til summen af ​​værdierne for dens børn. For at beregne summen af børn værdier, opgaveimplementering gør følgende:

  • strømmer børn sæt,
  • kort over denne strøm, hvilket skaber en ny CountingTask for hvert element,
  • udfører hver delopgave ved at gafle den,
  • indsamler resultaterne ved at ringe til tilslutte metode til hver forked opgave,
  • opsummerer resultaterne ved hjælp af Collectors.summingInt samler.
offentlig statisk klasse CountingTask udvider RecursiveTask {private final TreeNode node; public CountingTask (TreeNode node) {this.node = node; } @ Override beskyttet heltal beregning () {return node.value + node.children.stream () .map (childNode -> new CountingTask (childNode) .fork ()) .collect (Collectors.summingInt (ForkJoinTask :: join)) ; }}

Koden til at køre beregningen på et faktisk træ er meget enkel:

TreeNode-træ = nyt TreeNode (5, nyt TreeNode (3), nyt TreeNode (2, nyt TreeNode (2), nyt TreeNode (8))); ForkJoinPool forkJoinPool = ForkJoinPool.commonPool (); int sum = forkJoinPool.invoke (ny CountingTask (træ));

4. Thread Pools implementering i Guava

Guava er et populært Google-bibliotek med hjælpeprogrammer. Det har mange nyttige samtidighedsklasser, herunder flere praktiske implementeringer af ExecutorService. Implementeringsklasser er ikke tilgængelige til direkte instantiering eller underklassering, så det eneste indgangspunkt for oprettelse af deres forekomster er Mere eksekutører hjælperklasse.

4.1. Tilføjelse af Guava som en Maven-afhængighed

Føj følgende afhængighed til din Maven pom-fil for at inkludere Guava-biblioteket i dit projekt. Du kan finde den nyeste version af Guava-biblioteket i Maven Central-arkivet:

 com.google.guava guava 19.0 

4.2. Direkte eksekutør og direkte eksekutortjeneste

Nogle gange vil du udføre opgaven enten i den aktuelle tråd eller i en trådpulje afhængigt af nogle forhold. Du foretrækker at bruge en enkelt Eksekutor interface og bare skifte implementeringen. Selvom det ikke er så svært at komme med en implementering af Eksekutor eller ExecutorService der udfører opgaverne i den aktuelle tråd, kræver det stadig at skrive noget kedelpladekode.

Med glæde leverer Guava foruddefinerede forekomster til os.

Her er et eksempel der demonstrerer udførelsen af ​​en opgave i samme tråd. Selvom den medfølgende opgave sover i 500 millisekunder, er den blokerer den aktuelle tråd, og resultatet er tilgængeligt umiddelbart efter udføre opkaldet er afsluttet:

Executor executor = MoreExecutors.directExecutor (); AtomicBoolean udført = ny AtomicBoolean (); executor.execute (() -> {prøv {Thread.sleep (500);} fange (InterruptedException e) {e.printStackTrace ();} executed.set (true);}); assertTrue (executed.get ());

Instansen returneret af directExecutor () metoden er faktisk en statisk singleton, så brug af denne metode giver overhovedet ingen omkostninger ved oprettelse af objekt.

Du foretrækker denne metode frem for MoreExecutors.newDirectExecutorService () fordi den API skaber en fuldgyldig implementering af eksekutortjeneste ved hvert opkald.

4.3. Afslutte eksekutortjenester

Et andet almindeligt problem er nedlukning af den virtuelle maskine mens en trådpul stadig kører sine opgaver. Selv med en annulleringsmekanisme på plads er der ingen garanti for, at opgaverne opfører sig pænt og stopper deres arbejde, når eksekutortjenesten lukker ned. Dette kan få JVM til at hænge på ubestemt tid, mens opgaverne fortsætter med at udføre deres arbejde.

For at løse dette problem introducerer Guava en familie af spændende eksekutortjenester. De er baseret på dæmontråde, der afsluttes sammen med JVM.

Disse tjenester tilføjer også en lukningskrog med Runtime.getRuntime (). AddShutdownHook () metode og forhindre, at den virtuelle maskine afsluttes i en konfigureret tid, før den opgiver hang-opgaver.

I det følgende eksempel sender vi den opgave, der indeholder en uendelig løkke, men vi bruger en spændende eksekutortjeneste med en konfigureret tid på 100 millisekunder til at vente på opgaverne efter VM-afslutning. Uden exitingExecutorService på plads, ville denne opgave få VM til at hænge på ubestemt tid:

ThreadPoolExecutor eksekutor = (ThreadPoolExecutor) Executors.newFixedThreadPool (5); ExecutorService executorService = MoreExecutors.getExitingExecutorService (eksekutor, 100, TimeUnit.MILLISECONDS); executorService.submit (() -> {while (true) {}});

4.4. Lytter dekoratører

Lytter dekoratører giver dig mulighed for at pakke ExecutorService og modtage ListenableFuture forekomster ved indsendelse af opgave i stedet for enkle Fremtid tilfælde. Det ListenableFuture interface udvides Fremtid og har en enkelt yderligere metode addListener. Denne metode giver mulighed for at tilføje en lytter, der kaldes til fremtidig afslutning.

Du vil sjældent ønske at bruge ListenableFuture.addListener () metode direkte, men det er det afgørende for de fleste af hjælpemetoderne i Fremtid utility klasse. For eksempel med Futures.allAsList () metode kan du kombinere flere ListenableFuture forekomster i en enkelt ListenableFuture der fuldføres efter en vellykket afslutning af alle futures kombineret:

ExecutorService executorService = Executors.newCachedThreadPool (); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator (executorService); ListenableFuture future1 = listeningExecutorService.submit (() -> "Hej"); ListenableFuture future2 = listeningExecutorService.submit (() -> "Verden"); Stringhilsen = Futures.allAsList (future1, future2) .get () .stream () .collect (Collectors.joining ("")); assertEquals ("Hello World", hilsen);

5. Konklusion

I denne artikel har vi diskuteret Thread Pool-mønsteret og dets implementeringer i standard Java-biblioteket og i Googles Guava-bibliotek.

Kildekoden til artiklen er tilgængelig på GitHub.