Apache Spark: Forskelle mellem dataframes, datasæt og RDD'er

1. Oversigt

Apache Spark er et hurtigt, distribueret databehandlingssystem. Det udfører databehandling i hukommelsen og bruger cache i hukommelsen og optimeret udførelse, hvilket resulterer i hurtig ydeevne. Det giver API'er på højt niveau til populære programmeringssprog som Scala, Python, Java og R.

I denne hurtige vejledning gennemgår vi tre af de grundlæggende koncepter for Spark: datarammer, datasæt og RDD'er.

2. DataFrame

Spark SQL introducerede en tabulær dataabstraktion kaldet DataFrame siden Spark 1.3. Siden da er det blevet en af ​​de vigtigste funktioner i Spark. Denne API er nyttig, når vi vil håndtere strukturerede og semistrukturerede, distribuerede data.

I afsnit 3 diskuterer vi Resilient Distribuerede datasæt (RDD). DataFrames gemmer data på en mere effektiv måde end RDD'er, dette skyldes, at de bruger de uforanderlige, in-memory, elastiske, distribuerede og parallelle muligheder for RDD'er, men de anvender også et skema til dataene. DataFrames oversætter også SQL-kode til optimerede RDD-operationer på lavt niveau.

Vi kan oprette DataFrames på tre måder:

  • Konvertering af eksisterende RDD'er
  • Kører SQL-forespørgsler
  • Indlæser eksterne data

Spark team introduceret SparkSession i version 2.0 forener det alle forskellige sammenhænge, ​​hvilket sikrer, at udviklere ikke behøver at bekymre sig om at oprette forskellige sammenhænge:

SparkSession session = SparkSession.builder () .appName ("TouristDataFrameExample") .master ("local [*]") .getOrCreate (); DataFrameReader dataFrameReader = session.read ();

Vi analyserer Tourist.csv fil:

Datasæt data = dataFrameReader.option ("header", "true") .csv ("data / Tourist.csv");

Siden Spark 2.0 blev DataFrame en Datasæt af typen Række, så vi kan bruge en DataFrame som et alias for en Datasæt.

Vi kan vælge bestemte kolonner, som vi er interesseret i. Vi kan også filtrere og gruppere efter en given kolonne:

data.select (col ("country"), col ("year"), col ("value")) .show (); data.filter (col ("country"). equalTo ("Mexico")) .show (); data.groupBy (col ("country")) .count () .show ();

3. Datasæt

Et datasæt er et sæt stærkt typede, strukturerede data. De giver den velkendte objektorienterede programmeringsstil plus fordelene ved typesikkerhed, da datasæt kan kontrollere syntaks og fangstfejl på kompileringstidspunktet.

Datasæt er en udvidelse af DataFrame, så vi kan betragte en DataFrame som en utypet visning af et datasæt.

Spark-teamet frigav Datasæt API i Spark 1.6, og som de nævnte: "Målet med Spark Datasets er at tilvejebringe en API, der giver brugerne mulighed for let at udtrykke transformationer på objektdomæner, samtidig med at de giver ydelsen og robusthedsfordelene ved Spark SQL-eksekveringsmotoren".

Først skal vi oprette en klasse af typen TouristData:

offentlig klasse TouristData {privat strengregion; privat strengland; privat strengår; private String-serien; privat dobbelt værdi; private String fodnoter; privat streng kilde; // ... getters og setters}

For at kortlægge hver af vores optegnelser til den specificerede type skal vi bruge en indkoder. Kodere oversættes mellem Java-objekter og Sparks interne binære format:

// SparkSession initialisering og dataindlæsning DatasætresponsWithSelectedColumn = data.select (col ("region"), col ("country"), col ("year"), col ("series"), col ("value"). Cast ("dobbelt"), kol ("fodnoter"), kol ("kilde")); Datasæt typedDataset = responsWithSelectedColumns .as (Encoders.bean (TouristData.class));

Som med DataFrame kan vi filtrere og gruppere efter specifikke kolonner:

typedDataset.filter ((FilterFunction) -post -> record.getCountry () .equals ("Norge")) .show (); typedDataset.groupBy (typedDataset.col ("country")) .count () .show ();

Vi kan også udføre operationer som at filtrere efter kolonne, der matcher et bestemt område eller beregne summen af ​​en bestemt kolonne for at få den samlede værdi af det:

typedDataset.filter ((FilterFunction) -post -> record.getYear ()! = null && (Long.valueOf (record.getYear ())> 2010 && Long.valueOf (record.getYear ()) record.getValue ()! = null && record.getSeries () .contains ("udgift")) .groupBy ("land") .agg (sum ("værdi")) .show ();

4. RDD'er

Det modstandsdygtige distribuerede datasæt eller RDD er Sparks primære programmeringsabstraktion. Det repræsenterer en samling af elementer, der er: uforanderlig, elastisk og distribueret.

En RDD indkapsler et stort datasæt, Spark distribuerer automatisk dataene indeholdt i RDD'er over vores klynge og paralleliserer de operationer, vi udfører på dem.

Vi kan kun oprette RDD'er gennem operationer af data i stabil lagring eller operationer på andre RDD'er.

Fejltolerance er vigtig, når vi beskæftiger os med store datasæt, og dataene distribueres på klyngemaskiner. RDD'er er modstandsdygtige på grund af Sparks indbyggede mekanisme til gendannelse af fejl. Spark er afhængig af det faktum, at RDD'er husker, hvordan de blev oprettet, så vi let kan spore slægten for at gendanne partitionen.

Der er to typer operationer, vi kan udføre på RDD'er: Transformationer og handlinger.

4.1. Transformationer

Vi kan anvende transformationer til en RDD for at manipulere dens data. Når denne manipulation er udført, får vi en helt ny RDD, da RDD'er er uforanderlige objekter.

Vi kontrollerer, hvordan vi implementerer Map and Filter, to af de mest almindelige transformationer.

Først skal vi oprette en JavaSparkContext og indlæse dataene som en RDD fra Tourist.csv fil:

SparkConf conf = ny SparkConf (). SetAppName ("store bogstaver") .setMaster ("lokal [*]"); JavaSparkContext sc = ny JavaSparkContext (conf); JavaRDD-turister = sc.textFile ("data / Tourist.csv");

Lad os derefter anvende kortfunktionen for at få navnet på landet fra hver post og konvertere navnet til store bogstaver. Vi kan gemme dette nyligt genererede datasæt som en tekstfil på disken:

JavaRDD upperCaseCountries = tourist.map (linje -> {String [] kolonner = line.split (COMMA_DELIMITER); returner kolonner [1] .toUpperCase ();}). Tydelig (); upperCaseCountries.saveAsTextFile ("data / output / uppercase.txt");

Hvis vi kun vil vælge et specifikt land, kan vi anvende filterfunktionen på vores oprindelige turister RDD:

JavaRDD-turisterInMexico = turister. Filter (linje -> linje.split (COMMA_DELIMITER) [1]. Ligestilling ("Mexico")); touristInMexico.saveAsTextFile ("data / output / touristInMexico.txt");

4.2. Handlinger

Handlinger returnerer en endelig værdi eller gemmer resultaterne på disken efter at have foretaget nogen beregning af dataene.

To af de gentagne gange anvendte handlinger i Spark er Count and Reduce.

Lad os tælle de samlede lande i vores CSV-fil:

// Initialisering af gnistkontekst og dataindlæsning JavaRDD-lande = tourist.map (linje -> {String [] kolonner = line.split (COMMA_DELIMITER); returner kolonner [1];}). Tydelig (); Langt antalOfCountries = country.count ();

Nu beregner vi de samlede udgifter efter land. Vi bliver nødt til at filtrere de poster, der indeholder udgifter i deres beskrivelse.

I stedet for at bruge en JavaRDD, vi bruger en JavaPairRDD. Et par RDD er en type RDD, der kan gemme nøgleværdipar. Lad os kontrollere det næste:

JavaRDD-turistudgifter = turister. Filter (linje -> linje.split (COMMA_DELIMITER) [3]. Indeholder ("udgift")); JavaPairRDD spendPairRdd = touristExpenditure .mapToPair (line -> {String [] columns = line.split (COMMA_DELIMITER); returner ny Tuple2 (kolonner [1], Double.valueOf (kolonner [6]));}); Liste totalByCountry = spendPairRdd .reduceByKey ((x, y) -> x + y) .collect ();

5. Konklusion

For at opsummere skal vi bruge DataFrames eller datasæt, når vi har brug for domænespecifikke API'er, vi har brug for højtstående udtryk som aggregerings-, sum- eller SQL-forespørgsler. Eller når vi ønsker typesikkerhed på kompileringstidspunktet.

På den anden side skal vi bruge RDD'er, når data er ustrukturerede, og vi ikke behøver at implementere et specifikt skema, eller når vi har brug for transformationer og handlinger på lavt niveau.

Som altid er alle kodeeksempler tilgængelige på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found