Forskellen mellem RxJava API og Java 9 Flow API

1. Introduktion

Java Flow API blev introduceret i Java 9 som en implementering af Reactive Stream Specification.

I denne vejledning undersøger vi først reaktive strømme. Derefter lærer vi om dets forhold til RxJava og Flow API.

2. Hvad er reaktive streams?

Reactive Manifesto introducerede Reactive Streams for at specificere en standard for asynkron strømbehandling med ikke-blokerende modtryk.

Omfanget af specifikationen for reaktiv strøm er at definere et minimalt sæt grænseflader for at nå disse mål:

  • org.reactivestreams.Publisher er en dataudbyder, der offentliggør data til abonnenterne baseret på deres efterspørgsel

  • org.reactivestreams.Subscriber er forbruger af data - den kan modtage data efter abonnement på en udgiver

  • org.reactivestreams. abonnement oprettes, når en udgiver accepterer en abonnent

  • org.reactivestreams.Processor er både abonnent og udgiver - abonnerer på udgiver, behandler dataene og sender derefter de behandlede data til abonnenten

Flow API stammer fra specifikationen. RxJava går forud for det, men siden 2.0 har RxJava også understøttet spec.

Vi går dybt ind i begge dele, men lad os først se en praktisk brugssag.

3. Brug sag

Til denne vejledning bruger vi en live stream-videotjeneste som vores brugssag.

En live stream video, i modsætning til on-demand video streaming, afhænger ikke af forbrugeren. Derfor udgiver serveren strømmen i sit eget tempo, og det er forbrugerens ansvar at tilpasse sig.

I den mest enkle form består vores model af en videostream-udgiver og en videoafspiller som abonnent.

Lad os implementere VideoFrame som vores dataelement:

offentlig klasse VideoFrame {privat langt nummer; // yderligere datafelter // konstruktør, getters, setters}

Lad os så gennemgå vores Flow API og RxJava-implementeringer en efter en.

4. Implementering med Flow API

Flow API'erne i JDK 9 svarer til specifikationen for reaktive strømme. Hvis applikationen oprindeligt anmoder om N-elementer med Flow API, skubber udgiveren højst N-poster til abonnenten.

Flow API-grænsefladerne er alle i java.util.concurrent.Flow interface. De svarer semantisk til deres respektive kolleger fra Reactive Streams.

Lad os implementere VideoStreamServer som udgiver af VideoFrame.

offentlig klasse VideoStreamServer udvider SubmissionPublisher {public VideoStreamServer () {super (Executors.newSingleThreadExecutor (), 5); }}

Vi har udvidet vores VideoStreamServer fra IndsendelseUdgiver i stedet for direkte at implementere Flow :: Udgiver. IndsendelseUdgiver er JDK implementering af Flow :: Udgiver til asynkron kommunikation med abonnenter, så det lader vores VideoStreamServer at udsende i sit eget tempo.

Det er også nyttigt til modtryk og bufferhåndtering, for hvornår SubmissionPublisher :: abonner kaldes, det skaber en forekomst af BufferedSubscription, og tilføjer derefter det nye abonnement til sin abonnementskæde. BufferedSubscription kan buffer udstedte varer op til SubmissionPublisher # maxBufferCapacity.

Lad os nu definere Videoafspiller, der forbruger en strøm af VideoFrame. Derfor skal den implementeres Flow :: Abonnent.

offentlig klasse VideoPlayer implementerer Flow.Subscriber {Flow.Subscription abonnement = null; @Override offentligt ugyldigt onSubscribe (Flow.Subscription abonnement) {this.subscription = abonnement; abonnement. anmodning (1); } @Override offentligt ugyldigt onNext (VideoFrame-element) {log.info ("play # {}", item.getNumber ()); abonnement. anmodning (1); } @ Override offentlig ugyldighed onError (Throwable throwable) {log.error ("Der er en fejl i videostreaming: {}", throwable.getMessage ()); } @ Overstyr offentlig ugyldighed onComplete () {log.error ("Videoen er afsluttet"); }}

Videoafspiller abonnerer på VideoStreamServer, derefter efter et vellykket abonnement Videoafspiller::onTilmeld metode kaldes, og den anmoder om en ramme. Videoafspiller:: onNext modtager rammen og anmoder om en ny. Antallet af de ønskede rammer afhænger af brugssagen og Abonnent implementeringer.

Lad os endelig sammensætte tingene:

VideoStreamServer streamServer = ny VideoStreamServer (); streamServer.subscribe (nyt VideoPlayer ()); // indsend videorammer ScheduledExecutorService eksekutor = Executors.newScheduledThreadPool (1); AtomicLong frameNumber = ny AtomicLong (); executor.scheduleWithFixedDelay (() -> {streamServer.offer (new VideoFrame (frameNumber.getAndIncrement ()), (subscriber, videoFrame) -> {subscriber.onError (new RuntimeException ("Frame #" + videoFrame.getNumber () + " faldt på grund af modtryk ")); return true;});}, 0, 1, TimeUnit.MILLISECONDS); søvn (1000);

5. Implementering med RxJava

RxJava er en Java-implementering af ReactiveX. Projektet ReactiveX (eller Reactive Extensions) sigter mod at give et reaktivt programmeringskoncept. Det er en kombination af Observer-mønster, Iterator-mønster og funktionel programmering.

Den seneste store version til RxJava er 3.x. RxJava understøtter Reactive Streams siden version 2.x med dens Flydbar baseklasse, men det er et mere betydningsfuldt sæt end Reactive Streams med flere baseklasser som Flydbar, Observerbar, Enkelt, Kompletabel.

Flydbar da komponent for reaktiv strømoverensstemmelse er en strøm på 0 til N emner med modtrykshåndtering. Flydbar strækker sig Forlægger fra Reactive Streams. Derfor accepterer mange RxJava-operatører Forlægger direkte og tillade direkte interoperation med andre implementeringer af Reactive Streams.

Lad os nu lave vores videostreamgenerator, som er en uendelig doven stream:

Stream videoStream = Stream.iterate (ny VideoFrame (0), videoFrame -> {// dvale i 1 ms; returner ny VideoFrame (videoFrame.getNumber () + 1);});

Derefter definerer vi en Flydbar eksempel for at generere rammer på en separat tråd:

Flowable .fromStream (videoStream) .subscribeOn (Schedulers.from (Executors.newSingleThreadExecutor ()))

Det er vigtigt at bemærke, at en uendelig strøm er nok for os, men hvis vi har brug for en mere fleksibel måde at generere vores strøm på, Flydbar. Opret er et godt valg.

Flowable .create (new FlowableOnSubscribe () {AtomicLong frame = new AtomicLong (); @Override public void subscribe (@NonNull FlowableEmitter emitter) {while (true) {emitter.onNext (new VideoFrame (frame.incrementAndGet ())) / / sov i 1 ms for at simulere forsinkelse}}}, / * Indstil modtryksstrategi her * /)

Derefter abonnerer VideoPlayer på næste trin på dette Flowable og observerer emner på en separat tråd.

videoFlowable .observeOn (Schedulers.from (Executors.newSingleThreadExecutor ())). abonner (item -> {log.info ("play #" + item.getNumber ()); // dvale i 30 ms for at simulere rammevisning}) ;

Og endelig konfigurerer vi strategien for modtryk. Hvis vi vil stoppe videoen i tilfælde af ramtab, skal vi derfor bruge den ModtrykOverflowStrategy :: FEJL når bufferen er fuld.

Flydbar .fraStream (videoStream) .subscribeOn (Schedulers.from (Executors.newSingleThreadExecutor ())) .onBackpressureBuffer (5, null, BackpressureOverflowStrategy.ERROR) .observeOn (Schedulers.from (Executors.newS) > {log.info ("play #" + item.getNumber ()); // dvale i 30 ms for at simulere rammevisning});

6. Sammenligning af RxJava og Flow API

Selv i disse to enkle implementeringer kan vi se, hvordan RxJavas API er rig, især til bufferhåndtering, fejlhåndtering og modtryksstrategi. Det giver os flere muligheder og færre linjer med kode med sin flydende API. Lad os nu overveje mere komplicerede sager.

Antag, at vores afspiller ikke kan vise videorammer uden en codec. Derfor med Flow API skal vi implementere en Processor for at simulere codec og sidde mellem server og afspiller. Med RxJava kan vi gøre det med Flydbar :: flatMap eller Flydbar :: kort.

Eller lad os forestille os, at vores afspiller også sender live oversættelseslyd, så vi er nødt til at kombinere strømme af video og lyd fra separate udgivere. Med RxJava kan vi bruge Flowable :: combineLatest, men med Flow API er det ikke en let opgave.

Selvom det er muligt at skrive en brugerdefineret Processor der abonnerer på begge streams og sender de kombinerede data til vores Videoafspiller. Implementeringen er dog hovedpine.

7. Hvorfor Flow API?

På dette tidspunkt har vi muligvis et spørgsmål, hvad er filosofien bag Flow API?

Hvis vi søger efter Flow API-anvendelser i JDK, kan vi finde noget i java.net.http og jdk.internal.net.http.

Desuden kan vi finde adaptere i reaktorprojektet eller den reaktive strømpakke. For eksempel, org.reactivestreams.FlowAdapters har metoder til konvertering af Flow API-grænseflader til Reactive Stream-enheder og omvendt. Derfor hjælper det interoperabiliteten mellem Flow API og biblioteker med understøttelse af reaktiv strøm.

Alle disse fakta hjælper os med at forstå formålet med Flow API: Det blev oprettet for at være en gruppe af reaktive specifikationsgrænseflader i JDK uden relæ til tredjeparter. Desuden forventer Java, at Flow API accepteres som standardgrænseflader til reaktiv specifikation og skal bruges i JDK eller andre Java-baserede biblioteker, der implementerer den reaktive specifikation for mellemværker og hjælpeprogrammer.

8. Konklusioner

I denne vejledning har vi en introduktion til Reactive Stream Specification, Flow API og RxJava.

Desuden har vi set et praktisk eksempel på Flow API og RxJava-implementeringer til en live videostream.

Men alle aspekter af Flow API og RxJava kan lide Flow :: Processor, Flydbar :: kort og Flydbar :: flatMap eller modtryksstrategier er ikke dækket her.

Som altid finder du selvstudiets komplette kode over på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found