En guide til Apache Crunch

1. Introduktion

I denne vejledning demonstrerer vi Apache Crunch med et eksempel på en databehandlingsapplikation. Vi kører denne applikation ved hjælp af MapReduce-rammen.

Vi starter med kort at dække nogle Apache Crunch-koncepter. Så springer vi ind i en prøveapp. I denne app udfører vi tekstbehandling:

  • Først og fremmest læser vi linjerne fra en tekstfil
  • Senere deler vi dem i ord og fjerner nogle almindelige ord
  • Derefter grupperer vi de resterende ord for at få en liste over unikke ord og deres antal
  • Endelig skriver vi denne liste til en tekstfil

2. Hvad er crunch?

MapReduce er en distribueret, parallel programmeringsramme til behandling af store mængder data på en klynge af servere. Software-rammer som Hadoop og Spark implementerer MapReduce.

Crunch giver en ramme til at skrive, teste og køre MapReduce-rørledninger i Java. Her skriver vi ikke MapReduce-jobene direkte. I stedet definerer vi datarørledningen (dvs. operationerne til at udføre input, behandling og output trin) ved hjælp af Crunch API'er. Crunch Planner kortlægger dem til MapReduce-jobene og udfører dem efter behov.

Derfor koordineres hver Crunch-datarørledning af en forekomst af Rørledning interface. Denne grænseflade definerer også metoder til aflæsning af data i en pipeline via Kilde forekomster og udskrivning af data fra en pipeline til Mål tilfælde.

Vi har 3 grænseflader til at repræsentere data:

  1. PC-samling - en uforanderlig, distribueret samling af elementer
  2. PTable<>, V> - et uforanderligt, distribueret, ikke-ordnet multikort over nøgler og værdier
  3. PGroupedTable<>, V> - et distribueret, sorteret kort over nøgler af typen K til en Iterabel V der kan gentages nøjagtigt en gang

DoFn er basisklassen for alle databehandlingsfunktioner. Det svarer til Mapper, Reducer og Combiner klasser i MapReduce. Vi bruger det meste af udviklingen på at skrive og teste logiske beregninger ved hjælp af den.

Nu hvor vi er mere fortrolige med Crunch, lad os bruge det til at oprette applikationseksemplet.

3. Opsætning af et Crunch-projekt

Lad os først og fremmest oprette et Crunch-projekt med Maven. Vi kan gøre det på to måder:

  1. Tilføj de krævede afhængigheder i pom.xml fil til et eksisterende projekt
  2. Brug en arketype til at generere et startprojekt

Lad os se hurtigt på begge tilgange.

3.1. Maven afhængigheder

For at tilføje Crunch til et eksisterende projekt, lad os tilføje de krævede afhængigheder i pom.xml fil.

Lad os først tilføje crunch-core bibliotek:

 org.apache.crunch crunch-core 0.15.0 

Lad os derefter tilføje hadoop-klient bibliotek til at kommunikere med Hadoop. Vi bruger den version, der matcher Hadoop-installationen:

 org.apache.hadoop hadoop-client 2.2.0 leveret 

Vi kan tjekke Maven Central for de nyeste versioner af crunch-core og hadoop-klientbiblioteker.

3.2. Maven arketype

En anden tilgang er hurtigt at generere et startprojekt ved hjælp af Maven-arketypen leveret af Crunch:

mvn arketype: generer -Dfilter = org.apache.crunch: crunch-arketype 

Når du bliver bedt om af ovenstående kommando, leverer vi Crunch-versionen og detaljerne om projektartefakter.

4. Crunch Pipeline Setup

Efter oprettelsen af ​​projektet skal vi oprette en Rørledning objekt. Crunch har 3 Rørledning implementeringer:

  • MRPipeline - udfører inden for Hadoop MapReduce
  • SparkPipeline - udføres som en serie af gnistrørledninger
  • MemPipeline - udfører i-hukommelsen på klienten og er nyttig til enhedstest

Normalt udvikler og tester vi ved hjælp af en forekomst af MemPipeline. Senere bruger vi en forekomst af MRPipeline eller SparkPipeline til faktisk udførelse.

Hvis vi havde brug for en pipeline i hukommelsen, kunne vi bruge den statiske metode getInstance at få MemPipeline eksempel:

Pipeline pipeline = MemPipeline.getInstance ();

Men for nu, lad os oprette en forekomst af MRPipeline at udføre applikationen med Hadoop:

Pipeline pipeline = ny MRPipeline (WordCount.class, getConf ());

5. Læs inputdata

Efter oprettelse af pipeline-objektet ønsker vi at læse inputdata. Det Rørledning interface giver en bekvem metode til at læse input fra en tekstfil, readTextFile (stienavn).

Lad os kalde denne metode til at læse inputtekstfilen:

PCollection linjer = pipeline.readTextFile (inputPath);

Ovenstående kode læser tekstfilen som en samling af Snor.

Lad os som næste trin skrive en test case til læsning af input:

@Test offentlig ugyldighed givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead () {Pipeline pipeline = MemPipeline.getInstance (); PCollection lines = pipeline.readTextFile (INPUT_FILE_PATH); assertEquals (21, lines.asCollection () .getValue () .størrelse ()); }

I denne test verificerer vi, at vi får det forventede antal linjer, når vi læser en tekstfil.

6. Trin til databehandling

Efter at have læst inputdataene er vi nødt til at behandle dem. Crunch API indeholder et antal underklasser af DoFn til at håndtere almindelige databehandlingsscenarier:

  • FilterFn - filtrerer medlemmer af en samling baseret på en boolsk tilstand
  • MapFn - kortlægger hver indgangspost til nøjagtigt en udgangspost
  • CombineFn - kombinerer et antal værdier til en enkelt værdi
  • Deltag i FN - udfører sammenføjninger såsom indre sammenføjning, venstre ydre sammenføjning, højre ydre sammenføjning og fuld ydre sammenføjning

Lad os implementere følgende databehandlingslogik ved hjælp af disse klasser:

  1. Opdel hver linje i inputfilen i ord
  2. Fjern stopordene
  3. Tæl de unikke ord

6.1. Del en linje med tekst i ord

Lad os først og fremmest oprette Tokenizer klasse for at opdele en linje i ord.

Vi udvider DoFn klasse. Denne klasse har en abstrakt metode kaldet behandle. Denne metode behandler inputregistreringer fra a PC-samling og sender output til en Emitter.

Vi er nødt til at implementere opdelingslogikken i denne metode:

public class Tokenizer udvider DoFn {privat statisk endelig Splitter SPLITTER = Splitter .onPattern ("\ s +") .omitEmptyStrings (); @ Overstyr offentlig ugyldig proces (String line, Emitter emitter) {for (String word: SPLITTER.split (line)) {emitter.emit (word); }}} 

I ovenstående implementering har vi brugt Splitter klasse fra Guava-biblioteket for at udtrække ord fra en linje.

Lad os derefter skrive en enhedstest til Tokenizer klasse:

@RunWith (MockitoJUnitRunner.class) offentlig klasse TokenizerUnitTest {@Mock privat emitter emitter; @ Test offentlig ugyldighed givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEitted () {Tokenizer splitter = ny Tokenizer (); splitter.process ("hej verden", emitter); verificer (emitter) .emit ("hej"); verificere (emitter) .emit ("verden"); verificereNoMoreInteractions (emitter); }}

Ovenstående test bekræfter, at de korrekte ord returneres.

Lad os endelig dele de læste linjer fra inputtekstfilen ved hjælp af denne klasse.

Det parallelDo metode til PC-samling interface anvender det givne DoFn til alle elementerne og returnerer et nyt PC-samling.

Lad os kalde denne metode på linjesamlingen og passere en forekomst af Tokenizer:

PC-samlingsord = lines.parallelDo (ny Tokenizer (), Writables.strings ()); 

Som et resultat får vi listen over ord i inputtekstfilen. Vi fjerner stopordene i næste trin.

6.2. Fjern stopord

På samme måde som det foregående trin, lad os oprette en StopWordFilter klasse for at filtrere stopord ud.

Imidlertid vil vi udvide FilterFn i stedet for DoFn. FilterFn har en abstrakt metode kaldet acceptere. Vi har brug for at implementere filtreringslogikken i denne metode:

offentlig klasse StopWordFilter udvider FilterFn {// Engelske stopord, lånt fra Lucene. privat statisk endelig Sæt STOP_WORDS = ImmutableSet .copyOf (ny streng [] {"a", "og", "er", "som", "ved", "være", "men", "ved", "for" , "hvis", "i", "ind i", "er", "det", "nej", "ikke", "af", "på", "eller", "s", "sådan", " t "," det "," det "," deres "," derefter "," der "," disse "," de "," dette "," til "," var "," vil "," med " }); @ Override public boolean accept (String word) {return! STOP_WORDS.contains (word); }}

Lad os derefter skrive enhedstesten til StopWordFilter klasse:

offentlig klasse StopWordFilterUnitTest {@Test public void givenFilter_whenStopWordPassed_thenFalseReturned () {FilterFn filter = new StopWordFilter (); assertFalse (filter.accept ("the")); assertFalse (filter.accept ("a")); } @Test offentlig ugyldighed givenFilter_whenNonStopWordPassed_thenTrueReturned () {FilterFn filter = nyt StopWordFilter (); assertTrue (filter.accept ("Hej")); assertTrue (filter.accept ("Verden")); } @Test offentlig ugyldighed givenWordCollection_whenFiltered_thenStopWordsRemoved () {PCollection words = MemPipeline .collectionOf ("This", "is", "a", "test", "phrase"); PCollection noStopWords = ord.filter (nyt StopWordFilter ()); assertEquals (ImmutableList.of ("Denne", "test", "sætning"), Lists.newArrayList (noStopWords.materialize ())); }}

Denne test verificerer, at filtreringslogikken udføres korrekt.

Lad os endelig bruge StopWordFilter for at filtrere listen over ord, der blev genereret i det forrige trin. Det filter metode til PC-samling interface anvender det givne FilterFn til alle elementerne og returnerer et nyt PC-samling.

Lad os kalde denne metode på ordsamlingen og videregive en forekomst af StopWordFilter:

PCollection noStopWords = ord.filter (nyt StopWordFilter ());

Som et resultat får vi den filtrerede ordsamling.

6.3. Tæl unikke ord

Efter at have fået den filtrerede ordsamling, vil vi tælle, hvor ofte hvert ord forekommer. PC-samling interface har en række metoder til at udføre almindelige sammenlægninger:

  • min - returnerer minimumselementet i samlingen
  • maks - returnerer det maksimale element i samlingen
  • længde - returnerer antallet af elementer i samlingen
  • tælle - returnerer a PTable der indeholder antallet af hvert unikke element i samlingen

Lad os bruge tælle metode til at få de unikke ord sammen med deres antal:

// Tællemetoden anvender en række Crunch-primitiver og returnerer // et kort over de unikke ord i input PC-samling til deres antal. PTable tæller = noStopWords.count ();

7. Angiv output

Som et resultat af de foregående trin har vi en ordtabel og deres antal. Vi vil skrive dette resultat til en tekstfil. Det Rørledning interface giver bekvemme metoder til at skrive output:

ugyldig skrivning (samling af pc-samling, målmål); ugyldig skrivning (PCollection-samling, Target target, Target.WriteMode writeMode); ugyldig writeTextFile (PCollection samling, String pathName);

Lad os derfor kalde writeTextFile metode:

pipeline.writeTextFile (tæller, outputPath); 

8. Administrer udførelse af rørledning

Alle trin hidtil har netop defineret datarørledningen. Intet input er blevet læst eller behandlet. Dette er fordi Crunch bruger doven eksekveringsmodel.

Det kører ikke MapReduce-job, før en metode, der styrer jobplanlægning og -udførelse, påkaldes på pipeline-grænsefladen:

  • løb - udarbejder en eksekveringsplan for at oprette de krævede output og udfører den derefter synkront
  • Færdig - kører de resterende job, der kræves for at generere output, og renser derefter oprettede mellemliggende datafiler
  • runAsync - svarer til kørselsmetoden, men udføres på en ikke-blokerende måde

Lad os derfor kalde Færdig metode til at udføre rørledningen som MapReduce-job:

PipelineResult resultat = pipeline.done (); 

Ovenstående erklæring kører MapReduce-job for at læse input, behandle dem og skrive resultatet til outputkataloget.

9. Sætte rørledningen sammen

Indtil videre har vi udviklet og enhedstestet logikken til at læse inputdata, behandle dem og skrive til outputfilen.

Lad os derefter sætte dem sammen for at bygge hele datapipelinen:

public int run (String [] args) kaster Undtagelse {String inputPath = args [0]; Streng outputPath = args [1]; // Opret et objekt til at koordinere oprettelse og udførelse af pipeline. Pipeline pipeline = ny MRPipeline (WordCount.class, getConf ()); // Henvis en given tekstfil til en samling af strenge. PCollection linjer = pipeline.readTextFile (inputPath); // Definer en funktion, der opdeler hver linje i en PC-samling af strenge i // en PC-samling, der består af de enkelte ord i filen. // Det andet argument indstiller serialiseringsformatet. PC-samlingsord = lines.parallelDo (ny Tokenizer (), Writables.strings ()); // Tag ordsamlingen og fjern kendte stopord. PCollection noStopWords = ord.filter (nyt StopWordFilter ()); // Tællemetoden anvender en række Crunch-primitiver og returnerer // et kort over de unikke ord i input PC-samling til deres antal. PTable tæller = noStopWords.count (); // Instruer rørledningen til at skrive de resulterende optællinger til en tekstfil. pipeline.writeTextFile (tæller, outputPath); // Udfør rørledningen som en MapReduce. PipelineResult resultat = pipeline.done (); return result.succeeded ()? 0: 1; }

10. Konfiguration af Hadoop-start

Datarørledningen er således klar.

Vi har dog brug for koden for at starte den. Lad os derfor skrive vigtigste metode til at starte applikationen:

offentlig klasse WordCount udvider konfigureret værktøj {offentligt statisk ugyldigt hoved (String [] args) kaster undtagelse {ToolRunner.run (ny konfiguration (), ny WordCount (), args); }

ToolRunner.run analyserer Hadoop-konfigurationen fra kommandolinjen og udfører MapReduce-jobbet.

11. Kør applikation

Den komplette ansøgning er nu klar. Lad os køre følgende kommando for at bygge den:

mvn-pakke 

Som et resultat af ovenstående kommando får vi den pakkede applikation og en speciel jobkrukke i målmappen.

Lad os bruge denne jobkrukke til at udføre applikationen på Hadoop:

hadoop jar target / crunch-1.0-SNAPSHOT-job.jar 

Applikationen læser inputfilen og skriver resultatet til outputfilen. Outputtfilen indeholder unikke ord sammen med deres antal svarende til følgende:

[Tilføj, 1] [Tilføjet, 1] [Beundring, 1] [Indrømmelse, 1] [Tillæg, 1]

Ud over Hadoop kan vi køre applikationen inden for IDE, som en enkeltstående applikation eller som enhedstest.

12. Konklusion

I denne vejledning oprettede vi en databehandlingsapplikation, der kører på MapReduce. Apache Crunch gør det let at skrive, teste og udføre MapReduce-rørledninger i Java.

Som sædvanlig kan den fulde kildekode findes på Github.


$config[zx-auto] not found$config[zx-overlay] not found