RxJava 2 - Flydbar

1. Introduktion

RxJava er en Reactive Extensions Java-implementering, der giver os mulighed for at skrive begivenhedsdrevne og asynkrone applikationer. Flere oplysninger om, hvordan du bruger RxJava, kan findes i vores intro-artikel her.

RxJava 2 blev omskrevet fra bunden, hvilket bragte flere nye funktioner; hvoraf nogle blev oprettet som et svar på problemer, der eksisterede i den tidligere version af rammen.

En af disse funktioner er io.reactivex.Flowable.

2. Observerbar vs.. Flydbar

I den forrige version af RxJava var der kun en basisklasse til håndtering af modtryksbevidste og ikke-modtryksbevidste kilder - Observerbar.

RxJava 2 introducerede en klar skelnen mellem disse to slags kilder - modtryksbevidste kilder er nu repræsenteret ved hjælp af en dedikeret klasse - Flydbar.

Observerbar kilder understøtter ikke modtryk. Derfor bør vi bruge det til kilder, som vi kun spiser og ikke kan påvirke.

Også, hvis vi har at gøre med et stort antal elementer, kan der opstå to mulige scenarier forbundet med modtryk afhængigt af typen af Observerbar.

I tilfælde af at bruge en såkaldt kold Observerbar“, begivenheder udsendes dovent, så vi er sikre på at oversvømme en observatør.

Når du bruger en hed Observerbardette vil dog fortsat udsende begivenheder, selvom forbrugeren ikke kan følge med.

3. Oprettelse af en Flydbar

Der er forskellige måder at oprette en Flydbar. Bekvemt for os ligner disse metoder de samme metoder som i Observerbar i den første version af RxJava.

3.1. Enkel Flydbar

Vi kan oprette en Flydbar bruger lige() metode på samme måde som vi kunne med Observerbar:

Flowable integerFlowable = Flowable.just (1, 2, 3, 4);

Selvom du bruger lige() er ret simpelt, det er ikke særlig almindeligt at oprette en Flydbar fra statiske data, og det bruges til testformål.

3.2. Flydbar fra Observerbar

Når vi har en Observerbar vi kan let omdanne det til Flydbar bruger toFlowable () metode:

Observable integerObservable = Observable.just (1, 2, 3); Flowable integerFlowable = integerObservable .toFlowable (ModtrykStrategi.BUFFER);

Bemærk, at for at være i stand til at udføre konverteringen, er vi nødt til at berige Observerbar med en Modtrykstrategi. Vi beskriver tilgængelige strategier i det næste afsnit.

3.3. Flydbar fra FlowableOnSubscribe

RxJava 2 introducerede en funktionel grænseflade FlowableOnSubscribe, som repræsenterer en Flydbar der begynder at udsende begivenheder, efter at forbrugeren abonnerer på den.

På grund af dette vil alle klienter modtage det samme sæt begivenheder, hvilket gør FlowableOnSubscribe modtrykssikkert.

Når vi har FlowableOnSubscribe vi kan bruge den til at oprette Flydbar:

FlowableOnSubscribe flowableOnSubscribe = flowable -> flowable.onNext (1); Flowable integerFlowable = Flowable .create (flowableOnSubscribe, BackpressureStrategy.BUFFER);

Dokumentationen beskriver mange flere metoder til oprettelse Flydbar.

4. FlydbarModtrykstrategi

Nogle metoder som toFlowable () eller skab() tage en Modtrykstrategi som et argument.

Det Modtrykstrategi er en optælling, der definerer det modtryksadfærd, som vi vil anvende på vores Flydbar.

Det kan cache eller slippe begivenheder eller slet ikke implementere nogen adfærd. I sidste ende er vi ansvarlige for at definere det ved hjælp af modtryksoperatorer.

Modtrykstrategi ligner Modtrykstilstand til stede i den tidligere version af RxJava.

Der er fem forskellige strategier tilgængelige i RxJava 2.

4.1. Buffer

Hvis vi bruger Modtrykstrategi.BUFFER, kilden vil buffere alle begivenhederne, indtil abonnenten kan forbruge dem:

offentlig ugyldig thenAllValuesAreBufferedAndReceived () {List testList = IntStream.range (0, 100000) .boxed () .collect (Collectors.toList ()); Observable observerable = Observable.fromIterable (testList); TestSubscriber testSubscriber = observerbar .toFlowable (ModtrykStrategi.BUFFER) .observeOn (Schedulers.computation ()). Test (); testSubscriber.awaitTerminalEvent (); Liste modtagetInter = testSubscriber.getEvents () .get (0) .stream () .mapToInt (objekt -> (int) objekt). Boxed () .collect (Collectors.toList ()); assertEquals (testList, receivedInts); }

Det svarer til at påberåbe sig onBackpressureBuffer () metode til Flydbar, men det tillader ikke at definere en bufferstørrelse eller handlingen onOverflow eksplicit.

4.2. Dråbe

Vi kan bruge ModtrykStrategi.DROP at kassere de begivenheder, der ikke kan forbruges i stedet for at buffere dem.

Igen ligner dette brugen onBackpressureDrop()Flydbar:

public void whenDropStrategyUsed_thenOnBackpressureDropped () {Observable observerable = Observable.fromIterable (testList); TestSubscriber testSubscriber = observerbar .toFlowable (ModtrykStrategi.DROP) .observeOn (Schedulers.computation ()) .test (); testSubscriber.awaitTerminalEvent (); Liste modtagetInter = testSubscriber.getEvents () .get (0) .stream () .mapToInt (objekt -> (int) objekt). Boxed () .collect (Collectors.toList ()); assertThat (receivedInts.size () <testList.size ()); assertThat (! receivedInts.contains (100000)); }

4.3. Seneste

Bruger ModtrykStrategi.LATEST vil tvinge kilden til kun at holde de seneste begivenheder og overskrive dermed tidligere værdier, hvis forbrugeren ikke kan følge med:

public void whenLatestStrategyUsed_thenTheLastElementReceived () {Observable observable = Observable.fromIterable (testList); TestSubscriber testSubscriber = observerbar .toFlowable (ModtrykStrategi.LATEST) .observeOn (Schedulers.computation ()) .test (); testSubscriber.awaitTerminalEvent (); Liste receivedInts = testSubscriber.getEvents () .get (0) .stream () .mapToInt (object -> (int) objekt). Boxed () .collect (Collectors.toList ()); assertThat (receivedInts.size () <testList.size ()); assertThat (receivedInts.contains (100000)); }

Modtrykstrategi.LATEST og modtrykstrategi.DROP ser meget ens ud, når vi ser på koden.

Imidlertid, ModtrykStrategi.LATEST vil overskrive elementer, som vores abonnent ikke kan håndtere, og kun beholde de nyeste, deraf navnet.

ModtrykStrategy.DROP, på den anden side vil kassere elementer, der ikke kan håndteres. Dette betyder, at de nyeste elementer ikke nødvendigvis udsendes.

4.4. Fejl

Når vi bruger ModtrykStrategy.ERROR, det siger vi simpelthen vi forventer ikke, at der opstår modtryk. Derfor er en ManglerBackpressureException skal kastes, hvis forbrugeren ikke kan følge med kilden:

offentlig ugyldig nårErrorStrategyUsed_thenExceptionIsThrown () {Observable observable = Observable.range (1, 100000); TestSubscriber-abonnent = observerbar .toFlowable (BackpressureStrategy.ERROR) .observeOn (Schedulers.computation ()) .test (); subscriber.awaitTerminalEvent (); subscriber.assertError (MissingBackpressureException.class); }

4.5. Mangler

Hvis vi bruger Modtrykstrategi. MISSING, kilden vil skubbe elementer uden at kassere eller buffere.

Nedstrøms vil være nødt til at håndtere overløb i dette tilfælde:

offentligt ugyldigt nårMissingStrategyUsed_thenException () {Observable observerable = Observable.range (1, 100000); TestSubscriber-abonnent = observerbar .toFlowable (BackpressureStrategy.MISSING) .observeOn (Schedulers.computation ()) .test (); subscriber.awaitTerminalEvent (); subscriber.assertError (MissingBackpressureException.class); }

I vores test undtager vi det ManglerbackpressureException for begge FEJL og MANGLER strategier. Da de begge vil kaste en sådan undtagelse, når kildens interne buffer er overfyldt.

Det er dog værd at bemærke, at begge har et andet formål.

Vi skal bruge den førstnævnte, når vi slet ikke forventer modtryk, og vi ønsker, at kilden skal kaste en undtagelse, hvis det sker.

Den sidstnævnte kunne bruges, hvis vi ikke ønsker at specificere en standardadfærd ved oprettelsen af Flydbar. Og vi skal bruge modtryksoperatører til at definere det senere.

5. Resume

I denne vejledning har vi præsenteret den nye klasse, der blev introduceret i RxJava 2 hedder Flydbar.

For at finde mere information om Flydbar sig selv og dets API kan vi henvise til dokumentationen.

Som altid kan alle kodeeksempler findes på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found