Introduktion til Apache Flink med Java

1. Oversigt

Apache Flink er en Big Data-behandlingsramme, der giver programmerere mulighed for at behandle den enorme mængde data på en meget effektiv og skalerbar måde.

I denne artikel introducerer vi nogle af de centrale API-koncepter og standard datatransformationer, der er tilgængelige i Apache Flink Java API. Den flydende stil med denne API gør det nemt at arbejde med Flinks centrale konstruktion - den distribuerede samling.

Først vil vi se på Flinks Datasæt API-transformationer og brug dem til at implementere et ordtællingsprogram. Så tager vi et kort kig på Flinks DataStream API, som giver dig mulighed for at behandle strømme af begivenheder i realtid.

2. Maven-afhængighed

For at komme i gang skal vi tilføje Maven-afhængigheder til flink-java og flink-test-værktøjer biblioteker:

 org.apache.flink flink-java 1.2.0 org.apache.flink flink-test-utils_2.10 1.2.0 test 

3. Core API-koncepter

Når vi arbejder med Flink, har vi brug for at vide nogle ting, der er relateret til API'et:

  • Hvert Flink-program udfører transformationer på distribuerede dataindsamlinger. Der findes en række forskellige funktioner til transformation af data, herunder filtrering, kortlægning, sammenføjning, gruppering og aggregering
  • EN håndvask operation i Flink udløser udførelsen af ​​en stream for at producere det ønskede resultat af programmet, såsom at gemme resultatet i filsystemet eller udskrive det til standardoutputtet
  • Flinktransformationer er dovne, hvilket betyder at de ikke udføres før a håndvask operation påberåbes
  • Apache Flink API understøtter to driftsformer - batch og realtid. Hvis du har at gøre med en begrænset datakilde, der kan behandles i batch-tilstand, bruger du Datasæt API. Hvis du ønsker at behandle ubegrænsede datastrømme i realtid, skal du bruge DataStream API

4. DataSet API-transformationer

Indgangspunktet til Flink-programmet er en forekomst af Udførelse Miljø klasse - dette definerer den kontekst, hvor et program udføres.

Lad os oprette en Udførelse Miljø for at starte vores behandling:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment ();

Bemærk, at når du starter applikationen på den lokale maskine, udfører den behandling på den lokale JVM. Hvis du vil starte behandlingen på en klynge af maskiner, skal du installere Apache Flink på disse maskiner og konfigurere Udførelse Miljø derfor.

4.1. Oprettelse af et datasæt

For at begynde at udføre datatransformationer er vi nødt til at forsyne vores program med dataene.

Lad os oprette en forekomst af Datasæt klasse ved hjælp af vores Udførelsesmiljø:

DataSet-beløb = env.fromElements (1, 29, 40, 50);

Du kan oprette en Datasæt fra flere kilder, såsom Apache Kafka, en CSV, fil eller næsten enhver anden datakilde.

4.2. Filtrer og reducer

Når du opretter en forekomst af Datasæt klasse, kan du anvende transformationer til den.

Lad os sige, at du vil filtrere tal, der er over en bestemt tærskel, og derefter summe dem alle sammen. Du kan bruge filter() og reducere() transformationer for at opnå dette:

int-tærskel = 30; Liste indsamle = beløb .filter (a -> a> tærskel). Reducere ((heltal, t1) -> heltal + t1). Indsamle (); assertThat (collect.get (0)). isEqualTo (90); 

Bemærk, at indsamle() metoden er en håndvask operation, der udløser de faktiske datatransformationer.

4.3. Kort

Lad os sige, at du har en Datasæt af Person genstande:

privat statisk klasse Person {privat int alder; privat strengnavn; // standard konstruktører / getters / setters}

Lad os derefter oprette en Datasæt af disse objekter:

DataSet personDataSource = env.fromCollection (Arrays.asList (ny person (23, "Tom"), ny person (75, "Michael")));

Antag, at du kun vil udtrække alder felt fra hvert objekt i samlingen. Du kan bruge kort() transformation for kun at få et specifikt felt af Person klasse:

Liste aldre = personDataSource .map (p -> p.age) .collect (); hævder, at (aldre) .har størrelse (2); hævder, at (aldre). indeholder (23, 75);

4.4. Tilslutte

Når du har to datasæt, kan du måske deltage i dem på nogle id Mark. Til dette kan du bruge tilslutte() transformation.

Lad os oprette samlinger af transaktioner og adresser for en bruger:

Tuple3-adresse = ny Tuple3 (1, "5th Avenue", "London"); Datasæt adresser = env.fromElements (adresse); Tuple2 firstTransaction = ny Tuple2 (1, "Transaktion_1"); Datasæt transaktioner = env.fromElements (firstTransaction, new Tuple2 (12, "Transaction_2")); 

Det første felt i begge tupler er af et Heltal type, og dette er en id felt, hvor vi ønsker at deltage i begge datasæt.

For at udføre den faktiske tilslutningslogik er vi nødt til at implementere en KeySelector interface til adresse og transaktion:

privat statisk klasse IdKeySelectorTransaction implementerer KeySelector {@Override public Integer getKey (Tuple2 value) {return value.f0; }} privat statisk klasse IdKeySelectorAddress implementerer KeySelector {@Override public Integer getKey (Tuple3 value) {return value.f0; }}

Hver vælger returnerer kun det felt, hvor sammenføjningen skal udføres.

Desværre er det ikke muligt at bruge lambda-udtryk her, fordi Flink har brug for generisk typeinformation.

Lad os derefter implementere fusionerende logik ved hjælp af disse vælgere:

Liste<>> sammenføjet = transaktioner.tilslut (adresser). hvor (ny IdKeySelectorTransaction ()) .equalTo (ny IdKeySelectorAddress ()) .collect (); hævder, at (sluttede) .hasstørrelse (1); assertThat (tilsluttet) .contains (ny Tuple2 (firstTransaction, adresse)); 

4.5. Sortere

Lad os sige, at du har følgende samling af Tuple2:

Tuple2 secondPerson = ny Tuple2 (4, "Tom"); Tuple2 thirdPerson = ny Tuple2 (5, "Scott"); Tuple2 quarterPerson = ny Tuple2 (200, "Michael"); Tuple2 firstPerson = ny Tuple2 (1, "Jack"); Datasæt transaktioner = env.fromElements (4thPerson, secondPerson, thirdPerson, firstPerson); 

Hvis du vil sortere denne samling efter det første felt i tuplen, kan du bruge sortPartitions () transformation:

Liste sorteret = transaktioner .sortPartition (ny IdKeySelectorTransaction (), Order.ASCENDING) .collect (); assertThat (sorteret) .containsExactly (firstPerson, secondPerson, thirdPerson, 4thPerson);

5. Antal ord

Ordtællingsproblemet er et, der ofte bruges til at fremvise mulighederne i Big Data-behandlingsrammer. Den grundlæggende løsning indebærer at tælle ordforekomster i et tekstinput. Lad os bruge Flink til at implementere en løsning på dette problem.

Som det første skridt i vores løsning opretter vi en LineSplitter klasse, der deler vores input i tokens (ord), der samler for hvert token a Tuple2 af nøgleværdipar. I hver af disse tupler er nøglen et ord, der findes i teksten, og værdien er heltal (1).

Denne klasse implementerer FlatMapFunction interface, der tager Snor som input og producerer en Tuple2:

offentlig klasse LineSplitter implementerer FlatMapFunction {@Override public void flatMap (String value, Collector out) {Stream.of (value.toLowerCase (). split ("\ W +")). filter (t -> t.length ()> 0) .forEach (token -> out.collect (new Tuple2 (token) , 1))); }}

Vi kalder indsamle() metode til Samler klasse for at skubbe data fremad i behandlingsrørledningen.

Vores næste og sidste trin er at gruppere tuplerne efter deres første elementer (ord) og derefter udføre en sum samlet på de andet elementer for at producere en optælling af ordforekomsterne:

offentlig statisk datasæt startWordCount (ExecutionEnvironment env, List lines) kaster Exception {DataSet text = env.fromCollection (lines); returner text.flatMap (ny LineSplitter ()) .groupBy (0) .aggregate (Aggregations.SUM, 1); }

Vi bruger tre typer af Flink-transformationer: flatMap (), groupBy ()og samlet().

Lad os skrive en test for at hævde, at implementeringen af ​​ordtælling fungerer som forventet:

Listelinjer = Arrays.asList ("Dette er en første sætning", "Dette er en anden sætning med et ord"); Datasæt resultat = WordCount.startWordCount (env, linjer); Liste indsamle = result.collect (); assertThat (collect) .containsExactlyInAnyOrder (ny Tuple2 ("a", 3), ny Tuple2 ("sætning", 2), ny Tuple2 ("ord", 1), ny Tuple2 ("er", 2), ny Tuple2 ( "denne", 2), ny Tuple2 ("anden", 1), ny Tuple2 ("første", 1), ny Tuple2 ("med", 1), ny Tuple2 ("en", 1));

6. DataStream API

6.1. Oprettelse af en DataStream

Apache Flink understøtter også behandling af hændelsesstrømme gennem sin DataStream API. Hvis vi vil begynde at forbruge begivenheder, skal vi først bruge StreamExecutionEnvironment klasse:

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment ();

Dernæst kan vi oprette en strøm af begivenheder ved hjælp af udførelse Miljø fra en række kilder. Det kan være en beskedbus som Apache Kafka, men i dette eksempel opretter vi simpelthen en kilde fra et par strengelementer:

DataStream dataStream = executionEnvironment.fromElements ("Dette er en første sætning", "Dette er en anden sætning med et ord");

Vi kan anvende transformationer til hvert element i DataStream som i det normale Datasæt klasse:

SingleOutputStreamOperator upperCase = text.map (String :: toUpperCase);

For at udløse udførelsen er vi nødt til at påkalde en sink-operation som f.eks Print() der bare udskriver resultatet af transformationer til standardoutputtet, efter med udføre () metode til StreamExecutionEnvironment klasse:

upperCase.print (); env.execute ();

Det producerer følgende output:

1> DETTE ER EN FØRSTE SINTE 2> DETTE ER EN ANDEN SINTE MED ET ET ORD

6.2. Windowing af begivenheder

Når du behandler en strøm af begivenheder i realtid, kan det være nødvendigt at gruppere begivenheder sammen og anvende nogle beregninger på et vindue af disse begivenheder.

Antag, at vi har en strøm af begivenheder, hvor hver begivenhed er et par, der består af begivenhedsnummeret og tidsstemplet, da begivenheden blev sendt til vores system, og at vi kan tåle begivenheder, der ikke er i orden, men kun hvis de ikke er mere end tyve sekunder for sent.

Lad os i dette eksempel først oprette en strøm, der simulerer to begivenheder, der er adskillige minutter fra hinanden, og definere en tidsstempeludtrækker, der specificerer vores tærskelværdi:

SingleOutputStreamOperator windowed = env.fromElements (new Tuple2 (16, ZonedDateTime.now (). plusMinutes (25) .toInstant (). getEpochSecond ()), new Tuple2 (15, ZonedDateTime.now (). plusMinutes (2) .toInstant () .getEpochSecond ())) .assignTimestampsAndWatermarks (new BoundedOutOfOrdernessTimestampExtractor (Time.seconds (20)) {@Override public long extractTimestamp (Tuple2 element) {return element.f1 * 1000; }});

Lad os derefter definere en vindueshandling for at gruppere vores begivenheder i fem sekunders vinduer og anvende en transformation på disse begivenheder:

SingleOutputStreamOperator reduceret = vinduesvindue. vindueAlle (TumblingEventTimeWindows.of (Time.seconds (5))) .maxBy (0, true); reduced.print ();

Det får det sidste element i hvert fem sekunders vindue, så det udskrives:

1> (15,1491221519)

Bemærk, at vi ikke kan se den anden begivenhed, fordi den ankom senere end den angivne tærskel for forsinkelse.

7. Konklusion

I denne artikel introducerede vi Apache Flink-rammen og kiggede på nogle af de transformationer, der blev leveret med dens API.

Vi implementerede et ordtællingsprogram ved hjælp af Flinks flydende og funktionelle DataSet API. Så kiggede vi på DataStream API og implementerede en simpel transformation i realtid på en strøm af begivenheder.

Implementeringen af ​​alle disse eksempler og kodestykker kan findes på GitHub - dette er et Maven-projekt, så det skal være let at importere og køre som det er.