Introduktion til KafkaStreams i Java

1. Oversigt

I denne artikel ser vi på KafkaStreams bibliotek.

KafkaStreams er konstrueret af skaberne af Apache Kafka. Det primære mål med dette stykke software er at give programmører mulighed for at oprette effektive streaming-applikationer i realtid, der kan fungere som Microservices.

KafkaStreams gør det muligt for os at forbruge fra Kafka-emner, analysere eller transformere data og potentielt sende dem til et andet Kafka-emne.

At demonstrere KafkaStreams, Vi opretter en enkel applikation, der læser sætninger fra et emne, tæller forekomster af ord og udskriver antallet pr. ord.

Vigtigt at bemærke er, at KafkaStreams biblioteket er ikke reaktivt og understøtter ikke asynkroniseringshandlinger og modtrykshåndtering.

2. Maven-afhængighed

For at begynde at skrive Stream-behandlingslogik ved hjælp af KafkaStreams, vi er nødt til at tilføje en afhængighed af kafka-vandløb og kafka-klienter:

 org.apache.kafka kafka-streams 1.0.0 org.apache.kafka kafka-clients 1.0.0 

Vi skal også have Apache Kafka installeret og startet, fordi vi bruger et Kafka-emne. Dette emne vil være datakilden til vores streamingjob.

Vi kan downloade Kafka og andre krævede afhængigheder fra det officielle websted.

3. Konfiguration af KafkaStreams-input

Den første ting, vi gør, er definitionen af ​​input Kafka-emnet.

Vi kan bruge Sammenflydende værktøj, som vi downloadede - det indeholder en Kafka Server. Den indeholder også kafka-konsol-producent som vi kan bruge til at offentliggøre beskeder til Kafka.

Lad os køre vores Kafka-klynge for at komme i gang:

./konfluent start

Når Kafka starter, kan vi definere vores datakilde og navn på vores applikation ved hjælp af APPLICATION_ID_CONFIG:

String inputTopic = "inputTopic";
Egenskaber streamsConfiguration = nye egenskaber (); streamsConfiguration.put (StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");

En vigtig konfigurationsparameter er BOOTSTRAP_SERVER_CONFIG. Dette er URL'en til vores lokale Kafka-forekomst, som vi lige startede:

private String bootstrapServers = "localhost: 9092"; streamsConfiguration.put (StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

Dernæst skal vi videregive typen af ​​nøgle og værdi af meddelelser, der forbruges fra inputTopic:

streamsConfiguration.put (StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String (). getClass (). getName ()); streamsConfiguration.put (StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String (). getClass (). getName ());

Stream-behandling er ofte stateful. Når vi vil gemme mellemresultater, skal vi specificere STATE_DIR_CONFIG parameter.

I vores test bruger vi et lokalt filsystem:

streamsConfiguration.put (StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory (). getAbsolutePath ()); 

4. Opbygning af en streamingtopologi

Når vi har defineret vores inputemne, kan vi oprette en streamingtopologi - det er en definition af, hvordan begivenheder skal håndteres og transformeres.

I vores eksempel vil vi gerne implementere en ordtæller. For hver sætning sendt til inputTopic, vi vil opdele det i ord og beregne forekomsten af ​​hvert ord.

Vi kan bruge en forekomst af KStreamsBuilder klasse for at begynde at konstruere vores topologi:

KStreamBuilder builder = ny KStreamBuilder (); KStream textLines = builder.stream (inputTopic); Mønster mønster = Mønster.kompil ("\ W +", Mønster.UNICODE_CHARACTER_CLASS); KTable wordCounts = textLines .flatMapValues ​​(værdi -> Arrays.asList (mønster.split (value.toLowerCase ()))) .groupBy ((nøgle, ord) -> word) .count ();

For at implementere ordtælling skal vi først dele værdierne ved hjælp af regulært udtryk.

Opdelingsmetoden returnerer en matrix. Vi bruger flatMapValues ​​() at flade det ud. Ellers ville vi ende med en liste over arrays, og det ville være ubelejligt at skrive kode ved hjælp af en sådan struktur.

Endelig samler vi værdierne for hvert ord og kalder det tælle() der beregner forekomster af et bestemt ord.

5. Håndteringsresultater

Vi har allerede beregnet antallet af ord for vores inputbeskeder. Lad os nu udskrive resultaterne på standardoutput ved hjælp af for hver() metode:

wordCounts .foreach ((w, c) -> System.out.println ("word:" + w + "->" + c));

Ved produktion kan et sådant streamingjob ofte offentliggøre output til et andet Kafka-emne.

Vi kunne gøre dette ved hjælp af til () metode:

String outputTopic = "outputTopic"; Serde stringSerde = Serdes.String (); Serde longSerde = Serdes.Long (); wordCounts.to (stringSerde, longSerde, outputTopic);

Det Serde klasse giver os forudkonfigurerede serializers til Java-typer, der vil blive brugt til at serialisere objekter til en række bytes. Arten af ​​bytes sendes derefter til Kafka-emnet.

Vi bruger Snor som en nøgle til vores emne og Lang som en værdi for det faktiske antal. Det til() metode gemmer de resulterende data til outputTopic.

6. Start af KafkaStream Job

Indtil dette punkt byggede vi en topologi, der kan udføres. Jobbet er dog ikke startet endnu.

Vi er nødt til at starte vores job eksplicit ved at ringe til Start() metode til KafkaStreams eksempel:

KafkaStreams streams = nye KafkaStreams (builder, streamsConfiguration); streams.start (); Tråd. Søvn (30000); streams.close ();

Bemærk, at vi venter 30 sekunder på, at jobbet er færdigt. I et virkeligt scenarie kører dette job hele tiden og behandler begivenheder fra Kafka, når de ankommer.

Vi kan teste vores job ved at offentliggøre nogle begivenheder til vores Kafka-emne.

Lad os starte en kafka-konsol-producent og manuelt sende nogle begivenheder til vores inputTopic:

./kafka-console-producer --topic inputTopic --broker-list localhost: 9092> "dette er en pony"> "dette er en hest og pony" 

På denne måde offentliggjorde vi to begivenheder til Kafka. Vores ansøgning vil forbruge disse begivenheder og udskrive følgende output:

ord: -> 1 ord: dette -> 1 ord: er -> 1 ord: a -> 1 ord: pony -> 1 ord: -> 2 ord: dette -> 2 ord: er -> 2 ord: a - > 2 ord: hest -> 1 ord: og -> 1 ord: pony -> 2

Vi kan se, at da den første besked ankom, ordet pony opstod kun én gang. Men da vi sendte den anden besked, ordet pony skete for anden gang udskrivning: “ord: pony -> 2 ″.

6. Konklusion

Denne artikel diskuterer, hvordan man opretter en primær streambehandlingsapplikation ved hjælp af Apache Kafka som datakilde og KafkaStreams bibliotek som streambehandlingsbiblioteket.

Alle disse eksempler og kodestykker findes i GitHub-projektet - dette er et Maven-projekt, så det skal være let at importere og køre som det er.