Introduktion til reaktorkerne

1. Introduktion

Reactor Core er et Java 8-bibliotek, der implementerer den reaktive programmeringsmodel. Den er bygget oven på Reactive Streams Specification, en standard til opbygning af reaktive applikationer.

Fra baggrunden af ​​ikke-reaktiv Java-udvikling kan det at gå reaktivt være en ganske stejl indlæringskurve. Dette bliver mere udfordrende, når man sammenligner det med Java 8 Strøm API, da de kunne forveksles med at være de samme abstraktioner på højt niveau.

I denne artikel vil vi forsøge at afmystificere dette paradigme. Vi tager små skridt gennem Reactor, indtil vi har bygget et billede af, hvordan man komponerer reaktiv kode, der lægger grundlaget for mere avancerede artikler, der kommer i en senere serie.

2. Specifikation for reaktive strømme

Inden vi ser på Reactor, skal vi se på specifikationen for Reactive Streams. Dette er, hvad Reactor implementerer, og det lægger grunden til biblioteket.

I det væsentlige er reaktive strømme en specifikation til asynkron strømbehandling.

Med andre ord et system, hvor mange begivenheder produceres og forbruges asynkront. Tænk på en strøm af tusindvis af lageropdateringer pr. Sekund, der kommer ind i en økonomisk applikation, og at den skal svare på disse opdateringer rettidigt.

Et af hovedmålene med dette er at løse problemet med modtryk. Hvis vi har en producent, der udsender begivenheder til en forbruger hurtigere, end den kan behandle dem, så vil forbrugeren til sidst blive overvældet af begivenheder og løbe tør for systemressourcer.

Modtryk betyder, at vores forbruger skal kunne fortælle producenten, hvor mange data der skal sendes for at forhindre dette, og det er det, der er beskrevet i specifikationen.

3. Maven-afhængigheder

Lad os tilføje vores Maven-afhængigheder, inden vi kommer i gang:

 io.projectreactor reactor-core 3.3.9.RELEASE ch.qos.logback logback-classic 1.1.3 

Vi tilføjer også Logback som en afhængighed. Dette skyldes, at vi logger udgangen fra Reactor for bedre at forstå datastrømmen.

4. Producering af en datastrøm

For at en applikation skal være reaktiv, er den første ting, den skal være i stand til at producere en datastrøm.

Dette kan være noget som eksemplet på lageropdatering, som vi gav tidligere. Uden disse data ville vi ikke have noget at reagere på, hvorfor dette er et logisk første skridt.

Reactive Core giver os to datatyper, der gør det muligt for os at gøre dette.

4.1. Strøm

Den første måde at gøre dette på er med Strøm. Det er en strøm, der kan udsende 0..n elementer. Lad os prøve at oprette en enkel:

Flux just = Flux.just (1, 2, 3, 4);

I dette tilfælde har vi en statisk strøm af fire elementer.

4.2. Mono

Den anden måde at gøre dette på er med en Mono, som er en strøm af 0..1 elementer. Lad os prøve at starte en:

Mono bare = Mono.just (1);

Dette ser ud og opfører sig næsten nøjagtigt det samme som Strøm, kun denne gang er vi begrænset til ikke mere end et element.

4.3. Hvorfor ikke kun flux?

Før du eksperimenterer videre, er det værd at fremhæve, hvorfor vi har disse to datatyper.

Først skal det bemærkes, at begge a Strøm og Mono er implementeringer af de reaktive streams Forlægger interface. Begge klasser er i overensstemmelse med specifikationen, og vi kunne bruge denne grænseflade i deres sted:

Udgiver bare = Mono.just ("foo");

Men virkelig, at kende denne kardinalitet er nyttigt. Dette skyldes, at nogle få operationer kun giver mening for en af ​​de to typer, og fordi det kan være mere udtryksfuldt (forestil dig findOne () i et arkiv).

5. Abonnere på en stream

Nu har vi et overblik på højt niveau af, hvordan vi producerer en datastrøm, vi er nødt til at abonnere på den, for at den kan udsende elementerne.

5.1. Samler elementer

Lad os bruge abonner () metode til at samle alle elementerne i en stream:

Listeelementer = ny ArrayList (); Flux.just (1, 2, 3, 4) .log (). Abonner (elementer :: tilføj); assertThat (elementer) .containsExactly (1, 2, 3, 4);

Dataene begynder ikke at flyde, før vi abonnerer. Bemærk, at vi også har tilføjet nogle logfiler, dette vil være nyttigt, når vi ser på, hvad der sker bag kulisserne.

5.2. Elementernes strømning

Når vi logger på plads, kan vi bruge det til at visualisere, hvordan dataene strømmer gennem vores strøm:

20: 25: 19.550 [hoved] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20: 25: 19.553 [main] INFO reactor.Flux.Array.1 - | anmodning (ubegrænset) 20: 25: 19.553 [main] INFO reactor.Flux.Array.1 - | onNext (1) 20: 25: 19.553 [main] INFO reaktor.Flux.Array.1 - | onNext (2) 20: 25: 19.553 [main] INFO reaktor.Flux.Array.1 - | onNext (3) 20: 25: 19.553 [main] INFO reactor.Flux.Array.1 - | onNext (4) 20: 25: 19.553 [main] INFO reactor.Flux.Array.1 - | onComplete ()

Først og fremmest kører alt på hovedtråden. Lad os ikke gå i detaljer om dette, da vi ser nærmere på samtidighed senere i denne artikel. Det gør tingene dog enkle, da vi kan håndtere alt i orden.

Lad os nu gennemgå den rækkefølge, som vi har logget en efter en:

  1. onSubscribe () - Dette kaldes, når vi abonnerer på vores stream
  2. anmodning (ubegrænset) - Når vi ringer abonnere, bag kulisserne skaber vi en Abonnement. Dette abonnement anmoder om elementer fra strømmen. I dette tilfælde er det som standard ubegrænset, hvilket betyder, at det anmoder om hvert eneste tilgængelige element
  3. onNext () - Dette kaldes på hvert enkelt element
  4. onComplete () - Dette kaldes sidste efter modtagelse af det sidste element. Der er faktisk en onError () som også kaldes, hvis der er en undtagelse, men i dette tilfælde er der ikke

Dette er strømmen lagt i Abonnent interface som en del af Reactive Streams Specification, og i virkeligheden er det, der er blevet instantieret bag kulisserne i vores opfordring til onSubscribe (). Det er en nyttig metode, men for bedre at forstå, hvad der sker, lad os give en Abonnent grænseflade direkte:

Flux.just (1, 2, 3, 4) .log (). Abonner (ny abonnent () {@ Override offentlig ugyldighed onSubscribe (abonnement s) {s.forespørgsel (Long.MAX_VALUE);} @ Overstyr offentlig ugyldighed påNæste ( Heltals heltal) {elements.add (heltal);} @ Override offentligt ugyldigt onError (Throwable t) {} @ Overstyr offentlig tomrum onComplete () {}});

Vi kan se, at hvert muligt trin i ovenstående flyder til en metode i Abonnent implementering. Det sker bare, at Strøm har givet os en hjælpemetode til at reducere denne detaljerethed.

5.3. Sammenligning med Java 8 Strømme

Det ser stadig ud til, at vi har noget synonymt med en Java 8 Strøm laver indsamling:

Liste indsamlet = Stream.of (1, 2, 3, 4) .collect (toList ());

Kun vi ikke.

Kerneforskellen er, at Reactive er en push-model, mens Java 8 Strømme er en pull-model. I en reaktiv tilgang er begivenhederne det skubbet til abonnenterne, når de kommer ind.

Den næste ting at bemærke er en Strømme terminaloperatør er netop det, terminal, trækker alle data og returnerer et resultat. Med Reactive kunne vi få en uendelig strøm, der kommer ind fra en ekstern ressource, med flere abonnenter tilknyttet og fjernet på ad hoc-basis. Vi kan også gøre ting som at kombinere vandløb, gasspjæld og anvende modtryk, som vi næste dækker.

6. Modtryk

Den næste ting, vi skal overveje, er modtryk. I vores eksempel beder abonnenten producenten om at skubbe hvert enkelt element på én gang. Dette kan ende med at blive overvældende for abonnenten og forbruge alle sine ressourcer.

Modtryk er, når en downstream kan fortælle en upstream at sende det færre data for at forhindre, at det bliver overvældet.

Vi kan ændre vores Abonnent implementering for at anvende modtryk. Lad os fortælle opstrøms kun at sende to elementer ad gangen ved hjælp af anmodning():

Flux.just (1, 2, 3, 4) .log (). Abonner (ny abonnent () {privat abonnement s; int onNextAmount; @ Overstyr offentlig ugyldighed onSubscribe (abonnement s) {this.s = s; s.anmodning (2);} @Override public void onNext (Integer integer) {elements.add (integer); onNextAmount ++; if (onNextAmount% 2 == 0) {s.request (2);}} @ Override public void onError (Throwable (Throwable) t) {} @Override offentligt ugyldigt onComplete () {}});

Hvis vi nu kører vores kode igen, ser vi anmodning (2) kaldes efterfulgt af to onNext () kalder derefter anmodning (2) igen.

23: 31: 15.395 [hoved] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 23: 31: 15.397 [main] INFO reactor.Flux.Array.1 - | anmodning (2) 23: 31: 15.397 [main] INFO reactor.Flux.Array.1 - | onNext (1) 23: 31: 15.398 [main] INFO reaktor.Flux.Array.1 - | onNext (2) 23: 31: 15.398 [main] INFO reaktor.Flux.Array.1 - | anmodning (2) 23: 31: 15.398 [main] INFO reactor.Flux.Array.1 - | onNext (3) 23: 31: 15.398 [main] INFO reaktor.Flux.Array.1 - | onNext (4) 23: 31: 15.398 [main] INFO reaktor.Flux.Array.1 - | anmodning (2) 23: 31: 15.398 [main] INFO reactor.Flux.Array.1 - | onComplete ()

I det væsentlige er dette reaktivt træk-modtryk. Vi anmoder opstrøms om kun at skubbe en vis mængde elementer, og kun når vi er klar.

Hvis vi forestiller os, at vi blev streamet tweets fra twitter, ville det være op til opstrøms at beslutte, hvad vi skulle gøre. Hvis tweets kom ind, men der ikke er nogen anmodninger fra downstream, kan upstream nedbringe varer, gemme dem i en buffer eller en anden strategi.

7. Drift på en stream

Vi kan også udføre operationer på dataene i vores stream og reagere på begivenheder, som vi finder det passende.

7.1. Kortlægning af data i en strøm

En simpel operation, som vi kan udføre, er at anvende en transformation. Lad os i dette tilfælde bare fordoble alle tallene i vores strøm:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2). Abonner (elementer :: tilføj);

kort() vil blive anvendt når onNext () Hedder.

7.2. Kombinerer to streams

Vi kan derefter gøre tingene mere interessante ved at kombinere en anden strøm med denne. Lad os prøve dette ved hjælp af lynlås () fungere:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2) .zipWith (Flux.range (0, Integer.MAX_VALUE), (one, two) -> String.format ("First Flux:% d, Second Flux:% d", one, two)). Abonner (elementer :: tilføj); assertThat (elementer). indeholder Nøjagtigt ("First Flux: 2, Second Flux: 0", "First Flux: 4, Second Flux: 1", "First Flux: 6, Second Flux: 2", "First Flux: 8, Second Flux: 3 ");

Her skaber vi en anden Strøm der fortsætter med at øges med en og streame den sammen med vores originale. Vi kan se, hvordan disse fungerer sammen ved at inspicere logfilerne:

20: 04: 38.064 [main] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20: 04: 38.065 [main] INFO reactor.Flux.Array.1 - | onNext (1) 20: 04: 38.066 [main] INFO reaktor.Flux.Range.2 - | onSubscribe ([Synchronous Fuseable] FluxRange.RangeSubscription) 20: 04: 38.066 [main] INFO reactor.Flux.Range.2 - | onNext (0) 20: 04: 38.067 [main] INFO reaktor.Flux.Array.1 - | onNext (2) 20: 04: 38.067 [main] INFO reaktor.Flux.Range.2 - | onNext (1) 20: 04: 38.067 [main] INFO reaktor.Flux.Array.1 - | onNext (3) 20: 04: 38.067 [main] INFO reaktor.Flux.Range.2 - | onNext (2) 20: 04: 38.067 [main] INFO reaktor.Flux.Array.1 - | onNext (4) 20: 04: 38.067 [main] INFO reaktor.Flux.Range.2 - | onNext (3) 20: 04: 38.067 [main] INFO reaktor.Flux.Array.1 - | onComplete () 20: 04: 38.067 [main] INFO reactor.Flux.Array.1 - | annuller () 20: 04: 38.067 [main] INFO reactor.Flux.Range.2 - | afbestille()

Bemærk, hvordan vi nu har et abonnement pr Strøm. Det onNext () opkald er også vekslede, så indekset for hvert element i strømmen vil matche, når vi anvender lynlås () fungere.

8. Hot Streams

I øjeblikket har vi primært fokuseret på kolde strømme. Disse er statiske streams med fast længde, der er lette at håndtere. En mere realistisk brugstilfælde for reaktiv kan være noget, der sker uendeligt.

For eksempel kunne vi have en strøm af musebevægelser, der konstant skal reageres på eller en twitter-feed. Disse typer af streams kaldes hot streams, da de altid kører og kan abonneres på når som helst og mangler starten på dataene.

8.1. Oprettelse af en Tilslutningsflux

En måde at oprette en varm strøm på er ved at konvertere en kold strøm til en. Lad os oprette en Strøm der varer evigt og udsender resultaterne til konsollen, som ville simulere en uendelig strøm af data, der kommer fra en ekstern ressource:

ConnectableFlux publish = Flux.create (fluxSink -> {while (true) {fluxSink.next (System.currentTimeMillis ());}}) .publish ();

Ved at ringe offentliggøre() vi får en Tilslutningsflux. Dette betyder at kalde abonner () vil ikke få det til at begynde at udsende, så vi kan tilføje flere abonnementer:

publish.subscribe (System.out :: println); publish.subscribe (System.out :: println);

Hvis vi prøver at køre denne kode, sker der intet. Først når vi ringer Opret forbindelse(), at den Strøm begynder at udsende:

publish.connect ();

8.2. Begrænsning

Hvis vi kører vores kode, bliver vores konsol overvældet af logning. Dette simulerer en situation, hvor for mange data videregives til vores forbrugere. Lad os prøve at omgå dette med gasregulering:

ConnectableFlux publish = Flux.create (fluxSink -> {while (true) {fluxSink.next (System.currentTimeMillis ());}}). Prøve (ofSeconds (2)) .publish ();

Her har vi introduceret en prøve() metode med et interval på to sekunder. Nu skubbes værdier kun til vores abonnent hvert andet sekund, hvilket betyder, at konsollen vil være meget mindre hektisk.

Der er selvfølgelig flere strategier for at reducere mængden af ​​data, der sendes nedstrøms, såsom vinduesvindue og buffering, men de vil blive udeladt af denne artikel.

9. Samtidighed

Alle vores eksempler ovenfor er i øjeblikket kørt på hovedtråden. Vi kan dog kontrollere, hvilken tråd vores kode kører på, hvis vi ønsker det. Det Planlægning interface giver en abstraktion omkring asynkron kode, som mange implementeringer giver os. Lad os prøve at abonnere på en anden tråd til main:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2) .subscribeOn (Schedulers.parallel ()) .subscribe (elementer :: tilføj);

Det Parallel scheduler får vores abonnement til at blive kørt på en anden tråd, som vi kan bevise ved at se på logfilerne. Vi ser, at den første post kommer fra vigtigste tråd og Flux kører i en anden tråd kaldet parallel-1.

20:03:27.505 [hoved] DEBUG reactor.util.Loggers $ LoggerFactory - Brug af Slf4j-logningsramme 20: 03: 27.529 [parallel-1] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20: 03: 27.531 [parallel-1] INFO reaktor.Flux.Array.1 - | anmodning (ubegrænset) 20: 03: 27.531 [parallel-1] INFO reaktor.Flux.Array.1 - | onNext (1) 20: 03: 27.531 [parallel-1] INFO reaktor.Flux.Array.1 - | onNext (2) 20: 03: 27.531 [parallel-1] INFO reaktor.Flux.Array.1 - | onNext (3) 20: 03: 27.531 [parallel-1] INFO reaktor.Flux.Array.1 - | onNext (4) 20: 03: 27.531 [parallel-1] INFO reaktor.Flux.Array.1 - | onComplete ()

Samtidighed er mere interessant end dette, og det vil være værd at udforske det i en anden artikel.

10. Konklusion

I denne artikel har vi givet et højt niveau, en-til-ende-oversigt over Reactive Core. Vi har forklaret, hvordan vi kan udgive og abonnere på streams, anvende modtryk, operere på streams og også håndtere data asynkront. Dette skulle forhåbentlig lægge et fundament for os til at skrive reaktive applikationer.

Senere artikler i denne serie vil dække mere avanceret samtidighed og andre reaktive koncepter. Der er også en anden artikel, der dækker Reactor with Spring.

Kildekoden til vores applikation er tilgængelig på over på GitHub; dette er et Maven-projekt, som skal kunne køre som det er.