Test af Kafka og Spring Boot

1. Oversigt

Apache Kafka er et kraftfuldt, distribueret, fejltolerant strømbehandlingssystem. I en tidligere tutorial lærte vi at arbejde med Spring og Kafka.

I denne vejledning vi bygger videre på den forrige og lærer at skrive pålidelige, selvstændige integrationstest, der ikke er afhængige af, at en ekstern Kafka-server kører.

Først starter vi, men ser på, hvordan man bruger og konfigurerer en indlejret forekomst af Kafka. Så ser vi, hvordan vi kan bruge de populære testcontainere fra vores tests.

2. Afhængigheder

Selvfølgelig skal vi tilføje standarden forårskafka afhængighed af vores pom.xml:

 org.springframework.kafka spring-kafka 2.6.3.RELEASE 

Så har vi brug for yderligere to afhængigheder specifikt til vores tests. Først tilføjer vi spring-kafka-test artefakt:

 org.springframework.kafka spring-kafka-test 2.6.3.RELEASE test 

Og endelig tilføjer vi Testcontainers Kafka-afhængighed, som også er tilgængelig over på Maven Central:

 org.testcontainers kafka 1.15.0 test 

Nu hvor vi har konfigureret alle de nødvendige afhængigheder, kan vi skrive en simpel Spring Boot-applikation ved hjælp af Kafka.

3. En simpel applikation fra Kafka Producent og Forbruger

Gennem hele denne tutorial vil vores test fokusere på en simpel producent-forbruger Spring Boot Kafka-applikation.

Lad os starte med at definere vores applikationsindgangspunkt:

@SpringBootApplication @EnableAutoConfiguration offentlig klasse KafkaProducerConsumerApplication {public static void main (String [] args) {SpringApplication.run (KafkaProducerConsumerApplication.class, args); }}

Som vi kan se, er dette en standard Spring Boot-applikation. Hvor det er muligt, vil vi gøre brug af standardkonfigurationsværdier. Med dette i tankerne gør vi brug af @EnableAutoConfiguration kommentar til automatisk konfiguration af vores ansøgning.

3.1. Producentopsætning

Lad os derefter overveje en producentbønne, som vi bruger til at sende meddelelser til et givet Kafka-emne:

@Komponent offentlig klasse KafkaProducer {privat statisk endelig Logger LOGGER = LoggerFactory.getLogger (KafkaProducer.class); @Autowired privat KafkaTemplate kafkaTemplate; offentlig tomrumsudsendelse (strengemne, strengnyttelast) {LOGGER.info ("sending nyttelast =" {} "til topic =" {} "", nyttelast, emne); kafkaTemplate.send (emne, nyttelast); }}

Vores KafkaProducer bønne defineret ovenfor er kun en indpakning omkring KafkaTemplate klasse. Denne klasse giver trådsikre operationer på højt niveau, såsom at sende data til det angivne emne, hvilket er nøjagtigt hvad vi gør i vores sende metode.

3.2. Forbrugeropsætning

På samme måde definerer vi nu en simpel forbrugerbønne, der lytter til et Kafka-emne og modtager meddelelser:

@Komponent offentlig klasse KafkaConsumer {privat statisk endelig Logger LOGGER = LoggerFactory.getLogger (KafkaConsumer.class); privat CountDownLatch-lås = ny CountDownLatch (1); privat streng nyttelast = null; @KafkaListener (topics = "$ {test.topic}") offentlig ugyldig modtagelse (ConsumerRecord consumerRecord) {LOGGER.info ("modtaget nyttelast =" {} "", consumerRecord.toString ()); setPayload (consumerRecord.toString ()); latch.countDown (); } offentlig CountDownLatch getLatch () {returlås; } offentlig streng getPayload () {retur nyttelast; }}

Vores enkle forbruger bruger @KafkaListener kommentar på modtage metode til at lytte til beskeder om et givet emne. Vi ser senere, hvordan vi konfigurerer test.topic fra vores tests.

Desuden gemmer modtagemetoden beskedindholdet i vores bønne og reducerer antallet af låsen variabel. Denne variabel er et simpelt trådsikkert tællerfelt, som vi bruger senere fra vores tests for at sikre, at vi med succes modtog en besked.

Nu hvor vi har vores enkle Kafka-applikation ved hjælp af Spring Boot implementeret, lad os se, hvordan vi kan skrive integrationstest.

4. Et ord om testning

Generelt, når vi skriver rene integrationstest, bør vi ikke stole på eksterne tjenester, som vi muligvis ikke er i stand til at kontrollere eller måske pludselig holder op med at arbejde. Dette kan have negative virkninger på vores testresultater.

Tilsvarende, hvis vi er afhængige af en ekstern tjeneste, i dette tilfælde en kørende Kafka-mægler, vil vi sandsynligvis ikke være i stand til at oprette den, kontrollere den og rive den ned, som vi ønsker fra vores tests.

4.1. Applikationsegenskaber

Vi skal bruge et meget let sæt applikationskonfigurationsegenskaber fra vores tests. Vi definerer disse egenskaber i vores src / test / resources / application.yml fil:

forår: kafka: forbruger: auto-offset-reset: tidligste gruppe-id: baeldung test: emne: embedded-test-topic

Dette er det mindste sæt egenskaber, som vi har brug for, når vi arbejder med en indlejret forekomst af Kafka eller en lokal mægler.

De fleste af disse er selvforklarende, men den, vi skal fremhæve af særlig betydning, er forbrugernes ejendom auto-offset-reset: tidligst. Denne egenskab sikrer, at vores forbrugergruppe får de meddelelser, vi sender, fordi containeren muligvis starter, når afsendelsen er afsluttet.

Derudover konfigurerer vi en emneegenskab med værdien embedded-test-topic, hvilket er emnet, vi bruger fra vores tests.

5. Test ved hjælp af Embedded Kafka

I dette afsnit vil vi se på, hvordan du bruger en Kafka-instans i hukommelsen til at køre vores tests mod. Dette er også kendt som Embedded Kafka.

Afhængigheden spring-kafka-test vi tilføjede tidligere indeholder nogle nyttige hjælpeprogrammer til at hjælpe med at teste vores applikation. Mest især indeholder den EmbeddedKafkaBroker klasse.

Med det i tankerne, lad os gå videre og skrive vores første integrationstest:

@SpringBootTest @DirtiesContext @EmbeddedKafka (partitioner = 1, brokerProperties = {"lyttere = PLAINTEXT: // localhost: 9092", "port = 9092"}) klasse EmbeddedKafkaIntegrationTest {@Autowired private KafkaConsumer consumer; @Autowired privat KafkaProducer-producent; @Value ("$ {test.topic}") privat strengemne; @Test offentligt ugyldigt givetEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived () kaster undtagelse {producer.send (emne, "Afsendelse med egen enkel KafkaProducer"); consumer.getLatch (). afventer (10000, TimeUnit.MILLISECONDS); assertThat (consumer.getLatch (). getCount (), equalTo (0L)); assertThat (consumer.getPayload (), containString ("embedded-test-topic")); }}

Lad os gå gennem de vigtigste dele af vores test. Først begynder vi med at dekorere vores testklasse med to smukke standard-forårskommentarer:

  • Det @SpringBootTest annotering vil sikre, at vores test bootstraps Spring Application-konteksten
  • Vi bruger også @DirtiesContext kommentar, som vil sikre, at denne kontekst renses og nulstilles mellem forskellige tests

Her kommer den afgørende del, vi bruger @EmbeddedKafka kommentar til at indsprøjte en forekomst af en EmbeddedKafkaBroker ind i vores tests. Der er desuden flere egenskaber til rådighed, som vi kan bruge til at konfigurere den indlejrede Kafka-knude:

  • skillevægge - dette er antallet af partitioner, der bruges pr. Emne. For at holde tingene pæne og enkle, ønsker vi kun, at en skal bruges fra vores tests
  • mæglerEjendomme - yderligere ejendomme til Kafka-mægleren. Igen holder vi tingene enkle og angiver en lytter i almindelig tekst og et portnummer

Dernæst kobler vi automatisk til vores forbruger og producent klasser og konfigurere et emne til at bruge værdien fra vores application.properties.

Til det sidste stykke puslespil vi sender simpelthen en besked til vores testemne og verificerer, at beskeden er modtaget og indeholder navnet på vores testemne.

Når vi kører vores test, ser vi blandt de detaljerede forårsproduktioner:

... 12: 45: 35.099 [main] INFO cbkafka.embedded.KafkaProducer - sender nyttelast = "Sender med vores egen enkle KafkaProducer" til topic = "embedded-test-topic" ... 12: 45: 35.103 [org .springframework.kafka.KafkaListenerEndpointContainer # 0-0-C-1] INFO cbkafka.embedded.KafkaConsumer - received payload = 'ConsumerRecord (topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1605267935099, serialiseret nøglestørrelse = -1, seriel værdi størrelse = 41, headers = RecordHeaders (headers = [], isReadOnly = false),  nøgle = null, værdi = Sender med vores egen enkle KafkaProducer) ' 

Dette bekræfter, at vores test fungerer korrekt. Fantastisk! Vi har nu en måde at skrive selvstændige, uafhængige integrationstest ved hjælp af en in-memory Kafka-mægler.

6. Test af Kafka med testcontainere

Nogle gange kan vi se små forskelle mellem en reel ekstern tjeneste vs. en indlejret forekomst i hukommelsen af ​​en tjeneste, der er specifikt leveret til testformål. Selvom det er usandsynligt, kan det også være, at den port, der blev brugt fra vores test, muligvis var besat og forårsagede en fejl.

Med dette i tankerne vil vi i dette afsnit se en variation på vores tidligere tilgang til test ved hjælp af Testcontainers-rammen. Vi vil se, hvordan vi kan instantiere og administrere en ekstern Apache Kafka-mægler, der er vært inde i en Docker-container fra vores integrationstest.

Lad os definere en anden integrationstest, der vil være meget lig den, vi så i det foregående afsnit:

@RunWith (SpringRunner.class) @Import (com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class) @SpringBootTest (classes = KafkaProducerConsumerApplication.class) @ContainConTaCaTaCaTaCaTaCaTaCaTaCaTaCaTaCaTaCaTaCaTaCaTaTa .parse ("confluentinc / cp-kafka: 5.4.3")); @Autowired privat KafkaConsumer forbruger; @Autowired privat KafkaProducer-producent; @Value ("$ {test.topic}") privat strengemne; @Test offentligt ugyldigt givetKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived () kaster undtagelse {producer.send (emne, "Afsendelse med egen controller"); consumer.getLatch (). afventer (10000, TimeUnit.MILLISECONDS); assertThat (consumer.getLatch (). getCount (), equalTo (0L)); assertThat (consumer.getPayload (), containString ("embedded-test-topic")); }}

Lad os se på forskellene denne gang. Vi erklærer kafka felt, som er en standard JUnit @ClassRule. Dette felt er en forekomst af Kafka Container klasse, der forbereder og styrer livscyklussen for vores container, der kører Kafka.

For at undgå havnekollisioner tildeler Testcontainers et portnummer dynamisk, når vores dockercontainer starter. Af denne grund leverer vi en tilpasset forbruger- og producentfabrikskonfiguration ved hjælp af klassen KafkaTestContainersConfiguration:

@Bean offentligt kort consumerConfigs () {Kortrekvisitter = nyt HashMap (); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers ()); props.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "tidligst"); props.put (ConsumerConfig.GROUP_ID_CONFIG, "baeldung"); // flere standardkonfigurationsreturrekvisitter; } @Bean offentlig ProducerFactory producerFactory () {Map configProps = ny HashMap (); configProps.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers ()); // mere standardkonfiguration returnerer ny DefaultKafkaProducerFactory (configProps); }

Vi henviser derefter til denne konfiguration via @Importere kommentar i begyndelsen af ​​vores test.

Årsagen til dette er, at vi har brug for en måde at injicere serveradressen i vores applikation, som som tidligere nævnt genereres dynamisk. Vi opnår dette ved at kalde getBootstrapServers () metode, som returnerer bootstrap-serverplaceringen:

bootstrap.servers = [PLAINTEXT: // localhost: 32789]

Nu når vi kører vores test, skal vi se, at Testcontainers gør flere ting:

  • Kontrollerer vores lokale Docker-opsætning.
  • Trækker i confluentinc / cp-kafka: 5.4.3 dockerbillede, hvis det er nødvendigt
  • Starter en ny container og venter på, at den er klar
  • Til sidst lukker og sletter containeren, når vores test er afsluttet

Igen bekræftes dette ved at inspicere testoutputtet:

13: 33: 10.396 [main] INFO 🐳 [confluentinc / cp-kafka: 5.4.3] - Oprettelse af container til billede: confluentinc / cp-kafka: 5.4.3 13: 33: 10.454 [main] INFO 🐳 [confluentinc / cp -kafka: 5.4.3] - Start beholder med ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3 13: 33: 10,785 [main] INFO 🐳 [confluentinc / cp-kafka: 5.4.3] - container confluentinc / cp-kafka: 5.4.3 starter: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

Presto! En fungerende integrationstest ved hjælp af en Kafka docker container.

7. Konklusion

I denne artikel har vi lært om et par tilgange til test af Kafka-applikationer med Spring Boot. I den første tilgang så vi, hvordan man konfigurerer og bruger en lokal Kafka-mægler i hukommelsen.

Så så vi, hvordan man bruger Testcontainere til at oprette en ekstern Kafka-mægler, der kører inde i en dockercontainer fra vores tests.

Som altid er artiklens fulde kildekode tilgængelig på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found