Planlægninger i RxJava

1. Oversigt

I denne artikel vil vi fokusere på forskellige typer Planlægning som vi skal bruge til at skrive multithreading-programmer baseret på RxJava Observable abonnerer På og observer på metoder.

Planlægning give mulighed for at specificere, hvor og sandsynligt hvornår de opgaver, der er relateret til driften af ​​et Observerbar kæde.

Vi kan få en Planlægning fra de fabriksmetoder, der er beskrevet i klassen Planlægning.

2. Standard trådadfærd

Som standard,Rx er enkelt gevind hvilket indebærer, at en Observerbar og den kæde af operatører, som vi kan anvende på den, underretter sine observatører om den samme tråd, som dens abonner () metode kaldes.

Det observer på og subscribeTil metoder tager som argument a Planlægning, som, som navnet antyder, er et værktøj, som vi kan bruge til planlægning af individuelle handlinger.

Vi opretter vores implementering af en Planlægning ved hjælp af skabArbejder metode, som returnerer en Planlægning. Arbejder. EN arbejder accepterer handlinger og udfører dem sekventielt på en enkelt tråd.

På en måde, en arbejder er en Scheduler selv, men vi vil ikke henvise til det som en Planlægning for at undgå forvirring.

2.1. Planlægning af en handling

Vi kan planlægge et job på enhver Planlægning ved at oprette en ny arbejder og planlægning af nogle handlinger:

Scheduler scheduler = Schedulers.immediate (); Scheduler.Worker worker = scheduler.createWorker (); worker.schedule (() -> resultat + = "handling"); Assert.assertTrue (result.equals ("handling"));

Handlingen sættes derefter i kø på den tråd, som medarbejderen er tildelt.

2.2. Annullering af en handling

Planlægning. Arbejder strækker sig Abonnement. Ringer til opsige abonnement metode på en arbejder vil medføre, at køen tømmes, og alle afventende opgaver annulleres. Vi kan se det ved eksempel:

Scheduler scheduler = Schedulers.newThread (); Scheduler.Worker worker = scheduler.createWorker (); worker.schedule (() -> {result + = "First_Action"; worker.unsubscribe ();}); worker.schedule (() -> result + = "Second_Action"); Assert.assertTrue (result.equals ("First_Action"));

Den anden opgave udføres aldrig, fordi den før den annullerede hele operationen. Handlinger, der var i færd med at blive udført, afbrydes.

3. Planlægere. Ny tråd

Denne planlægning starter simpelthen en ny tråd, hver gang den anmodes om via subscribeOn () eller observer på ().

Det er næppe nogensinde et godt valg, ikke kun på grund af den ventetid, der er involveret, når du starter en tråd, men også fordi denne tråd ikke genbruges:

Observable.just ("Hello") .observeOn (Schedulers.newThread ()) .doOnNext (s -> result2 + = Thread.currentThread (). GetName ()) .observeOn (Schedulers.newThread ()). Abonnement (s - > resultat1 + = Thread.currentThread (). getName ()); Tråd. Søvn (500); Assert.assertTrue (result1.equals ("RxNewThreadScheduler-1")); Assert.assertTrue (result2.equals ("RxNewThreadScheduler-2"));

Når Arbejder er gjort, tråden afsluttes simpelthen. Det her Planlægning kan kun bruges, når opgaver er grovkornede: det tager meget tid at gennemføre, men der er meget få af dem, så det er usandsynligt, at tråde genbruges overhovedet.

Scheduler scheduler = Schedulers.newThread (); Scheduler.Worker worker = scheduler.createWorker (); worker.schedule (() -> {result + = Thread.currentThread (). getName () + "_Start"; worker.schedule (() -> result + = "_worker_"); result + = "_End";} ); Tråd. Søvn (3000); Assert.assertTrue (result.equals ("RxNewThreadScheduler-1_Start_End_worker_"));

Da vi planlagde arbejder på en Ny trådTidsplanlægning, vi så, at arbejdstageren var bundet til en bestemt tråd.

4. Tidsplaner. Øjeblikkelig

Planlægning. Øjeblikkelig er en speciel planlægning, der påkalder en opgave inden for klienttråden på en blokerende måde snarere end asynkront og vender tilbage, når handlingen er afsluttet:

Scheduler scheduler = Schedulers.immediate (); Scheduler.Worker worker = scheduler.createWorker (); worker.schedule (() -> {result + = Thread.currentThread (). getName () + "_Start"; worker.schedule (() -> result + = "_worker_"); result + = "_End";} ); Tråd. Søvn (500); Assert.assertTrue (result.equals ("main_Start_worker__End"));

Faktisk abonnerer du på en Observerbar via øjeblikkelig planlægning har typisk den samme effekt som ikke at abonnere på noget bestemt Scheduler overhovedet:

Observable.just ("Hello") .subscribeOn (Schedulers.immediate ()) .subscribe (s -> result + = Thread.currentThread (). GetName ()); Tråd. Søvn (500); Assert.assertTrue (result.equals ("main"));

5. Schedulers. Trampolin

Det trampolinPlanlægning ligner meget umiddelbar fordi det også planlægger opgaver i samme tråd og effektivt blokerer.

Den kommende opgave udføres dog, når alle tidligere planlagte opgaver er afsluttet:

Observable.just (2, 4, 6, 8) .subscribeOn (Schedulers.trampoline ()) .subscribe (i -> result + = "" + i); Observable.just (1, 3, 5, 7, 9) .subscribeOn (Schedulers.trampoline ()) .subscribe (i -> result + = "" + i); Tråd. Søvn (500); Assert.assertTrue (result.equals ("246813579"));

Umiddelbar påberåber sig en given opgave med det samme, mens trampolin venter på, at den aktuelle opgave er færdig.

Det trampolin'S arbejder udfører hver opgave på den tråd, der planlagde den første opgave. Det første opkald til tidsplan blokerer, indtil køen tømmes:

Planlægningsplanlægger = Schedulers.trampoline (); Scheduler.Worker worker = scheduler.createWorker (); worker.schedule (() -> {result + = Thread.currentThread (). getName () + "Start"; worker.schedule (() -> {result + = "_middleStart"; worker.schedule (() -> resultat + = "_arbejder_"); resultat + = "_middleEnd";}); resultat + = "_mainEnd";}); Tråd. Søvn (500); Assert.assertTrue (result .equals ("mainStart_mainEnd_middleStart_middleEnd_worker_"));

6. Schedulers. Fra

Planlægning er internt mere komplekse end Eksekutører fra java.util.concurrent - så der var behov for en separat abstraktion.

Men fordi de konceptuelt er ret ens, er der ikke overraskende en indpakning, der kan dreje Eksekutor ind i Planlægning bruger fra fabriksmetode:

private ThreadFactory threadFactory (streng mønster) {returner ny ThreadFactoryBuilder () .setNameFormat (mønster) .build (); } @Test offentligt ugyldigt givetExecutors_whenSchedulerFrom_thenReturnElements () kaster InterruptedException {ExecutorService poolA = newFixedThreadPool (10, threadFactory ("Sched-A-% d")); Scheduler schedulerA = Schedulers.from (poolA); ExecutorService poolB = newFixedThreadPool (10, threadFactory ("Sched-B-% d")); Scheduler schedulerB = Schedulers.from (poolB); Observable observerable = Observable.create (subscriber -> {subscriber.onNext ("Alfa"); subscriber.onNext ("Beta"); subscriber.onCompleted ();}) ;; observerbar .subscribeOn (schedulerA) .subscribeOn (schedulerB) .subscribe (x -> result + = Thread.currentThread (). getName () + x + "_", Throwable :: printStackTrace, () -> result + = "_Completed "); Thread.sleep (2000); Assert.assertTrue (result.equals ("Sched-A-0Alfa_Sched-A-0Beta__Completed")); }

Planlægning B. bruges i en kort periode, men det planlægger næppe en ny handling planlægger A., der gør alt arbejdet. Således flere subscribeOn-metoder ignoreres ikke kun, men introducerer også en lille overhead.

7. Schedulers.io

Det her Planlægning svarer til newThread bortset fra det faktum, at allerede startede tråde genbruges og muligvis kan håndtere fremtidige anmodninger.

Denne implementering fungerer på samme måde som ThreadPoolExecutor fra java.util.concurrent med en ubegrænset pool af tråde. Hver gang en ny arbejder anmodes om, enten startes en ny tråd (og senere holdes inaktiv i et stykke tid), eller den inaktive genbruges:

Observable.just ("io") .subscribeOn (Schedulers.io ()) .subscribe (i -> result + = Thread.currentThread (). GetName ()); Assert.assertTrue (result.equals ("RxIoScheduler-2"));

Vi skal være forsigtige med ubegrænsede ressourcer af enhver art - i tilfælde af langsomme eller ikke-reagerende eksterne afhængigheder som webservices, ioplanlægning kan starte et enormt antal tråde, hvilket fører til, at vores helt egen applikation ikke reagerer.

I praksis følgende Schedulers.io er næsten altid et bedre valg.

8. Planlægning. Beregning

Beregning Scheduler som standard begrænser antallet af tråde, der kører parallelt med værdien af tilgængeligProcessorer (), som det findes i Runtime.getRuntime () utility klasse.

Så vi skal bruge en beregningsplanlægning når opgaver er helt CPU-bundne det vil sige, de kræver beregningskraft og har ingen blokeringskode.

Det bruger en ubegrænset kø foran hver tråd, så hvis opgaven er planlagt, men alle kerner er optaget, vil den stå i kø. Køen lige før hver tråd vil dog fortsætte med at vokse:

Observable.just ("computation") .subscribeOn (Schedulers.computation ()) .subscribe (i -> result + = Thread.currentThread (). GetName ()); Assert.assertTrue (result.equals ("RxComputationScheduler-1"));

Hvis vi af en eller anden grund har brug for et andet antal tråde end standard, kan vi altid bruge rx.scheduler.max-beregningstråde systemegenskab.

Ved at tage færre tråde kan vi sikre, at der altid er en eller flere CPU-kerner inaktive, og endda under tung belastning, beregning thread pool mætter ikke serveren. Det er simpelthen ikke muligt at have flere beregningstråde end kerner.

9. Schedulers.test

Det her Planlægning bruges kun til testformål, og vi ser det aldrig i produktionskoden. Dens største fordel er evnen til at fremme uret ved at simulere den tid, der passerer vilkårligt:

Listebogstaver = Arrays.asList ("A", "B", "C"); TestScheduler scheduler = Schedulers.test (); TestSubscriber-abonnent = ny TestSubscriber (); Observable tick = Observable .interval (1, TimeUnit.SECONDS, scheduler); Observable.from (letters) .zipWith (tick, (string, index) -> index + "-" + string) .subscribeOn (scheduler) .subscribe (subscriber); subscriber.assertNoValues ​​(); subscriber.assertNotCompleted (); scheduler.advanceTimeBy (1, TimeUnit.SECONDS); subscriber.assertNoErrors (); subscriber.assertValueCount (1); subscriber.assertValues ​​("0-A"); scheduler.advanceTimeTo (3, TimeUnit.SECONDS); subscriber.assertCompleted (); subscriber.assertNoErrors (); subscriber.assertValueCount (3); assertThat (subscriber.getOnNextEvents (), hasItems ("0-A", "1-B", "2-C"));

10. Standardplanlægning

Nogle Observerbar operatører i RxJava har alternative formularer, der giver os mulighed for at indstille hvilke Planlægning operatøren vil bruge til sin drift. Andre opererer ikke på noget særligt Planlægning eller betjene en bestemt standard Planlægning.

F.eks forsinke operatøren tager opstrøms begivenheder og skubber dem nedstrøms efter et givet tidspunkt. Det kan tydeligvis ikke indeholde den originale tråd i den periode, så den skal bruge en anden Planlægning:

ExecutorService poolA = newFixedThreadPool (10, threadFactory ("Sched1-")); Scheduler schedulerA = Schedulers.from (poolA); Observable.just ('A', 'B'). Delay (1, TimeUnit.SECONDS, schedulerA). Abonner (i -> resultat + = Thread.currentThread (). GetName () + i + ""); Thread.sleep (2000); Assert.assertTrue (result.equals ("Sched1-A Sched1-B"));

Uden at levere en brugerdefineret planlægger A., alle operatører nedenfor forsinke ville bruge beregningsplanlægning.

Andre vigtige operatører, der understøtter brugerdefineret Planlægning er buffer, interval, rækkevidde, timer, springe, tage, tiden er gåetog flere andre. Hvis vi ikke giver en Planlægning til sådanne operatører beregning scheduler bruges, hvilket i de fleste tilfælde er en sikker standard.

11. Konklusion

I virkelig reaktive applikationer, hvor alle langvarige operationer er asynkrone, meget få tråde og dermed Planlægning er nødvendige.

Mastering planlæggere er afgørende for at skrive skalerbar og sikker kode ved hjælp af RxJava. Forskellen på subscribeTil og observer på er især vigtigt under høj belastning, hvor hver opgave skal udføres nøjagtigt, når vi forventer.

Sidst men ikke mindst skal vi være sikre på det Planlægning brugt nedstrøms kan følge med den lo-annonce, der genereres af Planlægning upstrea m. For mere information er der denne artikel om modtryk.

Implementeringen af ​​alle disse eksempler og kodestykker findes i GitHub-projektet - dette er et Maven-projekt, så det skal være let at importere og køre, som det er.


$config[zx-auto] not found$config[zx-overlay] not found