Introduktion til Apache Kafka med Spring

Udholdenhedstop

Jeg har lige annonceret det nye Lær foråret kursus med fokus på det grundlæggende i Spring 5 og Spring Boot 2:

>> KONTROLLER KURSEN

1. Oversigt

Apache Kafka er et distribueret og fejltolerant strømbehandlingssystem.

I denne artikel dækker vi forårssupport til Kafka og niveauet af abstraktioner, det giver over native Kafka Java-klient-API'er.

Spring Kafka bringer den enkle og typiske Spring skabelon programmeringsmodel med en KafkaTemplate og meddelelsesdrevne POJO'er via @KafkaListener kommentar.

2. Installation og opsætning

For at downloade og installere Kafka henvises til den officielle guide her.

Vi skal også tilføje forårskafka afhængighed af vores pom.xml:

 org.springframework.kafka spring-kafka 2.3.7.RELEASE 

Den seneste version af denne artefakt kan findes her.

Vores eksempelapplikation vil være en Spring Boot-applikation.

Denne artikel antager, at serveren startes ved hjælp af standardkonfigurationen, og at ingen serverporte ændres.

3. Konfiguration af emner

Tidligere har vi kørt kommandolinjeværktøjer til at oprette emner i Kafka som:

$ bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ --replikationsfaktor 1 - partitioner 1 \ --topic mytopic

Men med introduktionen af AdminClient i Kafka kan vi nu oprette emner programmatisk.

Vi er nødt til at tilføje KafkaAdmin Springbønne, som automatisk tilføjer emner for alle bønner af typen Nyt emne:

@Configuration offentlig klasse KafkaTopicConfig {@Value (værdi = "$ {kafka.bootstrapAddress}") privat String bootstrapAddress; @Bean offentlig KafkaAdmin kafkaAdmin () {Map configs = new HashMap (); configs.put (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); returner nyt KafkaAdmin (configs); } @Bean public NewTopic topic1 () {return new NewTopic ("baeldung", 1, (short) 1); }}

4. Producere meddelelser

For at oprette meddelelser skal vi først konfigurere en ProducentFabrik som sætter strategien for oprettelse af Kafka Producent tilfælde.

Så har vi brug for en KafkaTemplate der indpakker en Producent forekomst og giver bekvemme metoder til at sende meddelelser til Kafka-emner.

Producent forekomster er trådsikre, og derfor vil brugen af ​​en enkelt forekomst i en applikationskontekst give højere ydeevne. Følgelig, KakfaTemplate forekomster er også trådsikre, og brug af en forekomst anbefales.

4.1. Producentkonfiguration

@Configuration public class KafkaProducerConfig {@Bean public ProducerFactory producerFactory () {Map configProps = new HashMap (); configProps.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); returner nyt DefaultKafkaProducerFactory (configProps); } @Bean offentlig KafkaTemplate kafkaTemplate () {returner ny KafkaTemplate (producerFactory ()); }}

4.2. Udgivelsesbeskeder

Vi kan sende beskeder ved hjælp af KafkaTemplate klasse:

@Autowired privat KafkaTemplate kafkaTemplate; public void sendMessage (String msg) {kafkaTemplate.send (topicName, msg); }

Det sende API returnerer en ListenableFuture objekt. Hvis vi vil blokere sendetråden og få resultatet om den sendte besked, kan vi ringe til API af ListenableFuture objekt. Tråden venter på resultatet, men det sænker producenten.

Kafka er en hurtig stream-behandlingsplatform. Så det er en bedre idé at håndtere resultaterne asynkront, så de efterfølgende meddelelser ikke venter på resultatet af den forrige besked. Vi kan gøre dette gennem en tilbagekaldelse:

public void sendMessage (streng besked) {ListenableFuture fremtid = kafkaTemplate.send (topicName, besked); future.addCallback (ny ListenableFutureCallback() {@Override public void onSuccess (SendResult result) {System.out.println ("Sendt besked = [" + besked + "] med forskydning = [" + resultat.getRecordMetadata (). Forskydning () + "]") ; } @Override public void onFailure (Throwable ex) {System.out.println ("Kan ikke sende besked = [" + besked + "] på grund af:" + ex.getMessage ()); }}); }

5. Forbrugende meddelelser

5.1. Forbrugerkonfiguration

For at forbruge meddelelser skal vi konfigurere en Forbrugerfabrik og en KafkaListenerContainerFactory. Når disse bønner er tilgængelige på Spring Bean-fabrikken, kan POJO-baserede forbrugere konfigureres ved hjælp af @KafkaListener kommentar.

@EnableKafka kommentar kræves i konfigurationsklassen for at muliggøre detektion af @KafkaListener kommentar på forårshåndterede bønner:

@EnableKafka @Configuration offentlig klasse KafkaConsumerConfig {@Bean public ConsumerFactory consumerFactory () {Map rekvisitter = ny HashMap (); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put (ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); returner nye DefaultKafkaConsumerFactory (rekvisitter); } @Bean offentlig ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory fabrik = ny ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (forbrugerfabrik ()); retur fabrik; }}

5.2. Forbruger beskeder

@KafkaListener (topics = "topicName", groupId = "foo") offentlig ugyldig listenGroupFoo (strengbesked) {System.out.println ("Modtaget besked i gruppefo:" + besked); }

Flere lyttere kan implementeres til et emne, hver med en anden gruppe-id. Desuden kan en forbruger lytte efter beskeder fra forskellige emner:

@KafkaListener (topics = "topic1, topic2", groupId = "foo")

Spring understøtter også hentning af en eller flere beskedoverskrifter ved hjælp af @Header kommentar i lytteren:

@KafkaListener (topics = "topicName") public void listenWithHeaders (@Payload String message, @Header (KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {System.out.println ("Modtaget meddelelse:" + meddelelse "+" fra partition: "+ partition);}

5.3. Forbrugende meddelelser fra en bestemt partition

Som du måske har bemærket, havde vi oprettet emnet baeldung med kun en partition. Men for et emne med flere partitioner, a @KafkaListener kan eksplicit abonnere på en bestemt partition af et emne med en indledende forskydning:

@KafkaListener (topicPartitions = @TopicPartition (topic = "topicName", partitionOffsets = {@PartitionOffset (partition = "0", initialOffset = "0"), @PartitionOffset (partition = "3", initialOffset = "0")}) , containerFactory = "partitionsKafkaListenerContainerFactory") offentlig ugyldig listenToPartition (@Payload String besked, @Header (KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {System.out.println ("Modtaget meddelelse:" + meddelelse "+" fra partition: "+ partition) ;}

Siden den initialOffset er sendt til 0 i denne lytter, vil alle de tidligere forbrugte meddelelser fra partitioner 0 og tre blive genbrugt hver gang denne lytter initialiseres. Hvis det ikke er nødvendigt at indstille forskydningen, kan vi bruge skillevægge ejendom af @TopicPartition kommentar for kun at indstille partitionerne uden forskydning:

@KafkaListener (topicPartitions = @TopicPartition (topic = "topicName", partitioner = {"0", "1"}))

5.4. Tilføjelse af beskedfilter for lyttere

Lyttere kan konfigureres til at forbruge bestemte typer meddelelser ved at tilføje et brugerdefineret filter. Dette kan gøres ved at indstille en RecordFilterStrategy til KafkaListenerContainerFactory:

@Bean offentlig ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory fabrik = ny ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (forbrugerfabrik ()); factory.setRecordFilterStrategy (record -> record.value (). indeholder ("Verden")); retur fabrik; }

En lytter kan derefter konfigureres til at bruge denne containerfabrik:

@KafkaListener (topics = "topicName", containerFactory = "filterKafkaListenerContainerFactory") offentlig ugyldig listenWithFilter (strengmeddelelse) {System.out.println ("Modtaget meddelelse i filtreret lytter:" + meddelelse); }

I denne lytter, alle meddelelser, der matcher filteret, kasseres.

6. Brugerdefinerede meddelelsesomformere

Indtil videre har vi kun dækket afsendelse og modtagelse af strenge som beskeder. Vi kan dog også sende og modtage brugerdefinerede Java-objekter. Dette kræver konfiguration af passende serializer i ProducentFabrik og deserializer i Forbrugerfabrik.

Lad os se på en simpel bønneklasse, som vi sender som beskeder:

offentlig klasse hilsen {privat streng msg; privat strengnavn; // standard getters, setters og constructor}

6.1. Producerer brugerdefinerede meddelelser

I dette eksempel vil vi bruge JsonSerializer. Lad os se på koden til ProducentFabrik og KafkaTemplate:

@Bean offentlig ProducerFactory greetingProducerFactory () {// ... configProps.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); returner nyt DefaultKafkaProducerFactory (configProps); } @Bean public KafkaTemplate greetingKafkaTemplate () {returner ny KafkaTemplate (greetingProducerFactory ()); }

Denne nye KafkaTemplate kan bruges til at sende Hilsen besked:

kafkaTemplate.send (topicName, ny hilsen ("Hej", "Verden"));

6.2. Forbruger brugerdefinerede meddelelser

Lad os på samme måde ændre Forbrugerfabrik og KafkaListenerContainerFactory at deserialisere hilsenbeskeden korrekt:

@Bean public ConsumerFactory greetingConsumerFactory () {// ... returner nyt DefaultKafkaConsumerFactory (rekvisitter, ny StringDeserializer (), ny JsonDeserializer (Greeting.class)); } @Bean offentlig ConcurrentKafkaListenerContainerFactory hilsenKafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory fabrik = ny ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (hilsenConsumerFactory ()); retur fabrik; }

Spring-kafka JSON serializer og deserializer bruger Jackson-biblioteket, som også er en valgfri mavenafhængighed til spring-kafka-projektet. Så lad os tilføje det til vores pom.xml:

 com.fasterxml.jackson.core jackson-databind 2.9.7 

I stedet for at bruge den nyeste version af Jackson, anbefales det at bruge den version, der føjes til pom.xml af forårskafka.

Endelig er vi nødt til at skrive en lytter til at forbruge Hilsen Beskeder:

@KafkaListener (topics = "topicName", containerFactory = "greetingKafkaListenerContainerFactory") offentlig ugyldig greetingListener (hilsenhilsen) {// proceshilsen}

7. Konklusion

I denne artikel dækkede vi det grundlæggende i Spring Support til Apache Kafka. Vi kiggede kort på de klasser, der bruges til at sende og modtage meddelelser.

Komplet kildekode til denne artikel kan findes på GitHub. Inden du udfører koden, skal du sørge for, at Kafka-serveren kører, og at emnerne oprettes manuelt.

Persistens bund

Jeg har lige annonceret det nye Lær foråret kursus med fokus på det grundlæggende i Spring 5 og Spring Boot 2:

>> KONTROLLER KURSEN