Præcis en gang behandling i Kafka med Java

1. Oversigt

I denne vejledning ser vi på, hvordan Kafka sikrer levering nøjagtigt én gang mellem producent- og forbrugerapplikationer via den nyligt introducerede Transactional API.

Derudover bruger vi denne API til at implementere transaktionsproducenter og forbrugere for at opnå ende-til-ende-levering nøjagtigt én gang i et WordCount-eksempel.

2. Levering af besked i Kafka

På grund af forskellige fejl kan meddelelsessystemer ikke garantere levering af meddelelser mellem producent- og forbrugerapplikationer. Afhængig af hvordan klientapplikationerne interagerer med sådanne systemer, er følgende meddelelsessemantik mulig:

  • Hvis et meddelelsessystem aldrig vil duplikere en besked, men måske går glip af den lejlighedsvise besked, kalder vi det højst en gang
  • Eller hvis det aldrig går glip af en besked, men måske duplikerer den lejlighedsvise besked, kalder vi det mindst en gang
  • Men hvis det altid leverer alle meddelelser uden duplikering, er det nøjagtigt-en gang

Oprindeligt understøttede Kafka kun levering af meddelelser på højst én gang og mindst én gang.

Imidlertid, introduktionen af ​​Transaktioner mellem Kafka-mæglere og klientapplikationer sikrer levering nøjagtigt én gang i Kafka. For at forstå det bedre, lad os hurtigt gennemgå transaktionsklientens API.

3. Maven-afhængigheder

For at arbejde med transaktions-API'et har vi brug for Kafkas Java-klient i vores pom:

 org.apache.kafka kafka-klienter 2.0.0 

4. En transaktion forbruge-transformere-producere Sløjfe

For vores eksempel vil vi forbruge beskeder fra et inputemne, sætninger.

Derefter tæller vi hvert ord for hver sætning og sender de individuelle ordtællinger til et outputemne, tæller.

I eksemplet antager vi, at der allerede findes transaktionsdata i sætninger emne.

4.1. En transaktionsbevidst producent

Så lad os først tilføje en typisk Kafka-producent.

Ejendomme producerProps = nye egenskaber (); producerProps.put ("bootstrap.servers", "localhost: 9092");

Derudover skal vi dog angive en transactional.id og aktivere idempotens:

producerProps.put ("aktivere.idempotens", "sandt"); producerProps.put ("transactional.id", "prod-1"); KafkaProducer producer = ny KafkaProducer (producerProps);

Fordi vi har aktiveret idempotens, bruger Kafka denne transaktions-id som en del af sin algoritme til kopiere enhver besked denne producentsender, der sikrer ubesværet.

Kort sagt, hvis producenten ved et uheld sender den samme besked til Kafka mere end én gang, gør disse indstillinger det muligt at bemærke det.

Alt hvad vi skal gøre er sørg for, at transaktions-id'et er forskelligt for hver producent, dog konsistent på tværs af genstart.

4.2. Aktivering af producenten til transaktioner

Når vi er klar, skal vi også ringe initTransaction at forberede producenten til at bruge transaktioner:

producer.initTransactions ();

Dette registrerer producenten hos mægleren som en, der kan bruge transaktioner, identificere det ved dets transactional.id og et sekvensnummer eller epoke. Til gengæld vil mægleren bruge disse til at fremskrive alle handlinger til en transaktionslog.

Og følgelig mægleren fjerner alle handlinger fra loggen, der tilhører en producent med samme transaktions-id og tidligereepoke, formoder, at de er fra afviklede transaktioner.

4.3. En transaktionsbevidst forbruger

Når vi spiser, kan vi læse alle meddelelserne på en emnepartition i rækkefølge. Selvom, vi kan angive med isolation. niveau at vi skal vente med at læse transaktionsmeddelelser, indtil den tilknyttede transaktion er begået:

Egenskaber consumerProps = nye egenskaber (); consumerProps.put ("bootstrap.servers", "localhost: 9092"); consumerProps.put ("group.id", "my-group-id"); consumerProps.put ("aktivér.auto.forpligtelse", "falsk"); consumerProps.put ("isolation.level", "read_committed"); KafkaConsumer forbruger = ny KafkaConsumer (consumerProps); consumer.subscribe (singleton (“sætninger”));

Brug af en værdi på read_committed sikrer, at vi ikke læser nogen transaktionsmeddelelser, før transaktionen gennemføres.

Standardværdien af isolation. niveau er read_uncommitted.

4.4. Forbruger og transformerer ved transaktion

Nu hvor vi har producenten og forbrugeren begge konfigureret til at skrive og læse transaktionelt, kan vi forbruge poster fra vores inputemne og tælle hvert ord i hver post:

ConsumerRecords-poster = consumer.poll (ofSeconds (60)); Kort wordCountMap = records.records (nyt TopicPartition ("input", 0)) .stream () .flatMap (record -> Stream.of (record.value (). Split (""))) .map (word -> Tuple.of (word, 1)) .collect (Collectors.toMap (tuple -> tuple.getKey (), t1 -> t1.getValue (), (v1, v2) -> v1 + v2));

Bemærk, at der ikke er noget transaktionsmæssigt ved ovenstående kode. Men, siden vi brugte read_committed, det betyder, at ingen meddelelser, der blev skrevet til inputemnet i den samme transaktion, læses af denne forbruger, før de alle er skrevet.

Nu kan vi sende det beregnede antal ord til outputemnet.

Lad os se, hvordan vi kan producere vores resultater, også transaktionelt.

4.5. Send API

For at sende vores optællinger som nye meddelelser, men i samme transaktion, ringer vi beginTransaktion:

producer.beginTransaction ();

Derefter kan vi skrive hver til vores "tæller" emne, hvor nøglen er ordet, og tællingen er værdien:

wordCountMap.forEach ((nøgle, værdi) -> producent.send (nyt ProducerRecord ("tæller", nøgle, værdi.tilString ())));

Bemærk, at da producenten kan opdele dataene ved hjælp af nøglen, betyder det, at transaktionsmeddelelser kan spænde over flere partitioner, som hver læses af separate forbrugere. Derfor gemmer Kafka-mægleren en liste over alle opdaterede partitioner til en transaktion.

Bemærk også, inden for en transaktion kan en producent bruge flere tråde til at sende poster parallelt.

4.6. Forpligtende forskydninger

Og endelig er vi nødt til at begå vores forskydninger, som vi lige er færdige med at forbruge. Med transaktioner forpligter vi forskydningerne tilbage til det inputemne, vi læser dem fra, som normalt. Også selvom vi send dem til producentens transaktion.

Vi kan gøre alt dette i et enkelt opkald, men vi skal først beregne forskydningerne for hver emnepartition:

KortforskydningerToCommit = ny HashMap (); til (TopicPartition partition: records.partitions ()) {List partitionedRecords = records.records (partition); long offset = partitionedRecords.get (partitionedRecords.size () - 1) .offset (); offsetsToCommit.put (partition, ny OffsetAndMetadata (offset + 1)); }

Bemærk, at det, vi forpligter os til transaktionen, er den kommende modregning, hvilket betyder, at vi skal tilføje 1.

Derefter kan vi sende vores beregnede forskydninger til transaktionen:

producer.sendOffsetsToTransaction (offsetsToCommit, "min gruppe-id");

4.7. Forpligtelse eller afbrydelse af transaktionen

Og endelig kan vi begå transaktionen, som atomisk vil skrive modregningerne til forbruger-forskydninger emne såvel som selve transaktionen:

producer.commitTransaction ();

Dette skyller enhver bufret besked til de respektive partitioner. Derudover stiller Kafka-mægleren alle beskeder i denne transaktion til rådighed for forbrugerne.

Selvfølgelig, hvis noget går galt, mens vi behandler, for eksempel, hvis vi fanger en undtagelse, kan vi ringe abortTransaktion:

prøv {// ... læs fra inputemne // ... transformer // ... skriv til outputemne producer.commitTransaction (); } fange (Undtagelse e) {producer.abortTransaction (); }

Og slip eventuelle buffrede beskeder og fjern transaktionen fra mægleren.

Hvis vi hverken begår eller afbryder før mægler-konfigureret maks. transaktion.timeout.ms, Kafka-mægleren afbryder selve transaktionen. Standardværdien for denne ejendom er 900.000 millisekunder eller 15 minutter.

5. Andet forbruge-transformere-producere Sløjfer

Det, vi lige har set, er et grundlæggende forbruge-transformere-producere loop, der læser og skriver til den samme Kafka-klynge.

Omvendt applikationer, der skal læse og skrive til forskellige Kafka-klynger, skal bruge de ældre commitSync og commitAsync API. Typisk gemmer applikationer forbrugerforskydninger i deres eksterne tilstandslager for at opretholde transaktionalitet.

6. Konklusion

For datakritiske applikationer er end-til-ende nøjagtigt én gang behandling ofte bydende nødvendigt.

I denne vejledning vi så, hvordan vi bruger Kafka til netop dette ved hjælp af transaktioner, og vi implementerede et transaktionsbaseret ordtællingseksempel for at illustrere princippet.

Du er velkommen til at tjekke alle kodeeksemplerne på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found