Introduktion til Kafka-stik
1. Oversigt
Apache Kafka® er en distribueret streamingplatform. I en tidligere vejledning diskuterede vi, hvordan Kafka-forbrugere og producenter implementeres ved hjælp af Spring.
I denne vejledning lærer vi, hvordan du bruger Kafka-stik.
Vi kigger på:
- Forskellige typer Kafka-stik
- Funktioner og tilstande til Kafka Connect
- Stikkonfiguration ved hjælp af egenskabsfiler såvel som REST API
2. Grundlæggende om Kafka Connect og Kafka Connectors
Kafka Connect er en ramme til at forbinde Kafka med eksterne systemer såsom databaser, nøgleværdilagre, søgeindekser og filsystemer ved hjælp af såkaldte Stik.
Kafka-stik er brugsklare komponenter, som kan hjælpe os at importere data fra eksterne systemer til Kafka-emner og eksportere data fra Kafka-emner til eksterne systemer. Vi kan bruge eksisterende stikimplementeringer til almindelige datakilder og dræn eller implementere vores egne stik.
EN kildestik indsamler data fra et system. Kildesystemer kan være hele databaser, streamtabeller eller meddelelsesmæglere. Et kildekonnektor kunne også indsamle metrics fra applikationsservere til Kafka-emner, hvilket gør dataene tilgængelige til streambehandling med lav latenstid.
EN vaskestik leverer data fra Kafka-emner til andre systemer, som kan være indekser som Elasticsearch, batchsystemer som Hadoop eller enhver form for database.
Nogle stik vedligeholdes af samfundet, mens andre understøttes af Confluent eller dets partnere. Virkelig kan vi finde stik til de mest populære systemer som S3, JDBC og Cassandra, for bare at nævne nogle få.
3. Funktioner
Kafka Connect funktioner inkluderer:
- En ramme til at forbinde eksterne systemer med Kafka - det forenkler udvikling, implementering og styring af stik
- Distribueret og enkeltstående tilstande - det hjælper os med at implementere store klynger ved at udnytte den distribuerede karakter af Kafka såvel som opsætninger til udvikling, test og mindre produktionsinstallationer
- REST-interface - vi kan administrere stik ved hjælp af en REST API
- Automatisk offsetstyring - Kafka Connect hjælper os med at håndtere offset-forpligtelsesproces, hvilket sparer os besværet med at implementere denne fejlbehæftede del af connectorudvikling manuelt
- Distribueret og skalerbart som standard - Kafka Connect bruger den eksisterende gruppestyringsprotokol; vi kan tilføje flere arbejdere til at opskalere en Kafka Connect-klynge
- Streaming og batchintegration - Kafka Connect er en ideel løsning til at bygge bro over streaming- og batchdatasystemer i forbindelse med Kafkas eksisterende kapaciteter
- Transformationer - disse gør det muligt for os at foretage enkle og lette ændringer til individuelle meddelelser
4. Opsætning
I stedet for at bruge den almindelige Kafka-distribution downloader vi Confluent Platform, en Kafka-distribution leveret af Confluent, Inc., firmaet bag Kafka. Confluent Platform leveres med nogle ekstra værktøjer og klienter sammenlignet med almindelig Kafka samt nogle ekstra forudbyggede stik.
For vores sag er Open Source-udgaven tilstrækkelig, som kan findes på Confluents websted.
5. Hurtig start Kafka Connect
For det første vil vi diskutere Kafka Connect-princippet, ved hjælp af de mest basale stik, som er filen kilde stik og filen håndvask stik.
Bekvemt leveres Confluent Platform med begge disse stik samt referencekonfigurationer.
5.1. Source Connector Configuration
For kildestikket er referencekonfigurationen tilgængelig på $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties:
navn = lokal-fil-kilde connector.class = FileStreamSource opgaver.max = 1 emne = connect-test fil = test.txt
Denne konfiguration har nogle egenskaber, der er almindelige for alle kildekonnektorer:
- navn er et brugerdefineret navn til forbindelsesinstansen
- stik. klasse angiver implementeringsklassen, dybest set typen stik
- opgaver. maks angiver, hvor mange forekomster af vores kildekonnektor, der skal køre parallelt, og
- emne definerer det emne, som stikket skal sende output til
I dette tilfælde har vi også en forbindelsesspecifik attribut:
- fil definerer den fil, hvorfra stikket skal læse input
For at dette skal fungere, lad os oprette en grundlæggende fil med noget indhold:
ekko -e "foo \ nbar \ n"> $ CONFLUENT_HOME / test.txt
Bemærk, at arbejdsmappen er $ CONFLUENT_HOME.
5.2. Sink Connector Configuration
For vores vaskestik bruger vi referencekonfigurationen på $ CONFLUENT_HOME / etc / kafka / connect-file-sink.properties:
navn = local-file-sink connector.class = FileStreamSink-opgaver.max = 1 fil = test.sink.txt-emner = connect-test
Logisk set indeholder den nøjagtigt de samme parametre, dog denne gang stik. klasse angiver implementeringen af vaskestikket, og fil er det sted, hvor stikket skal skrive indholdet.
5.3. Arbejderkonfig
Endelig er vi nødt til at konfigurere Connect-arbejderen, som integrerer vores to stik og udfører arbejdet med at læse fra kildestikket og skrive til vaskestikket.
Til det kan vi bruge $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties:
bootstrap.servers = localhost: 9092 key.converter = org.apache.kafka.connect.json.JsonConverter value.converter = org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable = false value.converter. schemas.enable = false offset.storage.file.filename = / tmp / connect.offsets offset.flush.interval.ms = 10000 plugin.path = / share / java
Noter det plugin.path kan holde en liste over stier, hvor stikimplementeringer er tilgængelige
Da vi bruger stik, der følger med Kafka, kan vi indstille plugin.path til $ CONFLUENT_HOME / del / java. Arbejde med Windows kan det være nødvendigt at give en absolut sti her.
For de andre parametre kan vi lade standardværdierne være:
- bootstrap.servers indeholder adresserne til Kafka-mæglerne
- key.converter og værdi.konverter definere konvertererklasser, der serialiserer og deserialiserer dataene, når de strømmer fra kilden til Kafka og derefter fra Kafka til vasken
- key.converter.schemas.enable og value.converter.schemas.enable er konverteringsspecifikke indstillinger
- offset.storage.file.filename er den vigtigste indstilling, når du kører Connect i standalone-tilstand: den definerer, hvor Connect skal gemme sine offset-data
- offset.flush.interval.ms definerer det interval, hvormed medarbejderen forsøger at begå forskydninger for opgaver
Og listen over parametre er ret moden, så tjek den officielle dokumentation for en komplet liste.
5.4. Kafka Connect i standalone-tilstand
Og med det kan vi starte vores første connectoropsætning:
$ CONFLUENT_HOME / bin / connect-standalone \ $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties \ $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties \ $ CONFLUENT_HOME / etc / kafka / connect-file-sink. ejendomme
For det første kan vi inspicere indholdet af emnet ved hjælp af kommandolinjen:
$ CONFLUENT_HOME / bin / kafka-konsol-forbruger - bootstrap-server localhost: 9092 --topisk tilslutningstest - fra start
Som vi kan se, tog kildestikket dataene fra test.txt fil, omdannede den til JSON og sendte den til Kafka:
{"schema": {"type": "string", "optional": false}, "payload": "foo"} {"schema": {"type": "string", "optional": false}, "nyttelast": "bar"}
Og hvis vi kigger på mappen $ CONFLUENT_HOME, kan vi se, at en fil test.sink.txt blev oprettet her:
kat $ CONFLUENT_HOME / test.sink.txt foo bar
Da vaskestikket udvinder værdien fra nyttelast attribut og skriver det til destinationsfilen, dataene i test.sink.txt har indholdet af originalen test.txt fil.
Lad os nu tilføje flere linjer til test.txt.
Når vi gør det, ser vi, at kildestikket registrerer disse ændringer automatisk.
Vi skal kun sørge for at indsætte en ny linje i slutningen, ellers betragter kildestikket ikke den sidste linje.
Lad os på dette tidspunkt stoppe Connect-processen, da vi starter Connect i distribueret tilstand i et par linjer.
6. Connect's REST API
Indtil nu lavede vi alle konfigurationer ved at videregive ejendomsfiler via kommandolinjen. Da Connect er designet til at køre som en tjeneste, er der imidlertid også en REST API tilgængelig.
Som standard er den tilgængelig på // localhost: 8083. Et par slutpunkter er:
- GET / stik - returnerer en liste med alle stik i brug
- GET / stik / {navn} - returnerer detaljer om et specifikt stik
- POST / stik - opretter et nyt stik; anmodningslegemet skal være et JSON-objekt, der indeholder et strengnavnfelt og et objektkonfigurationsfelt med stikkonfigurationsparametre
- GET / stik / {navn} / status - returnerer connectorens aktuelle status - inklusive hvis den kører, mislykkes eller pauses - hvilken medarbejder den er tildelt, fejloplysninger, hvis den ikke fungerer, og status for alle dens opgaver
- SLET / stik / {navn} - sletter et stik, stopper elegant alle opgaver og sletter dets konfiguration
- GET / stik-plugins - returnerer en liste over connector-plugins installeret i Kafka Connect-klyngen
Den officielle dokumentation giver en liste med alle slutpunkter.
Vi bruger REST API til at oprette nye stik i det følgende afsnit.
7. Kafka Connect i distribueret tilstand
Den enkeltstående tilstand fungerer perfekt til udvikling og test samt mindre opsætninger. Men hvis vi ønsker at udnytte fuldt ud den distribuerede karakter af Kafka, er vi nødt til at starte Connect i distribueret tilstand.
Ved at gøre dette lagres forbindelsesindstillinger og metadata i Kafka-emner i stedet for filsystemet. Som et resultat er arbejderknudepunkterne virkelig statsløse.
7.1. Starter Connect
En referencekonfiguration for distribueret tilstand kan findes på $ CONFLUENT_HOME/etc/kafka/connect-distributed.properties.
Parametre er for det meste de samme som for standalone-tilstand. Der er kun få forskelle:
- group.id definerer navnet på Connect-klyngegruppen. Værdien skal være forskellig fra ethvert forbrugergruppe-id
- offset.storage.topic, config.storage.topic og status.storage.topic definer emner til disse indstillinger. For hvert emne kan vi også definere en replikationsfaktor
Igen giver den officielle dokumentation en liste med alle parametre.
Vi kan starte Connect i distribueret tilstand som følger:
$ CONFLUENT_HOME / bin / connect-distribueret $ CONFLUENT_HOME / etc / kafka / connect-distributed.properties
7.2. Tilføjelse af stik ved hjælp af REST API
Nu sammenlignet med den standalone startkommando, passerede vi ingen forbindelseskonfigurationer som argumenter. I stedet skal vi oprette stik ved hjælp af REST API.
For at oprette vores eksempel fra før skal vi sende to POST-anmodninger til // localhost: 8083 / stik indeholdende følgende JSON-strukturer.
Først skal vi oprette kroppen til kildekonnektoren POST som en JSON-fil. Her kalder vi det tilslut-fil-kilde.json:
{"name": "local-file-source", "config": {"connector.class": "FileStreamSource", "task.max": 1, "file": "test-distribution.txt", "topic ":" forbindelsesdistribueret}}}
Bemærk, hvordan dette ligner den referencekonfigurationsfil, vi brugte første gang.
Og så POSTER vi det:
curl -d @ "$ CONFLUENT_HOME / connect-file-source.json" \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors
Derefter gør vi det samme for vaskestikket og kalder filen connect-file-sink.json:
{"name": "local-file-sink", "config": {"connector.class": "FileStreamSink", "task.max": 1, "file": "test-distribution.sink.txt", "topics": "connect-distribution"}}
Og udfør POST som før:
curl -d @ $ CONFLUENT_HOME / connect-file-sink.json \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors
Hvis det er nødvendigt, kan vi kontrollere, at denne opsætning fungerer korrekt:
$ CONFLUENT_HOME / bin / kafka-console-consumer --bootstrap-server localhost: 9092 --topic connect-distribution --from-beginning {"schema": {"type": "string", "optional": false}, "nyttelast": "foo"} {"skema": {"type": "streng", "valgfri": falsk}, "nyttelast": "bar"}
Og hvis vi kigger på mappen $ CONFLUENT_HOME, kan vi se, at en fil testdistribueret.sink.txt blev oprettet her:
kat $ CONFLUENT_HOME / testdistribueret.sink.txt foo bar
Efter at vi har testet den distribuerede opsætning, lad os rydde op ved at fjerne de to stik:
krølle -X SLET // localhost: 8083 / stik / lokal fil-kilde krølle -X SLET // lokalhost: 8083 / stik / lokal fil-vask
8. Transformering af data
8.1. Understøttede transformationer
Transformationer giver os mulighed for at foretage enkle og lette ændringer til individuelle meddelelser.
Kafka Connect understøtter følgende indbyggede transformationer:
- Indsæt felt - Tilføj et felt ved hjælp af enten statiske data eller optag metadata
- ReplaceField - Filtrer eller omdøb felter
- MaskField - Erstat et felt med den gyldige nulværdi for typen (f.eks. Nul eller en tom streng)
- HoistField - Pak hele begivenheden som et enkelt felt inde i en struktur eller et kort
- ExtractField - Uddrag et specifikt felt fra struktur og kort og inkluder kun dette felt i resultaterne
- SetSchemaMetadata - Rediger skemaets navn eller version
- TidsstempelRouter - Rediger emnet for en post baseret på originalt emne og tidsstempel
- RegexRouter - Rediger emnet for en post baseret på det originale emne, en erstatningsstreng og et regulært udtryk
En transformation konfigureres ved hjælp af følgende parametre:
- transformerer - En komma-adskilt liste over aliaser til transformationerne
- transformerer. $ alias.type - Klassenavn til transformation
- transformerer. $ alias. $ transformationSpecificConfig - Konfiguration til den respektive transformation
8.2. Anvendelse af en transformer
Lad os oprette følgende to transformationer for at teste nogle transformationsfunktioner:
- Lad os først pakke hele meddelelsen som en JSON-struktur
- Lad os derefter tilføje et felt til den struktur
Før vi anvender vores transformationer, er vi nødt til at konfigurere Connect til at bruge skemaløs JSON ved at ændre connect-distributed.properties:
key.converter.schemas.enable = falsk værdi.converter.schemas.enable = false
Derefter skal vi genstarte Connect igen i distribueret tilstand:
$ CONFLUENT_HOME / bin / connect-distribueret $ CONFLUENT_HOME / etc / kafka / connect-distributed.properties
Igen er vi nødt til at oprette kroppen til kildekonnektoren POST som en JSON-fil. Her kalder vi det connect-file-source-transform.json.
Udover de allerede kendte parametre tilføjer vi et par linjer til de to nødvendige transformationer:
{"name": "local-file-source", "config": {"connector.class": "FileStreamSource", "task.max": 1, "file": "test-transformation.txt", "topic ":" connect-transformation "," transforms ":" MakeMap, InsertSource "," transforms.MakeMap.type ":" org.apache.kafka.connect.transforms.HoistField $ Value "," transforms.MakeMap.field ": "line", "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField $ Value", "transforms.InsertSource.static.field": "data_source", "transforms.InsertSource.static.value ":" test-fil-kilde "}}
Lad os derefter udføre POST:
curl -d @ $ CONFLUENT_HOME / connect-file-source-transform.json \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors
Lad os skrive nogle linjer til vores test-transformation.txt:
Foo Bar
Hvis vi nu inspicerer forbinde-transformation emne, bør vi få følgende linjer:
{"line": "Foo", "data_source": "test-file-source"} {"line": "Bar", "data_source": "test-file-source"}
9. Brug af Ready Connectors
Efter at have brugt disse enkle stik, lad os se på mere avancerede brugsklare stik, og hvordan man installerer dem.
9.1. Hvor finder du stik?
Forudbyggede stik fås fra forskellige kilder:
- Et par stik er pakket med almindelig Apache Kafka (kilde og vask til filer og konsol)
- Nogle flere stik er samlet med Confluent Platform (ElasticSearch, HDFS, JDBC og AWS S3)
- Tjek også Confluent Hub, som er en slags appbutik til Kafka-stik. Antallet af tilbudte stik vokser kontinuerligt:
- Confluent-stik (udviklet, testet, dokumenteret og understøttes fuldt ud af Confluent)
- Certificerede stik (implementeret af en tredjepart og certificeret af Confluent)
- Community-udviklede og -støttede stik
- Ud over det giver Confluent også en Connectors-side med nogle stik, der også er tilgængelige på Confluent Hub, men også med nogle flere community-stik
- Og endelig er der også leverandører, der leverer stik som en del af deres produkt. For eksempel leverer Landoop et streamingbibliotek kaldet Lenses, som også indeholder et sæt med ~ 25 open source-stik (mange af dem krydser også andre steder)
9.2. Installation af stik fra Confluent Hub
Virksomhedsversionen af Confluent indeholder et script til installation af stik og andre komponenter fra Confluent Hub (scriptet er ikke inkluderet i Open Source-versionen). Hvis vi bruger virksomhedsversionen, kan vi installere et stik ved hjælp af følgende kommando:
$ CONFLUENT_HOME / bin / confluent-hub install confluentinc / kafka-connect-mqtt: 1.0.0-preview
9.3. Installation af stik manuelt
Hvis vi har brug for et stik, som ikke er tilgængeligt på Confluent Hub, eller hvis vi har Open Source-versionen af Confluent, kan vi installere de nødvendige stik manuelt. Til det er vi nødt til at downloade og pakke ud stikket samt flytte de medfølgende libs til den mappe, der er angivet som plugin.path.
For hvert stik skal arkivet indeholde to mapper, der er interessante for os:
- Det lib mappen indeholder stikburken, for eksempel kafka-connect-mqtt-1.0.0-preview.jar, samt nogle flere krukker, der kræves af stikket
- Det etc mappen indeholder en eller flere referencekonfigurationsfiler
Vi er nødt til at flytte lib mappe til $ CONFLUENT_HOME / del / javaeller hvilken sti vi angav som plugin.path i connect-standalone.properties og connect-distributed.properties. Ved at gøre det kan det også give mening at omdøbe mappen til noget meningsfuldt.
Vi kan bruge konfigurationsfilerne fra etc enten ved at henvise til dem, mens de starter i standalone-tilstand, eller vi kan bare gribe egenskaberne og oprette en JSON-fil ud fra dem.
10. Konklusion
I denne vejledning har vi kigget på, hvordan du installerer og bruger Kafka Connect.
Vi kiggede på typer stik, både kilde og vask. Vi kiggede også på nogle funktioner og tilstande, som Connect kan køre i. Derefter gennemgik vi transformere. Og endelig lærte vi, hvor vi skulle få fat i, og hvordan vi installerede tilpassede stik.
Som altid kan konfigurationsfilerne findes på GitHub.