Introduktion til RxJava

1. Oversigt

I denne artikel vil vi fokusere på at bruge Reactive Extensions (Rx) i Java til at komponere og forbruge sekvenser af data.

Med et overblik kan API'en ligne Java 8 Streams, men faktisk er den meget mere fleksibel og flydende, hvilket gør det til et kraftigt programmeringsparadigme.

Hvis du vil læse mere om RxJava, skal du tjekke denne opskrivning.

2. Opsætning

For at bruge RxJava i vores Maven-projekt skal vi tilføje følgende afhængighed til vores pom.xml:

 io.reactivex rxjava $ {rx.java.version} 

Eller til et Gradle-projekt:

kompiler 'io.reactivex.rxjava: rxjava: x.y.z'

3. Funktionelle reaktive begreber

På den ene side, funktionel programmering er processen med at opbygge software ved at komponere rene funktioner, undgå delt tilstand, mutable data og bivirkninger.

På den anden side, reaktiv programmering er et asynkront programmeringsparadigme, der beskæftiger sig med datastrømme og forplantning af ændringer.

Sammen, funktionel reaktiv programmering danner en kombination af funktionelle og reaktive teknikker, der kan repræsentere en elegant tilgang til begivenhedsdrevet programmering - med værdier, der ændrer sig over tid, og hvor forbrugeren reagerer på dataene, når de kommer ind.

Denne teknologi samler forskellige implementeringer af dens kerneprincipper, nogle forfattere kom med et dokument, der definerer det fælles ordforråd til beskrivelse af den nye type applikationer.

3.1. Reaktivt manifest

Reactive Manifesto er et online-dokument, der opstiller en høj standard for applikationer inden for softwareudviklingsbranchen. Kort sagt, reaktive systemer er:

  • Responsive - systemer skal reagere rettidigt
  • Message Driven - systemer skal bruge asynkronisering af meddelelse mellem komponenter for at sikre løs kobling
  • Elastik - systemer skal forblive lydhøre under høj belastning
  • Modstandsdygtige - systemer skal forblive lydhøre, når nogle komponenter fejler

4. Observerbare

Der er to nøgletyper at forstå, når du arbejder med Rx:

  • Observerbar repræsenterer ethvert objekt, der kan hente data fra en datakilde, og hvis tilstand kan være af interesse på en måde, som andre objekter kan registrere en interesse
  • En observatør er ethvert objekt, der ønsker at blive underrettet, når tilstanden for et andet objekt ændres

En observatør abonnerer på en Observerbar sekvens. Sekvensen sender elementer til observatør en ad gangen.

Det observatør håndterer hver enkelt, inden den næste behandles. Hvis mange begivenheder kommer asynkront, skal de gemmes i kø eller droppes.

I Rx, en observatør vil aldrig blive ringet op med en vare, der ikke er i orden, eller kaldes tilbage, før tilbagekaldet er returneret til det forrige element.

4.1. Typer af Observerbar

Der er to typer:

  • Ikke-blokering - asynkron udførelse understøttes og har tilladelse til at afmelde ethvert tidspunkt i begivenhedsstrømmen. På denne artikel vil vi mest fokusere på denne type type
  • Blokering - alle onNext observatøropkald vil være synkrone, og det er ikke muligt at afmelde midt i en begivenhedsstrøm. Vi kan altid konvertere en Observerbar ind i en Blokering observerbarved hjælp af metoden toBlocking:
BlockingObservable blockingObservable = observerbar.toBlocking ();

4.2. Operatører

En operatør er en funktion, der tager en Observabel (kilden) som sit første argument og returnerer et andet Observerbar (destinationen). Derefter vil det for hvert element, som den observerbare kilde udsender, anvende en funktion på det element og derefter udsende resultatet på destinationen Observerbar.

Operatører kan lænkes sammen for at skabe komplekse datastrømme, der filtrerer begivenhed baseret på bestemte kriterier. Flere operatører kan anvendes på det samme observerbar.

Det er ikke svært at komme i en situation, hvor en Observerbar udsender varer hurtigere end en operatør eller observatør kan forbruge dem. Du kan læse mere om modtryk her.

4.3. Opret observerbar

Den grundlæggende operatør lige producerer en Observerbar der udsender en enkelt generisk forekomst inden afslutningen, String "Hej". Når vi ønsker at få oplysninger ud af en Observerbar, implementerer vi en observatør interface og derefter ringe abonnere på det ønskede Observerbar:

Observable observerable = Observable.just ("Hello"); observerbar. abonner (s -> resultat = s); assertTrue (result.equals ("Hello"));

4.4. OnNext, OnError, og OnFuldført

Der er tre metoder på observatør interface, som vi vil vide om:

  1. OnNext kaldes på vores observatør hver gang en ny begivenhed offentliggøres i vedlagte Observerbar. Dette er metoden, hvor vi udfører nogle handlinger på hver begivenhed
  2. OnFuldført kaldes når rækkefølgen af ​​begivenheder forbundet med en Observerbar er komplet, hvilket indikerer, at vi ikke skulle forvente mere onNext opfordrer vores observatør
  3. OnError kaldes, når en uhåndteret undtagelse kastes under RxJava rammekode eller vores hændelseshåndteringskode

Returneringsværdien for Observerbareabonnere metoden er en abonnere grænseflade:

Streng [] bogstaver = {"a", "b", "c", "d", "e", "f", "g"}; Observerbar observerbar = Observerbar.fra (bogstaver); observerbar.subscribe (i -> resultat + = i, // OnNext Throwable :: printStackTrace, // OnError () -> result + = "_Completed" // OnCompleted); assertTrue (result.equals ("abcdefg_Completed"));

5. Observerbare transformationer og betingede operatører

5.1. Kort

Men operatør transformerer genstande udsendt af en Observerbar ved at anvende en funktion til hvert emne.

Lad os antage, at der er en erklæret række strenge, der indeholder nogle bogstaver fra alfabetet, og vi vil udskrive dem i hovedstilstand:

Observable.from (letters) .map (String :: toUpperCase). Abonner (letter -> resultat + = bogstav); assertTrue (result.equals ("ABCDEFG"));

FlatMap kan bruges til at flade Observerbare når vi ender med indlejrede Observerbare.

Flere detaljer om forskellen mellem kort og flatMap kan findes her.

Forudsat at vi har en metode, der returnerer en Observerbar fra en liste over strenge. Nu udskriver vi for hver streng fra en ny Observerbar listen over titler baseret på hvad Abonnent ser:

Observable getTitle () {return Observable.from (titleList); } Observable.just ("book1", "book2") .flatMap (s -> getTitle ()). Abonner (l -> resultat + = l); assertTrue (result.equals ("titletitle"));

5.2. Scanning

Det scanningsoperatør aanvender en funktion til hvert emne, der udsendes af en Observerbar sekventielt og udsender hver efterfølgende værdi.

Det giver os mulighed for at videreføre tilstand fra begivenhed til begivenhed:

Streng [] bogstaver = {"a", "b", "c"}; Observable.from (letters) .scan (new StringBuilder (), StringBuilder :: append). Abonner (total -> resultat + = total.toString ()); assertTrue (result.equals ("aababc"));

5.3. GroupBy

Gruppér efter giver os mulighed for at klassificere begivenhederne i input Observerbar i outputkategorier.

Lad os antage, at vi oprettede en række heltal fra 0 til 10 og derefter anvende gruppere efter der vil opdele dem i kategorierne også selvom og ulige:

Observable.from (numbers) .groupBy (i -> 0 == (i% 2)? "EVEN": "ODD"). Abonner (group -> group.subscribe ((number) -> {if (group.getKey) () .toString (). er lig med ("EVEN")) {EVEN [0] + = nummer;} ellers {ODD [0] + = nummer;}})); assertTrue (EVEN [0] .equals ("0246810")); assertTrue (ODD [0] .equals ("13579"));

5.4. Filter

Operatøren filter udsender kun disse emner fra en observerbar der passerer en predikat prøve.

Så lad os filtrere i et heltal array for de ulige tal:

Observable.from (numbers) .filter (i -> (i% 2 == 1)). Abonner (i -> result + = i); assertTrue (result.equals ("13579"));

5.5. Betingede operatører

StandardIfEmpty udsender element fra kilden Observerbareller et standardelement, hvis kilden Observerbar er tom:

Observable.empty () .defaultIfEmpty ("Observable is tom"). Abonner (s -> resultat + = s); assertTrue (result.equals ("Observerbar er tom"));

Den følgende kode udsender det første bogstav i alfabetet 'en' fordi arrayet bogstaver er ikke tom, og det er hvad den indeholder i første position:

Observable.from (letters) .defaultIfEmpty ("Observable is empty"). First (). Abonner (s -> resultat + = s); assertTrue (result.equals ("a"));

Tag et stykke tid operatøren kasserer genstande, der udsendes af en Observerbar efter at en bestemt tilstand bliver falsk:

Observable.from (numbers) .takeWhile (i -> i sum [0] + = s); assertTrue (sum [0] == 10);

Selvfølgelig er der flere andre operatører, der kunne dække vores behov som Indeholder, SkipWhile, SkipUntil, TakeUntil, etc.

6. Observerbare, der kan forbindes

EN Tilslutningsbar Observerbar ligner en almindelig Observerbar, bortset fra at det ikke begynder at udsende varer, når det abonnerer på, men kun når Opret forbindelse operatøren anvendes på det.

På denne måde kan vi vente på, at alle tilsigtede observatører abonnerer på Observerbar før Observerbar begynder at sende emner:

Streng [] resultat = {""}; ConnectableObservable connectable = Observable.interval (200, TimeUnit.MILLISECONDS) .publish (); connectable.subscribe (i -> resultat [0] + = i); assertFalse (resultat [0] .equals ("01")); connectable.connect (); Tråd. Søvn (500); assertTrue (resultat [0] .equals ("01"));

7. Single

Enkelt er som en Observerbar som i stedet for at udsende en række værdier udsender en værdi eller en fejlmeddelelse.

Med denne datakilde kan vi kun bruge to metoder til at abonnere:

  • OnSuccess returnerer a Enkelt der også kalder en metode, vi specificerer
  • OnError returnerer også a Enkelt der straks underretter abonnenterne om en fejl
Streng [] resultat = {""}; Single single = Observable.just ("Hello") .toSingle () .doOnSuccess (i -> result [0] + = i) .doOnError (error -> {throw new RuntimeException (error.getMessage ());}); single.subscribe (); assertTrue (resultat [0] .equals ("Hej"));

8. Emner

EN Emne er samtidig to elementer, a abonnent og en observerbar. Som abonnent kan et emne bruges til at offentliggøre begivenheder, der kommer fra mere end en observerbar.

Og fordi det også kan observeres, kan begivenhederne fra flere abonnenter genudsendes som dens begivenheder til enhver, der observerer det.

I det næste eksempel ser vi på, hvordan observatørerne kan se de begivenheder, der opstår, efter at de abonnerer:

Heltalsabonnent1 = 0; Heltalsabonnent2 = 0; Observer getFirstObserver () {returner ny Observer () {@Override public void onNext (Integer value) {subscriber1 + = value; } @ Overstyr offentlig ugyldighed onError (kan kastes) {System.out.println ("fejl"); } @ Overstyr offentlig ugyldighed onCompleted () {System.out.println ("Abonnent1 afsluttet"); }}; } Observer getSecondObserver () {return new Observer () {@Override public void onNext (Integer value) {subscriber2 + = value; } @ Overstyr offentlig ugyldighed onError (kan kastes) {System.out.println ("fejl"); } @ Overstyr offentlig tomrum påFuldført () {System.out.println ("Abonnent2 afsluttet"); }}; } PublishSubject subject = PublishSubject.create (); subject.subscribe (getFirstObserver ()); subject.onNext (1); subject.onNext (2); subject.onNext (3); subject.subscribe (getSecondObserver ()); subject.onNext (4); subject.onCompleted (); assertTrue (subscriber1 + subscriber2 == 14)

9. Ressourcestyring

Ved brug af Betjening giver os mulighed for at knytte ressourcer, såsom en JDBC-databaseforbindelse, en netværksforbindelse eller åbne filer til vores observerbare.

Her præsenterer vi i kommentarer de trin, vi skal gøre for at nå dette mål, og også et eksempel på implementering:

Streng [] resultat = {""}; Observerbare værdier = Observable.using (() -> "MyResource", r -> {return Observable.create (o -> {for (Character c: r.toCharArray ()) {o.onNext (c);} o. onCompleted ();});}, r -> System.out.println ("Disposed:" + r)); værdier. abonner (v -> resultat [0] + = v, e -> resultat [0] + = e); assertTrue (resultat [0] .equals ("MyResource"));

10. Konklusion

I denne artikel har vi talt om, hvordan man bruger RxJava-biblioteket, og også hvordan man udforsker dets vigtigste funktioner.

Den fulde kildekode til projektet inklusive alle de kodeeksempler, der bruges her, kan findes på Github.