Introduktion til Project Reactor Bus

1. Oversigt

I denne hurtige artikel introducerer vi reaktorbussen ved at oprette et virkeligt scenarie for en reaktiv, hændelsesdrevet applikation.

2. Grundlæggende om Project Reactor

2.1. Hvorfor reaktor?

Moderne applikationer skal håndtere et stort antal samtidige anmodninger og behandle en betydelig mængde data. Standard, blokeringskode er ikke længere tilstrækkelig til at opfylde disse krav.

Det reaktive designmønster er et begivenhedsbaseret arkitektonisk tilgang til asynkron håndtering af et stort antal samtidige serviceanmodninger kommer fra enkelt- eller flere servicearbejdere.

Project Reactor er baseret på dette mønster og har et klart og ambitiøst mål om at opbygge ikke-blokerende, reaktive applikationer på JVM.

2.2. Eksempel på scenarier

Før vi kommer i gang, er der et par interessante scenarier, hvor det at give mening at bruge den reaktive arkitektoniske stil, bare for at få en idé om, hvor vi kan anvende den:

  • Meddelelsestjenester til en stor online shoppingplatform som Amazon
  • Kæmpe transaktionsbehandlingstjenester til banksektoren
  • Lagre handelsvirksomheder, hvor aktiekurser ændrer sig samtidigt

3. Maven-afhængigheder

Lad os begynde at bruge Project Reactor Bus ved at tilføje følgende afhængighed i vores pom.xml:

 io.projektreaktor reaktor-bus 2.0.8.FRIGIVELSE 

Vi kan kontrollere den nyeste version af reaktor-bus i Maven Central.

4. Opbygning af en demo-applikation

For bedre at forstå fordelene ved den reaktorbaserede tilgang, lad os se på et praktisk eksempel.

Vi bygger en simpel applikation, der er ansvarlig for at sende meddelelser til brugerne af en online shoppingplatform. For eksempel, hvis en bruger placerer en ny ordre, sender appen en ordrebekræftelse via e-mail eller SMS.

En typisk synkron implementering vil naturligvis være begrænset af e-mail- eller SMS-tjenestens gennemløb. Derfor vil trafikstigninger som f.eks. Helligdage generelt være problematiske.

Med en reaktiv tilgang kan vi designe vores system til at være mere fleksibelt og tilpasse sig bedre til fejl eller timeouts, der kan forekomme i de eksterne systemer, såsom gateway-servere.

Lad os se på applikationen - startende med de mere traditionelle aspekter og videre til de mere reaktive konstruktioner.

4.1. Enkel POJO

Lad os først oprette en POJO-klasse til at repræsentere underretningsdataene:

offentlig klasse NotificationData {privat lang id; privat strengnavn; privat streng e-mail; private String mobile; // getter og setter metoder}

4.2. Servicelaget

Lad os nu definere et simpelt servicelag:

offentlig grænseflade NotificationService {void initiateNotification (NotificationData notificationData) kaster InterruptedException; }

Og implementeringen, der simulerer en langvarig operation:

@Service public class NotificationServiceimpl implementerer NotificationService {@Override public void initiateNotification (NotificationData notificationData) kaster InterruptedException {System.out.println ("Notification service started for" + "Notification ID:" + notificationData.getId ()); Tråd. Søvn (5000); System.out.println ("Notifikationstjeneste afsluttet for" + "Meddelelses-id:" + notificationData.getId ()); }}

Bemærk at for at illustrere et virkeligt scenarie med at sende meddelelser via en SMS eller e-mail-gateway introducerer vi med vilje en forsinkelse på fem sekunder i initiateNotification metode med Tråd. Søvn (5000).

Derfor, når en tråd rammer tjenesten, blokeres den i fem sekunder.

4.3. Forbrugeren

Lad os nu springe ind i de mere reaktive aspekter af vores applikation og implementere en forbruger - som vi derefter kortlægger til reaktorhændelsesbussen:

@Service offentlig klasse NotificationConsumer implementerer forbruger {@Autowired private NotificationService notificationService; @Override offentlig ugyldig accept (begivenhed notifikationsdataEvent) {NotificationData notifikationsdata = notifikationsdataEvent.getData (); prøv {notificationService.initiateNotification (notificationData); } fange (InterruptedException e) {// ignorere}}}

Som vi kan se, implementerer forbrugeren, vi oprettede Forbruger interface. Den vigtigste logik ligger i acceptere metode.

Dette er en lignende tilgang, som vi kan møde i en typisk Spring lytter-implementering.

4.4. Controlleren

Endelig, nu hvor vi er i stand til at forbruge begivenhederne, lad os også generere dem.

Vi skal gøre det i en simpel controller:

@Controller offentlig klasse NotificationController {@Autowired privat EventBus eventBus; @GetMapping ("/ startNotification / {param}") offentlig ugyldig startNotification (@PathVariable Integer param) {for (int i = 0; i <param; i ++) {NotificationData data = new NotificationData (); data.setId (i); eventBus.notify ("notificationConsumer", Event.wrap (data)); System.out.println ("Notifikation" + i + ": meddelelsesopgave sendt med succes"); }}}

Dette er ret selvforklarende - vi udsender begivenheder gennem EventBus her.

For eksempel, hvis en klient rammer webadressen med en param-værdi på ti, sendes ti begivenheder gennem hændelsesbussen.

4.5. Java Config

Lad os nu sætte alt sammen og oprette en simpel Spring Boot-applikation.

Først skal vi konfigurere EventBus og Miljø bønner:

@Configuration public class Config {@Bean public Environment env () {return Environment.initializeIfEmpty (). AssignErrorJournal (); } @Bean public EventBus createEventBus (Environment env) {return EventBus.create (env, Environment.THREAD_POOL); }}

I vores tilfælde vi instantierer EventBus med en standard trådpulje, der er tilgængelig i miljøet.

Alternativt kan vi bruge en tilpasset Afsender eksempel:

EventBus evBus = EventBus.create (env, Environment.newDispatcher (REACTOR_CAPACITY, REACTOR_CONSUMERS_COUNT, DispatcherType.THREAD_POOL_EXECUTOR));

Nu er vi klar til at oprette en hovedapplikationskode:

importer statisk reaktor.bus.selector.Selectors. $; @SpringBootApplication public class NotificationApplication implementerer CommandLineRunner {@Autowired private EventBus eventBus; @Autowired private NotificationConsumer notificationConsumer; @ Override public void run (String ... args) kaster undtagelse {eventBus.on ($ ("notificationConsumer"), notificationConsumer); } offentlig statisk ugyldig hoved (String [] args) {SpringApplication.run (NotificationApplication.class, args); }}

I vores løb metode vi registrerer notifikation Forbruger udløses, når meddelelsen matcher en given vælger.

Læg mærke til, hvordan vi bruger den statiske import af $ attribut for at oprette en Vælger objekt.

5. Test applikationen

Lad os nu oprette en test for at se vores Notifikation Ansøgning i aktion:

@RunWith (SpringRunner.class) @SpringBootTest (webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class NotificationApplicationIntegrationTest {@LocalServerPort private int-port; @Test offentlig ugyldighed givetAppStarted_whenNotificationTasksSubmitted_thenProcessed () {RestTemplate restTemplate = ny RestTemplate (); restTemplate.getForObject ("// localhost:" + port + "/ startNotification / 10", String.class); }}

Som vi kan se, så snart anmodningen er udført, alle ti opgaver sendes med det samme uden at skabe nogen blokering. Og når de er indsendt, behandles meddelelsesbegivenhederne parallelt.

Meddelelse 0: underretningsopgave sendt korrekt Meddelelse 1: underretningsopgave sendt korrekt Underretning 2: underretningsopgave sendt korrekt Underretning 3: underretningsopgave sendt korrekt Underretning 4: underretningsopgave sendt korrekt Notifikation 5: underretningsopgave sendt korrekt Underretning 6: underretningsopgave sendt korrekt Underretning 7: notifikationsopgave sendt med succes Notifikation 8: notifikationsopgave sendt korrekt Notifikation 9: notifikationsopgave sendt korrekt Meddelelsestjeneste startet for Meddelelses-id: 1 Meddelelsestjeneste startet for Meddelelses-id: 2 Meddelelsestjeneste startet for Meddelelses-id: 3 Meddelelsestjeneste startet for Meddelelses-id : 0 Meddelelsestjeneste afsluttet for Meddelelses-id: 1 Meddelelsestjeneste afsluttet for Meddelelses-id: 0 Meddelelsestjeneste startet for Meddelelses-id: 4 Meddelelsestjeneste afsluttet for Meddelelses-id: 3 Meddelelsestjeneste afsluttet for Meddelelses-id: 2 Meddelelsestjeneste startet for Meddelelses-id: 6 Meddelelsestjeneste startet for Meddelelses-id: 5 Meddelelsestjeneste startet for Meddelelses-id: 7 Meddelelsestjeneste afsluttet for Meddelelses-id: 4 Meddelelsestjeneste startet for Meddelelses-id: 8 Meddelelsestjeneste afsluttet for Meddelelses-id: 6 Meddelelsestjeneste afsluttet for Meddelelses-id: 5 Meddelelsestjeneste startet for Meddelelses-id: 9 Meddelelsestjeneste afsluttet for Meddelelses-id: 7 Meddelelsestjeneste afsluttet for Meddelelses-id: 8 Meddelelsestjeneste afsluttet for Meddelelses-id: 9

Det er vigtigt at huske på det i vores scenario er der ikke behov for at behandle disse begivenheder i en bestemt rækkefølge.

6. Konklusion

I denne hurtige vejledning Vi har oprettet en simpel begivenhedsdrevet applikation. Vi har også set, hvordan man begynder at skrive en mere reaktiv og ikke-blokerende kode.

Imidlertid, dette scenario skraber bare overfladen af ​​motivet og repræsenterer bare en god base for at begynde at eksperimentere med det reaktive paradigme.

Som altid er kildekoden tilgængelig på GitHub.