Håndtering af modtryk med RxJava

1. Oversigt

I denne artikel vil vi se på, hvordan RxJava-biblioteket hjælper os med at håndtere modtryk.

Kort sagt - RxJava bruger et koncept med reaktive strømme ved at introducere Observerbare, til hvilken en eller flere Observatører kan abonnere på. At håndtere muligvis uendelige vandløb er meget udfordrende, da vi har brug for et modtryksproblem.

Det er ikke svært at komme i en situation, hvor en Observerbar udsender varer hurtigere, end en abonnent kan forbruge dem. Vi vil se på de forskellige løsninger på problemet med voksende buffer af uforbrugte varer.

2. Varmt Observerbare Versus Cold Observerbare

Lad os først oprette en simpel forbrugerfunktion, der vil blive brugt som forbruger af elementer fra Observerbare som vi vil definere senere:

public class ComputeFunction {public static void compute (Integer v) {try {System.out.println ("compute integer v:" + v); Tråd. Søvn (1000); } fange (InterruptedException e) {e.printStackTrace (); }}}

Vores beregne () funktion er simpelthen at udskrive argumentet. Det vigtige at bemærke her er en påkaldelse af en Thread.sleep (1000) metode - vi gør det for at efterligne en langvarig opgave, der vil forårsage Observerbar at fylde op med ting hurtigere Observer kan forbruge dem.

Vi har to typer af Observable - Hot og Kold - der er helt forskellige, når det gælder håndtering af modtryk.

2.1. Kold Observerbare

En kold Observerbar udsender en bestemt række af emner, men kan begynde at udsende denne sekvens, når den er Observer finder det praktisk, og uanset hvilket tempo Observer ønsker uden at forstyrre sekvensens integritet. Kold Observerbar leverer varer på en doven måde.

Det Observer tager kun elementer, når den er klar til at behandle den vare, og emner ikke behøver at blive bufret i en Observerbar fordi de bliver anmodet om i en pull-mode.

For eksempel, hvis du opretter en Observerbar baseret på et statisk udvalg af elementer fra en til en million, det Observerbar vil udsende den samme rækkefølge af emner, uanset hvor ofte disse emner observeres:

Observable.range (1, 1_000_000) .observeOn (Schedulers.computation ()). Abonner (ComputeFunction :: compute);

Når vi starter vores program, beregnes varerne af Observer doven og vil blive anmodet om på en pull-måde. Det Schedulers.computation () metode betyder, at vi vil køre vores Observer inden for en beregningstrådspul i RxJava.

Output af et program vil bestå af et resultat af en beregne () metode påkrævet for en efter en vare fra en Observerbar:

beregne heltal v: 1 beregne heltal v: 2 beregne heltal v: 3 beregne heltal v: 4 ...

Kold Observerbare behøver ikke at have nogen form for et modtryk, fordi de arbejder i en pull-mode. Eksempler på genstande, der udsendes af forkølelse Observerbar kan omfatte resultaterne af en databaseforespørgsel, filhentning eller webanmodning.

2.2. Hed Observerbare

En varm Observerbar begynder at generere genstande og udsender dem straks, når de oprettes. Det er i modstrid med forkølelse Observerbare træk model for behandling. Hed Observerbar udsender varer i sit eget tempo, og det er op til sine observatører at følge med.

Når Observer er ikke i stand til at forbruge varer så hurtigt som de produceres af en Observerbar de skal bufres eller håndteres på en anden måde, da de fylder hukommelsen og endelig forårsager OutOfMemoryException.

Lad os overveje et eksempel på varmt Observerbar, der producerer 1 million varer til en slutforbruger, der behandler disse varer. Når en beregne () metode i Observer tager noget tid at behandle hver vare, Observerbar begynder at fylde en hukommelse med emner, hvilket får et program til at mislykkes:

PublishSubject source = PublishSubject.create (); source.observeOn (Schedulers.computation ()). abonner (ComputeFunction :: compute, Throwable :: printStackTrace); IntStream.range (1, 1_000_000) .forEach (kilde :: onNext); 

At køre programmet mislykkes med en ManglerBackpressureException fordi vi ikke definerede en måde at håndtere overproduktion på Observerbar.

Eksempler på genstande, der udsendes af en hot Observerbar kan omfatte begivenheder med mus og tastatur, systembegivenheder eller aktiekurser.

3. Buffering af overproduktion Observerbar

Den første måde at håndtere overproduktion på Observerbar er at definere en slags buffer for elementer, der ikke kan behandles af en Observer.

Vi kan gøre det ved at kalde en buffer () metode:

PublishSubject source = PublishSubject.create (); source.buffer (1024) .observeOn (Schedulers.computation ()). abonner (ComputeFunction :: compute, Throwable :: printStackTrace); 

Definition af en buffer med størrelsen 1024 vil give en Observer noget tid til at indhente en overproduktionskilde. Bufferen gemmer varer, der endnu ikke blev behandlet.

Vi kan øge en bufferstørrelse for at have nok plads til producerede værdier.

Bemærk dog, at generelt, dette kan kun være en midlertidig løsning da overløb stadig kan ske, hvis kilden overproducerer den forudsagte bufferstørrelse.

4. Batching af udsendte genstande

Vi kan batchproducere varer i vinduer af N-elementer.

Hvornår Observerbar producerer elementer hurtigere end Observer kan behandle dem, kan vi afhjælpe dette ved at gruppere producerede elementer sammen og sende en batch af elementer til Observer der er i stand til at behandle en samling af elementer i stedet for element en efter en:

PublishSubject source = PublishSubject.create (); source.window (500) .observeOn (Schedulers.computation ()). abonner (ComputeFunction :: compute, Throwable :: printStackTrace); 

Ved brug af vindue() metode med argument 500, vil fortælle Observerbar for at gruppere elementer i 500-batches. Denne teknik kan reducere et problem med overproduktion Observerbar hvornår Observer er i stand til at behandle en batch af elementer hurtigere sammenlignet med behandling af elementer en efter en.

5. Springe over elementer

Hvis nogle af de værdier, der produceres af Observerbar kan ignoreres sikkert, kan vi bruge prøveudtagningen inden for en bestemt tids- og reguleringsoperatører.

Metoderne prøve() og throttleFirst () tager varighed som parameter:

  • Srigelig() metode ser periodisk på rækkefølgen af ​​elementer og udsender det sidste element, der blev produceret inden for den varighed, der er angivet som en parameter
  • Det throttleFirst () metoden udsender det første element, der blev produceret efter den varighed, der er angivet som parameter

Varigheden er et tidspunkt, hvorefter et bestemt element vælges fra sekvensen af ​​producerede elementer. Vi kan specificere en strategi til håndtering af modtryk ved at springe over elementer:

PublishSubject source = PublishSubject.create (); source.sample (100, TimeUnit.MILLISECONDS) .observeOn (Schedulers.computation ()). abonner (ComputeFunction :: compute, Throwable :: printStackTrace);

Vi specificerede, at strategien for at springe over elementer vil være en prøve() metode. Vi ønsker en prøve af en sekvens med en varighed på 100 millisekunder. Dette element vil blive udsendt til Observer.

Husk dog, at disse operatører kun reducerer værdien for modtagelse af værdi i downstream Observer og således kan de stadig føre til ManglerBackpressureException.

6. Håndtering af påfyldning Observerbar Buffer

Hvis vores strategier for prøveudtagning eller batching af elementer ikke hjælper med at udfylde en buffer, vi er nødt til at implementere en strategi til håndtering af sager, når en buffer fyldes op.

Vi skal bruge en onBackpressureBuffer () metode til at forhindre BufferOverflowException.

Det onBackpressureBuffer () metoden tager tre argumenter: en kapacitet på en Observerbar buffer, en metode, der påberåbes, når en buffer fyldes op, og en strategi til håndtering af elementer, der skal kasseres fra en buffer. Strategier for overløb er i en Modtryk Overløb klasse.

Der er 4 typer handlinger, der kan udføres, når bufferen fyldes op:

  • ON_OVERFLOW_ERROR - dette er standardadfærdssignalering a BufferOverflowException når bufferen er fuld
  • ON_OVERFLOW_DEFAULT - i øjeblikket er det det samme som ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST - hvis et overløb ville ske, ignoreres den aktuelle værdi simpelthen, og kun de gamle værdier leveres en gang nedstrøms Observer anmodninger
  • ON_OVERFLOW_DROP_OLDEST - slipper det ældste element i bufferen og tilføjer den aktuelle værdi til det

Lad os se, hvordan vi specificerer denne strategi:

Observable.range (1, 1_000_000) .onBackpressureBuffer (16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) .observeOn (Schedulers.computation ()). Abonner (e -> {}, Throwable :: printStackTrace) 

Her er vores strategi til håndtering af den overfyldte buffer at slippe det ældste element i en buffer og tilføje det nyeste element produceret af en Observerbar.

Bemærk, at de to sidste strategier forårsager en diskontinuitet i strømmen, da de udelader elementer. Derudover signalerer de ikke BufferOverflowException.

7. Slip af alle overproducerede elementer

Hver gang nedstrøms Observer er ikke klar til at modtage et element, kan vi bruge et onBackpressureDrop () metode til at droppe elementet fra sekvensen.

Vi kan tænke på denne metode som en onBackpressureBuffer () metode med en bufferkapacitet indstillet til nul med en strategi ON_OVERFLOW_DROP_LATEST.

Denne operatør er nyttig, når vi sikkert kan ignorere værdier fra en kilde Observerbar (såsom musebevægelser eller aktuelle GPS-placeringssignaler), da der senere vil være mere opdaterede værdier:

Observable.range (1, 1_000_000) .onBackpressureDrop () .observeOn (Schedulers.computation ()) .doOnNext (ComputeFunction :: compute). Abonner (v -> {}, Throwable :: printStackTrace);

Metoden onBackpressureDrop () eliminerer et problem med overproduktion Observerbar men skal bruges med forsigtighed.

8. Konklusion

I denne artikel så vi på et problem med overproduktion Observerbar og måder at håndtere et modtryk på. Vi så på strategier for buffering, batching og springe over elementer, når Observer er ikke i stand til at forbruge elementer så hurtigt som de produceres af en Observerbar.

Implementeringen af ​​alle disse eksempler og kodestykker findes i GitHub-projektet - dette er et Maven-projekt, så det skal være let at importere og køre, som det er.


$config[zx-auto] not found$config[zx-overlay] not found