Vejledning til Java Parallel Collectors Library

1. Introduktion

Parallel-samlere er et lille bibliotek, der giver et sæt Java Stream API-samlere, der muliggør parallel behandling - samtidig med at de vigtigste mangler ved standard Parallel Streams omgås.

2. Maven-afhængigheder

Hvis vi vil begynde at bruge biblioteket, skal vi tilføje en enkelt post i Maven's pom.xml fil:

 com.pivovarit parallel-samlere 1.1.0 

Eller en enkelt linje i Gradles build-fil:

kompilér 'com.pivovarit: parallel-samlere: 1.1.0'

Den nyeste version kan findes på Maven Central.

3. Parallelle strømme forbehold

Parallelle streams var et af Java 8s højdepunkter, men de viste sig kun at være anvendelige til tung CPU-behandling.

Årsagen til dette var det faktum, at Parallelle streams blev internt bakket op af en JVM-delt delt ForkJoinPool, som gav begrænset parallelitet og blev brugt af alle Parallel Streams, der kører på en enkelt JVM-forekomst.

Forestil dig for eksempel, at vi har en liste over id'er, og vi vil bruge dem til at hente en liste over brugere, og at denne handling er dyr.

Vi kunne bruge Parallel Streams til det:

Liste-id'er = Arrays.asList (1, 2, 3); Listeresultater = ids.parallelStream () .map (i -> fetchById (i)) // hver operation tager et sekund .collect (Collectors.toList ()); System.out.println (resultater); // [bruger-1, bruger-2, bruger-3]

Og faktisk kan vi se, at der er en mærkbar hastighed. Men det bliver problematisk, hvis vi begynder at køre flere parallelle blokeringsoperationer ... parallelt. Dette kan hurtigt mætte poolen og resultere i potentielt enorme ventetid. Derfor er det vigtigt at oprette skotter ved at oprette separate trådpuljer - for at forhindre, at ikke-relaterede opgaver påvirker hinandens udførelse.

For at give en brugerdefineret ForkJoinPool For eksempel kunne vi udnytte det trick, der er beskrevet her, men denne tilgang var afhængig af et udokumenteret hack og var defekt indtil JDK10. Vi kan læse mere i selve udgaven - [JDK8190974].

4. Parallelle samlere i aktion

Parallelle samlere er, som navnet antyder, bare standard Stream API-samlere, der tillader udførelse af yderligere operationer parallelt kl indsamle() fase.

ParallelCollectors (som spejler Samlere klasse) klasse er en facade, der giver adgang til hele bibliotekets funktionalitet.

Hvis vi ville foretage ovenstående eksempel, kunne vi bare skrive:

ExecutorService eksekutor = Executors.newFixedThreadPool (10); Liste-id'er = Arrays.asList (1, 2, 3); Fuldført resultater = ids.stream () .collect (ParallelCollectors.parallelToList (i -> fetchById (i), eksekutor, 4)); System.out.println (results.join ()); // [bruger-1, bruger-2, bruger-3]

Resultatet er det samme, dog vi var i stand til at levere vores brugerdefinerede trådpulje, specificere vores tilpassede parallelismeniveau, og resultatet ankom pakket ind i en Fuldført forekomst uden at blokere den aktuelle tråd.

Standard Parallel Streams kunne derimod ikke opnå nogen af ​​dem.

4.1. ParallelCollectors.parallelToList / ToSet ()

Så intuitivt som det bliver, hvis vi vil behandle en Strøm parallelt og indsamle resultater i en Liste eller Sæt, kan vi simpelthen bruge ParallelCollectors.parallelToList eller parallelToSet:

Liste-id'er = Arrays.asList (1, 2, 3); Listeresultater = ids.stream () .collect (parallelToList (i -> fetchById (i), eksekutor, 4)) .join ();

4.2. ParallelCollectors.parallelToMap ()

Hvis vi ønsker at indsamle Strøm elementer i en Kort eksempel, ligesom med Stream API, er vi nødt til at levere to mappers:

Liste-id'er = Arrays.asList (1, 2, 3); Kortresultater = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), eksekutor, 4)). Sammenføjning (); // {1 = bruger-1, 2 = bruger-2, 3 = bruger-3}

Vi kan også levere en brugerdefineret Kort eksempel Leverandør:

Kortresultater = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), TreeMap :: new, executor, 4)) .join (); 

Og en tilpasset strategi til konfliktløsning:

Liste-id'er = Arrays.asList (1, 2, 3); Kortresultater = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), TreeMap :: new, (s1, s2) -> s1, executor, 4)) .join ();

4.3. ParallelCollectors.parallelToCollection ()

På samme måde som ovenstående kan vi passere vores skik Indsamlingsleverandør hvis vi ønsker at opnå resultater pakket i vores brugerdefinerede container:

Listeresultater = ids.stream () .collect (parallelToCollection (i -> fetchById (i), LinkedList :: new, executor, 4)) .join ();

4.4. ParallelCollectors.parallelToStream ()

Hvis ovenstående ikke er nok, kan vi faktisk få en Strøm eksempel og fortsæt brugerdefineret behandling der:

Kort resultater = ids.stream () .collect (parallelToStream (i -> fetchById (i), eksekutor, 4)). derefter anvende (stream -> stream.collect (Collectors.groupingBy (i -> i.length ()))) .tilslutte();

4.5. ParallelCollectors.parallel ()

Denne giver os mulighed for at streame resultater i afslutningsrækkefølge:

ids.stream () .collect (parallel (i -> fetchByIdWithRandomDelay (i), eksekutor, 4)) .forEach (System.out :: println); // bruger-1 // bruger-3 // bruger-2 

I dette tilfælde kan vi forvente, at samleren returnerer forskellige resultater hver gang, da vi introducerede en tilfældig behandlingsforsinkelse.

4.6. ParallelCollectors.parallelOrdered ()

Denne facilitet tillader streaming-resultater ligesom ovenstående, men opretholder den oprindelige rækkefølge:

ids.stream () .collect (parallelOrdered (i -> fetchByIdWithRandomDelay (i), eksekutor, 4)) .forEach (System.out :: println); // bruger-1 // bruger-2 // bruger-3 

I dette tilfælde opretholder samleren altid ordren, men kan være langsommere end ovenstående.

5. Begrænsninger

I skrivende stund parallel-samlere fungerer ikke med uendelige streams selvom der bruges kortslutningsoperationer - det er en designbegrænsning pålagt af Stream API-interner. Kort fortalt, Strøms behandler samlere som ikke-kortslutningsoperationer, så strømmen skal behandle alle opstrømselementer, inden den afsluttes.

Den anden begrænsning er den kortslutningsoperationer afbryder ikke de resterende opgaver efter kortslutning.

6. Konklusion

Vi så, hvordan parallelsamlerbiblioteket giver os mulighed for at udføre parallel behandling ved hjælp af brugerdefineret Java Stream API Samlere og CompletableFutures at bruge brugerdefinerede trådpuljer, parallelisme og ikke-blokerende stil af CompletableFutures.

Som altid er kodeuddrag tilgængelige på GitHub.

For yderligere læsning, se parallel-samlerbiblioteket på GitHub, forfatterens blog og forfatterens Twitter-konto.