ETL med Spring Cloud Data Flow

1. Oversigt

Spring Cloud Data Flow er et cloud-native værktøjssæt til opbygning af datarørledninger og batchprocesser i realtid. Spring Cloud Data Flow er klar til at blive brugt til en række brugssager til databehandling som simpel import / eksport, ETL-behandling, begivenhedsstreaming og forudsigende analyse.

I denne vejledning lærer vi et eksempel på ETL (Real-Extract Transform and Load) i realtid ved hjælp af en stream-pipeline, der udtrækker data fra en JDBC-database, omdanner dem til enkle POJO'er og indlæser dem i en MongoDB.

2. ETL og Event-Stream Processing

ETL - udtræk, transformer og indlæs - blev ofte omtalt som en proces, der batch-indlæser data fra flere databaser og systemer til et fælles datalager. I dette datalager er det muligt at udføre tung databehandlingsbehandling uden at gå på kompromis med systemets samlede ydeevne.

Imidlertid ændrer nye tendenser den måde, hvordan dette gøres på. ETL har stadig en rolle i overførslen af ​​data til datalager og datasøer.

I dag kan dette gøres med streams i en event-stream-arkitektur ved hjælp af Spring Cloud Data Flow.

3. Dataskilte med forårsky

Med Spring Cloud Data Flow (SCDF) kan udviklere oprette datarørledninger i to varianter:

  • Langvarige stream-applikationer i realtid ved hjælp af Spring Cloud Stream
  • Kortvarige batch-opgaveapplikationer ved hjælp af Spring Cloud Task

I denne artikel dækker vi den første, en langvarig streamingapplikation baseret på Spring Cloud Stream.

3.1. Spring Cloud Stream-applikationer

SCDF Stream-rørledningerne er sammensat af trin, hvorhvert trin er en applikation bygget i Spring Boot-stil ved hjælp af Spring Cloud Stream-mikrorammen. Disse applikationer er integreret af en messaging-middelvare som Apache Kafka eller RabbitMQ.

Disse applikationer er klassificeret i kilder, processorer og dræn. Sammenlignet med ETL-processen kan vi sige, at kilden er "ekstraktet", processoren er "transformeren", og vasken er "belastning" -delen.

I nogle tilfælde kan vi bruge en applikationsstarter i et eller flere trin i rørledningen. Dette betyder, at vi ikke behøver at implementere en ny applikation til et trin, men i stedet konfigurere en eksisterende applikationsstarter, der allerede er implementeret.

En liste over applikationsstartere kan findes her.

3.2. Spring Cloud Data Flow Server

Det sidste stykke af arkitekturen er Spring Cloud Data Flow Server. SCDF-serveren implementerer applikationerne og rørledningsstrømmen ved hjælp af Spring Cloud Deployer Specification. Denne specifikation understøtter SCDF-cloud-native smag ved at distribuere til en række moderne driftstider, såsom Kubernetes, Apache Mesos, Garn og Cloud Foundry.

Vi kan også køre strømmen som en lokal implementering.

Flere oplysninger om SCDF-arkitekturen kan findes her.

4. Opsætning af miljø

Før vi starter, er vi nødt til det vælg delene af denne komplekse implementering. Det første stykke at definere er SCDF-serveren.

Til testning vi bruger SCDF Server Local til lokal udvikling. Til produktionsinstallationen kan vi senere vælge en cloud-native runtime, som SCDF Server Kubernetes. Vi kan finde listen over serverkørselstider her.

Lad os nu kontrollere systemkravene for at køre denne server.

4.1. Systemkrav

For at køre SCDF-serveren bliver vi nødt til at definere og opsætte to afhængigheder:

  • messaging-mellemware og
  • RDBMS.

Til messaging-mellemware, vi arbejder med RabbitMQ, og vi vælger PostgreSQL som RDBMS til lagring af vores pipeline stream-definitioner.

For at køre RabbitMQ skal du downloade den nyeste version her og starte en RabbitMQ-forekomst ved hjælp af standardkonfigurationen eller køre følgende Docker-kommando:

docker-kørsel - navn dataflow-rabbit -p 15672: 15672 -p 5672: 5672 -d rabbitmq: 3-management

Som det sidste installationstrin skal du installere og køre PostgreSQL RDBMS på standardporten 5432. Opret derefter en database, hvor SCDF kan gemme sine streamdefinitioner ved hjælp af følgende script:

OPRET DATABASE dataforløb;

4.2. Spring Cloud Data Flow Server Local

For at køre SCDF Server Local kan vi vælge at starte serveren ved hjælp af docker-komponere, eller vi kan starte det som et Java-program.

Her kører vi SCDF Server Local som et Java-program. For at konfigurere applikationen skal vi definere konfigurationen som Java-applikationsparametre. Vi har brug for Java 8 i systemstien.

For at være vært for krukker og afhængigheder skal vi oprette en hjemmemappe til vores SCDF-server og downloade SCDF-serverens lokale distribution i denne mappe. Du kan downloade den seneste distribution af SCDF Server Local her.

Vi er også nødt til at oprette en lib-mappe og placere en JDBC-driver der. Den seneste version af PostgreSQL-driveren er tilgængelig her.

Lad os endelig køre den lokale SCDF-server:

$ java -Dloader.path = lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \ --spring.datasource.url = jdbc: postgresql: //127.0.0.1: 5432 / dataflow \ --spring.datasource.username = postgres_username \ --spring.datasource.password = postgres_password \ --spring.datasource.driver-class-name = org.postgresql.Driver \ --spring.rabbitmq.host = 127.0.0.1 \ --spring.rabbitmq.port = 5672 \ --spring.rabbitmq.username = gæst \ --spring.rabbitmq.password = gæst

Vi kan kontrollere, om den kører ved at se på denne URL:

// localhost: 9393 / dashboard

4.3. Spring Cloud Data Flow Shell

SCDF Shell er en kommandolinjeværktøj, der gør det nemt at komponere og implementere vores applikationer og rørledninger. Disse Shell-kommandoer kører over Spring Cloud Data Flow Server REST API.

Download den nyeste version af krukken til din SCDF-hjemmemappe, der er tilgængelig her. Når det er gjort, skal du køre følgende kommando (opdater versionen efter behov):

$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar ____ ____ _ __ / ___ | _ __ _ __ (_) _ __ __ _ / ___ | | ___ _ _ __ | | \ ___ \ | '_ \ | '__ | | '_ \ / _` | | | | | / _ \ | | | | / _` | ___) | | _) | | | | | | | (_ | | | | ___ | | (_) | | _ | | (_ | | | ____ / | .__ / | _ | _ | _ | | _ | \ __, | \ ____ | _ | \ ___ / \ __, _ | \ __, _ | ____ | _ | _ __ | ___ / __________ | _ \ __ _ | | _ __ _ | ___ | | _____ __ \ \ \ \ \ \ | | | | / _ ` | __ / _` | | _ | | / _ \ \ / / / \ \ \ \ \ | | _ | | (_ | | || (_ | | | _ | | | (_) \ VV / / / / / / / | ____ / \ __, _ | \ __ \ __, _ | | _ | | _ | \ ___ / \ _ / \ _ / / _ / _ / _ / _ / _ / Velkommen til Spring Cloud Data Flow shell. For hjælp, tryk TAB, eller skriv "hjælp". dataflow:>

Hvis i stedet for “dataflow:> ” du får "server-ukendt:> ” i den sidste linje kører du ikke SCDF-serveren på localhost. I dette tilfælde skal du køre følgende kommando for at oprette forbindelse til en anden vært:

server-ukendt:> konfigurationsserver for dataforløb // {host}

Nu er Shell tilsluttet SCDF-serveren, og vi kan køre vores kommandoer.

Den første ting, vi skal gøre i Shell, er at importere applikationsstarterne. Find den nyeste version her til RabbitMQ + Maven i Spring Boot 2.0.x, og kør følgende kommando (igen, opdater versionen, her “Darwin-SR1", efter behov):

$ dataflow:> appimport --uri //bit.ly/Darwin-SR1-stream-applications-rabbit-maven

For at kontrollere de installerede applikationer skal du køre følgende Shell-kommando:

$ dataflow:> appliste

Som et resultat skal vi se en tabel, der indeholder alle de installerede applikationer.

SCDF tilbyder også en grafisk grænseflade, der hedder Flo, som vi kan få adgang til via denne adresse: // localhost: 9393 / dashboard. Imidlertid er dens anvendelse ikke omfattet af denne artikel.

5. Komponering af en ETL-rørledning

Lad os nu oprette vores stream-pipeline. For at gøre dette bruger vi JDBC Source-applikationsstarter til at udtrække oplysninger fra vores relationsdatabase.

Vi opretter også en brugerdefineret processor til transformation af informationsstrukturen og en brugerdefineret vask til at indlæse vores data i en MongoDB.

5.1. Uddrag - Forberedelse af en relationsdatabase til udvinding

Lad os oprette en database med navnet på crm og en tabel med navnet på kunde:

OPRET DATABASE crm;
Opret TABEL-kunde (id bigint IKKE NULL, importeret boolsk DEFAULT falsk, kundenavn karakter varierer (50), PRIMÆR NØGLE (id))

Bemærk, at vi bruger et flag importeret, der gemmer, hvilken post der allerede er importeret. Vi kan også gemme disse oplysninger i en anden tabel, hvis det er nødvendigt.

Lad os nu indsætte nogle data:

INDSÆT I Kunde (id, kundenavn, importeret) VÆRDIER (1, 'John Doe', falsk);

5.2. Transform - Mapping JDBC Felter til MongoDB Felter struktur

Til transformationstrinnet foretager vi en simpel oversættelse af feltet Kundenavn fra kildetabellen til et nyt felt navn. Andre transformationer kunne udføres her, men lad os holde eksemplet kort.

For at gøre dette opretter vi et nyt projekt med navnet kundetransformering. Den nemmeste måde at gøre dette på er ved at bruge Spring Initializr-webstedet til at oprette projektet. Når du er kommet til webstedet, skal du vælge et gruppe- og et artefaktnavn. Vi bruger com.kunde og kundetransformering, henholdsvis.

Når dette er gjort, skal du klikke på knappen "Generer projekt" for at downloade projektet. Derefter pakkes projektet ud og importeres til din foretrukne IDE, og tilføj følgende afhængighed til pom.xml:

 org.springframework.cloud spring-cloud-stream-binder-rabbit 

Nu er vi klar til at begynde at kode feltnavnkonvertering. For at gøre dette opretter vi Kunde klasse til at fungere som en adapter. Denne klasse vil modtage Kundenavn via sætnavn () metode og udsender dens værdi via getName metode.

Det @JsonProperty annoteringer vil udføre transformationen, mens deserialisering fra JSON til Java:

offentlig klasse kunde {privat Lang id; privat strengnavn; @JsonProperty ("kunde_navn") offentligt ugyldigt sætnavn (strengnavn) {dette.navn = navn; } @JsonProperty ("navn") offentlig String getName () {returnavn; } // Getters og Setters}

Processoren skal modtage data fra et input, udføre transformationen og binde resultatet til en outputkanal. Lad os oprette en klasse til at gøre dette:

import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.integration.annotation.Transformer; @EnableBinding (Processor.class) offentlig klasse CustomerProcessorConfiguration {@Transformer (inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) offentlig kunde convertToPojo (Kundens nyttelast) {return nyttelast; }}

I ovenstående kode kan vi observere, at transformationen sker automatisk. Inputet modtager dataene, da JSON og Jackson deserialiserer dem til en Kunde objekt ved hjælp af sæt metoder.

Det modsatte er for output, dataene serialiseres til JSON ved hjælp af metoder.

5.3. Load - Sink i MongoDB

På samme måde som transformationstrinnet vi opretter endnu et maven-projekt, nu med navnet kunde-mongodb-håndvask. Gå igen til Spring Initializr, for gruppen vælger com.kunde, og vælg for artefakten kunde-mongodb-vask. Skriv derefter MongoDB i afhængigheds-søgefeltet, og download projektet.

Derefter skal du pakke den ud og importere den til din foretrukne IDE.

Tilføj derefter den samme ekstra afhængighed som i kundetransformering projekt.

Nu opretter vi en anden Kunde klasse til modtagelse af input i dette trin:

import org.springframework.data.mongodb.core.mapping.Document; @Document (collection = "kunde") offentlig klasse kunde {privat Lang id; privat strengnavn; // Getters og Setters}

For at synke Kunde, opretter vi en lytterklasse, der gemmer kundenheden ved hjælp af Kundeopbevaring:

@EnableBinding (Sink.class) offentlig klasse CustomerListener {@Autowired private CustomerRepository repository; @StreamListener (Sink.INPUT) offentlig ugyldig gemme (Kundekunde) {repository.save (kunde); }}

Og Kundeopbevaring, i dette tilfælde er en MongoRepository fra Spring Data:

import org.springframework.data.mongodb.repository.MongoRepository; import org.springframework.stereotype.Repository; @Repository offentlig grænseflade CustomerRepository udvider MongoRepository {} 

5.4. Stream Definition

Nu, begge brugerdefinerede applikationer er klar til at blive registreret på SCDF Server. For at opnå dette skal du kompilere begge projekter ved hjælp af kommandoen Maven mvn installere.

Vi registrerer dem derefter ved hjælp af Spring Cloud Data Flow Shell:

app-register - navn kundetransform - type processor --uri maven: //com.kunde: kunde-transformation: 0.0.1-SNAPSHOT
app-register --navn kunde-mongodb-vask - type vask --uri maven: //com.customer: customer-mongodb-sink: jar: 0.0.1-SNAPSHOT

Lad os endelig kontrollere, om applikationerne er gemt på SCDF, kør applikationslistekommandoen i skallen:

app-liste

Som et resultat skal vi se begge applikationer i den resulterende tabel.

5.4.1. Stream Pipeline Domain-Specific Language - DSL

En DSL definerer konfigurationen og dataflyden mellem applikationerne. SCDF DSL er enkel. I det første ord definerer vi programmets navn efterfulgt af konfigurationerne.

Syntaksen er også en Unix-inspireret pipeline-syntaks, der bruger lodrette bjælker, også kendt som "rør", til at forbinde flere applikationer:

http --port = 8181 | log

Dette opretter en HTTP-applikation, der serveres i port 8181, som sender enhver modtaget kropsnyttelast til en log.

Lad os nu se, hvordan du opretter DSL-streamdefinitionen af ​​JDBC-kilden.

5.4.2. JDBC Source Stream Definition

Nøglekonfigurationerne for JDBC-kilden er forespørgsel og opdatering.forespørgsel vælger ulæste poster, mens opdatering ændrer et flag for at forhindre, at de aktuelle poster genlæses.

Vi definerer også JDBC-kilden til afstemning i en fast forsinkelse på 30 sekunder og afstemning på maksimalt 1000 rækker. Endelig definerer vi konfigurationerne af forbindelsen, som driver, brugernavn, adgangskode og forbindelses-URL:

jdbc --query = 'VÆLG id, kundenavn FRA public.customer WHERE importeret = false' --update = 'UPDATE public.customer SET importeret = true WHERE id i (: id)' --max-rows-per-poll = 1000 - fast forsinkelse = 30 --time-unit = SECONDS - driver-class-name = org.postgresql.Driver --url = jdbc: postgresql: // localhost: 5432 / crm --username = postgres - adgangskode = postgres

Flere JDBC-kildekonfigurationsegenskaber kan findes her.

5.4.3. Kunde MongoDB Sink Stream Definition

Da vi ikke definerede forbindelseskonfigurationerne i application.properties af kunde-mongodb-vask, konfigurerer vi gennem DSL-parametre.

Vores ansøgning er fuldt ud baseret på MongoDataAutoConfiguration. Du kan tjekke de andre mulige konfigurationer her. Dybest set definerer vi spring.data.mongodb.uri:

customer-mongodb-sink --spring.data.mongodb.uri = mongodb: // localhost / main

5.4.4. Opret og implementer strømmen

For at oprette den endelige streamdefinition skal du først gå tilbage til Shell og udføre følgende kommando (uden linjeskift er de lige indsat for læsbarhed):

stream create --name jdbc-to-mongodb --definition "jdbc --query = 'VÆLG id, kundenavn FRA public.customer WHERE importeret = false' --fixed-delay = 30 --max-rows-per-poll = 1000 --update = 'UPDATE kundesæt importeret = sand WHERE id i (: id)' - time-unit = SECONDS --password = postgres - driver-class-name = org.postgresql.Driver --username = postgres --url = jdbc: postgresql: // localhost: 5432 / crm | customer-transform | customer-mongodb-sink --spring.data.mongodb.uri = mongodb: // localhost / main " 

Denne stream DSL definerer en stream ved navn jdbc-til-mongodb. Næste, vi distribuerer strømmen efter dens navn:

stream deploy --name jdbc-to-mongodb 

Endelig skal vi se placeringen af ​​alle tilgængelige logfiler i logoutputtet:

Logfiler vil være i {PATH_TO_LOG} /spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink Logs vil være i {PATH_TO_LOG} / spring-cloud-deployer / jdbc-to-mongodb /jdbc-to-mongodb.customer-transform Logfiler vil være i {PATH_TO_LOG} /spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc

6. Konklusion

I denne artikel har vi set et komplet eksempel på en ETL-datarørledning ved hjælp af Spring Cloud Data Flow.

Mest bemærkelsesværdigt så vi konfigurationerne af en applikationsstarter, oprettede en ETL-streampipeline ved hjælp af Spring Cloud Data Flow Shell og implementerede brugerdefinerede applikationer til vores læsning, transformation og skrivning af data.

Som altid kan eksempelkoden findes i GitHub-projektet.