Introduktion til Netflix Mantis

1. Oversigt

I denne artikel ser vi på Mantis-platformen udviklet af Netflix.

Vi udforsker de vigtigste Mantis-koncepter ved at oprette, køre og undersøge et streambehandlingsjob.

2. Hvad er mantis?

Mantis er en platform til opbygning af stream-processing applikationer (job). Det giver en nem måde at styre implementering og livscyklus af job. Desuden er det letter ressourceallokering, opdagelse og kommunikation mellem disse job.

Derfor kan udviklere fokusere på den faktiske forretningslogik, samtidig med at de har støtte fra en robust og skalerbar platform til at køre deres applikationer med høj volumen, lav latenstid og ikke-blokerende.

Et mantisjob består af tre forskellige dele:

  • det kilde, ansvarlig for at hente data fra en ekstern kilde
  • en eller flere niveauer, ansvarlig for behandling af indgående hændelsesstrømme
  • og en håndvask der indsamler de behandlede data

Lad os nu udforske hver af dem.

3. Opsætning og afhængigheder

Lad os starte med at tilføje mantis-runtime og jackson-databind afhængigheder:

 io.mantisrx mantis-runtime com.fasterxml.jackson.core jackson-databind 

Lad os nu implementere Mantis til konfiguration af vores jobdatakilde Kilde grænseflade:

offentlig klasse RandomLogSource implementerer kilde {@Override public Observable call (Context context, Index index) {return Observable.just (Observable .interval (250, TimeUnit.MILLISECONDS) .map (this :: createRandomLogEvent)); } privat streng createRandomLogEvent (langt kryds) {// generere en tilfældig logindgangsstreng ...}}

Som vi kan se, genererer den simpelthen tilfældige logindgange flere gange i sekundet.

4. Vores første job

Lad os nu oprette et mantis-job, der simpelthen samler loghændelser fra vores RandomLogSource. Senere tilføjer vi gruppe- og aggregeringstransformationer for et mere komplekst og interessant resultat.

Til at begynde med, lad os oprette en LogEvent enhed:

offentlig klasse LogEvent implementerer JsonType {privat langt indeks; privat strengniveau; privat streng besked; // ...}

Lad os derefter tilføje vores TransformLogStage.

Det er et simpelt trin, der implementerer ScalarComputation-grænsefladen og deler en logindgang for at opbygge en LogEvent. Det filtrerer også eventuelle forkerte formaterede strenge ud:

offentlig klasse TransformLogStage implementerer ScalarComputation {@Override public Observable call (Context context, Observable logEntry) {return logEntry .map (log -> log.split ("#")) .filter (parts -> parts.length == 3). kort (LogEvent :: ny); }}

4.1. Kører jobbet

På dette tidspunkt har vi nok byggesten til at sammensætte vores Mantis-job:

offentlig klasse LogCollectingJob udvider MantisJobProvider {@Override public Job getJobInstance () {return MantisJob .source (new RandomLogSource ()) .stage (new TransformLogStage (), new ScalarToScalar.Config ()) .sink (Sinks.eagerSub. LogEvent :: toJsonString))) .metadata (ny Metadata.Builder (). Build ()) .create (); }}

Lad os se nærmere på vores job.

Som vi kan se, strækker det sig MantisJobProvider. Først henter den data fra vores RandomLogSource og anvender TransformLogStage til de hentede data. Endelig sender den de behandlede data til den indbyggede vask, der ivrigt abonnerer og leverer data via SSE.

Lad os nu konfigurere vores job til at udføre lokalt ved opstart:

@SpringBootApplication offentlig klasse MantisApplication implementerer CommandLineRunner {// ... @ Override public void run (String ... args) {LocalJobExecutorNetworked.execute (new LogCollectingJob (). GetJobInstance ()); }}

Lad os køre applikationen. Vi får vist en logmeddelelse som:

... Serverer moderne HTTP SSE-server vask på port: 86XX

Lad os nu oprette forbindelse til vasken ved hjælp af krølle:

$ curl localhost: 86XX data: {"index": 86, "level": "WARN", "message": "login-forsøg"} data: {"index": 87, "level": "ERROR", "message ":" bruger oprettet "} data: {" index ": 88," level ":" INFO "," message ":" bruger oprettet "} data: {" index ": 89," level ":" INFO ", "message": "loginforsøg"} data: {"index": 90, "level": "INFO", "message": "bruger oprettet"} data: {"index": 91, "level": "FEJL "," message ":" bruger oprettet "} data: {" index ": 92," level ":" WARN "," message ":" loginforsøg "} data: {" index ": 93," level ": "INFO", "message": "bruger oprettet"} ...

4.2. Konfiguration af vasken

Indtil videre har vi brugt den indbyggede vask til at indsamle vores behandlede data. Lad os se om vi kan tilføje mere fleksibilitet til vores scenario ved at tilbyde en brugerdefineret vask.

Hvad hvis vi f.eks. Vil filtrere logfiler efter besked?

Lad os oprette en LogSink der implementerer Håndvask grænseflade:

public class LogSink implementerer Sink {@Override public void call (Context context, PortRequest portRequest, Observable logEventObservable) {SelfDocumentingSink sink = new ServerSentEventsSink.Builder () .withEncoder (LogEvent :: toJsonString) .withPredicateM (filterBy). ; logEventObservable.subscribe (); sink.call (kontekst, portRequest, logEventObservable); } private Predicate filterByLogMessage () {return new Predicate ("filter by message", parameters -> {if (parameters! = null && parameters.containsKey ("filter")) {return logEvent -> logEvent.getMessage (). indeholder ( parameters.get ("filter"). get (0));} returner logEvent -> sand;}); }}

I denne vaskimplementering konfigurerede vi et predikat, der bruger filter parameter for kun at hente logfiler, der indeholder tekstsættet i filter parameter:

$ curl localhost: 8874? filter = login data: {"index": 93, "level": "FEJL", "message": "loginforsøg"} data: {"index": 95, "level": "INFO "," message ":" loginforsøg "} data: {" index ": 97," level ":" FEJL "," message ":" loginforsøg "} ...

Bemærk Mantis tilbyder også et stærkt forespørgselssprog, MQL, der kan bruges til forespørgsel, transformation og analyse af streamdata på en SQL-måde.

5. Stage-lænkning

Lad os nu antage, at vi er interesserede i at vide, hvor mange FEJL, ADVARE, eller INFO logposter, vi har i et givet tidsinterval. Til dette tilføjer vi to trin til vores job og sammenkæder dem.

5.1. Gruppering

Lad os først oprette en GroupLogStage.

Denne fase er en ToGroupComputation implementering, der modtager en LogEvent streame data fra det eksisterende TransformLogStage. Derefter grupperer det poster efter logningsniveau og sender dem til næste trin:

offentlig klasse GroupLogStage implementerer ToGroupComputation {@ Override public Observable call (Context context, Observable logEvent) {return logEvent.map (log -> new MantisGroup (log.getLevel (), log)); } offentlig statisk ScalarToGroup.Config config () {returner ny ScalarToGroup.Config () .description ("Gruppér hændelsesdata efter niveau") .codec (JacksonCodecs.pojo (LogEvent.class)) .concurrentInput (); }}

Vi har også oprettet en brugerdefineret scenekonfiguration ved at give en beskrivelse, den codec, der skal bruges til serialisering af output, og tillod dette scenes opkaldsmetode at køre samtidigt ved hjælp af concurrentInput ().

En ting at bemærke er, at dette trin er vandret skalerbart. Det betyder, at vi kan køre så mange forekomster af denne fase som nødvendigt. Også værd at nævne, når det implementeres i en Mantis-klynge, dette trin sender data til det næste trin, så alle begivenheder, der tilhører en bestemt gruppe, lander på den samme arbejder i næste trin.

5.2. Aggregerende

Før vi går videre og opretter den næste fase, skal vi først tilføje en LogAggregate enhed:

offentlig klasse LogAggregate implementerer JsonType {privat endelig heltalstælling; privat endelig strengniveau; }

Lad os nu oprette den sidste fase i kæden.

Denne fase gennemføres GroupToScalarComputation og omdanner en strøm af loggrupper til en skalar LogAggregate. Det gør det ved at tælle, hvor mange gange hver logtype vises i strømmen. Derudover har den også en LogAggregationDuration parameter, som kan bruges til at kontrollere størrelsen på aggregeringsvinduet:

offentlig klasse CountLogStage implementerer GroupToScalarComputation {privat int varighed; @Override public void init (Context context) {duration = (int) context.getParameters (). Get ("LogAggregationDuration", 1000); } @ Override public Observable call (Context context, Observable mantisGroup) {return mantisGroup .window (duration, TimeUnit.MILLISECONDS) .flatMap (o -> o.groupBy (MantisGroup :: getKeyValue) .flatMap (group -> group.reduce (0, (count, value) -> count = count + 1) .map ((count) -> new LogAggregate (count, group.getKey ())))); } offentlig statisk GroupToScalar.Config config () {returner ny GroupToScalar.Config () .description ("sum events for a log level") .codec (JacksonCodecs.pojo (LogAggregate.class)) .withParameters (getParameters ()); } offentlig statisk liste getParameters () {Liste params = ny ArrayList (); params.add (ny IntParameter () .name ("LogAggregationDuration"). beskrivelse ("vinduesstørrelse til aggregering i millisekunder") .validator (Validators.range (100, 10000)) .defaultValue (5000) .build ()); returparametre; }}

5.3. Konfigurer og kør jobbet

Det eneste, der er tilbage at gøre nu, er at konfigurere vores job:

public class LogAggregationJob udvider MantisJobProvider {@Override public Job getJobInstance () {return MantisJob .source (new RandomLogSource ()) .stage (new TransformLogStage (), TransformLogStage.stageConfig ()) .stage (new GroupLogStage (), group. stage) (ny CountLogStage (), CountLogStage.config ()). sink (Sinks.eagerSubscribe (Sinks.sse (LogAggregate :: toJsonString))) .metadata (ny Metadata.Builder (). build ()) .create (); }}

Så snart vi kører applikationen og udfører vores nye job, kan vi se logoptællingerne hentes hvert par sekunder:

$ curl localhost: 8133 data: {"count": 3, "level": "ERROR"} data: {"count": 13, "level": "INFO"} data: {"count": 4, "level ":" WARN "} data: {" count ": 8," level ":" FEJL "} data: {" count ": 5," level ":" INFO "} data: {" count ": 7," level ":" WARN "} ...

6. Konklusion

For at opsummere har vi i denne artikel set, hvad Netflix Mantis er, og hvad det kan bruges til. Desuden kiggede vi på hovedkoncepterne, brugte dem til at opbygge job og udforskede tilpassede konfigurationer til forskellige scenarier.

Som altid er den komplette kode tilgængelig på GitHub.