Introduktion til Kafka-stik

1. Oversigt

Apache Kafka® er en distribueret streamingplatform. I en tidligere vejledning diskuterede vi, hvordan Kafka-forbrugere og producenter implementeres ved hjælp af Spring.

I denne vejledning lærer vi, hvordan du bruger Kafka-stik.

Vi kigger på:

  • Forskellige typer Kafka-stik
  • Funktioner og tilstande til Kafka Connect
  • Stikkonfiguration ved hjælp af egenskabsfiler såvel som REST API

2. Grundlæggende om Kafka Connect og Kafka Connectors

Kafka Connect er en ramme til at forbinde Kafka med eksterne systemer såsom databaser, nøgleværdilagre, søgeindekser og filsystemer ved hjælp af såkaldte Stik.

Kafka-stik er brugsklare komponenter, som kan hjælpe os at importere data fra eksterne systemer til Kafka-emner og eksportere data fra Kafka-emner til eksterne systemer. Vi kan bruge eksisterende stikimplementeringer til almindelige datakilder og dræn eller implementere vores egne stik.

EN kildestik indsamler data fra et system. Kildesystemer kan være hele databaser, streamtabeller eller meddelelsesmæglere. Et kildekonnektor kunne også indsamle metrics fra applikationsservere til Kafka-emner, hvilket gør dataene tilgængelige til streambehandling med lav latenstid.

EN vaskestik leverer data fra Kafka-emner til andre systemer, som kan være indekser som Elasticsearch, batchsystemer som Hadoop eller enhver form for database.

Nogle stik vedligeholdes af samfundet, mens andre understøttes af Confluent eller dets partnere. Virkelig kan vi finde stik til de mest populære systemer som S3, JDBC og Cassandra, for bare at nævne nogle få.

3. Funktioner

Kafka Connect funktioner inkluderer:

  • En ramme til at forbinde eksterne systemer med Kafka - det forenkler udvikling, implementering og styring af stik
  • Distribueret og enkeltstående tilstande - det hjælper os med at implementere store klynger ved at udnytte den distribuerede karakter af Kafka såvel som opsætninger til udvikling, test og mindre produktionsinstallationer
  • REST-interface - vi kan administrere stik ved hjælp af en REST API
  • Automatisk offsetstyring - Kafka Connect hjælper os med at håndtere offset-forpligtelsesproces, hvilket sparer os besværet med at implementere denne fejlbehæftede del af connectorudvikling manuelt
  • Distribueret og skalerbart som standard - Kafka Connect bruger den eksisterende gruppestyringsprotokol; vi kan tilføje flere arbejdere til at opskalere en Kafka Connect-klynge
  • Streaming og batchintegration - Kafka Connect er en ideel løsning til at bygge bro over streaming- og batchdatasystemer i forbindelse med Kafkas eksisterende kapaciteter
  • Transformationer - disse gør det muligt for os at foretage enkle og lette ændringer til individuelle meddelelser

4. Opsætning

I stedet for at bruge den almindelige Kafka-distribution downloader vi Confluent Platform, en Kafka-distribution leveret af Confluent, Inc., firmaet bag Kafka. Confluent Platform leveres med nogle ekstra værktøjer og klienter sammenlignet med almindelig Kafka samt nogle ekstra forudbyggede stik.

For vores sag er Open Source-udgaven tilstrækkelig, som kan findes på Confluents websted.

5. Hurtig start Kafka Connect

For det første vil vi diskutere Kafka Connect-princippet, ved hjælp af de mest basale stik, som er filen kilde stik og filen håndvask stik.

Bekvemt leveres Confluent Platform med begge disse stik samt referencekonfigurationer.

5.1. Source Connector Configuration

For kildestikket er referencekonfigurationen tilgængelig på $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties:

navn = lokal-fil-kilde connector.class = FileStreamSource opgaver.max = 1 emne = connect-test fil = test.txt

Denne konfiguration har nogle egenskaber, der er almindelige for alle kildekonnektorer:

  • navn er et brugerdefineret navn til forbindelsesinstansen
  • stik. klasse angiver implementeringsklassen, dybest set typen stik
  • opgaver. maks angiver, hvor mange forekomster af vores kildekonnektor, der skal køre parallelt, og
  • emne definerer det emne, som stikket skal sende output til

I dette tilfælde har vi også en forbindelsesspecifik attribut:

  • fil definerer den fil, hvorfra stikket skal læse input

For at dette skal fungere, lad os oprette en grundlæggende fil med noget indhold:

ekko -e "foo \ nbar \ n"> $ CONFLUENT_HOME / test.txt

Bemærk, at arbejdsmappen er $ CONFLUENT_HOME.

5.2. Sink Connector Configuration

For vores vaskestik bruger vi referencekonfigurationen på $ CONFLUENT_HOME / etc / kafka / connect-file-sink.properties:

navn = local-file-sink connector.class = FileStreamSink-opgaver.max = 1 fil = test.sink.txt-emner = connect-test

Logisk set indeholder den nøjagtigt de samme parametre, dog denne gang stik. klasse angiver implementeringen af ​​vaskestikket, og fil er det sted, hvor stikket skal skrive indholdet.

5.3. Arbejderkonfig

Endelig er vi nødt til at konfigurere Connect-arbejderen, som integrerer vores to stik og udfører arbejdet med at læse fra kildestikket og skrive til vaskestikket.

Til det kan vi bruge $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties:

bootstrap.servers = localhost: 9092 key.converter = org.apache.kafka.connect.json.JsonConverter value.converter = org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable = false value.converter. schemas.enable = false offset.storage.file.filename = / tmp / connect.offsets offset.flush.interval.ms = 10000 plugin.path = / share / java

Noter det plugin.path kan holde en liste over stier, hvor stikimplementeringer er tilgængelige

Da vi bruger stik, der følger med Kafka, kan vi indstille plugin.path til $ CONFLUENT_HOME / del / java. Arbejde med Windows kan det være nødvendigt at give en absolut sti her.

For de andre parametre kan vi lade standardværdierne være:

  • bootstrap.servers indeholder adresserne til Kafka-mæglerne
  • key.converter og værdi.konverter definere konvertererklasser, der serialiserer og deserialiserer dataene, når de strømmer fra kilden til Kafka og derefter fra Kafka til vasken
  • key.converter.schemas.enable og value.converter.schemas.enable er konverteringsspecifikke indstillinger
  • offset.storage.file.filename er den vigtigste indstilling, når du kører Connect i standalone-tilstand: den definerer, hvor Connect skal gemme sine offset-data
  • offset.flush.interval.ms definerer det interval, hvormed medarbejderen forsøger at begå forskydninger for opgaver

Og listen over parametre er ret moden, så tjek den officielle dokumentation for en komplet liste.

5.4. Kafka Connect i standalone-tilstand

Og med det kan vi starte vores første connectoropsætning:

$ CONFLUENT_HOME / bin / connect-standalone \ $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties \ $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties \ $ CONFLUENT_HOME / etc / kafka / connect-file-sink. ejendomme

For det første kan vi inspicere indholdet af emnet ved hjælp af kommandolinjen:

$ CONFLUENT_HOME / bin / kafka-konsol-forbruger - bootstrap-server localhost: 9092 --topisk tilslutningstest - fra start

Som vi kan se, tog kildestikket dataene fra test.txt fil, omdannede den til JSON og sendte den til Kafka:

{"schema": {"type": "string", "optional": false}, "payload": "foo"} {"schema": {"type": "string", "optional": false}, "nyttelast": "bar"}

Og hvis vi kigger på mappen $ CONFLUENT_HOME, kan vi se, at en fil test.sink.txt blev oprettet her:

kat $ CONFLUENT_HOME / test.sink.txt foo bar

Da vaskestikket udvinder værdien fra nyttelast attribut og skriver det til destinationsfilen, dataene i test.sink.txt har indholdet af originalen test.txt fil.

Lad os nu tilføje flere linjer til test.txt.

Når vi gør det, ser vi, at kildestikket registrerer disse ændringer automatisk.

Vi skal kun sørge for at indsætte en ny linje i slutningen, ellers betragter kildestikket ikke den sidste linje.

Lad os på dette tidspunkt stoppe Connect-processen, da vi starter Connect i distribueret tilstand i et par linjer.

6. Connect's REST API

Indtil nu lavede vi alle konfigurationer ved at videregive ejendomsfiler via kommandolinjen. Da Connect er designet til at køre som en tjeneste, er der imidlertid også en REST API tilgængelig.

Som standard er den tilgængelig på // localhost: 8083. Et par slutpunkter er:

  • GET / stik - returnerer en liste med alle stik i brug
  • GET / stik / {navn} - returnerer detaljer om et specifikt stik
  • POST / stik - opretter et nyt stik; anmodningslegemet skal være et JSON-objekt, der indeholder et strengnavnfelt og et objektkonfigurationsfelt med stikkonfigurationsparametre
  • GET / stik / {navn} / status - returnerer connectorens aktuelle status - inklusive hvis den kører, mislykkes eller pauses - hvilken medarbejder den er tildelt, fejloplysninger, hvis den ikke fungerer, og status for alle dens opgaver
  • SLET / stik / {navn} - sletter et stik, stopper elegant alle opgaver og sletter dets konfiguration
  • GET / stik-plugins - returnerer en liste over connector-plugins installeret i Kafka Connect-klyngen

Den officielle dokumentation giver en liste med alle slutpunkter.

Vi bruger REST API til at oprette nye stik i det følgende afsnit.

7. Kafka Connect i distribueret tilstand

Den enkeltstående tilstand fungerer perfekt til udvikling og test samt mindre opsætninger. Men hvis vi ønsker at udnytte fuldt ud den distribuerede karakter af Kafka, er vi nødt til at starte Connect i distribueret tilstand.

Ved at gøre dette lagres forbindelsesindstillinger og metadata i Kafka-emner i stedet for filsystemet. Som et resultat er arbejderknudepunkterne virkelig statsløse.

7.1. Starter Connect

En referencekonfiguration for distribueret tilstand kan findes på $ CONFLUENT_HOME/etc/kafka/connect-distributed.properties.

Parametre er for det meste de samme som for standalone-tilstand. Der er kun få forskelle:

  • group.id definerer navnet på Connect-klyngegruppen. Værdien skal være forskellig fra ethvert forbrugergruppe-id
  • offset.storage.topic, config.storage.topic og status.storage.topic definer emner til disse indstillinger. For hvert emne kan vi også definere en replikationsfaktor

Igen giver den officielle dokumentation en liste med alle parametre.

Vi kan starte Connect i distribueret tilstand som følger:

$ CONFLUENT_HOME / bin / connect-distribueret $ CONFLUENT_HOME / etc / kafka / connect-distributed.properties

7.2. Tilføjelse af stik ved hjælp af REST API

Nu sammenlignet med den standalone startkommando, passerede vi ingen forbindelseskonfigurationer som argumenter. I stedet skal vi oprette stik ved hjælp af REST API.

For at oprette vores eksempel fra før skal vi sende to POST-anmodninger til // localhost: 8083 / stik indeholdende følgende JSON-strukturer.

Først skal vi oprette kroppen til kildekonnektoren POST som en JSON-fil. Her kalder vi det tilslut-fil-kilde.json:

{"name": "local-file-source", "config": {"connector.class": "FileStreamSource", "task.max": 1, "file": "test-distribution.txt", "topic ":" forbindelsesdistribueret}}}

Bemærk, hvordan dette ligner den referencekonfigurationsfil, vi brugte første gang.

Og så POSTER vi det:

curl -d @ "$ CONFLUENT_HOME / connect-file-source.json" \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors

Derefter gør vi det samme for vaskestikket og kalder filen connect-file-sink.json:

{"name": "local-file-sink", "config": {"connector.class": "FileStreamSink", "task.max": 1, "file": "test-distribution.sink.txt", "topics": "connect-distribution"}}

Og udfør POST som før:

curl -d @ $ CONFLUENT_HOME / connect-file-sink.json \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors

Hvis det er nødvendigt, kan vi kontrollere, at denne opsætning fungerer korrekt:

$ CONFLUENT_HOME / bin / kafka-console-consumer --bootstrap-server localhost: 9092 --topic connect-distribution --from-beginning {"schema": {"type": "string", "optional": false}, "nyttelast": "foo"} {"skema": {"type": "streng", "valgfri": falsk}, "nyttelast": "bar"}

Og hvis vi kigger på mappen $ CONFLUENT_HOME, kan vi se, at en fil testdistribueret.sink.txt blev oprettet her:

kat $ CONFLUENT_HOME / testdistribueret.sink.txt foo bar

Efter at vi har testet den distribuerede opsætning, lad os rydde op ved at fjerne de to stik:

krølle -X SLET // localhost: 8083 / stik / lokal fil-kilde krølle -X SLET // lokalhost: 8083 / stik / lokal fil-vask

8. Transformering af data

8.1. Understøttede transformationer

Transformationer giver os mulighed for at foretage enkle og lette ændringer til individuelle meddelelser.

Kafka Connect understøtter følgende indbyggede transformationer:

  • Indsæt felt - Tilføj et felt ved hjælp af enten statiske data eller optag metadata
  • ReplaceField - Filtrer eller omdøb felter
  • MaskField - Erstat et felt med den gyldige nulværdi for typen (f.eks. Nul eller en tom streng)
  • HoistField - Pak hele begivenheden som et enkelt felt inde i en struktur eller et kort
  • ExtractField - Uddrag et specifikt felt fra struktur og kort og inkluder kun dette felt i resultaterne
  • SetSchemaMetadata - Rediger skemaets navn eller version
  • TidsstempelRouter - Rediger emnet for en post baseret på originalt emne og tidsstempel
  • RegexRouter - Rediger emnet for en post baseret på det originale emne, en erstatningsstreng og et regulært udtryk

En transformation konfigureres ved hjælp af følgende parametre:

  • transformerer - En komma-adskilt liste over aliaser til transformationerne
  • transformerer. $ alias.type - Klassenavn til transformation
  • transformerer. $ alias. $ transformationSpecificConfig - Konfiguration til den respektive transformation

8.2. Anvendelse af en transformer

Lad os oprette følgende to transformationer for at teste nogle transformationsfunktioner:

  • Lad os først pakke hele meddelelsen som en JSON-struktur
  • Lad os derefter tilføje et felt til den struktur

Før vi anvender vores transformationer, er vi nødt til at konfigurere Connect til at bruge skemaløs JSON ved at ændre connect-distributed.properties:

key.converter.schemas.enable = falsk værdi.converter.schemas.enable = false

Derefter skal vi genstarte Connect igen i distribueret tilstand:

$ CONFLUENT_HOME / bin / connect-distribueret $ CONFLUENT_HOME / etc / kafka / connect-distributed.properties

Igen er vi nødt til at oprette kroppen til kildekonnektoren POST som en JSON-fil. Her kalder vi det connect-file-source-transform.json.

Udover de allerede kendte parametre tilføjer vi et par linjer til de to nødvendige transformationer:

{"name": "local-file-source", "config": {"connector.class": "FileStreamSource", "task.max": 1, "file": "test-transformation.txt", "topic ":" connect-transformation "," transforms ":" MakeMap, InsertSource "," transforms.MakeMap.type ":" org.apache.kafka.connect.transforms.HoistField $ Value "," transforms.MakeMap.field ": "line", "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField $ Value", "transforms.InsertSource.static.field": "data_source", "transforms.InsertSource.static.value ":" test-fil-kilde "}}

Lad os derefter udføre POST:

curl -d @ $ CONFLUENT_HOME / connect-file-source-transform.json \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors

Lad os skrive nogle linjer til vores test-transformation.txt:

Foo Bar

Hvis vi nu inspicerer forbinde-transformation emne, bør vi få følgende linjer:

{"line": "Foo", "data_source": "test-file-source"} {"line": "Bar", "data_source": "test-file-source"}

9. Brug af Ready Connectors

Efter at have brugt disse enkle stik, lad os se på mere avancerede brugsklare stik, og hvordan man installerer dem.

9.1. Hvor finder du stik?

Forudbyggede stik fås fra forskellige kilder:

  • Et par stik er pakket med almindelig Apache Kafka (kilde og vask til filer og konsol)
  • Nogle flere stik er samlet med Confluent Platform (ElasticSearch, HDFS, JDBC og AWS S3)
  • Tjek også Confluent Hub, som er en slags appbutik til Kafka-stik. Antallet af tilbudte stik vokser kontinuerligt:
    • Confluent-stik (udviklet, testet, dokumenteret og understøttes fuldt ud af Confluent)
    • Certificerede stik (implementeret af en tredjepart og certificeret af Confluent)
    • Community-udviklede og -støttede stik
  • Ud over det giver Confluent også en Connectors-side med nogle stik, der også er tilgængelige på Confluent Hub, men også med nogle flere community-stik
  • Og endelig er der også leverandører, der leverer stik som en del af deres produkt. For eksempel leverer Landoop et streamingbibliotek kaldet Lenses, som også indeholder et sæt med ~ 25 open source-stik (mange af dem krydser også andre steder)

9.2. Installation af stik fra Confluent Hub

Virksomhedsversionen af ​​Confluent indeholder et script til installation af stik og andre komponenter fra Confluent Hub (scriptet er ikke inkluderet i Open Source-versionen). Hvis vi bruger virksomhedsversionen, kan vi installere et stik ved hjælp af følgende kommando:

$ CONFLUENT_HOME / bin / confluent-hub install confluentinc / kafka-connect-mqtt: 1.0.0-preview

9.3. Installation af stik manuelt

Hvis vi har brug for et stik, som ikke er tilgængeligt på Confluent Hub, eller hvis vi har Open Source-versionen af ​​Confluent, kan vi installere de nødvendige stik manuelt. Til det er vi nødt til at downloade og pakke ud stikket samt flytte de medfølgende libs til den mappe, der er angivet som plugin.path.

For hvert stik skal arkivet indeholde to mapper, der er interessante for os:

  • Det lib mappen indeholder stikburken, for eksempel kafka-connect-mqtt-1.0.0-preview.jar, samt nogle flere krukker, der kræves af stikket
  • Det etc mappen indeholder en eller flere referencekonfigurationsfiler

Vi er nødt til at flytte lib mappe til $ CONFLUENT_HOME / del / javaeller hvilken sti vi angav som plugin.path i connect-standalone.properties og connect-distributed.properties. Ved at gøre det kan det også give mening at omdøbe mappen til noget meningsfuldt.

Vi kan bruge konfigurationsfilerne fra etc enten ved at henvise til dem, mens de starter i standalone-tilstand, eller vi kan bare gribe egenskaberne og oprette en JSON-fil ud fra dem.

10. Konklusion

I denne vejledning har vi kigget på, hvordan du installerer og bruger Kafka Connect.

Vi kiggede på typer stik, både kilde og vask. Vi kiggede også på nogle funktioner og tilstande, som Connect kan køre i. Derefter gennemgik vi transformere. Og endelig lærte vi, hvor vi skulle få fat i, og hvordan vi installerede tilpassede stik.

Som altid kan konfigurationsfilerne findes på GitHub.