Opbygning af en datarørledning med Flink og Kafka

1. Oversigt

Apache Flink er en stream-behandlingsramme, der let kan bruges med Java. Apache Kafka er et distribueret stream-behandlingssystem, der understøtter høj fejltolerance.

I denne vejledning vil vi se på, hvordan man bygger en datarørledning ved hjælp af disse to teknologier.

2. Installation

For at installere og konfigurere Apache Kafka henvises til den officielle guide. Efter installationen kan vi bruge følgende kommandoer til at oprette de nye emner, der kaldes flink_input og flink_output:

 bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ --replikationsfaktor 1 - partitioner 1 \ --topic flink_output bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ --replikationsfaktor 1 - partitioner 1 \ --emne flink_input

Af hensyn til denne vejledning bruger vi standardkonfiguration og standardporte til Apache Kafka.

3. Flink-brug

Apache Flink tillader en realtids strømbehandlingsteknologi. Rammen tillader brug af flere tredjepartssystemer som streamkilder eller dræn.

I Flink - der findes forskellige stik:

  • Apache Kafka (kilde / vask)
  • Apache Cassandra (vask)
  • Amazon Kinesis Streams (kilde / vask)
  • Elastiksøgning (vask)
  • Hadoop FileSystem (vask)
  • RabbitMQ (kilde / vask)
  • Apache NiFi (kilde / vask)
  • API til Twitter-streaming (kilde)

For at tilføje Flink til vores projekt skal vi medtage følgende Maven-afhængigheder:

 org.apache.flink flink-core 1.5.0 org.apache.flink flink-connector-kafka-0.11_2.11 1.5.0 

Tilføjelse af disse afhængigheder giver os mulighed for at forbruge og producere til og fra Kafka-emner. Du kan finde den aktuelle version af Flink på Maven Central.

4. Kafka-strengforbruger

For at forbruge data fra Kafka med Flink er vi nødt til at angive et emne og en Kafka-adresse. Vi skal også angive et gruppe-id, som bruges til at holde forskydninger, så vi ikke altid læser hele dataene fra starten.

Lad os oprette en statisk metode, der gør oprettelsen af FlinkKafkaConsumer lettere:

offentlig statisk FlinkKafkaConsumer011 createStringConsumerForTopic (String topic, String kafkaAddress, String kafkaGroup) {Properties rekvisitter = nye egenskaber (); props.setProperty ("bootstrap.servers", kafkaAddress); props.setProperty ("group.id", kafkaGroup); FlinkKafkaConsumer011 forbruger = ny FlinkKafkaConsumer011 (emne, ny SimpleStringSchema (), rekvisitter); returforbruger; }

Denne metode tager en emne, kafkaAdresse, og kafkaGruppe og skaber FlinkKafkaConsumer der vil forbruge data fra det givne emne som en Snor siden vi har brugt SimpleStringSchema at afkode data.

Nummeret 011 i klassens navn henviser til Kafka-versionen.

5. Kafka String Producer

For at producere data til Kafka er vi nødt til at angive Kafka-adresse og emne, som vi vil bruge. Igen kan vi oprette en statisk metode, der hjælper os med at skabe producenter til forskellige emner:

offentlig statisk FlinkKafkaProducer011 createStringProducer (strengemne, streng kafkaAddress) {returner ny FlinkKafkaProducer011 (kafkaAddress, emne, nyt SimpleStringSchema ()); }

Denne metode tager kun emne og kafkaAdresse som argumenter, da der ikke er behov for at angive gruppe-id, når vi producerer til Kafka-emne.

6. String Stream Processing

Når vi har en fuldt fungerende forbruger og producent, kan vi prøve at behandle data fra Kafka og derefter gemme vores resultater tilbage til Kafka. Den fulde liste over funktioner, der kan bruges til streambehandling kan findes her.

I dette eksempel vil vi kapitalisere ord i hver Kafka-post og derefter skrive det tilbage til Kafka.

Til dette formål er vi nødt til at oprette en brugerdefineret MapFunction:

offentlig klasse WordsCapitalizer implementerer MapFunction {@Override public String map (String s) {return s.toUpperCase (); }}

Efter oprettelse af funktionen kan vi bruge den til streambehandling:

public static void capitalize () {String inputTopic = "flink_input"; Streng outputTopic = "flink_output"; String consumerGroup = "baeldung"; Strengadresse = "localhost: 9092"; StreamExecutionEnvironment-miljø = StreamExecutionEnvironment .getExecutionEnvironment (); FlinkKafkaConsumer011 flinkKafkaConsumer = createStringConsumerForTopic (inputTopic, address, consumerGroup); DataStream stringInputStream = miljø .addSource (flinkKafkaConsumer); FlinkKafkaProducer011 flinkKafkaProducer = createStringProducer (outputTopic, adresse); stringInputStream .map (new WordsCapitalizer ()) .addSink (flinkKafkaProducer); }

Ansøgningen læser data fra flink_input emne, udfør operationer på stream og gem derefter resultaterne i flink_output emne i Kafka.

Vi har set, hvordan vi skal håndtere strenge ved hjælp af Flink og Kafka. Men ofte er det nødvendigt at udføre operationer på brugerdefinerede objekter. Vi får se, hvordan du gør dette i de næste kapitler.

7. Deserialisering af brugerdefineret objekt

Følgende klasse repræsenterer en simpel besked med information om afsender og modtager:

@JsonSerialize offentlig klasse InputMessage {Stringafsender; Stringmodtager LocalDateTime sentAt; String besked; }

Tidligere brugte vi SimpleStringSchema at deserialisere beskeder fra Kafka, men nu vi ønsker at deserialisere data direkte til brugerdefinerede objekter.

For at gøre dette har vi brug for en brugerdefineret Deserialisering Skema:

offentlig klasse InputMessageDeserializationSchema implementerer DeserializationSchema {statisk ObjectMapper objectMapper = ny ObjectMapper () .registerModule (ny JavaTimeModule ()); @ Override offentlige InputMessage deserialize (byte [] bytes) kaster IOException {return objectMapper.readValue (bytes, InputMessage.class); } @Override offentlig boolsk isEndOfStream (InputMessage inputMessage) {return false; } @ Override offentlig TypeInformation getProducedType () {return TypeInformation.of (InputMessage.class); }}

Vi antager her, at meddelelserne holdes som JSON i Kafka.

Da vi har et felt af typen LocalDateTime, skal vi specificere JavaTimeModule, som tager sig af kortlægning LocalDateTime modsætter sig JSON.

Flink-skemaer kan ikke have felter, der ikke kan serienummeres fordi alle operatører (som skemaer eller funktioner) serialiseres i starten af ​​jobbet.

Der er lignende problemer i Apache Spark. En af de kendte rettelser til dette problem er at initialisere felter som statisk, som vi gjorde med ObjectMapper over. Det er ikke den smukkeste løsning, men det er relativt simpelt og gør jobbet.

Metoden isEndOfStream kan bruges til det særlige tilfælde, når stream kun skal behandles, indtil nogle specifikke data er modtaget. Men det er ikke nødvendigt i vores tilfælde.

8. Serialisering af brugerdefineret objekt

Lad os nu antage, at vi ønsker, at vores system skal have mulighed for at oprette en sikkerhedskopi af meddelelser. Vi ønsker, at processen skal være automatisk, og hver sikkerhedskopi skal bestå af beskeder, der sendes i løbet af en hel dag.

En backup-besked skal også have en unik id tildelt.

Til dette formål kan vi oprette følgende klasse:

public class Backup {@JsonProperty ("inputMessages") Liste inputMessages; @JsonProperty ("backupTimestamp") LocalDateTime backupTimestamp; @JsonProperty ("uuid") UUID uuid; public Backup (List inputMessages, LocalDateTime backupTimestamp) {this.inputMessages = inputMessages; this.backupTimestamp = backupTimestamp; this.uuid = UUID.randomUUID (); }}

Vær opmærksom på, at UUID-genereringsmekanismen ikke er perfekt, da den tillader duplikater. Dette er dog nok til omfanget af dette eksempel.

Vi vil redde vores Backup objekt som JSON over for Kafka, så vi er nødt til at skabe vores SerialiseringSkema:

offentlig klasse BackupSerializationSchema implementerer SerializationSchema {ObjectMapper objectMapper; Logger logger = LoggerFactory.getLogger (BackupSerializationSchema.class); @ Overstyr offentlig byte [] serialiser (Backup backupMessage) {if (objectMapper == null) {objectMapper = new ObjectMapper () .registerModule (new JavaTimeModule ()); } prøv {return objectMapper.writeValueAsString (backupMessage) .getBytes (); } fange (com.fasterxml.jackson.core.JsonProcessingException e) {logger.error ("Mislykkedes at parse JSON", e); } returner ny byte [0]; }}

9. Tidsstempelbeskeder

Da vi ønsker at oprette en sikkerhedskopi til alle meddelelser hver dag, skal meddelelser have et tidsstempel.

Flink giver de tre forskellige tidsegenskaber EventTime, ProcessingTime, og Indtagelsestid.

I vores tilfælde er vi nødt til at bruge det tidspunkt, hvor beskeden er sendt, så vi bruger det EventTime.

At bruge EventTimevi har brug for en TidsstempelTildeler som udtrækker tidsstempler fra vores inputdata:

offentlig klasse InputMessageTimestampAssigner implementerer AssignerWithPunctuatedWatermarks {@Override public long extractTimestamp (InputMessage element, long previousElementTimestamp) {ZoneId zoneId = ZoneId.systemDefault (); return element.getSentAt (). atZone (zoneId) .toEpochSecond () * 1000; } @Nullable @Override public Watermark checkAndGetNextWatermark (InputMessage lastElement, long extractedTimestamp) {returner nyt vandmærke (extractedTimestamp - 1500); }}

Vi er nødt til at omdanne vores LocalDateTime til EpochSecond da dette er det format, der forventes af Flink. Efter tildeling af tidsstempler bruger alle tidsbaserede operationer tid fra sentAt felt til drift.

Da Flink forventer, at tidsstemplerne skal være i millisekunder og toEpochSecond () returnerer tid i sekunder, vi havde brug for til at gange den med 1000, så Flink opretter vinduer korrekt.

Flink definerer begrebet a Vandmærke. Vandmærker er nyttige i tilfælde af data, der ikke kommer i den rækkefølge, de blev sendt. Et vandmærke definerer den maksimale forsinkelse, der er tilladt for elementer, der skal behandles.

Elementer, der har tidsstempler lavere end vandmærket, behandles slet ikke.

10. Oprettelse af tidsvinduer

For at sikre, at vores sikkerhedskopi kun samler meddelelser, der er sendt i løbet af en dag, kan vi bruge timeWindowAll metode på strømmen, som deler meddelelser i windows.

Vi skal dog stadig samle meddelelser fra hvert vindue og returnere dem som Backup.

For at gøre dette har vi brug for en brugerdefineret Samlet funktion:

offentlig klasse BackupAggregator implementerer AggregateFunction {@Override public List createAccumulator () {returner ny ArrayList (); } @Override public List add (InputMessage inputMessage, List inputMessages) {inputMessages.add (inputMessage); return inputMessages; } @Override public Backup getResult (List inputMessages) {returner ny Backup (inputMessages, LocalDateTime.now ()); } @ Override public List merge (List inputMessages, List acc1) {inputMessages.addAll (acc1); return inputMessages; }}

11. Samlede sikkerhedskopier

Efter at have tildelt ordentlige tidsstempler og implementeret vores Samlet funktion, kan vi endelig tage vores Kafka-input og behandle det:

offentlig statisk ugyldighed createBackup () kaster undtagelse {String inputTopic = "flink_input"; Streng outputTopic = "flink_output"; String consumerGroup = "baeldung"; String kafkaAddress = "192.168.99.100:9092"; StreamExecutionEnvironment-miljø = StreamExecutionEnvironment.getExecutionEnvironment (); miljø.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); FlinkKafkaConsumer011 flinkKafkaConsumer = createInputMessageConsumer (inputTopic, kafkaAddress, consumerGroup); flinkKafkaConsumer.setStartFromEarliest (); flinkKafkaConsumer.assignTimestampsAndWatermarks (new InputMessageTimestampAssigner ()); FlinkKafkaProducer011 flinkKafkaProducer = createBackupProducer (outputTopic, kafkaAddress); DataStream inputMessagesStream = miljø.addSource (flinkKafkaConsumer); inputMessagesStream .timeWindowAll (Time.hours (24)) .aggregate (new BackupAggregator ()) .addSink (flinkKafkaProducer); miljø. udfør (); }

12. Konklusion

I denne artikel har vi præsenteret, hvordan man opretter en simpel datarørledning med Apache Flink og Apache Kafka.

Som altid kan koden findes på Github.


$config[zx-auto] not found$config[zx-overlay] not found