Vejledning til Fork / Join Framework i Java

1. Oversigt

Gaffel / sammenføjningsrammen blev præsenteret i Java 7. Det giver værktøjer, der hjælper med at fremskynde parallel behandling ved at forsøge at bruge alle tilgængelige processorkerner - hvilket er opnået gennem en opdeling og erobring tilgang.

I praksis betyder det, at rammen første “gafler”, rekursivt opdele opgaven i mindre uafhængige underopgaver, indtil de er enkle nok til at blive udført asynkront.

Efter det, "join" -delen begynder, hvor resultaterne af alle underopgaver sammenføjes rekursivt til et enkelt resultat, eller i tilfælde af en opgave, der returnerer ugyldig, venter programmet simpelthen, indtil hver delopgave udføres.

For at give effektiv parallel udførelse bruger fork / join-rammen en pulje af tråde kaldet ForkJoinPool, der administrerer arbejdstråde af typen ForkJoinWorkerThread.

2. ForkJoinPool

Det ForkJoinPool er hjertet i rammen. Det er en implementering af ExecutorService der administrerer arbejdertråde og giver os værktøjer til at få oplysninger om trådpuljens tilstand og ydeevne.

Arbejdstråde kan kun udføre en opgave ad gangen, men ForkJoinPool opretter ikke en separat tråd for hver enkelt underopgave. I stedet har hver tråd i puljen sin egen kø med dobbelt ende (eller deque, udtalt dæk) som gemmer opgaver.

Denne arkitektur er afgørende for at afbalancere trådens arbejdsbyrde ved hjælp af arbejde-stjæle algoritme.

2.1. Arbejd stjæle algoritme

Kort sagt - gratis tråde prøver at "stjæle" arbejde fra deques af travle tråde.

Som standard får en arbejdstråd opgaver fra lederen af ​​sin egen deque. Når den er tom, tager tråden en opgave fra halen til en anden optaget tråds hale eller fra den globale adgangskø, da det er her, de største stykker arbejde sandsynligvis vil blive placeret.

Denne tilgang minimerer muligheden for, at tråde konkurrerer om opgaver. Det reducerer også antallet af gange, tråden bliver nødt til at gå på udkig efter arbejde, da det fungerer på de største tilgængelige klumper af arbejde først.

2.2. ForkJoinPool Instantiering

I Java 8, den mest bekvemme måde at få adgang til forekomsten af ForkJoinPool er at bruge sin statiske metode commonPool (). Som navnet antyder, vil dette give en henvisning til den fælles pool, som er en standard trådpool for hver ForkJoinTask.

Ifølge Oracle's dokumentation reducerer brugen af ​​den foruddefinerede fælles pool ressourceforbruget, da dette afskrækker oprettelsen af ​​en separat trådpool pr. Opgave.

ForkJoinPool commonPool = ForkJoinPool.commonPool ();

Den samme adfærd kan opnås i Java 7 ved at oprette en ForkJoinPool og tildele det til en offentlig statisk felt i en hjælpeklasse:

offentlig statisk ForkJoinPool forkJoinPool = ny ForkJoinPool (2);

Nu er det let tilgængeligt:

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

Med ForkJoinPool's konstruktører, er det muligt at oprette en brugerdefineret trådpulje med et specifikt niveau af parallelisme, trådfabrik og undtagelsesbehandler. I eksemplet ovenfor har poolen et parallelitetsniveau på 2. Dette betyder, at poolen vil bruge 2 processorkerner.

3. ForkJoinTask

ForkJoinTask er basistypen for opgaver, der udføres indeni ForkJoinPool. I praksis bør en af ​​dens to underklasser udvides: Rekursiv handling til ugyldig opgaver og Rekursiv opgave til opgaver, der returnerer en værdi.De har begge en abstrakt metode beregne () hvor opgavens logik er defineret.

3.1. RecursiveAction - et eksempel

I eksemplet nedenfor er den enhed, der skal behandles, repræsenteret af a Snor hedder arbejdsbyrde. Til demonstrationsformål er opgaven en meningsløs opgave: den bogstaver simpelthen sin input og logger den.

For at demonstrere rammens gaffeladfærd, eksemplet deler opgaven, hvis arbejdsbyrde.længde () er større end en specificeret tærskelbruger createSubtask () metode.

Strengen er rekursivt opdelt i understrenge, hvilket skaber CustomRecursiveTask forekomster, der er baseret på disse underlag.

Som et resultat returnerer metoden a Liste.

Listen sendes til ForkJoinPool bruger påkaldeAlle () metode:

offentlig klasse CustomRecursiveAction udvider RecursiveAction {private String workload = ""; privat statisk endelig int THRESHOLD = 4; privat statisk loggerlogger = Logger.getAnonymousLogger (); public CustomRecursiveAction (String workload) {this.workload = workload; } @ Override beskyttet ugyldig beregning () {if (workload.length ()> THRESHOLD) {ForkJoinTask.invokeAll (createSubtasks ()); } andet {behandling (arbejdsbelastning); }} privat liste createSubtasks () {List subtasks = new ArrayList (); String partOne = workload.substring (0, workload.length () / 2); String partTwo = workload.substring (workload.length () / 2, workload.length ()); subtasks.add (ny CustomRecursiveAction (partOne)); subtasks.add (ny CustomRecursiveAction (partTwo)); returnere underopgaver } privat ugyldig behandling (String-arbejde) {String-resultat = work.toUpperCase (); logger.info ("Dette resultat - (" + resultat + ") - blev behandlet af" + Thread.currentThread (). getName ()); }}

Dette mønster kan bruges til at udvikle dit eget Rekursiv handling klasser. For at gøre dette skal du oprette et objekt, der repræsenterer den samlede mængde arbejde, vælge en passende tærskel, definere en metode til at opdele arbejdet og definere en metode til at udføre arbejdet.

3.2. Rekursiv opgave

For opgaver, der returnerer en værdi, er logikken her ens, bortset fra at resultatet for hver delopgave er samlet i et enkelt resultat:

offentlig klasse CustomRecursiveTask udvider RecursiveTask {private int [] arr; privat statisk endelig int THRESHOLD = 20; offentlig CustomRecursiveTask (int [] arr) {this.arr = arr; } @ Override-beskyttet heltal beregning () {if (arr.length> THRESHOLD) {return ForkJoinTask.invokeAll (createSubtasks ()) .stream () .mapToInt (ForkJoinTask :: join) .sum (); } andet {returbehandling (arr); }} privat samling createSubtasks () {List divisionTasks = new ArrayList (); divisionTasks.add (ny CustomRecursiveTask (Arrays.copyOfRange (arr, 0, arr. længde / 2))); divisionTasks.add (ny CustomRecursiveTask (Arrays.copyOfRange (arr, arr. længde / 2, arr. længde))); returnerede delte opgaver } privat Heltalsbehandling (int [] arr) {return Arrays.stream (arr) .filter (a -> a> 10 && a a * 10) .sum (); }}

I dette eksempel er arbejdet repræsenteret af en matrix gemt i arr felt i CustomRecursiveTask klasse. Det createSubtasks () metode opdeler rekursivt opgaven i mindre stykker arbejde, indtil hvert stykke er mindre end tærsklen. Derefter påkaldeAlle () metoden sender underopgaverne til den fælles pool og returnerer en liste over Fremtid.

For at udløse eksekvering, tilslutte() metode kaldes for hver delopgave.

I dette eksempel opnås dette ved hjælp af Java 8'er Stream API; det sum() metode bruges som en repræsentation af at kombinere underresultater i det endelige resultat.

4. Afsendelse af opgaver til ForkJoinPool

For at indsende opgaver til trådpuljen kan få tilgange bruges.

Det Indsend() eller udføre ()metode (deres brugstilfælde er de samme):

forkJoinPool.execute (customRecursiveTask); int resultat = customRecursiveTask.join ();

Det påberåbe sig ()metode forkaster opgaven og venter på resultatet og behøver ikke nogen manuel sammenføjning:

int resultat = forkJoinPool.invoke (customRecursiveTask);

Det påkaldeAlle () metode er den mest bekvemme måde at indsende en sekvens af ForkJoinTasks til ForkJoinPool. Det tager opgaver som parametre (to opgaver, var args eller en samling), gafler returnerer derefter en samling af Fremtid genstande i den rækkefølge, de blev produceret i.

Alternativt kan du bruge separat gaffel() og tilslutte() metoder. Det gaffel() metoden sender en opgave til en pool, men den udløser ikke dens udførelse. Det tilslutte() metoden skal bruges til dette formål. I tilfælde af Rekursiv handling, det tilslutte() returnerer intet andet end nul; til Rekursiv opgave, det returnerer resultatet af opgavens udførelse:

customRecursiveTaskFirst.fork (); resultat = customRecursiveTaskLast.join ();

I vores Rekursiv opgave eksempel brugte vi påkaldeAlle () metode til at sende en sekvens af underopgaver til puljen. Det samme job kan udføres med gaffel() og tilslutte(), selvom dette har konsekvenser for rækkefølgen af ​​resultaterne.

For at undgå forvirring er det generelt en god ide at bruge påkaldeAlle () metode til at sende mere end en opgave til ForkJoinPool.

5. Konklusioner

Brug af fork / join-rammen kan fremskynde behandlingen af ​​store opgaver, men for at opnå dette resultat bør nogle retningslinjer følges:

  • Brug så få trådpuljer som muligt - i de fleste tilfælde er den bedste beslutning at bruge en trådpulje pr. Applikation eller system
  • Brug standard fælles trådpulje, hvis ingen specifik indstilling er nødvendig
  • Brug en rimelig tærskel til opdeling ForkJoinTask ind i underopgaver
  • Undgå blokering i dinForkJoinTasks

Eksemplerne brugt i denne artikel er tilgængelige i det linkede GitHub-arkiv.


$config[zx-auto] not found$config[zx-overlay] not found