Kom godt i gang med Stream Processing med Spring Cloud Data Flow

1. Introduktion

Spring Cloud Data Flow er en cloud-native programmerings- og driftsmodel til mikroservices, der kan sammensættes.

Med Spring Cloud Data Flow, kan udviklere oprette og orkestrere datarørledninger til almindelige brugssager såsom dataindtagelse, realtidsanalyse og dataimport / eksport.

Disse datarørledninger findes i to varianter, streaming og batch datarørledninger.

I det første tilfælde forbruges eller produceres en ubegrænset mængde data via messaging middleware. I det andet tilfælde behandler den kortvarige opgave et endeligt sæt data og afsluttes derefter.

Denne artikel vil fokusere på streaming-behandling.

2. Arkitektonisk oversigt

De vigtigste komponenter, som denne type arkitektur er Ansøgninger, det Data Flow Serverog målkørselstiden.

Ud over disse nøglekomponenter har vi normalt også en Data flow shell og en meddelelsesmægler inden for arkitekturen.

Lad os se alle disse komponenter mere detaljeret.

2.1. Ansøgninger

En streaming-datarørledning inkluderer typisk forbrug af begivenheder fra eksterne systemer, databehandling og polyglot-persistens. Disse faser kaldes almindeligvis Kilde, Processorog Håndvask i Spring Cloud terminologi:

  • Kilde: er det program, der bruger begivenheder
  • Processor: bruger data fra Kilde, udfører noget behandling på det og udsender de behandlede data til den næste applikation i pipelinen
  • Håndvask: enten forbruger fra en Kilde eller Processor og skriver dataene til det ønskede persistenslag

Disse applikationer kan pakkes på to måder:

  • Spring Boot uber-jar, der er hostet i et maven-arkiv, fil, http eller enhver anden Spring-ressourceimplementering (denne metode vil blive brugt i denne artikel)
  • Docker

Mange kilder, processorer og sink applikationer til almindelige brugssager (f.eks. Jdbc, hdfs, http, router) er allerede leveret og klar til brug af Spring Cloud Data Flow hold.

2.2. Kørselstid

Der er også behov for en kørselstid, for at disse applikationer kan udføres. De understøttede driftstider er:

  • Cloud Foundry
  • Apache GARN
  • Kubernetes
  • Apache Mesos
  • Lokal server til udvikling (som vil blive brugt i denne artikel)

2.3. Data Flow Server

Komponenten, der er ansvarlig for implementering af applikationer til en runtime, er Data Flow Server. Der er en Data Flow Server eksekverbar jar leveret til hver af målkørselstiderne.

Det Data Flow Server er ansvarlig for fortolkning:

  • En stream DSL, der beskriver den logiske strøm af data gennem flere applikationer.
  • Et implementeringsmanifest, der beskriver kortlægning af applikationer til runtime.

2.4. Data flow shell

Data Flow Shell er en klient for Data Flow Server. Skallen giver os mulighed for at udføre den DSL-kommando, der er nødvendig for at interagere med serveren.

Som et eksempel ville DSL til at beskrive datastrømmen fra en http-kilde til en jdbc-vask blive skrevet som “http | jdbc ”. Disse navne i DSL er registreret hos Data Flow Server og kortlægge applikationsartefakter, der kan hostes i Maven- eller Docker-arkiver.

Spring tilbyder også en grafisk grænseflade, navngivet Flotil oprettelse og overvågning af streaming datarørledninger. Imidlertid er brugen uden for diskussionen af ​​denne artikel.

2.5. Beskedmægler

Som vi har set i eksemplet i det foregående afsnit, har vi brugt rørsymbolet til definitionen af ​​datastrømmen. Rørsymbolet repræsenterer kommunikationen mellem de to applikationer via messaging middleware.

Det betyder, at vi har brug for en meddelelsesmægler, der er i gang i målmiljøet.

De to messaging middleware-mæglere, der understøttes, er:

  • Apache Kafka
  • RabbitMQ

Og så, nu hvor vi har et overblik over de arkitektoniske komponenter - det er tid til at opbygge vores første streambehandlingspipeline.

3. Installer en meddelelsesmægler

Som vi har set har applikationerne i pipeline brug for en messaging-middelware til at kommunikere. I forbindelse med denne artikel går vi videre med RabbitMQ.

For de fulde detaljer om installationen kan du følge instruktionerne på det officielle websted.

4. Den lokale datastrømsserver

For at fremskynde processen med at generere vores applikationer bruger vi Spring Initializr; med sin hjælp kan vi få vores Spring Boot applikationer på få minutter.

Når du er kommet til hjemmesiden, skal du blot vælge en Gruppe og en Artefakt navn.

Når dette er gjort, skal du klikke på knappen Generer projekt for at starte download af Maven-artefakten.

Når download er afsluttet, skal du pakke projektet ud og importere det som et Maven-projekt i din valgte IDE.

Lad os tilføje en Maven-afhængighed til projektet. Som vi har brug for Dataflow Local Server biblioteker, lad os tilføje spring-cloud-starter-dataflow-server-local afhængighed:

 org.springframework.cloud spring-cloud-starter-dataflow-server-local 

Nu er vi nødt til at kommentere Spring Boot hovedklasse med @EnableDataFlowServer kommentar:

@EnableDataFlowServer @ SpringBootApplication offentlig klasse SpringDataFlowServerApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowServerApplication.class, args); }} 

Det er alt. Vores Local Data Flow Server er klar til at blive henrettet:

mvn spring-boot: løb

Applikationen starter op på port 9393.

5. Datastrømskallen

Gå igen til Spring Initializr og vælg en Gruppe og Artefakt navn.

Når vi har downloadet og importeret projektet, lad os tilføje en fjeder-cloud-dataflow-shell-afhængighed:

 org.springframework.cloud spring-cloud-dataflow-shell 

Nu skal vi tilføje @EnableDataFlowShell kommentar til Spring Boot hovedklasse:

@EnableDataFlowShell @ SpringBootApplication offentlig klasse SpringDataFlowShellApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowShellApplication.class, args); }} 

Vi kan nu køre skallen:

mvn spring-boot: løb

Når skallen kører, kan vi skrive Hjælp kommando i prompten for at se en komplet liste over kommandoer, som vi kan udføre.

6. Kildeapplikationen

Tilsvarende opretter vi på Initializr nu en simpel applikation og tilføjer en Stream kanin afhængighed kaldet spring-cloud-starter-stream-rabbit:

 org.springframework.cloud spring-cloud-starter-stream-rabbit 

Vi tilføjer derefter @EnableBinding (Source.class) kommentar til Spring Boot hovedklasse:

@EnableBinding (Source.class) @SpringBootApplication offentlig klasse SpringDataFlowTimeSourceApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowTimeSourceApplication.class, args); }}

Nu skal vi definere kilden til de data, der skal behandles. Denne kilde kan være en hvilken som helst potentiel endeløs arbejdsbyrde (internet-of-things-sensordata, 24/7 hændelsesbehandling, online transaktionsdata indtages).

I vores prøveapplikation producerer vi en begivenhed (for enkelheds skyld et nyt tidsstempel) hvert 10. sekund med en Poller.

Det @InboundChannelAdapter annotation sender en besked til kildens outputkanal ved hjælp af returværdien som meddelelsens nyttelast:

@Bean @InboundChannelAdapter (værdi = Source.OUTPUT, poller = @Poller (fixedDelay = "10000", maxMessagesPerPoll = "1")) offentlig MessageSource timeMessageSource () {return () -> MessageBuilder.withPayload (ny dato (). GetTime ()). build (); } 

Vores datakilde er klar.

7. Processoransøgningen

Derefter opretter vi en applikation og tilføjer en Stream kanin afhængighed.

Vi tilføjer derefter @EnableBinding (Processor.class) kommentar til Spring Boot hovedklasse:

@EnableBinding (Processor.class) @SpringBootApplication public class SpringDataFlowTimeProcessorApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowTimeProcessorApplication.class, args); }}

Dernæst skal vi definere en metode til at behandle de data, der kommer fra kildeapplikationen.

For at definere en transformer skal vi kommentere denne metode med @Transformer kommentar:

@Transformer (inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transformation (Long timestamp) {DateFormat dateFormat = new SimpleDateFormat ("åååå / MM / dd tt: mm: åå"); String date = dateFormat.format (tidsstempel); Retur dato; }

Det konverterer et tidsstempel fra 'input'-kanalen til en formateret dato, der sendes til' output'-kanalen.

8. Sink-applikationen

Den sidste applikation, der skal oprettes, er Sink-applikationen.

Gå igen til Spring Initializr og vælg en Gruppe, en Artefakt navn. Efter download af projektet, lad os tilføje en Stream kanin afhængighed.

Tilføj derefter @EnableBinding (Sink.class) kommentar til Spring Boot hovedklasse:

@EnableBinding (Sink.class) @SpringBootApplication public class SpringDataFlowLoggingSinkApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowLoggingSinkApplication.class, args); }}

Nu har vi brug for en metode til at opfange de meddelelser, der kommer fra processorapplikationen.

For at gøre dette skal vi tilføje @StreamListener (Sink.INPUT) kommentar til vores metode:

@StreamListener (Sink.INPUT) public void loggerSink (String date) {logger.info ("Modtaget:" + dato); }

Metoden udskriver simpelthen tidsstemplet omdannet i en formateret dato til en logfil.

9. Registrer en Stream-app

Spring Cloud Data Flow Shell giver os mulighed for at registrere en Stream-app i App-registreringsdatabasen ved hjælp af app-register kommando.

Vi skal angive et unikt navn, applikationstype og en URI, der kan løses til appgenstanden. Angiv “for typenkilde“, “processor“, Eller“håndvask“.

Når en URI leveres med maven-skemaet, skal formatet være i overensstemmelse med følgende:

maven: //: [: [:]]:

For at registrere Kilde, Processor og Håndvask applikationer, der tidligere er oprettet, skal du gå til Spring Cloud Data Flow Shell og udsted følgende kommandoer fra prompten:

app register - navn tidskilde - type kilde --uri maven: //com.baeldung.spring.cloud: spring-data-flow-time-source: jar: 0.0.1-SNAPSHOT app register --name time -processor --type processor --uri maven: //com.baeldung.spring.cloud: spring-data-flow-time-processor: jar: 0.0.1-SNAPSHOT app register - navn logning-sink --type sink --uri maven: //com.baeldung.spring.cloud: spring-data-flow-logging-sink: jar: 0.0.1-SNAPSHOT 

10. Opret og implementer strømmen

For at oprette en ny streamdefinition skal du gå til Spring Cloud Data Flow Shell og udfør følgende shell-kommando:

stream create --name time-to-log --definition 'time-source | tidsprocessor | logning-vask '

Dette definerer en stream med navnet time-to-log baseret på DSL-udtrykket ‘Tidskilde | tidsprocessor | logning-vask '.

Derefter udføres følgende shell-kommando for at implementere strømmen:

stream deploy --name time-to-log

Det Data Flow Server løser tidskilde, tidsprocessorog logning-vask at maven koordinater og bruger dem til at starte tidskilde, tidsprocessor og logning-vask applikationer af strømmen.

Hvis streamen er korrekt implementeret, kan du se i Data Flow Server logger, at modulerne er startet og bundet sammen:

2016-08-24 12: 29: 10.516 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer: implementering af app time-to-log.logging-sink instans 0 Logfiler vil være i PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034549734 / time-to-log.logging-sink 2016-08-24 12: 29: 17.600 INFO 8096 --- [io-9393-exec-10] oscd spi.local.LocalAppDeployer: implementering af app time-to-log.time-processor-forekomst 0 Logfiler vil være i PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034556862 / time-to-log.time-processor 2016-08-24 12: 29: 23.280 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer: implementering af app time-to-log.time-source-forekomst 0 Logfiler vil være i PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034562861 / time-to-log.time-source

11. Gennemgang af resultatet

I dette eksempel sender kilden simpelthen det aktuelle tidsstempel som en besked hvert sekund, processoren formaterer det, og logvasken udsender det formaterede tidsstempel ved hjælp af logningsrammen.

Logfilerne er placeret i det bibliotek, der vises i Data Flow Server'S logoutput, som vist ovenfor. For at se resultatet kan vi hale loggen:

hale -f PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034549734 / time-to-log.logging-sink / stdout_0.log 2016-08-24 12: 40: 42.029 INFO 9488 --- [ r.time-to-log-1] scSpringDataFlowLoggingSinkApplication: Modtaget: 2016/08/24 11:40:01 2016-08-24 12: 40: 52.035 INFO 9488 --- [r.time-to-log-1 ] scSpringDataFlowLoggingSinkApplication: Modtaget: 2016/08/24 11:40:11 2016-08-24 12: 41: 02.030 INFO 9488 --- [r.time-to-log-1] scSpringDataFlowLoggingSinkApplication: Modtaget: 2016/08 / 24 11:40:21

12. Konklusion

I denne artikel har vi set, hvordan man bygger en datarørledning til streambehandling ved brug af Spring Cloud Data Flow.

Vi så også rollen som Kilde, Processor og Håndvask applikationer inde i streamen, og hvordan man tilslutter og binder dette modul inde i en Data Flow Server gennem brug af Data flow shell.

Eksempelkoden findes i GitHub-projektet.