Apache RocketMQ med Spring Boot

1. Introduktion

I denne vejledning opretter vi en meddelelsesproducent og forbruger ved hjælp af Spring Boot og Apache RocketMQ, en open source-distribueret messaging- og streaming-dataplatform.

2. Afhængigheder

For Maven-projekter er vi nødt til at tilføje RocketMQ Spring Boot Starter-afhængighed:

 org.apache.rocketmq rocketmq-spring-boot-starter 2.0.4 

3. Producere meddelelser

For vores eksempel opretter vi en grundlæggende meddelelsesproducent, der sender begivenheder, når brugeren tilføjer eller fjerner en vare fra indkøbskurven.

Lad os først oprette vores serverplacering og gruppenavn i vores application.properties:

rocketmq.name-server = 127.0.0.1: 9876 rocketmq.producer.group = vogn-producent-gruppe

Bemærk, at hvis vi havde mere end en navneserver, kunne vi liste dem som vært: port; vært: port.

For at gøre det enkelt opretter vi nu en CommandLineRunner applikation og generere et par begivenheder under applikationsstart:

@SpringBootApplication offentlig klasse CartEventProducer implementerer CommandLineRunner {@Autowired private RocketMQTemplate raketMQTemplate; public static void main (String [] args) {SpringApplication.run (CartEventProducer.class, args); } offentlig ugyldig kørsel (String ... args) kaster Undtagelse {rocketMQTemplate.convertAndSend ("cart-item-add-topic", ny CartItemEvent ("cykel", 1)); rocketMQTemplate.convertAndSend ("cart-item-add-topic", nyt CartItemEvent ("computer", 2)); rocketMQTemplate.convertAndSend ("cart-item-fjernet-emne", nyt CartItemEvent ("cykel", 1)); }}

Det CartItemEvent består af kun to egenskaber - varens id og en mængde:

klasse CartItemEvent {private String itemId; privat int mængde; // konstruktør, getters og setters}

I ovenstående eksempel bruger vi convertAndSend () metode, en generisk metode defineret af AbstraktMeddelelseSendeskabelon abstrakt klasse for at sende vores vognbegivenheder. Det tager to parametre: En destination, som i vores tilfælde er et emnenavn og en beskednyttelast.

4. Beskedforbruger

At forbruge RocketMQ-meddelelser er så simpelt som at oprette en Spring-komponent, der er kommenteret med @RocketMQMessageListener og implementering af RocketMQListener grænseflade:

@SpringBootApplication offentlig klasse CartEventConsumer {public static void main (String [] args) {SpringApplication.run (CartEventConsumer.class, args); } @Service @RocketMQMessageListener (topic = "cart-item-add-topic", consumerGroup = "cart-consumer_cart-item-add-topic") offentlig klasse CardItemAddConsumer implementerer RocketMQListener {public void onMessage (CartItemEvent addItemEvent) {log.info ( "Tilføjer element: {}", addItemEvent); // yderligere logik}} @Service @RocketMQMessageListener (topic = "cart-item-removed-topic", consumerGroup = "cart-consumer_cart-item-removed-topic") offentlig klasse CardItemRemoveConsumer implementerer RocketMQListener {public void onMessage (CartItemEvent removeItemEvent) {log.info ("Fjerner element: {}", removeItemEvent); // yderligere logik}}}

Vi er nødt til at oprette en separat komponent til ethvert beskedemne, vi lytter til. I hver af disse lyttere definerer vi navnet på emnet og forbrugergruppenavnet gennem @RocketMQMessageListener kommentar.

5. Synkron og asynkron transmission

I de foregående eksempler brugte vi convertAndSend metode til at sende vores beskeder. Vi har dog nogle andre muligheder.

Vi kunne for eksempel ringe syncSend som er forskellig fra convertAndSend fordi det vender tilbage SendResult objekt.

Den kan f.eks. Bruges til at kontrollere, om vores besked blev sendt med succes eller få dens id:

offentlig ugyldig kørsel (String ... args) kaster undtagelse {SendResult addBikeResult = rocketMQTemplate.syncSend ("cart-item-add-topic", nyt CartItemEvent ("cykel", 1)); SendResult addComputerResult = rocketMQTemplate.syncSend ("cart-item-add-topic", ny CartItemEvent ("computer", 2)); SendResult removeBikeResult = rocketMQTemplate.syncSend ("cart-item-fjernet-emne", nyt CartItemEvent ("cykel", 1)); }

Synes godt om convertAndSend, denne metode returneres kun, når afsendelsesproceduren er afsluttet.

Vi bør bruge synkron transmission i tilfælde, der kræver høj pålidelighed, såsom vigtige meddelelsesmeddelelser eller SMS-beskeder.

På den anden side vil vi måske i stedet sende beskeden asynkront og blive underrettet, når afsendelsen er afsluttet.

Vi kan gøre dette med asyncSend, som tager en SendCallback som parameter og vender straks tilbage:

rocketMQTemplate.asyncSend ("cart-item-add-topic", nyt CartItemEvent ("bike", 1), nyt SendCallback () {@Override offentligt ugyldigt onSuccess (SendResult sendResult) {log.error ("Vellykket sendt kurvemne") ;} @Override offentligt ugyldigt påException (Throwable throwable) {log.error ("Undtagelse under afsendelse af kurvemne", throwable);}});

Vi bruger asynkron transmission i tilfælde, der kræver høj kapacitet.

Endelig kan vi bruge til scenarier, hvor vi har meget høje kapacitetskrav sendOneWay i stedet for asyncSend. sendOneWay er forskellig fra asyncSend ved at det ikke garanterer, at meddelelsen bliver sendt.

Envejstransmission kan også bruges til almindelige pålidelighedssager, såsom indsamling af logfiler.

6. Afsendelse af meddelelser i transaktion

RocketMQ giver os muligheden for at sende meddelelser inden for en transaktion. Vi kan gøre det ved at bruge sendInTransaction () metode:

MessageBuilder.withPayload (ny CartItemEvent ("cykel", 1)). Build (); rocketMQTemplate.sendMessageInTransaction ("test-transaktion", "emne-navn", msg, null);

Vi skal også implementere en RocketMQLocalTransactionListener grænseflade:

@RocketMQTransactionListener (txProducerGroup = "test-transaction") klasse TransactionListenerImpl implementerer RocketMQLocalTransactionListener {@Override public RocketMQLocalTransactionState executeLocalTransaction (Message msg, Object arg) {// ... lokal transaktionsproces, returner ROLLBACKNOWT. } @ Override offentlig RocketMQLocalTransactionState checkLocalTransaction (Message msg) {// ... tjek transaktionsstatus og returner ROLLBACK, COMMIT eller Ukendt returner RocketMQLocalTransactionState.COMMIT; }}

I sendMessageInTransaction (), er den første parameter transaktionsnavnet. Det skal være det samme som @RocketMQTransactionListener'S medlemsfelt txProducerGroup.

7. Konfiguration af meddelelsesproducent

Vi kan også konfigurere aspekter af selve meddelelsesproducenten:

  • rocketmq.producer.send-besked-timeout: Meddelelsen send timeout i millisekunder - standardværdien er 3000
  • rocketmq.producer.compress-message-body-threshold: Tærskel, over hvilken RocketMQ komprimerer meddelelser - standardværdien er 1024.
  • rocketmq.producer.max-besked-størrelse: Den maksimale beskedstørrelse i byte - standardværdien er 4096.
  • rocketmq.producer.retry-times-when-send-async-failed: Det maksimale antal forsøg, der skal udføres internt i asynkron tilstand, før der sendes fejl - standardværdien er 2.
  • rocketmq.producer.retry-next-server: Indikerer, om der skal forsøges en anden mægler, når intern fejl mislykkes - standardværdien er falsk.
  • rocketmq.producer.retry-times-when-send-failed: Det maksimale antal forsøg, der skal udføres internt i asynkron tilstand, før der sendes fejl - standardværdien er 2.

8. Konklusion

I denne artikel har vi lært, hvordan man sender og bruger meddelelser ved hjælp af Apache RocketMQ og Spring Boot. Som altid er al kildekode tilgængelig på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found