Guide til Akka-vandløb

1. Oversigt

I denne artikel vil vi se på akka-streams bibliotek, der er bygget oven på Akka-skuespillerammen, der overholder manifestet for reaktive strømme. Akka Streams API giver os mulighed for nemt at komponere datatransformationsstrømme fra uafhængige trin.

Desuden udføres al behandling på en reaktiv, ikke-blokerende og asynkron måde.

2. Maven-afhængigheder

For at komme i gang skal vi tilføje akka-stream og akka-stream-testkit biblioteker i vores pom.xml:

 com.typesafe.akka akka-stream_2.11 2.5.2 com.typesafe.akka akka-stream-testkit_2.11 2.5.2 

3. Akka Streams API

For at arbejde med Akka Streams skal vi være opmærksomme på de centrale API-koncepter:

  • Kilde - indgangspunktet til behandling i akka-stream bibliotek - vi kan oprette en forekomst af denne klasse fra flere kilder; for eksempel kan vi bruge enkelt() metode, hvis vi vil oprette en Kilde fra en enkelt Snor, eller vi kan oprette en Kilde fra en Iterabel af elementer
  • Flyde - den vigtigste byggesten - hver Flyde forekomst har en indgang og en udgangsværdi
  • Materializer - mede kan bruge en, hvis vi vil have vores Flyde at have nogle bivirkninger som at logge eller gemme resultater; oftest passerer vi Anvendes ikke alias som en Materializer at betegne, at vores Flyde bør ikke have nogen bivirkninger
  • Håndvask drift - når vi bygger en Flyde, den udføres først, før vi registrerer en Håndvask operation på det - det er en terminaloperation, der udløser alle beregninger i hele Flyde

4. Oprettelse Strømme i Akka Streams

Lad os starte med at opbygge et simpelt eksempel, hvor vi viser, hvordan vi gør det oprette og kombinere flere Flydes - at behandle en strøm af heltal og beregne det gennemsnitlige bevægelsesvindue af heltalspar fra strømmen.

Vi analyserer en semikolon-afgrænset Snor af heltal som input for at skabe vores akka-stream Kilde for eksemplet.

4.1. Brug af en Flyde til Parse Input

Lad os først oprette en DataImporter klasse, der tager en instans af ActorSystem som vi senere vil bruge til at skabe vores Flyde:

offentlig klasse DataImporter {privat ActorSystem actorSystem; // standardkonstruktører, getters ...}

Lad os derefter oprette en parseLine metode, der genererer en Liste af Heltal fra vores afgrænsede input Snor. Husk, at vi kun bruger Java Stream API her til parsing:

privat liste parseLine (String line) {String [] fields = line.split (";"); returnere Arrays.stream (felter) .map (Heltal :: parseInt) .collect (Collectors.toList ()); }

Vores første Flyde vil gælde parseLine til vores input for at oprette en Flyde med inputtype Snor og output type Heltal:

private Flow parseContent () {return Flow.of (String.class) .mapConcat (dette :: parseLine); }

Når vi kalder parseLine () metode ved compileren, at argumentet for den lambda-funktion vil være a Snor - samme som inputtypen til vores Flyde.

Bemærk, at vi bruger mapConcat () metode - svarende til Java 8 flatMap () metode - fordi vi vil flade Liste af Heltal returneret af parseLine () ind i en Flyde af Heltal så efterfølgende trin i vores behandling ikke behøver at håndtere Liste.

4.2. Brug af en Flyde at udføre beregninger

På dette tidspunkt har vi vores Flyde af parsede heltal. Nu skal vi implementere logik, der grupperer alle inputelementer i par og beregner et gennemsnit af disse par.

Nu skal vi lave en Flyde af Heltals og gruppere dem ved hjælp af grupperet () metode.

Dernæst vil vi beregne et gennemsnit.

Da vi ikke er interesseret i rækkefølgen, hvor disse gennemsnit behandles, kan vi har gennemsnit beregnet parallelt ved hjælp af flere tråde ved hjælp af mapAsyncUnordered () metode, sender antallet af tråde som et argument til denne metode.

Handlingen, der vil blive bestået som lambda til Flyde skal returnere en Fuldført fordi den handling beregnes asynkront i den separate tråd:

private Flow computeAverage () {return Flow.of (Integer.class) .grouped (2) .mapAsyncUnordered (8, integers -> CompletableFuture.supplyAsync (() -> integers.stream () .mapToDouble (v -> v). gennemsnit () .ellerElse (-1,0))); }

Vi beregner gennemsnit i otte parallelle tråde. Bemærk, at vi bruger Java 8 Stream API til beregning af et gennemsnit.

4.3. Komponere flere Strømme ind i en single Flyde

Det Flyde API er en flydende abstraktion, der giver os mulighed for det komponere flere Flyde tilfælde for at nå vores endelige behandlingsmål. Vi kan have granulære strømme, hvor man f.eks. Analyserer JSON, en anden transformerer, og en anden indsamler nogle statistikker.

Sådan granularitet hjælper os med at oprette mere testbar kode, fordi vi kan teste hvert behandlingstrin uafhængigt.

Vi oprettede to strømme over, der kan fungere uafhængigt af hinanden. Nu vil vi komponere dem sammen.

Først vil vi analysere vores input Snorog derefter vil vi beregne et gennemsnit på en strøm af elementer.

Vi kan komponere vores strømme ved hjælp af via () metode:

Flow calcAnage () {return Flow.of (String.class) .via (parseContent ()) .via (computeAverage ()); }

Vi oprettede en Flyde har input type Snor og to andre strømmer efter det. Det parseContent ()Flyde tager en Snor indtaster og returnerer et Heltal som output. Det computeAverage () Flow tager det Heltal og beregner et gennemsnitligt afkast Dobbelt som output-typen.

5. Tilføjelse Håndvask til Flyde

Som vi nævnte, til dette punkt hele Flyde udføres endnu ikke, fordi den er doven. For at starte udførelse af Flyde vi er nødt til at definere en Håndvask. Det Håndvask operation kan f.eks. gemme data i en database eller sende resultater til en ekstern webservice.

Antag, at vi har en Gennemsnitligt arkiv klasse med følgende Gemme() metode, der skriver resultater til vores database:

CompletionStage save (Dobbelt gennemsnit) {return CompletableFuture.supplyAsync (() -> {// skriv til gennemsnittet for returnering af database;}); }

Nu vil vi oprette en Håndvask operation, der bruger denne metode til at gemme resultaterne af vores Flyde forarbejdning. At skabe vores Håndvask, vi skal først lave en Flyde der tager et resultat af vores behandling som inputtypen. Dernæst vil vi gemme alle vores resultater i databasen.

Igen er vi ligeglade med at bestille elementerne, så vi kan udføre Gemme() operationer parallelt bruger mapAsyncUnordered () metode.

At oprette en Håndvask fra Flyde vi er nødt til at kalde toMat () med Sink.ignore () som et første argument og Hold til højre() som det andet, fordi vi vil returnere en status for behandlingen:

privat vask storeAverages () {return Flow.of (Double.class) .mapAsyncUnordered (4, averageRepository :: save) .toMat (Sink.ignore (), Keep.right ()); }

6. Definition af en kilde til Flyde

Den sidste ting, vi skal gøre, er at lave en Kilde fra input Snor. Vi kan anvende en beregne gennemsnit ()Flyde til denne kilde ved hjælp af via () metode.

Derefter for at tilføje Håndvask til behandlingen skal vi kalde runWith () metode og bestå storeAverages () Vask som vi lige har oprettet:

CompletionStage calcAAgageForContent (String content) {return Source.single (content) .via (calcateAverage ()) .runWith (storeAverages (), ActorMaterializer.create (actorSystem)) .whenComplete ((d, e) -> {if (d! = null) {System.out.println ("Import afsluttet");} ellers {e.printStackTrace ();}}); }

Bemærk, at når behandlingen er afsluttet, tilføjer vi nårFuldfør () tilbagekald, hvor vi kan udføre nogle handlinger afhængigt af resultatet af behandlingen.

7. Testning Akka Streams

Vi kan teste vores behandling ved hjælp af akka-stream-testkit.

Den bedste måde at teste den faktiske logik på behandlingen er at teste alt Flyde logik og brug TestSink for at udløse beregningen og hævde resultaterne.

I vores test skaber vi Flyde at vi vil teste, og derefter opretter vi en Kilde fra testinputindholdet:

@Test offentlig ugyldighed givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults () {// givet Flow testet = ny DataImporter (actorSystem) .calculateAverage (); Strengindgang = "1; 9; 11; 0"; // når Source flow = Source.single (input) .via (testet); // derefter flow .runWith (TestSink.probe (actorSystem), ActorMaterializer.create (actorSystem)) .request (4) .expectNextUnordered (5d, 5.5); }

Vi kontrollerer, at vi forventer fire inputargumenter, og to resultater, der er gennemsnit, kan komme i en hvilken som helst rækkefølge, fordi vores behandling sker på den asynkrone og parallelle måde.

8. Konklusion

I denne artikel så vi på akka-stream bibliotek.

Vi definerede en proces, der kombinerer flere Strømme at beregne glidende gennemsnit af elementer. Derefter definerede vi en Kilde det er et indgangspunkt for strømbehandlingen og en Håndvask der udløser den faktiske behandling.

Endelig skrev vi en test til vores behandling ved hjælp af akka-stream-testkit.

Implementeringen af ​​alle disse eksempler og kodestykker findes i GitHub-projektet - dette er et Maven-projekt, så det skal være let at importere og køre, som det er.


$config[zx-auto] not found$config[zx-overlay] not found