Opbygning af en datarørledning med Kafka, Spark Streaming og Cassandra

1. Oversigt

Apache Kafka er en skalerbar, høj ydeevne, platform med lav latenstid tillader læsning og skrivning af datastrømme som et messaging-system. Vi kan starte med Kafka i Java ret let.

Spark Streaming er en del af Apache Spark-platformen muliggør skalerbar, høj kapacitet, fejltolerant behandling af datastrømme. Selvom det er skrevet i Scala, tilbyder Spark Java API'er at arbejde med.

Apache Cassandra er en distribueret NoSQL-datalager med bred søjle. Flere detaljer om Cassandra er tilgængelige i vores tidligere artikel.

I denne vejledning kombinerer vi disse for at oprette en meget skalerbar og fejletolerant datarørledning til en datastream i realtid.

2. Installationer

For at starte skal vi bruge Kafka, Spark og Cassandra installeret lokalt på vores maskine for at køre applikationen. Vi får se, hvordan vi udvikler en datarørledning ved hjælp af disse platforme, når vi går videre.

Imidlertid forlader vi alle standardkonfigurationer inklusive porte til alle installationer, som hjælper med at få tutorial til at køre problemfrit.

2.1. Kafka

Installation af Kafka på vores lokale maskine er ret ligetil og kan findes som en del af den officielle dokumentation. Vi bruger 2.1.0-udgivelsen af ​​Kafka.

Ud over, Kafka kræver, at Apache Zookeeper kører men med henblik på denne tutorial udnytter vi den enkelt node Zookeeper-forekomst pakket med Kafka.

Når det er lykkedes os at starte Zookeeper og Kafka lokalt efter den officielle guide, kan vi fortsætte med at oprette vores emne, der hedder "meddelelser":

 $ KAFKA_HOME $ \ bin \ windows \ kafka-topics.bat --create \ --zookeeper localhost: 2181 \ --replikationsfaktor 1 - partitioner 1 \ - emnebeskeder

Bemærk, at ovenstående script er til Windows-platform, men der er også lignende scripts tilgængelige for Unix-lignende platforme.

2.2. Gnist

Spark bruger Hadoop's klientbiblioteker til HDFS og YARN. Følgelig, det kan være meget vanskeligt at samle de kompatible versioner af alle disse. Imidlertid kommer den officielle download af Spark færdigpakket med populære versioner af Hadoop. Til denne vejledning bruger vi version 2.3.0-pakke "forudbygget til Apache Hadoop 2.7 og nyere".

Når den rigtige pakke med Spark er pakket ud, kan de tilgængelige scripts bruges til at indsende ansøgninger. Vi ser dette senere, når vi udvikler vores applikation i Spring Boot.

2.3. Cassandra

DataStax stiller en community-udgave af Cassandra til rådighed til forskellige platforme inklusive Windows. Vi kan downloade og installere dette på vores lokale maskine meget let efter den officielle dokumentation. Vi bruger version 3.9.0.

Når det er lykkedes os at installere og starte Cassandra på vores lokale maskine, kan vi fortsætte med at oprette vores nøgleområde og tabel. Dette kan gøres ved hjælp af CQL Shell, der leveres med vores installation:

OPRET KEYSPACE-ordforråd MED REPLIKATION = {'class': 'SimpleStrategy', 'replication_factor': 1}; BRUG ordforråd; Opret TABELord (ordtekst PRIMÆR NØGLE, tæl int);

Bemærk, at vi har oprettet et navneområde, der hedder ordforråd og et bord deri kaldes ord med to kolonner, ordog tælle.

3. Afhængigheder

Vi kan integrere Kafka og Spark afhængigheder i vores applikation gennem Maven. Vi trækker disse afhængigheder fra Maven Central:

  • Core Spark
  • SQL Spark
  • Streaming Spark
  • Streaming Kafka Spark
  • Cassandra Spark
  • Cassandra Java Spark

Og vi kan tilføje dem til vores pom i overensstemmelse hermed:

 org.apache.spark gnist-core_2.11 2.3.0 leveret org.apache.spark gnist-sql_2.11 2.3.0 leveret org.apache.spark gnist-streaming_2.11 2.3.0 forudsat org.apache.spark gnist-streaming -kafka-0-10_2.11 2.3.0 com.datastax.spark gnist-cassandra-connector_2.11 2.3.0 com.datastax.spark gnist-cassandra-connector-java_2.11 1.5.2 

Bemærk, at nogle af disse afhængigheder er markeret som stillet til rådighed i omfang. Dette skyldes, at disse vil blive gjort tilgængelige af Spark-installationen, hvor vi indsender ansøgningen om udførelse ved hjælp af gnist-indsend.

4. Spark Streaming - Kafka-integrationsstrategier

På dette tidspunkt er det umagen værd at tale kort om integrationsstrategierne for Spark og Kafka.

Kafka introducerede ny forbruger-API mellem version 0.8 og 0.10. Derfor er de tilsvarende Spark Streaming-pakker tilgængelige for begge mæglerversioner. Det er vigtigt at vælge den rigtige pakke afhængigt af den tilgængelige mægler og de ønskede funktioner.

4.1. Spark Streaming Kafka 0.8

0.8-versionen er den stabile integrations-API med muligheder for at bruge den modtagerbaserede eller den direkte tilgang. Vi går ikke nærmere ind på detaljerne i disse tilgange, som vi kan finde i den officielle dokumentation. Et vigtigt punkt at bemærke her er, at denne pakke er kompatibel med Kafka Broker-versioner 0.8.2.1 eller højere.

4.2. Spark Streaming Kafka 0.10

Denne er i øjeblikket i en eksperimentel tilstand og er kun kompatibel med Kafka Broker-versioner 0.10.0 eller højere. Denne pakke tilbyder kun den direkte tilgang, der nu bruger den nye Kafka forbruger-API. Vi kan finde flere detaljer om dette i den officielle dokumentation. Vigtigere er det ikke bagudkompatibel med ældre Kafka Broker-versioner.

Bemærk, at til denne tutorial bruger vi 0.10-pakken. Den afhængighed, der er nævnt i det foregående afsnit, refererer kun til dette.

5. Udvikling af en datarørledning

Vi opretter en simpel applikation i Java ved hjælp af Spark, der integreres med det Kafka-emne, vi oprettede tidligere. Ansøgningen læser meddelelserne som sendt og tæller hyppigheden af ​​ord i hver besked. Dette vil derefter blive opdateret i Cassandra-tabellen, som vi oprettede tidligere.

Lad os hurtigt visualisere, hvordan dataene flyder:

5.1. Kom JavaStreamingContext

For det første begynder vi med at initialisere JavaStreamingContext hvilket er indgangspunktet for alle Spark Streaming-applikationer:

SparkConf sparkConf = ny SparkConf (); sparkConf.setAppName ("WordCountingApp"); sparkConf.set ("spark.cassandra.connection.host", "127.0.0.1"); JavaStreamingContext streamingContext = ny JavaStreamingContext (sparkConf, Durations.seconds (1));

5.2. Kom DStream fra Kafka

Nu kan vi oprette forbindelse til Kafka-emnet fra JavaStreamingContext:

Kort kafkaParams = nyt HashMap (); kafkaParams.put ("bootstrap.servers", "localhost: 9092"); kafkaParams.put ("key.deserializer", StringDeserializer.class); kafkaParams.put ("værdi.deserializer", StringDeserializer.class); kafkaParams.put ("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put ("auto.offset.reset", "seneste"); kafkaParams.put ("aktiver.auto.forpligtelse", falsk); Samlingsemner = Arrays.asList ("beskeder"); JavaInputDStream beskeder = KafkaUtils.createDirectStream (streamingContext, LocationStrategies.PreferConsistent (), ConsumerStrategies. Abonner (emner, kafkaParams));

Bemærk, at vi skal levere deserializers til nøgle og værdi her. For almindelige datatyper som Snor, deserializer er tilgængelig som standard. Men hvis vi ønsker at hente brugerdefinerede datatyper, bliver vi nødt til at levere brugerdefinerede deserializers.

Her har vi opnået JavaInputDStream som er en implementering af diskretiserede streams eller DStreams, den grundlæggende abstraktion leveret af Spark Streaming. Internt er DStreams intet andet end en kontinuerlig række RDD'er.

5.3. Behandling opnået DStream

Vi udfører nu en række operationer på JavaInputDStream for at få ordfrekvenser i meddelelserne:

JavaPairDStream-resultater = meddelelser .mapToPair (post -> ny Tuple2 (record.key (), record.value ())); JavaDStream linjer = resultater .map (tuple2 -> tuple2._2 ()); JavaDStream-ord = linjer .flatMap (x -> Arrays.asList (x.split ("\ s +")). Iterator ()); JavaPairDStream wordCounts = ord .mapToPair (s -> ny Tuple2 (s, 1)) .reduceByKey ((i1, i2) -> i1 + i2);

5.4. Vedvarende behandlet DStream ind i Cassandra

Endelig kan vi gentage det behandlede JavaPairDStream at indsætte dem i vores Cassandra-bord:

wordCounts.foreachRDD (javaRdd -> {Map wordCountMap = javaRdd.collectAsMap (); for (String key: wordCountMap.keySet ()) {List wordList = Arrays.asList (new Word (key, wordCountMap.get (key))); JavaRDD rdd = streamingContext.sparkContext (). Parallelize (wordList); javaFunctions (rdd) .writerBuilder ("ordforråd", "ord", mapToRow (Word.class)). SaveToCassandra ();}});

5.5. Kørsel af applikationen

Da dette er en stream-behandlingsapplikation, vil vi gerne holde dette kørende:

streamingContext.start (); streamingContext.awaitTermination ();

6. Udnyttelse af kontrolpunkter

I en stream-behandlingsapplikation Det er ofte nyttigt at bevare tilstanden mellem batcher af data, der behandles.

For eksempel er vi i vores tidligere forsøg kun i stand til at gemme den aktuelle frekvens af ordene. Hvad hvis vi vil gemme den kumulative frekvens i stedet? Spark Streaming gør det muligt gennem et koncept kaldet checkpoints.

Vi ændrer nu den rørledning, vi oprettede tidligere for at udnytte kontrolpunkter:

Bemærk, at vi kun bruger kontrolpunkter til databehandlingssessionen. Dette giver ikke fejltolerance. Dog kan kontrolpunkt også bruges til fejltolerance.

Der er et par ændringer, vi bliver nødt til at foretage i vores ansøgning for at udnytte kontrolpunkter. Dette inkluderer levering af JavaStreamingContext med et kontrolpunkt:

streamingContext.checkpoint ("./. checkpoint");

Her bruger vi det lokale filsystem til at gemme kontrolpunkter. For robusthed skal dette dog opbevares et sted som HDFS, S3 eller Kafka. Mere om dette findes i den officielle dokumentation.

Derefter bliver vi nødt til at hente kontrolpunktet og oprette et kumulativt antal ord, mens vi behandler hver partition ved hjælp af en kortlægningsfunktion:

JavaMapWithStateDStream cumulativeWordCounts = wordCounts .mapWithState (StateSpec.function ((word, one, state) -> {int sum = one.orElse (0) + (state.exists ()? state.get (): 0); Tuple2 output = new Tuple2 (ord, sum); state.update (sum); return output;}));

Når vi har fået det samlede antal ord, kan vi fortsætte med at gentage og gemme dem i Cassandra som før.

Bemærk, at mens datakontrolpunkt er nyttigt til stateful behandling, det kommer med en latensomkostning. Derfor er det nødvendigt at bruge dette klogt sammen med et optimalt kontrolpunktinterval.

7. Forståelse af forskydninger

Hvis vi husker nogle af de Kafka-parametre, vi har indstillet tidligere:

kafkaParams.put ("auto.offset.reset", "seneste"); kafkaParams.put ("aktivér.auto.forpligtelse", falsk);

Disse betyder dybest set det Vi ønsker ikke automatisk at forpligte sig til offset og vil gerne vælge den seneste offset hver gang en forbrugergruppe initialiseres. Derfor vil vores ansøgning kun være i stand til at forbruge meddelelser, der er sendt i den periode, den kører.

Hvis vi ønsker at forbruge alle meddelelser, der er sendt, uanset om applikationen kørte eller ej, og også vil holde styr på de beskeder, der allerede er sendt, vi bliver nødt til at konfigurere forskydningen korrekt sammen med at gemme forskydningstilstanden, selvom dette er lidt uden for omfanget af denne vejledning.

Dette er også en måde, hvorpå Spark Streaming tilbyder et bestemt niveau af garanti som "nøjagtigt en gang". Dette betyder grundlæggende, at hver besked, der sendes på Kafka-emnet, kun behandles nøjagtigt én gang af Spark Streaming.

8. Implementering af applikation

Vi kan implementere vores applikation ved hjælp af Spark-send-scriptet som leveres fyldt med gnistinstallationen:

$ SPARK_HOME $ \ bin \ spark-submit \ --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \ --master local [2] \ target \ spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies .krukke

Bemærk, at krukken, vi opretter ved hjælp af Maven, skal indeholde de afhængigheder, der ikke er markeret som stillet til rådighed i omfang.

Når vi først har indsendt denne ansøgning og sendt nogle meddelelser i det Kafka-emne, vi oprettede tidligere, skulle vi se de kumulative ordoptællinger, der blev sendt i Cassandra-tabellen, som vi oprettede tidligere.

9. Konklusion

For at opsummere lærte vi i denne vejledning, hvordan man opretter en simpel datapipeline ved hjælp af Kafka, Spark Streaming og Cassandra. Vi lærte også, hvordan man udnytter kontrolpunkter i Spark Streaming for at opretholde tilstand mellem batches.

Som altid er koden til eksemplerne tilgængelig på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found