RabbitMQ-meddelelse afsendes med foråret AMQP

1. Introduktion

I denne vejledning undersøger vi begrebet fanout og emneudveksling med Spring AMQP og RabbitMQ.

På et højt niveau fanout udvekslinger vilje udsende den samme besked til alle bundne køer, mens emneudveksling brug en routing-nøgle til videregivelse af beskeder til en bestemt bundet kø eller køer.

Før læsning af Messaging With Spring anbefales AMQP til denne tutorial.

2. Opsætning af en Fanout Exchange

Lad os oprette en fanout-udveksling med to køer bundet til den. Når vi sender en besked til denne udveksling, modtager begge køer beskeden. Vores fanout-udveksling ignorerer enhver routingnøgle, der er inkluderet i meddelelsen.

Spring AMQP giver os mulighed for at samle alle erklæringer om køer, udvekslinger og bindinger i en Erklærbare objekt:

@Bean public Declarables fanoutBindings () {Queue fanoutQueue1 = new Queue ("fanout.queue1", false); Kø fanoutQueue2 = ny kø ("fanout.queue2", falsk); FanoutExchange fanoutExchange = ny FanoutExchange ("fanout.exchange"); returner nye Declarables (fanoutQueue1, fanoutQueue2, fanoutExchange, bind (fanoutQueue1) .to (fanoutExchange), BindingBuilder.bind (fanoutQueue2) .to (fanoutExchange)); }

3. Opsætning af en emneudveksling

Nu opretter vi også en emneudveksling med to køer, hver med et andet bindingsmønster:

@Bean public Declarables topicBindings () {Kø topicQueue1 = ny kø (topicQueue1navn, falsk); Kø topicQueue2 = ny kø (topicQueue2Name, false); TopicExchange topicExchange = ny TopicExchange (topicExchangeName); returner nye Declarables (topicQueue1, topicQueue2, topicExchange, BindingBuilder .bind (topicQueue1) .to (topicExchange) .with ("*. important. *"), BindingBuilder .bind (topicQueue2) .to (topicExchange) .with ("#. fejl")); }

En emneudveksling giver os mulighed for at binde køer til det med forskellige nøglemønstre. Dette er meget fleksibelt og giver os mulighed for at binde flere køer med det samme mønster eller endda flere mønstre til den samme kø.

Når meddelelsens routingnøgle matcher mønsteret, placeres den i køen. Hvis en kø har flere bindinger, der matcher meddelelsens routingnøgle, placeres kun en kopi af meddelelsen i køen.

Vores bindingsmønstre kan bruge en stjerne (“*”) til at matche et ord i en bestemt position eller et pundtegn (“#”) for at matche nul eller flere ord.

Så vores emneKø1 modtager beskeder, der har rutetaster, der har et tre-ords mønster, hvor mellemordet er "vigtigt" - for eksempel: “Bruger.important.error” eller “Blog.important.notification”.

Og vores emneKø2 vil modtage meddelelser, der har rutetaster, der ender med ordfejlen; matchende eksempler er "fejl", “Bruger.important.error” eller “Blog.post.save.error”.

4. Opsætning af en producent

Vi bruger convertAndSend metode til RabbitTemplate at sende vores eksempelbeskeder:

 Strengmeddelelse = "nyttelast udsendes"; return args -> {rabbitTemplate.convertAndSend (FANOUT_EXCHANGE_NAME, "", "fanout" + meddelelse); rabbitTemplate.convertAndSend (TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_WARN, "emne vigtigt advarsel" + besked); rabbitTemplate.convertAndSend (TOPIC_EXCHANGE_NAME, ROUTING_KEY_USER_IMPORTANT_ERROR, "emne vigtig fejl" + besked); };

Det RabbitTemplate giver mange overbelastede convertAndSend () metoder til forskellige udvekslingstyper.

Når vi sender en besked til en fanout-udveksling, ignoreres routing-nøglen, og meddelelsen sendes til alle bundne køer.

Når vi sender en besked til emneudvekslingen, skal vi videregive en routingnøgle. Baseret på denne routing-nøgle vil meddelelsen blive leveret til bestemte køer.

5. Konfiguration af forbrugere

Endelig lad os oprette fire forbrugere - en for hver kø - til at hente de producerede meddelelser:

 @RabbitListener (køer {{FANOUT_QUEUE_1_NAME}) offentlig ugyldig modtageMessageFromFanout1 (strengbesked) {System.out.println ("Modtaget fanout 1 besked:" + besked); } @RabbitListener (køer = {FANOUT_QUEUE_2_NAME}) offentlig ugyldig modtageMessageFromFanout2 (strengbesked) {System.out.println ("Modtaget fanout 2-meddelelse:" + besked); } @RabbitListener (køer {{TOPIC_QUEUE_1_NAME}) offentlig ugyldig modtageMessageFromTopic1 (strengmeddelelse) {System.out.println ("Modtaget emne 1 (" + BINDING_PATTERN_IMPORTANT + ") besked:" + besked); } @RabbitListener (køer {{TOPIC_QUEUE_2_NAME}) offentlig ugyldig modtageMessageFromTopic2 (strengmeddelelse) {System.out.println ("Modtaget emne 2 (" + BINDING_PATTERN_ERROR + ") besked:" + besked); }

Vi konfigurerer forbrugere ved hjælp af @RabbitListener kommentar. Det eneste argument, der sendes her, er køernes navn. Forbrugerne er ikke opmærksomme her på udveksling eller routingnøgler.

6. Kørsel af eksemplet

Vores prøveprojekt er en Spring Boot-applikation, og det initialiserer applikationen sammen med en forbindelse til RabbitMQ og opretter alle køer, udvekslinger og bindinger.

Som standard forventer vores applikation en RabbitMQ-instans, der kører på localhost på port 5672. Vi kan ændre dette og andre standardindstillinger i ansøgning.yaml.

Vores projekt udsætter HTTP-slutpunkt på URI - /udsende - der accepterer POST'er med en besked i anmodningsorganet.

Når vi sender en anmodning til denne URI med body “Test”, skal vi se noget lignende i output:

Modtaget fanout 1-meddelelse: fanout-nyttelast udsendes Modtaget emne 1 (* .vigtig. *) Besked: emne vigtigt advarsel nyttelast udsendes Modtaget emne 2 (# .fejl) besked: emne vigtig fejl nyttelast udsendes Modtaget fanout 2-besked: fanout nyttelast udsendes Modtaget emne 1 (* .vigtig. *) besked: emne vigtig fejl nyttelast udsendes

Den rækkefølge, hvor vi vil se disse meddelelser, er naturligvis ikke garanteret.

7. Konklusion

I denne hurtige vejledning dækkede vi fanout og emneudvekslinger med Spring AMQP og RabbitMQ.

Den komplette kildekode og alle kodestykker til denne tutorial er tilgængelige på GitHub-arkivet.