Reaktive systemer i Java

1. Introduktion

I denne vejledning forstår vi det grundlæggende i oprettelse af reaktive systemer i Java ved hjælp af Spring og andre værktøjer og rammer.

I processen vil vi diskutere, hvordan reaktiv programmering bare er en driver til oprettelse af et reaktivt system. Dette vil hjælpe os med at forstå begrundelsen for at skabe reaktive systemer og forskellige specifikationer, biblioteker og standarder, det har inspireret undervejs.

2. Hvad er reaktive systemer?

I løbet af de sidste par årtier har teknologilandskabet set flere forstyrrelser, der har ført til en fuldstændig transformation af den måde, vi ser værdi på teknologi på. Computerverdenen før Internettet kunne aldrig have forestillet sig de måder og midler, hvorpå den vil ændre vores nuværende dag.

Med internettets rækkevidde for masserne og den stadigt udviklende oplevelse, det lover, skal applikationsarkitekter være på tæerne for at imødekomme deres behov.

Grundlæggende betyder det, at vi aldrig kan designe en applikation, som vi plejede tidligere. EN meget responsiv applikation er ikke længere en luksus, men en nødvendighed.

Også dette står over for tilfældige fejl og uforudsigelig belastning. Timens behov er ikke kun at få det rigtige resultat, men få det hurtigt! Det er ret vigtigt at køre de fantastiske brugeroplevelser, vi lover at levere.

Dette er det, der skaber behovet for en arkitektonisk stil, der kan give os reaktive systemer.

2.1. Reaktivt manifest

Tilbage i 2013, et team af udviklere, ledet af Jonas Boner, kom sammen for at definere et sæt kerneprincipper i et dokument kendt som det reaktive manifest. Dette er grundlaget for en arkitekturstil til at skabe reaktive systemer. Siden da har dette manifest samlet stor interesse fra udviklerfællesskabet.

Grundlæggende foreskriver dette dokument opskriften på, at et reaktivt system skal være fleksibelt, løst koblet og skalerbart. Dette gør sådanne systemer lette at udvikle, tolerante over for fejl og vigtigst af alt meget lydhør, understøttelsen af ​​utrolige brugeroplevelser.

Så hvad er denne hemmelige opskrift? Nå, det er næppe nogen hemmelighed! Manifestet definerer de grundlæggende egenskaber eller principper for et reaktivt system:

  • Lydhør: Et reaktivt system skal give en hurtig og ensartet responstid og dermed en ensartet servicekvalitet
  • Robust: Et reaktivt system skal forblive lydhørt i tilfælde af tilfældige fejl gennem replikering og isolering
  • Elastisk: Et sådant system skal forblive lydhørt under uforudsigelige arbejdsbelastninger gennem omkostningseffektiv skalerbarhed
  • Meddelelsesdrevet: Det skal stole på asynkron meddelelse, der går mellem systemkomponenter

Disse principper lyder enkle og fornuftige, men er ikke altid lettere at implementere i kompleks virksomhedsarkitektur. I denne vejledning udvikler vi et eksempelsystem i Java med disse principper i tankerne!

3. Hvad er reaktiv programmering?

Før vi fortsætter, er det vigtigt at forstå forskellen mellem reaktiv programmering og reaktive systemer. Vi bruger begge disse udtryk ganske ofte og misforstår let den ene for den anden. Som vi har set tidligere, er reaktive systemer et resultat af en bestemt arkitektonisk stil.

I modsætning, reaktiv programmering er et programmeringsparadigme, hvor fokus er på at udvikle asynkrone og ikke-blokerende komponenter. Kernen i reaktiv programmering er en datastrøm, som vi kan observere og reagere på, også anvende modtryk. Dette fører til ikke-blokerende udførelse og dermed til bedre skalerbarhed med færre udførelsestråde.

Nu betyder det ikke, at reaktive systemer og reaktiv programmering udelukker hinanden. Faktisk er reaktiv programmering et vigtigt skridt i retning af at realisere et reaktivt system, men det er ikke alt!

3.1. Reaktive streams

Reactive Streams er et fællesskabsinitiativ, der startede tilbage i 2013 til giver en standard til asynkron strømbehandling med ikke-blokerende modtryk. Målet her var at definere et sæt grænseflader, metoder og protokoller, der kan beskrive de nødvendige operationer og enheder.

Siden da er der opstået flere implementeringer på flere programmeringssprog, som er i overensstemmelse med specifikationen for reaktive strømme. Disse inkluderer Akka Streams, Ratpack og Vert.x for at nævne nogle få.

3.2. Reaktive biblioteker til Java

Et af de oprindelige mål bag de reaktive streams var til sidst at blive inkluderet som et officielt Java-standardbibliotek. Som et resultat svarer specifikationen til reaktive strømme semantisk til Java Flow-biblioteket, introduceret i Java 9.

Derudover er der et par populære valg til at implementere reaktiv programmering i Java:

  • Reaktive udvidelser: Populært kendt som ReactiveX, de giver API til asynkron programmering med observerbare streams. Disse er tilgængelige til flere programmeringssprog og platforme, herunder Java, hvor det er kendt som RxJava
  • Projektreaktor: Dette er et andet reaktivt bibliotek, grundlæggende baseret på specifikationen for reaktive strømme, der er målrettet mod at opbygge ikke-applikationer på JVM. Det tilfældigvis også er grundlaget for den reaktive stak i forårets økosystem

4. En simpel applikation

Med henblik på denne tutorial udvikler vi en enkel applikation baseret på mikrotjenestearkitektur med en minimal frontend. Applikationsarkitekturen skal have nok elementer til at skabe et reaktivt system.

Til vores anvendelse vedtager vi end-to-end reaktiv programmering og andre mønstre og værktøjer til at udføre de grundlæggende egenskaber ved et reaktivt system.

4.1. Arkitektur

Vi begynder med at definere en simpel applikationsarkitektur, som ikke nødvendigvis udviser egenskaberne ved reaktive systemer. Derefter foretager vi de nødvendige ændringer for at opnå disse egenskaber en efter en.

Så lad os først starte med at definere en simpel arkitektur:

Dette er en ganske simpel arkitektur, der har en masse mikrotjenester, der letter en handelssag, hvor vi kan afgive en ordre. Det har også en frontend for brugeroplevelse, og al kommunikation sker som REST over HTTP. Desuden administrerer hver mikroservice deres data i individuelle databaser, en praksis kendt som database pr. Tjeneste.

Vi fortsætter med at oprette denne enkle applikation i de følgende underafsnit. Dette vil være vores base for at forstå fejlene i denne arkitektur og måder og midler til at vedtage principper og praksis, så vi kan omdanne dette til et reaktivt system.

4.3. Inventory Microservice

Beholdning mikroservice vil være ansvarlig for styring af en liste over produkter og deres nuværende lager. Det gør det også muligt at ændre lagerbeholdningen, når ordrer behandles. Vi bruger Spring Boot med MongoDB til at udvikle denne service.

Lad os begynde med at definere en controller til at udsætte nogle slutpunkter:

@GetMapping offentlig liste getAllProducts () {returner produktService.getProducts (); } @PostMapping public Order processOrder (@RequestBody Order order) {return productService.handleOrder (order); } @DeleteMapping public Order revertOrder (@RequestBody Order order) {return productService.revertOrder (order); }

og en service til indkapsling af vores forretningslogik:

@Transactional public Order handleOrder (Order order) {order.getLineItems () .forEach (l -> {Product> p = productRepository.findById (l.getProductId ()) .orElseThrow (() -> new RuntimeException ("Kunne ikke finde produktet: "+ l.getProductId ())), hvis (p.getStock ()> = l.getQuantity ()) {p.setStock (p.getStock () - l.getQuantity ()); productRepository.save ( p);} ellers {kast ny RuntimeException ("Produktet er udsolgt:" + l.getProductId ());}}); returner order.setOrderStatus (OrderStatus.SUCCESS); } @Transactional public Order revertOrder (Order order) {order.getLineItems () .forEach (l -> {Product p = productRepository.findById (l.getProductId ()) .orElseThrow (() -> new RuntimeException ("Kunne ikke finde produktet: "+ l.getProductId ())); p.setStock (p.getStock () + l.getQuantity ()); productRepository.save (p);}); returner order.setOrderStatus (OrderStatus.SUCCESS); }

Bemærk, at vi er vedvarende enheder inden for en transaktion, som sikrer, at ingen inkonsekvent tilstand resulterer i tilfælde af undtagelser.

Bortset fra disse bliver vi også nødt til at definere domæneenhederne, lagergrænsefladen og en masse konfigurationsklasser, der er nødvendige for at alt fungerer korrekt.

Men da disse for det meste er kedelplade, undgår vi at gå igennem dem, og de kan henvises til i GitHub-arkivet, der er angivet i sidste afsnit af denne artikel.

4.4. Forsendelsesmikroservice

Forsendelsesmikroservicen vil heller ikke være meget forskellig. Det her vil blive ansvarlig for at kontrollere, om en forsendelse kan genereres til ordren og opret en, hvis det er muligt.

Som før definerer vi en controller, der skal eksponere vores slutpunkter, faktisk kun et enkelt slutpunkt:

@PostMapping offentlig ordreproces (@RequestBody ordreordre) {retur shippingService.handleOrder (ordre); }

og en tjeneste til indkapsling af forretningslogikken relateret til ordreforsendelse:

public Order handleOrder (Order order) {LocalDate shippingDate = null; if (LocalTime.now (). isAfter (LocalTime.parse ("10:00")) && LocalTime.now (). isBefore (LocalTime.parse ("18:00"))) {shippingDate = LocalDate.now () .plusDays (1); } ellers {kast ny RuntimeException ("Den aktuelle tid er uden grænser for at afgive ordre."); } shipmentRepository.save (ny forsendelse () .setAddress (order.getShippingAddress ()) .setShippingDate (shippingDate)); returner order.setShippingDate (shippingDate) .setOrderStatus (OrderStatus.SUCCESS); }

Vores enkle forsendelsestjeneste kontrollerer bare det gyldige tidsvindue for at afgive ordrer. Vi undgår at diskutere resten af ​​kogepladekoden som før.

4.5. Bestil Microservice

Endelig definerer vi en ordre-mikroservice, som vil være ansvarlig for at skabe en ny ordre bortset fra andre ting. Interessant nok vil den også spille som en orkestratorservice, hvor den kommunikerer med lagertjenesten og forsendelsestjenesten for ordren.

Lad os definere vores controller med de krævede slutpunkter:

@PostMapping offentlig ordre oprette (@RequestBody ordreordre) {Bestil processedOrder = orderService.createOrder (ordre); hvis (OrderStatus.FAILURE.equals (processingOrder.getOrderStatus ())) {smid ny RuntimeException ("Ordrebehandling mislykkedes, prøv igen senere."); } returnerer processeret ordre; } @GetMapping offentlig liste getAll () {returner orderService.getOrders (); }

Og en tjeneste til indkapsling af forretningslogikken relateret til ordrer:

public Order createOrder (Order order) {boolsk succes = sand; Ordre gemtOrder = orderRepository.save (ordre); Bestil inventarResponse = null; prøv {inventoryResponse = restTemplate.postForObject (lagerServiceUrl, ordre, ordre.klasse); } fange (Undtagelse ex) {success = false; } Bestil shippingResponse = null; prøv {shippingResponse = restTemplate.postForObject (shippingServiceUrl, order, Order.class); } catch (Exception ex) {success = false; HttpEntity deleteRequest = ny HttpEntity (rækkefølge); ResponseEntity deleteResponse = restTemplate.exchange (lagerServiceUrl, HttpMethod.DELETE, deleteRequest, Order.class); } hvis (succes) {savedOrder.setOrderStatus (OrderStatus.SUCCESS); savedOrder.setShippingDate (shippingResponse.getShippingDate ()); } andet {savedOrder.setOrderStatus (OrderStatus.FAILURE); } returner orderRepository.save (gemtOrder); } offentlig liste getOrders () {return orderRepository.findAll (); }

Håndteringen af ​​ordrer, hvor vi orkestrerer opkald til lager- og forsendelsestjenester, er langt fra ideel. Distribueret transaktioner med flere mikrotjenester er et komplekst emne i sig selv og uden for omfanget af denne tutorial.

Vi vil dog se senere i denne vejledning, hvordan et reaktivt system til en vis grad kan undgå behovet for distribuerede transaktioner.

Som før gennemgår vi ikke resten af ​​kogepladekoden. Dette kan der imidlertid henvises til i GitHub-repoen.

4.6. Front-end

Lad os også tilføje en brugergrænseflade for at gøre diskussionen komplet. Brugergrænsefladen vil være baseret på Angular og vil være en enkel applikation med en enkelt side.

Det bliver vi nødt til Opret en enkel komponent i Angular til håndtering af oprettelse og hentning af ordrer. Af særlig betydning er den del, hvor vi kalder vores API for at oprette ordren:

createOrder () {let headers = new HttpHeaders ({'Content-Type': 'application / json'}); lad optioner = {headers: headers} this.http.post ('// localhost: 8080 / api / orders', this.form.value, options). abonner ((respons) => {this.response = respons}, (fejl) => {dette.fejl = fejl})}

Ovenstående kodestykke forventer, at ordredata fanges i en form og er tilgængelige inden for komponentens anvendelsesområde. Angular tilbyder fantastisk support til at skabe enkle til komplekse former ved hjælp af reaktive og skabelonstyrede former.

Også vigtigt er den del, hvor vi får tidligere oprettede ordrer:

getOrders () {this.previousOrders = this.http.get ('' // localhost: 8080 / api / orders '')}

Bemærk, at det kantede HTTP-modul er asynkron i naturen og returnerer derfor RxJS Observerbars. Vi kan håndtere svaret efter vores opfattelse ved at føre dem gennem et asynkronrør:

Dine ordrer placeret indtil videre:

  • Ordre-id: {{order.id}}, ordrestatus: {{order.orderStatus}}, ordrebesked: {{order.responseMessage}}

Naturligvis vil Angular kræve skabeloner, stilarter og konfigurationer for at fungere, men disse kan henvises til i GitHub-arkivet. Bemærk, at vi har samlet alt i en enkelt komponent her, hvilket ideelt set ikke er noget, vi skal gøre.

Men for denne vejledning er disse bekymringer ikke i omfang.

4.7. Implementering af applikationen

Nu hvor vi har oprettet alle individuelle dele af applikationen, hvordan skal vi gå hen og implementere dem? Nå, vi kan altid gøre dette manuelt. Men vi skal være forsigtige med at det snart kan blive kedeligt.

Til denne vejledning bruger vi Docker Compose til opbyg og implementer vores applikation på en Docker-maskine. Dette kræver, at vi tilføjer en standard Dockerfil i hver tjeneste og opretter en Docker Compose-fil til hele applikationen.

Lad os se, hvordan dette docker-compose.yml fil ser ud:

version: '3' tjenester: frontend: build: ./frontend porte: - "80:80" ordreservice: build: ./ordre- service porte: - "8080: 8080" beholdningstjeneste: build: ./inventory -serviceporte: - "8081: 8081" forsendelsestjeneste: build: ./forsendelse-service-porte: - "8082: 8082"

Dette er en ret standard definition af tjenester i Docker Compose og kræver ingen særlig opmærksomhed.

4.8. Problemer med denne arkitektur

Nu hvor vi har en simpel applikation på plads med flere tjenester, der interagerer med hinanden, kan vi diskutere problemerne i denne arkitektur. Der er, hvad vi vil prøve at adressere i de følgende sektioner og til sidst komme til den tilstand, hvor vi ville have omdannet vores applikation til et reaktivt system!

Selvom denne applikation langt fra er en produktionskvalitet, og der er flere problemer, vil vi fokusere på de spørgsmål, der vedrører motivationen til reaktive systemer:

  • Fejl i enten lagertjeneste eller forsendelsestjeneste kan have en kaskadevirkning
  • Opkald til eksterne systemer og database er alle blokerende
  • Implementeringen kan ikke håndtere fejl og svingende belastninger automatisk

5. Reaktiv programmering

Blokering af opkald i ethvert program ofte resultere i kritiske ressourcer, der bare venter på, at ting skal ske. Disse inkluderer databaseopkald, opkald til webtjenester og filsystemopkald. Hvis vi kan frigøre udførelsestråde fra denne ventetid og give en mekanisme til at cirkelere tilbage, når resultaterne er tilgængelige, vil det give meget bedre ressourceudnyttelse.

Dette er, hvad vedtagelsen af ​​det reaktive programmeringsparadigme gør for os. Selvom det er muligt at skifte til et reaktivt bibliotek for mange af disse opkald, er det muligvis ikke muligt for alt. For os gør heldigvis Spring det meget nemmere at bruge reaktiv programmering med MongoDB og REST API'er:

Spring Data Mongo har support til reaktiv adgang via MongoDB Reactive Streams Java Driver. Det giver ReactiveMongoTemplate og ReactiveMongoRepository, som begge har omfattende kortlægningsfunktionalitet.

Spring WebFlux leverer den reaktive stak-webramme til Spring, der muliggør ikke-blokerende kode og reaktivt streams-modtryk. Det udnytter reaktoren som sit reaktive bibliotek. Yderligere giver det Webklient til udførelse af HTTP-anmodninger med reaktive streams-modtryk. Det bruger Reactor Netty som HTTP-klientbibliotek.

5.1. Beholdningstjeneste

Vi begynder med at ændre vores slutpunkter for at udsende reaktive udgivere:

@GetMapping offentlig Flux getAllProducts () {returner produktService.getProducts (); }
@PostMapping offentlig Mono processOrder (@RequestBody Order order) {return productService.handleOrder (order); } @DeleteMapping offentlig Mono revertOrder (@RequestBody Order order) {return productService.revertOrder (order); }

Vi bliver naturligvis også nødt til at foretage de nødvendige ændringer i tjenesten:

@Transactional public Mono handleOrder (Order order) {return Flux.fromIterable (order.getLineItems ()) .flatMap (l -> productRepository.findById (l.getProductId ())) .flatMap (p -> {int q = order. getLineItems (). stream () .filter (l -> l.getProductId (). er lig med (p.getId ())) .findAny (). get () .getQuantity (); hvis (p.getStock ()> = q) {p.setStock (p.getStock () - q); returner productRepository.save (p);} ellers {return Mono.error (ny RuntimeException ("Produktet er udsolgt:" + p.getId ()) );}}). derefter (Mono.just (order.setOrderStatus ("SUCCESS"))); } @Transactional public Mono revertOrder (Order order) {return Flux.fromIterable (order.getLineItems ()) .flatMap (l -> productRepository.findById (l.getProductId ())) .flatMap (p -> {int q = order .getLineItems (). stream () .filter (l -> l.getProductId (). er lig med (p.getId ())) .findAny (). get () .getQuantity (); p.setStock (p.getStock ( ) + q); returner produktRepository.save (p);}). derefter (Mono.just (order.setOrderStatus ("SUCCESS"))); }

5.2. Forsendelsesservice

På samme måde ændrer vi slutpunktet på vores forsendelsestjeneste:

@PostMapping offentlig monoproces (@RequestBody ordreordre) {return shippingService.handleOrder (ordre); }

Og tilsvarende ændringer i tjenesten for at udnytte reaktiv programmering:

offentlig monohåndteringOrder (ordreordre) {returner Mono.just (ordre) .flatMap (o -> {LocalDate shippingDate = null; hvis (LocalTime.now (). isAfter (LocalTime.parse ("10:00")) && LocalTime .now (). isBefore (LocalTime.parse ("18:00"))) {shippingDate = LocalDate.now (). plusDays (1);} ellers {returner Mono.error (ny RuntimeException ("Den aktuelle tid er slukket) grænserne for at afgive ordre. "));} returnere forsendelseRepository.save (ny forsendelse () .setAddress (ordre.getShippingAddress ()) .setShippingDate (forsendelsesdato))}} .map (s -> ordre.setShippingDate (s. getShippingDate ()) .setOrderStatus (OrderStatus.SUCCESS)); }

5.3. Bestil service

Vi bliver nødt til at foretage lignende ændringer i slutpunkterne for ordretjenesten:

@PostMapping offentlig mono oprette (@RequestBody ordre ordre) {return orderService.createOrder (ordre) .flatMap (o -> {hvis (OrderStatus.FAILURE.equals (o.getOrderStatus ())) {return Mono.error (ny RuntimeException ( "Ordrebehandlingen mislykkedes, prøv igen senere." + O.getResponseMessage ()));} ellers {returner Mono.just (o);}}); } @GetMapping public Flux getAll () {return orderService.getOrders (); }

Ændringerne i service vil være mere involverede, da vi bliver nødt til at gøre brug af foråret Webklient at påberåbe sig beholdningen og sende reaktive slutpunkter:

public Mono createOrder (Order order) {return Mono.just (order) .flatMap (orderRepository :: save) .flatMap (o -> {return webClient.method (HttpMethod.POST) .uri (inventoryServiceUrl) .body (BodyInserters.fromValue) (o)) .exchange ();}) .onErrorResume (err -> {return Mono.just (order.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (err.getMessage ()));}) .flatMap (o -> {if (! OrderStatus.FAILURE.equals (o.getOrderStatus ())) {return webClient.method (HttpMethod.POST) .uri (shippingServiceUrl) .body (BodyInserters.fromValue (o)) .exchange ();} else { returner Mono.just (o);}}) .onErrorResume (err -> {return webClient.method (HttpMethod.POST) .uri (inventoryServiceUrl) .body (BodyInserters.fromValue (order)) .hent () .bodyToMono (Order .class) .map (o -> o.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (err.getMessage ()));}) .map (o -> {if (! OrderStatus.FAILURE.equals (o.getOrderStatus ( ))) {return order.setShippingDate (o.getShippingDate ()) .setOrderStatus (OrderStatus.SUCCESS);} ellers {return order.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (o.getResponseMessage ()); }}) .flatMap (orderRepository :: gem); } public Flux getOrders () {return orderRepository.findAll (); }

Det her form for orkestrering med reaktive API'er er ingen nem øvelse og er ofte udsat for fejl såvel som svært at debugge. Vi får se, hvordan dette kan forenkles i det næste afsnit.

5.4. Front-end

Nu hvor vores API'er er i stand til at streame begivenheder, når de opstår, er det helt naturligt, at vi også kan udnytte det i vores front-end. Heldigvis understøtter Angular EventSource, grænsefladen til server-sendte begivenheder.

Lad os se, hvordan vi kan trække og behandle alle vores tidligere ordrer som en strøm af begivenheder:

getOrderStream () {return Observable.create ((observer) => {let eventSource = new EventSource ('// localhost: 8080 / api / orders') eventSource.onmessage = (event) => {let json = JSON.parse ( event.data) this.orders.push (json) this._zone.run (() => {observer.next (this.orders)})} eventSource.onerror = (error) => {if (eventSource.readyState = == 0) {eventSource.close () this._zone.run (() => {observer.complete ()})} ellers {this._zone.run (() => {observer.error ('EventSource error: '+ fejl)})}}})}

6. Beskedstyret arkitektur

Det første problem, vi skal løse, er relateret til service-til-service-kommunikation. Lige nu, disse kommunikationer er synkrone, hvilket giver flere problemer. Disse inkluderer kaskadefejl, kompleks orkestrering og distribuerede transaktioner for at nævne nogle få.

En åbenbar måde at løse dette problem på er at gøre disse kommunikationer asynkrone. EN meddelelsesmægler til at lette al service-til-service-kommunikation kan gøre tricket for os. Vi bruger Kafka som vores meddelelsesmægler og Spring for Kafka til at producere og forbruge meddelelser:

Vi bruger et enkelt emne til at producere og forbruge ordrebeskeder med forskellige ordrestatusser, som tjenester kan reagere på.

Lad os se, hvordan hver tjeneste skal ændres.

6.1. Beholdningstjeneste

Lad os begynde med at definere meddelelsesproducenten til vores lagertjeneste:

@Autowired privat KafkaTemplate kafkaTemplate; public void sendMessage (ordreordre) {this.kafkaTemplate.send ("ordrer", ordre); }

Derefter bliver vi nødt til at definere en beskedforbruger til lagertjeneste til at reagere på forskellige meddelelser om emnet:

@KafkaListener (topics = "ordrer", groupId = "lager") offentligt ugyldigt forbrug (ordreordre) kaster IOException {hvis (OrderStatus.RESERVE_INVENTORY.equals (order.getOrderStatus ())) {productService.handleOrder (ordre) .doOnSuccess ( o -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_SUCCESS));}) .doOnError (e -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_FAILURE) (setRespørgsmål) (sæt) Svar). }). abonner (); } andet hvis (OrderStatus.REVERT_INVENTORY.equals (order.getOrderStatus ())) {productService.revertOrder (order) .doOnSuccess (o -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_REVERT_S) e -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_REVERT_FAILURE) .setResponseMessage (e.getMessage ()));}). subscribe (); }}

Dette betyder også, at vi sikkert kan slippe nogle af de overflødige slutpunkter fra vores controller nu. Disse ændringer er tilstrækkelige til at opnå asynkron kommunikation i vores applikation.

6.2. Forsendelsesservice

Ændringerne i skibsfart svarer relativt til, hvad vi gjorde tidligere med lagertjenesten. Beskedproducenten er den samme, og meddelelsesforbrugeren er specifik for forsendelseslogikken:

@KafkaListener (topics = "ordrer", groupId = "forsendelse") offentligt ugyldigt forbrug (ordreordre) kaster IOException {hvis (OrderStatus.PREPARE_SHIPPING.equals (order.getOrderStatus ())) {shippingService.handleOrder (ordre) .doOnSuccess () o -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.SHIPPING_SUCCESS) .setShippingDate (o.getShippingDate ()));}) .doOnError (e -> {orderProducer.sendMessage (order.setOrderStatUSHandling) (e.getMessage ()))}}. abonner (); }}

Vi kan sikkert slippe alle slutpunkter i vores controller nu, da vi ikke længere har brug for dem.

6.3. Bestil service

Ændringerne i ordretjenesten vil være lidt mere involverede, da det var her, vi lavede al orkestrering tidligere.

Ikke desto mindre forbliver meddelelsesproducenten uændret, og beskedforbrugeren tager på ordreservicespecifik logik:

@KafkaListener (topics = "orders", groupId = "orders") offentligt ugyldigt forbrug (Order order) kaster IOException {if (OrderStatus.INITIATION_SUCCESS.equals (order.getOrderStatus ())) {orderRepository.findById (order.getId () ) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.RESERVE_INVENTORY)); returner o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) : gem). abonner (); } ellers hvis ("INVENTORY-SUCCESS" .equals (order.getOrderStatus ())) {orderRepository.findById (order.getId ()) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.PREPARE_SHIPPING)) ; returner o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) .flatMap (orderRepository :: gem). abonner (); } ellers hvis ("SHIPPING-FAILURE" .equals (order.getOrderStatus ())) {orderRepository.findById (order.getId ()) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.REVERT_INVENTORY)) ; returner o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) .flatMap (orderRepository :: gem). abonner (); } andet {orderRepository.findById (order.getId ()) .map (o -> {return o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) .flatMap (orderRepository :: save ). abonner (); }}

Det forbruger her reagerer blot på ordrebeskeder med forskellige ordrestatusser. Dette er det, der giver os koreografien mellem forskellige tjenester.

Endelig bliver vores ordretjeneste også nødt til at ændre for at understøtte denne koreografi:

public Mono createOrder (Order order) {return Mono.just (order) .flatMap (orderRepository :: save) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.INITIATION_SUCCESS)); return o;}). onErrorResume (err -> {returner Mono.just (order.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (err.getMessage ()));}) .flatMap (orderRepository :: save); }

Bemærk, at dette er langt enklere end den service, vi var nødt til at skrive med reaktive slutpunkter i det sidste afsnit. Asynkron koreografi resulterer ofte i langt enklere kode, selvom det koster en eventuel konsistens og kompleks fejlretning og overvågning. Som vi måske gætter på, får vores front-end ikke længere den endelige status for ordren med det samme.

7. Containerorkestreringstjeneste

Det sidste stykke af puslespillet, som vi vil løse, er relateret til implementering.

Det, vi ønsker i applikationen, er rigelig redundans og en tendens til at skalere op eller ned afhængigt af behovet automatisk.

Vi har allerede opnået containerisering af tjenester gennem Docker og styrer afhængigheder mellem dem gennem Docker Compose. Selvom disse er fantastiske værktøjer i sig selv, hjælper de os ikke med at opnå det, vi ønsker.

Derfor vi har brug for en container orkestreringstjeneste, der kan tage sig af redundans og skalerbarhed i vores applikation. Mens der er flere muligheder, inkluderer en af ​​de populære Kubernetes. Kubernetes giver os en cloud-leverandør-agnostisk måde at opnå stærkt skalerbare implementeringer af containeriserede arbejdsbelastninger på.

Kubernetes indpakker containere som Docker i Pods, som er den mindste implementeringsenhed. Desuden kan vi bruge Deployment til at beskrive den ønskede tilstand erklærende.

Implementering opretter ReplicaSets, som internt er ansvarlig for at bringe bælgene op. Vi kan beskrive et minimum antal identiske bælg, der skal køre til enhver tid. Dette giver redundans og dermed høj tilgængelighed.

Lad os se, hvordan vi kan definere en Kubernetes-implementering til vores applikationer:

apiVersion: apps / v1 type: Implementeringsmetadata: navn: lager-implementeringsspecifikation: replikaer: 3 vælger: matchLabels: navn: lager-distribution-skabelon: metadata: labels: navn: lager-distribution spec: containere: - navn: lagerbillede: lager-service-async: nyeste porte: - containerPort: 8081 --- apiVersion: apps / v1 type: Implementeringsmetadata: navn: forsendelse-implementeringsspecifikation: replikaer: 3 vælger: matchLabels: navn: skibsforsendelses-skabelon: metadata: labels : navn: shipping-deployment spec: containere: - navn: shipping image: shipping-service-async: seneste porte: - containerPort: 8082 --- apiVersion: apps / v1 type: Implementeringsmetadata: navn: order-deployment spec: replikaer : 3 vælger: matchLabels: navn: ordre-implementeringsskabelon: metadata: labels: navn: ordre-distribution spec: containere: - navn: ordrebillede: ordre-service-async: seneste porte: - containerPort: 8080

Her erklærer vi vores implementering for at opretholde tre identiske repliker af bælg til enhver tid. Selvom dette er en god måde at tilføje redundans på, er det muligvis ikke tilstrækkeligt til forskellige belastninger. Kubernetes leverer en anden ressource kendt som den vandrette pod autoscaler, som kan skaler antallet af bælg i en implementering baseret på observerede metrics som CPU-udnyttelse.

Bemærk, at vi lige har dækket aspekterne til skalerbarhed af applikationen, der hostes i en Kubernetes-klynge. Dette betyder ikke nødvendigvis, at selve den underliggende klynge er skalerbar. Oprettelse af en høj tilgængelighed Kubernetes-klynge er en ikke-triviel opgave og uden for omfanget af denne selvstudie.

8. Resulterende reaktivt system

Nu hvor vi har foretaget flere forbedringer i vores arkitektur, er det måske tid til at evaluere dette i forhold til definitionen af ​​et reaktivt system. Vi holder evalueringen i forhold til de fire egenskaber ved et reaktivt system, som vi diskuterede tidligere i vejledningen:

  • Lydhør: Vedtagelsen af ​​det reaktive programmeringsparadigme skal hjælpe os med at opnå ikke-blokering fra ende til ende og dermed en responsiv applikation
  • Robust: Kubernetes-implementering med ReplicaSet af det ønskede antal bælg skal give modstandsdygtighed mod tilfældige fejl
  • Elastisk: Kubernetes klynge og ressourcer skal give os den nødvendige støtte til at være elastiske over for uforudsigelige belastninger
  • Meddelelsesdrevet: At have al service-til-service-kommunikation håndteret asynkront gennem en Kafka-mægler, bør hjælpe os her

Selvom dette ser ret lovende ud, er det langt fra overstået. For at være ærlig, søgen efter et virkelig reaktivt system bør være en kontinuerlig øvelse af forbedringer. Vi kan aldrig forhindre alt, hvad der kan mislykkes, i en meget kompleks infrastruktur, hvor vores applikation kun er en lille del.

Et reaktivt system vil således kræve pålidelighed fra alle dele, der udgør helheden. Lige fra det fysiske netværk til infrastrukturtjenester som DNS, skal de alle falde i kø for at hjælpe os med at nå det endelige mål.

Ofte er det muligvis ikke muligt for os at administrere og yde de nødvendige garantier for alle disse dele. Og det er her en administreret skyinfrastruktur hjælper med at lindre vores smerte. Vi kan vælge mellem en række tjenester som IaaS (Infeastrure-as-a-Service), BaaS (Backend-as-a-Service) og PaaS (Platform-as-a-Service) for at delegere ansvaret til eksterne parter. Dette giver os ansvaret for vores ansøgning så vidt muligt.

9. Konklusion

I denne vejledning gennemgik vi det grundlæggende i reaktive systemer, og hvordan sammenlignes det med reaktiv programmering. Vi oprettede en simpel applikation med flere mikrotjenester og fremhævede de problemer, vi har til hensigt at løse med et reaktivt system.

Desuden gik vi videre med at introducere reaktiv programmering, meddelelsesbaseret arkitektur og containerorkestreringstjeneste i arkitekturen for at realisere et reaktivt system.

Endelig diskuterede vi den resulterende arkitektur, og hvordan den forbliver en rejse mod det reaktive system! Denne vejledning introducerer os ikke til alle de værktøjer, rammer eller mønstre, der kan hjælpe os med at skabe et reaktivt system, men det introducerer os til rejsen.

Som normalt kan kildekoden til denne artikel findes på GitHub.