Kombination af observerbare ting i RxJava

1. Introduktion

I denne hurtige vejledning diskuterer vi forskellige måder at kombinere på Observerbare i RxJava.

Hvis du er ny hos RxJava, skal du helt sikkert tjekke denne introduktionsvejledning først.

Lad os nu hoppe lige ind.

2. Observerbare

Observerbar sekvenser eller simpelthen Observerbare, er repræsentationer af asynkrone datastrømme.

Disse er baseret på observatørmønsteret, hvor et objekt kaldet en Observerabonnerer på emner, der udsendes af en Observerbar.

Abonnementet blokeres ikke som Observer står til at reagere på uanset hvad Observerbar vil udsende i fremtiden. Dette letter igen samtidighed.

Her er en simpel demonstration i RxJava:

Observerbar .fra (ny streng [] {"John", "Doe"}). Abonner (navn -> System.out.println ("Hej" + navn))

3. Kombination af observerbare ting

Når du programmerer ved hjælp af en reaktiv ramme, er det en almindelig brugssag at kombinere forskellige Observerbare.

I en webapplikation skal vi f.eks. Muligvis få to sæt asynkrone datastrømme, der er uafhængige af hinanden.

I stedet for at vente på, at den forrige stream er færdig, inden vi anmoder om den næste stream, kan vi ringe begge på samme tid og abonnere på de kombinerede streams.

I dette afsnit diskuterer vi nogle af de forskellige måder, vi kan kombinere flere på Observerbare i RxJava og de forskellige brugssager, som hver metode gælder for.

3.1. Fusionere

Vi kan bruge fusionere operatør til at kombinere output fra flere Observerbare så de opfører sig som en:

@Test offentlig ugyldighed givenTwoObservables_whenMerged_shouldEmitCombinedResults () {TestSubscriber testSubscriber = new TestSubscriber (); Observable.merge (Observable.from (new String [] {"Hello", "World"}), Observable.from (new String [] {"Jeg elsker", "RxJava"})). Abonner (testSubscriber); testSubscriber.assertValues ​​("Hej", "Verden", "Jeg elsker", "RxJava"); }

3.2. MergeDelayError

Det mergeDelayError metoden er den samme som fusionere ved at det kombinerer flere Observerbare ind i en, men hvis der opstår fejl under fusionen, tillader det fejlfri genstande at fortsætte, inden fejlene udbredes:

@Test offentligt ugyldigt givetMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError () {TestSubscriber testSubscriber = new TestSubscriber (); Observable.mergeDelayError (Observable.from (new String [] {"hallo", "world"}), Observable.error (new RuntimeException ("Some exception")), Observable.from (new String [] {"rxjava"} )) .subscribe (testSubscriber); testSubscriber.assertValues ​​("hej", "verden", "rxjava"); testSubscriber.assertError (RuntimeException.class); }

Ovenstående eksempel udsender alle fejlfri værdier:

hej verden rxjava

Bemærk, at hvis vi bruger fusionere i stedet for mergeDelayError, det Snorrxjava ” vil ikke blive udsendt fordi fusionere stopper straks datastrømmen fra Observerbare når der opstår en fejl.

3.3. Lynlås

Det lynlås udvidelsesmetode samler to værdisekvenser som par:

@Test offentlig ugyldighed givenTwoObservables_whenZipped_thenReturnCombinedResults () {List zippedStrings = ny ArrayList (); Observable.zip (Observable.from (new String [] {"Simple", "Moderate", "Complex"}), Observable.from (new String [] {"Solutions", "Success", "Hierarchy"}), (str1, str2) -> str1 + "" + str2). abonner (zippedStrings :: tilføj); assertThat (zippedStrings) .isNotEmpty (); assertThat (zippedStrings.size ()). er EqualTo (3); assertThat (zippedStrings) .contains ("Simple Solutions", "Moderate Success", "Complex Hierarchy"); }

3.4. Lynlås med interval

I dette eksempel vil vi zip en stream med interval som faktisk vil forsinke udsendelsen af ​​elementer i den første strøm:

@Test offentligt ugyldigt givetAStream_whenZippedWithInterval_shouldDelayStreamEmmission () {TestSubscriber testSubscriber = new TestSubscriber (); Observable data = Observable.just ("one", "two", "three", "four", "five"); Observerbart interval = Observable.interval (1L, TimeUnit.SECONDS); Observerbar .zip (data, interval, (strData, tick) -> String.format ("[% d] =% s", tick, strData)) .toBlocking (). Subscribe (testSubscriber); testSubscriber.assertCompleted (); testSubscriber.assertValueCount (5); testSubscriber.assertValues ​​("[0] = en", "[1] = to", "[2] = tre", "[3] = fire", "[4] = fem"); }

4. Resume

I denne artikel har vi set et par af metoderne til at kombinere Observerbare med RxJava. Du kan lære om andre metoder som f.eks combineLatest, tilslutte, groupTilmeld, switchOnNexti den officielle RxJava-dokumentation.

Som altid er kildekoden til denne artikel tilgængelig i vores GitHub repo.


$config[zx-auto] not found$config[zx-overlay] not found