Java 9 Reactive Streams

1. Oversigt

I denne artikel ser vi på Java 9 Reactive Streams. Kort sagt, vi kan bruge Flyde klasse, som omslutter de primære byggesten til opbygning af reaktiv strømbehandlingslogik.

Reaktive streams er en standard til asynkron strømbehandling med ikke-blokerende modtryk. Denne specifikation er defineret i Reaktivt manifest, og der er forskellige implementeringer af det, for eksempel RxJava eller Akka-streams.

2. Oversigt over reaktivt API

At bygge en Flyde, kan vi bruge tre hovedabstraktioner og komponere dem i asynkron behandlingslogik.

Hver Flyde har brug for at behandle begivenheder, der offentliggøres af en forlagsinstans; det Forlægger har en metode - abonner ().

Hvis nogen af ​​abonnenterne ønsker at modtage begivenheder, der er offentliggjort af det, skal de abonnere på det givne Forlægger.

Modtageren af ​​meddelelser skal implementere Abonnent interface. Typisk er dette slutningen for enhver Flyde behandling, fordi forekomsten af ​​den ikke sender meddelelser videre.

Vi kan tænke over Abonnent som en Håndvask. Dette har fire metoder, der skal tilsidesættes - onSubscribe (), onNext (), onError (), og onComplete (). Vi ser på dem i det næste afsnit.

Hvis vi vil omdanne indgående besked og videregive den videre til den næste Abonnent, vi er nødt til at gennemføre Processor interface. Dette fungerer både som en Abonnent fordi det modtager beskeder, og som Forlægger fordi det behandler disse meddelelser og sender dem til videre behandling.

3. Udgivelse og forbrug af meddelelser

Lad os sige, at vi vil skabe et simpelt Flyde, hvor vi har en Forlægger udgivelse af beskeder og en simpel Abonnent forbrugende beskeder, når de ankommer - en ad gangen.

Lad os oprette en EndSubscriber klasse. Vi er nødt til at gennemføre Abonnent interface. Dernæst tilsidesætter vi de nødvendige metoder.

Det onSubscribe () metode kaldes inden behandlingen starter. Forekomsten af Abonnement overføres som argumentet. Det er en klasse, der bruges til at styre strømmen af ​​meddelelser imellem Abonnent og Forlægger:

offentlig klasse EndSubscriber implementerer abonnent {private abonnementsabonnement; public List consumedElements = new LinkedList (); @Override public void onSubscribe (Abonnementsabonnement) {this.subscription = abonnement; abonnement. anmodning (1); }}

Vi initialiserede også en tom Liste af consumedElements det vil blive brugt i testene.

Nu skal vi implementere de resterende metoder fra Abonnent interface. Hovedmetoden her er onNext () - dette kaldes når som helst Forlægger offentliggør en ny besked:

@ Overstyr offentlig ugyldighed onNext (T-element) {System.out.println ("Got:" + item); abonnement. anmodning (1); }

Bemærk, at da vi startede abonnementet i onSubscribe () metode, og når vi behandlede en besked, skal vi ringe til anmodning() metode til Abonnement for at signalere, at strømmen Abonnent er klar til at forbruge flere beskeder.

Endelig skal vi gennemføre onError () - som kaldes, når der kommer en undtagelse i behandlingen såvel som onComplete () - kaldes, når Forlægger er lukket:

@Override offentlig ugyldighed onError (Throwable t) {t.printStackTrace (); } @ Overstyr offentlig ugyldighed onComplete () {System.out.println ("Udført"); }

Lad os skrive en test til behandlingen Flyde. Vi bruger IndsendelseUdgiver klasse - en konstruktion fra java.util.concurrent - som implementerer Forlægger interface.

Vi sender N elementer til Forlægger - som vores EndSubscriber modtager:

@Test offentlig ugyldigt nårSubscribeToIt_thenShouldConsumeAll () kaster InterruptedException {// givet SubmissionPublisher udgiver = ny SubmissionPublisher (); EndSubscriber-abonnent = ny EndSubscriber (); publisher.subscribe (abonnent); Listeelementer = List.of ("1", "x", "2", "x", "3", "x"); // når assertThat (publisher.getNumberOfSubscribers ()). er EqualTo (1); items.forEach (udgiver :: indsende); publisher.close (); // derefter afvente (). atMost (1000, TimeUnit.MILLISECONDS) .until (() -> assertThat (subscriber.consumedElements) .containsExactlyElementsOf (items)); }

Bemærk, at vi ringer til tæt() metode i tilfælde af EndSubscriber. Det vil påberåbe sig onComplete () tilbagekald nedenunder på hver Abonnent af det givne Forlægger.

At køre programmet vil producere følgende output:

Fik: 1 Fik: x Fik: 2 Fik: x Fik: 3 Fik: x Udført

4. Transformation af beskeder

Lad os sige, at vi ønsker at opbygge lignende logik mellem a Forlægger og en Abonnent, men også anvende en vis transformation.

Vi opretter TransformProcessor klasse, der implementerer Processor og strækker sig IndsendelseUdgiver - da dette vil være begge dele Publisher og Sabonnent.

Vi passerer i en Fungere der vil omdanne indgange til udgange:

offentlig klasse TransformProcessor udvider SubmissionPublisher implementerer Flow.Processor {privat funktion funktion; privat Flow.Abonnement abonnement; offentlig TransformProcessor (Funktionsfunktion) {super (); denne. funktion = funktion; } @ Overstyr offentlig ugyldighed onSubscribe (Flow.Subscription abonnement) {this.subscription = abonnement; abonnement. anmodning (1); } @ Overstyr offentlig ugyldighed påNæste (T-element) {indsend (funktion.apply (vare)); abonnement. anmodning (1); } @ Override offentlig ugyldighed onError (Throwable t) {t.printStackTrace (); } @ Overstyr offentlig ugyldighed onComplete () {luk (); }}

Lad os nu skriv en hurtig test med en behandlingsstrøm, hvori Forlægger udgiver Snor elementer.

Vores TransformProcessor vil blive analyseret Snor som Heltal - hvilket betyder, at der skal ske en konvertering her:

@Test offentligt ugyldigt nårSubscribeAndTransformElements_thenShouldConsumeAll () kaster InterruptedException {// givet SubmissionPublisher udgiver = ny SubmissionPublisher (); TransformProcessor transformProcessor = ny TransformProcessor (Heltal :: parseInt); EndSubscriber-abonnent = ny EndSubscriber (); Listeelementer = List.of ("1", "2", "3"); Liste expectResult = List.of (1, 2, 3); // når publisher.subscribe (transformProcessor); transformProcessor.subscribe (abonnent); items.forEach (udgiver :: indsende); publisher.close (); // afventer derefter (). atMost (1000, TimeUnit.MILLISECONDS) .until (() -> assertThat (subscriber.consumedElements) .containsExactlyElementsOf (expectedResult)); }

Bemærk, at ringe til tæt() metode på basen Forlægger vil forårsage onComplete () metode til TransformProcessor at påberåbes.

Husk, at alle udgivere i behandlingskæden skal lukkes på denne måde.

5. Styring af efterspørgsel efter meddelelser ved hjælp af Abonnement

Lad os sige, at vi kun vil forbruge det første element fra abonnementet, anvende noget logik og afslutte behandlingen. Vi kan bruge anmodning() metode til at opnå dette.

Lad os ændre vores EndSubscriber kun at forbruge N antal beskeder. Vi sender dette nummer som nummeret howMuchMessagesConsume konstruktør argument:

offentlig klasse EndSubscriber implementerer abonnent {private AtomicInteger howMuchMessagesConsume; privat abonnement; public List consumedElements = new LinkedList (); public EndSubscriber (Integer howMuchMessagesConsume) {this.howMuchMessagesConsume = new AtomicInteger (howMuchMessagesConsume); } @ Overstyr offentlig ugyldighed onSubscribe (abonnementsabonnement) {this.subscription = abonnement; abonnement. anmodning (1); } @Override public void onNext (T item) {howMuchMessagesConsume.decrementAndGet (); System.out.println ("Got:" + element); consumedElements.add (vare); hvis (howMuchMessagesConsume.get ()> 0) {abonnement.forespørgsel (1); }} // ...}

Vi kan anmode om elementer, så længe vi vil.

Lad os skrive en test, hvor vi kun vil forbruge et element fra det givne Abonnement:

@Test offentlig ugyldig nårRequestForOnlyOneElement_thenShouldConsumeOne () kaster InterruptedException {// givet SubmissionPublisher udgiver = ny SubmissionPublisher (); EndSubscriber-abonnent = ny EndSubscriber (1); publisher.subscribe (abonnent); Listeelementer = List.of ("1", "x", "2", "x", "3", "x"); Liste forventet = List.of ("1"); // når assertThat (publisher.getNumberOfSubscribers ()). er EqualTo (1); items.forEach (udgiver :: indsende); publisher.close (); // afventer derefter (). atMost (1000, TimeUnit.MILLISECONDS) .tiltil (() -> assertThat (subscriber.consumedElements) .containsExactlyElementsOf (forventet)); }

Selvom forlægger udgiver seks elementer, vores EndSubscriber vil kun forbruge et element, fordi det kun signalerer efterspørgsel efter behandling af det ene element.

Ved hjælp af anmodning() metode til Abonnement, vi kan implementere en mere sofistikeret modtryksmekanisme til at kontrollere hastigheden af ​​meddelelsesforbruget.

6. Konklusion

I denne artikel kiggede vi på Java 9 Reactive Streams.

Vi så, hvordan man opretter en behandling Flyde bestående af en Forlægger og en Abonnent. Vi skabte en mere kompleks behandlingsstrøm med transformation af elementer ved hjælp af Processorer.

Endelig brugte vi Abonnement at kontrollere efterspørgslen efter elementer af Abonnent.

Implementeringen af ​​alle disse eksempler og kodestykker findes i GitHub-projektet - dette er et Maven-projekt, så det skal være let at importere og køre, som det er.


$config[zx-auto] not found$config[zx-overlay] not found