Introduktion til Spring Cloud Stream

1. Oversigt

Spring Cloud Stream er en ramme bygget oven på Spring Boot og Spring Integration hjælper med at skabe hændelsesdrevne eller meddelelsesdrevne mikrotjenester.

I denne artikel introducerer vi koncepter og konstruktioner af Spring Cloud Stream med nogle enkle eksempler.

2. Maven-afhængigheder

For at komme i gang skal vi tilføje Spring Cloud Starter Stream med mægleren RabbitMQ Maven-afhængighed som messaging-middleware til vores pom.xml:

 org.springframework.cloud spring-cloud-starter-stream-rabbit 1.3.0.RELEASE 

Og vi tilføjer modulafhængigheden fra Maven Central for også at muliggøre JUnit-support:

 org.springframework.cloud spring-cloud-stream-test-support 1.3.0.RELEASE test 

3. Hovedkoncepter

Mikrotjenestearkitektur følger princippet om “smarte slutpunkter og dumme rør”. Kommunikation mellem slutpunkter er drevet af messaging-middleware parter som RabbitMQ eller Apache Kafka. Tjenester kommunikerer ved at offentliggøre domænehændelser via disse slutpunkter eller kanaler.

Lad os gå igennem de koncepter, der udgør Spring Cloud Stream-rammen sammen med de essentielle paradigmer, som vi skal være opmærksomme på for at opbygge meddelelsesdrevne tjenester.

3.1. Konstruktioner

Lad os se på en simpel tjeneste i Spring Cloud Stream, der lytter til input bindende og sender et svar til produktion bindende:

@SpringBootApplication @EnableBinding (Processor.class) offentlig klasse MyLoggerServiceApplication {public static void main (String [] args) {SpringApplication.run (MyLoggerServiceApplication.class, args); } @StreamListener (Processor.INPUT) @SendTo (Processor.OUTPUT) offentlig LogMessage berigLogMessage (LogMessage log) {returner ny LogMessage (String.format ("[1]:% s", log.getMessage ())); }}

Annotationen @EnableBinding konfigurerer applikationen til at binde kanalerne INDGANG og PRODUKTION defineret i grænsefladen Processor. Begge kanaler er bindinger, der kan konfigureres til at bruge en konkret messaging-middleware eller bindemiddel.

Lad os se på definitionen af ​​alle disse begreber:

  • Bindinger - en samling af grænseflader, der identificerer input- og outputkanaler erklærende
  • Ringbind - implementering af messaging-middleware såsom Kafka eller RabbitMQ
  • Kanal - repræsenterer kommunikationsrøret mellem messaging-middleware og applikationen
  • StreamListeners - beskedhåndteringsmetoder i bønner, der automatisk påberåbes på en meddelelse fra kanalen efter MessageConverter gør serialisering / deserialisering mellem middleware-specifikke begivenheder og domæneobjekttyper / POJO'er
  • Messalvie Skemaer - bruges til serialisering og deserialisering af meddelelser, disse skemaer kan læses statisk fra et sted eller indlæses dynamisk, hvilket understøtter udviklingen af ​​domæneobjekttyper

3.2. Kommunikationsmønstre

Beskeder, der er udpeget til destinationer, leveres af Publicer-abonner meddelelsesmønster. Forlag kategoriserer beskeder i emner, hver identificeret ved et navn. Abonnenter udtrykker interesse for et eller flere emner. Middleware filtrerer beskederne og leverer de interessante emner til abonnenterne.

Nu kunne abonnenterne grupperes. EN forbrugergruppe er et sæt abonnenter eller forbrugere, identificeret ved en gruppe-id, inden for hvilke meddelelser fra et emne eller emnets partition leveres på en belastningsafbalanceret måde.

4. Programmeringsmodel

Dette afsnit beskriver det grundlæggende ved opbygning af Spring Cloud Stream-applikationer.

4.1. Funktionel testning

Teststøtten er en bindemiddelimplementering, der gør det muligt at interagere med kanalerne og inspicere meddelelser.

Lad os sende en besked til ovenstående berigLogMessage service og kontroller, om svaret indeholder teksten “[1]: “ i begyndelsen af ​​meddelelsen:

@RunWith (SpringJUnit4ClassRunner.class) @ContextConfiguration (classes = MyLoggerServiceApplication.class) @DirtiesContext public class MyLoggerApplicationTests {@Autowired private Processor pipe; @Autowired privat MessageCollector messageCollector; @Test offentlig ugyldig nårSendMessage_thenResponseShouldUpdateText () {pipe.input () .send (MessageBuilder.withPayload (ny LogMessage ("Dette er min besked")) .build ()); Objektnyttelast = messageCollector.forChannel (pipe.output ()) .poll () .getPayload (); assertEquals ("[1]: Dette er min besked", payload.toString ()); }}

4.2. Brugerdefinerede kanaler

I ovenstående eksempel brugte vi Processor interface leveret af Spring Cloud, som kun har en indgang og en udgangskanal.

Hvis vi har brug for noget andet, som en input og to outputkanaler, kan vi oprette en brugerdefineret processor:

offentlig grænseflade MyProcessor {String INPUT = "myInput"; @Input SubscribableChannel myInput (); @Output ("myOutput") MessageChannel anOutput (); @ Output MessageChannel anotherOutput (); }

Foråret vil give os den rette implementering af denne grænseflade. Kanalnavnene kan indstilles ved hjælp af annoteringer som i @Output (“myOutput”).

Ellers bruger Spring metodenavnene som kanalnavne. Derfor har vi tre kanaler kaldet myInput, minOutputog en anden output.

Lad os forestille os, at vi vil dirigere beskederne til en output, hvis værdien er mindre end 10, og til en anden output er værdien større end eller lig med 10:

@Autowired privat MyProcessor-processor; @StreamListener (MyProcessor.INPUT) public void routeValues ​​(Integer val) {if (val <10) {processor.anOutput (). Send (message (val)); } ellers {processor.anotherOutput (). send (besked (val)); }} privat statisk endelig Beskedmeddelelse (T val) {return MessageBuilder.withPayload (val) .build (); }

4.3. Betinget afsendelse

Bruger @StreamListener kommentar, det kan vi også filtrer de meddelelser, vi forventer hos forbrugeren ved hjælp af enhver betingelse, som vi definerer med SpEL-udtryk.

Som et eksempel kunne vi bruge betinget forsendelse som en anden tilgang til at dirigere meddelelser til forskellige udgange:

@Autowired privat MyProcessor-processor; @StreamListener (target = MyProcessor.INPUT, condition = "nyttelast = 10") offentlig ugyldig routeValuesToAnotherOutput (Integer val) {processor.anotherOutput (). Send (meddelelse (val)); }

Den eneste begrænsning af denne tilgang er, at disse metoder ikke må returnere en værdi.

5. Opsætning

Lad os oprette applikationen, der behandler meddelelsen fra RabbitMQ-mægleren.

5.1. Bindemiddelkonfiguration

Vi kan konfigurere vores applikation til at bruge standardbinderimplementeringen via META-INF / fjederbindere:

kanin: \ org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

Eller vi kan tilføje bindemiddelbiblioteket til RabbitMQ til klassestien ved at inkludere denne afhængighed:

 org.springframework.cloud spring-cloud-stream-binder-rabbit 1.3.0.RELEASE 

Hvis der ikke leveres bindemiddelimplementering, bruger Spring direkte kommunikation mellem kanalerne.

5.2. RabbitMQ-konfiguration

For at konfigurere eksemplet i afsnit 3.1 til at bruge RabbitMQ-bindemidlet skal vi opdatere ansøgning.yml placeret på src / main / ressourcer:

forår: sky: stream: bindinger: input: destination: kø.log.messages bindemiddel: local_rabbit output: destination: kø.pretty.log.messages bindemiddel: local_rabbit bindemidler: local_rabbit: type: kaninmiljø: forår: rabbitmq: vært: port : 5672 brugernavn: adgangskode: virtual-host: /

Det input binding bruger den kaldte udveksling kø.log.messages, og produktion binding bruger udvekslingen kø.pretty.log.messages. Begge bindinger bruger det kaldte bindemiddel local_rabbit.

Bemærk, at vi ikke behøver at oprette RabbitMQ-udvekslinger eller køer på forhånd. Når du kører applikationen, begge børser oprettes automatisk.

For at teste applikationen kan vi bruge RabbitMQ-administrationswebstedet til at offentliggøre en besked. I Udgiv besked udvekslingspanel kø.log.messages, skal vi indtaste anmodningen i JSON-format.

5.3. Tilpasning af meddelelseskonvertering

Spring Cloud Stream giver os mulighed for at anvende meddelelseskonvertering til bestemte indholdstyper. I ovenstående eksempel ønsker vi at give almindelig tekst i stedet for at bruge JSON-format.

For at gøre dette skal vi anvende en tilpasset transformation til LogMessage ved hjælp af en MessageConverter:

@SpringBootApplication @EnableBinding (Processor.class) offentlig klasse MyLoggerServiceApplication {// ... @Bean public MessageConverter providesTextPlainMessageConverter () {returner ny TextPlainMessageConverter (); } // ...}
offentlig klasse TextPlainMessageConverter udvider AbstractMessageConverter {public TextPlainMessageConverter () {super (ny MimeType ("tekst", "almindelig")); } @ Override beskyttede boolske understøttelser (Class clazz) {return (LogMessage.class == clazz); } @ Override beskyttet Object convertFromInternal (Message message, Class targetClass, Object conversionHint) {Object payload = message.getPayload (); Strengtekst = nyttelastforekomst af streng? (Streng) nyttelast: ny streng ((byte []) nyttelast); returner ny LogMessage (tekst); }}

Efter anvendelse af disse ændringer, gå tilbage til Udgiv besked panel, hvis vi indstiller overskriften “contentTypes" til "tekst / almindelig”Og nyttelasten til“Hej Verden“, Det skal fungere som før.

5.4. Forbrugergrupper

Når du kører flere forekomster af vores applikation, hver gang der er en ny besked i en inputkanal, underrettes alle abonnenter.

Det meste af tiden skal vi kun behandle beskeden en gang. Spring Cloud Stream implementerer denne adfærd via forbrugergrupper.

For at muliggøre denne adfærd kan hver forbrugerbinding bruge spring.cloud.stream.bindings..group egenskab for at specificere et gruppenavn:

spring: cloud: stream: bindings: input: destination: queue.log.messages binder: local_rabbit group: logMessageConsumers ...

6. Beskedstyrede mikrotjenester

I dette afsnit introducerer vi alle de nødvendige funktioner til at køre vores Spring Cloud Stream-applikationer i en mikroservicekontekst.

6.1. Opskalere

Når flere applikationer kører, er det vigtigt at sikre, at dataene opdeles ordentligt på tværs af forbrugere. For at gøre dette tilbyder Spring Cloud Stream to egenskaber:

  • spring.cloud.stream.instanceCount - antal kørende applikationer
  • spring.cloud.stream.instanceIndex - indeks over den aktuelle applikation

For eksempel, hvis vi har implementeret to forekomster af ovenstående MyLoggerServiceApplication ansøgning, ejendommen spring.cloud.stream.instanceCount skal være 2 for begge applikationer og ejendommen spring.cloud.stream.instanceIndex skal være henholdsvis 0 og 1.

Disse egenskaber indstilles automatisk, hvis vi distribuerer Spring Cloud Stream-applikationer ved hjælp af Spring Data Flow som beskrevet i denne artikel.

6.2. Partitionering

Domænehændelserne kunne være Partitioneret Beskeder. Dette hjælper, når vi er skalering af lagring og forbedring af applikationsydelse.

Domænehændelsen har normalt en partitionsnøgle, så den ender i den samme partition med relaterede meddelelser.

Lad os sige, at vi ønsker, at logmeddelelserne skal partitioneres med det første bogstav i meddelelsen, som ville være partitionsnøglen, og grupperes i to partitioner.

Der ville være en partition for de logbeskeder, der starter med ER og en anden partition til N-Z. Dette kan konfigureres ved hjælp af to egenskaber:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression - udtrykket for at opdele nyttelastene
  • spring.cloud.stream.bindings.output.producer.partitionCount - antallet af grupper

Undertiden er udtrykket for partition for komplekst til at skrive det på kun én linje. I disse tilfælde kan vi skrive vores brugerdefinerede partitionsstrategi ved hjælp af ejendommen spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass.

6.3. Sundhedsindikator

I en mikroservicekontekst er det også nødvendigt registrere, når en tjeneste er nede eller begynder at mislykkes. Spring Cloud Stream leverer ejendommen management.health.binders.enabled for at aktivere sundhedsindikatorer for bindemidler.

Når du kører applikationen, kan vi forespørge om sundhedsstatus på //:/sundhed.

7. Konklusion

I denne vejledning præsenterede vi hovedbegreberne i Spring Cloud Stream og viste, hvordan man bruger det gennem nogle enkle eksempler over RabbitMQ. Mere info om Spring Cloud Stream kan findes her.

Kildekoden til denne artikel kan findes på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found