Introduktion til Apache Storm

1. Oversigt

Denne tutorial vil være en introduktion til Apache Storm, et distribueret realtidsberegningssystem.

Vi fokuserer på og dækker:

  • Hvad er Apache Storm præcist, og hvilke problemer det løser
  • Dens arkitektur og
  • Sådan bruges det i et projekt

2. Hvad er Apache Storm?

Apache Storm er gratis og open source distribueret system til realtidsberegninger.

Det giver fejltolerance, skalerbarhed og garanterer databehandling og er især godt til behandling af ubegrænsede datastrømme.

Nogle gode brugssager til Storm kan være behandling af kreditkortoperationer til afsløring af svig eller behandling af data fra smarte hjem for at opdage defekte sensorer.

Storm tillader integration med forskellige databaser og køsystemer, der er tilgængelige på markedet.

3. Maven-afhængighed

Før vi bruger Apache Storm, skal vi medtage stormkerneafhængigheden i vores projekt:

 org.apache.storm storm-core 1.2.2 leveres 

Vi skal kun bruge givet omfang hvis vi har til hensigt at køre vores ansøgning på Storm-klyngen.

For at køre applikationen lokalt kan vi bruge en såkaldt lokal tilstand, der simulerer Storm-klyngen i en lokal proces, i så fald bør vi fjerne stillet til rådighed.

4. Datamodel

Apache Storms datamodel består af to elementer: tupler og streams.

4.1. Tuple

EN Tuple er en ordnet liste over navngivne felter med dynamiske typer. Dette betyder, at vi ikke behøver at udtrykkeligt angive felttyperne.

Storm har brug for at vide, hvordan man serierer alle værdier, der bruges i en tuple. Som standard kan den allerede serieisere primitive typer, Strenge og byte arrays.

Og da Storm bruger Kryo-serialisering, skal vi registrere serializer ved hjælp af Konfig for at bruge de tilpassede typer. Vi kan gøre dette på en af ​​to måder:

For det første kan vi registrere klassen, der skal serienummeres, ved hjælp af dens fulde navn:

Config config = ny Config (); config.registerSerialization (User.class);

I et sådant tilfælde vil Kryo serieisere klassen ved hjælp af FieldSerializer. Som standard serierer dette alle ikke-forbigående felter i klassen, både private og offentlige.

Eller i stedet kan vi give både den klasse, der skal serialiseres, og den serializer, vi ønsker, at Storm skal bruge til den klasse:

Config config = ny Config (); config.registerSerialization (User.class, UserSerializer.class);

For at oprette den tilpassede serializer er vi nødt til at udvide den generiske klasse Serializer der har to metoder skrive og Læs.

4.2. Strøm

EN Strøm er kerneabstraktionen i Storm-økosystemet. Det Strøm er en ubegrænset sekvens af tupler.

Storms tillader behandling af flere streams parallelt.

Hver stream har et id, der leveres og tildeles under erklæring.

5. Topologi

Logikken i Storm-applikationen i realtid er pakket ind i topologien. Topologien består af tud og bolte.

5.1. Tud

Tud er kilderne til strømme. De udsender tupler til topologien.

Tuples kan læses fra forskellige eksterne systemer som Kafka, Kestrel eller ActiveMQ.

Tud kan være pålidelig eller upålidelig. Pålidelig betyder, at tuden kan svare, at tuplen, der ikke er behandlet af Storm. Upålidelig betyder, at tuden ikke svarer, da den vil bruge en brand-og-glemmekanisme til at udsende tuplerne.

For at oprette den brugerdefinerede tud skal vi implementere IRichSpout interface eller udvide enhver klasse, der allerede implementerer grænsefladen, for eksempel et abstrakt BaseRichSpout klasse.

Lad os oprette en upålidelig tud:

offentlig klasse RandomIntSpout udvider BaseRichSpout {privat tilfældig tilfældig; private SpoutOutputCollector outputCollector; @ Overstyr offentlig tomrum åbent (Kortkort, TopologyContext topologyContext, SpoutOutputCollector tudOutputCollector) {tilfældig = ny tilfældig (); outputCollector = tudOutputCollector; } @ Override offentlig ugyldighed nextTuple () {Utils.sleep (1000); outputCollector.emit (nye værdier (random.nextInt (), System.currentTimeMillis ())); } @Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (nye felter ("randomInt", "tidsstempel")); }}

Vores skik RandomIntSpout vil generere tilfældigt heltal og tidsstempel hvert sekund.

5.2. Bolt

Bolte behandler tupler i strømmen. De kan udføre forskellige operationer som filtrering, sammenlægninger eller brugerdefinerede funktioner.

Nogle operationer kræver flere trin, og derfor bliver vi nødt til at bruge flere bolte i sådanne tilfælde.

At oprette brugerdefinerede Bolt, skal vi implementere IRichBolt eller til enklere operationer IBasicBolt interface.

Der er også flere hjælperklasser til rådighed til implementering Bolt. I dette tilfælde bruger vi BaseBasicBolt:

public class PrintingBolt udvider BaseBasicBolt {@Override public void execute (Tuple tuple, BasicOutputCollector basicOutputCollector) {System.out.println (tuple); } @Override offentlig ugyldighed erklæreOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {}}

Denne skik Udskrivning Bolt vil simpelthen udskrive alle tupler til konsollen.

6. Oprettelse af en simpel topologi

Lad os sætte disse ideer sammen i en simpel topologi. Vores topologi har en tud og tre bolte.

6.1. RandomNumberSpout

I starten opretter vi en upålidelig tud. Det genererer tilfældige heltal fra intervallet (0,100) hvert sekund:

offentlig klasse RandomNumberSpout udvider BaseRichSpout {privat tilfældig tilfældig; privat SpoutOutputCollector opsamler; @ Overstyr offentlig tomrum åben (Kortkort, TopologyContext topologyContext, SpoutOutputCollector tudOutputCollector) {tilfældig = ny tilfældig (); collector = tudOutputCollector; } @ Override offentlig ugyldighed nextTuple () {Utils.sleep (1000); int operation = random.nextInt (101); lang tidsstempel = System.currentTimeMillis (); Værdier værdier = nye værdier (operation, tidsstempel); collector.emit (værdier); } @Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (nye felter ("operation", "tidsstempel")); }}

6.2. FiltreringBolt

Derefter opretter vi en bolt, der filtrerer alle elementer ud med operation lig med 0:

public class FilteringBolt udvider BaseBasicBolt {@Override public void execute (Tuple tuple, BasicOutputCollector basicOutputCollector) {int operation = tuple.getIntegerByField ("operation"); hvis (operation> 0) {basicOutputCollector.emit (tuple.getValues ​​()); }} @Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (nye felter ("operation", "tidsstempel")); }}

6.3. Aggregerende bolt

Lad os derefter oprette en mere kompliceret Bolt der samler alle positive operationer fra hver dag.

Til dette formål bruger vi en bestemt klasse, der er oprettet specielt til implementering af bolte, der fungerer på vinduer i stedet for at fungere på enkelte tupler: BaseWindowedBolt.

Windows er et væsentligt koncept i strømbehandling, der splitter de uendelige strømme i endelige bidder. Vi kan derefter anvende beregninger på hvert stykke. Der er generelt to typer vinduer:

Tidsvinduer bruges til at gruppere elementer fra en given tidsperiode ved hjælp af tidsstempler. Tidsvinduer kan have et andet antal elementer.

Tælvinduer bruges til at oprette vinduer med en defineret størrelse. I et sådant tilfælde vil alle vinduer have samme størrelse, og vinduet vil ikke udsendes, hvis der er færre elementer end den definerede størrelse.

Vores Aggregerende bolt genererer summen af ​​alle positive operationer fra a tidsvindue sammen med dens tids- og begyndelsesstempler:

offentlig klasse AggregatingBolt udvider BaseWindowedBolt {private OutputCollector outputCollector; @ Overstyr offentlig tomrumsforberedelse (Map stormConf, TopologyContext context, OutputCollector collector) {this.outputCollector = collector; } @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("sumOfOperations", "beginningTimestamp", "endTimestamp")); } @Override offentlig ugyldig udførelse (TupleWindow tupleWindow) {List tuples = tupleWindow.get (); tuples.sort (Comparator.comparing (dette :: getTimestamp)); int sumOfOperations = tuples.stream () .mapToInt (tuple -> tuple.getIntegerByField ("operation")). sum (); Lang begyndelsestidsstempel = getTimestamp (tuples.get (0)); Lang endTimestamp = getTimestamp (tuples.get (tuples.size () - 1)); Værdier værdier = nye værdier (sumOfOperations, begyndelseTidsstempel, slutTidsstempel); outputCollector.emit (værdier); } privat Lang getTimestamp (Tuple tuple) {return tuple.getLongByField ("tidsstempel"); }}

Bemærk, at det i dette tilfælde er sikkert at få det første element på listen direkte. Det skyldes, at hvert vindue beregnes ved hjælp af tidsstempel felt i Tuple, der skal være mindst et element i hvert vindue.

6.4. FileWritingBolt

Endelig opretter vi en bolt, der tager alle elementer med sumOfOperations større end 2000, skal du serieisere dem og skrive dem til filen:

offentlig klasse FileWritingBolt udvider BaseRichBolt {public static Logger logger = LoggerFactory.getLogger (FileWritingBolt.class); privat BufferedWriter-forfatter; privat streng filPath; private ObjectMapper objectMapper; @ Overstyr offentlig tomrumsoprydning () {prøv {writer.close (); } fange (IOException e) {logger.error ("Kunne ikke lukke forfatter!"); }} @ Overstyr offentlig tomrumsforberedelse (Kortkort, TopologyContext topologyContext, OutputCollector outputCollector) {objectMapper = ny ObjectMapper (); objectMapper.setVisibility (PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); prøv {writer = new BufferedWriter (new FileWriter (filePath)); } catch (IOException e) {logger.error ("Kunne ikke åbne en fil til skrivning.", e); }} @ Overstyr offentlig tomrum udfør (Tuple tuple) {int sumOfOperations = tuple.getIntegerByField ("sumOfOperations"); lang begyndelsestidsstempel = tuple.getLongByField ("begyndelsestidsstempel"); long endTimestamp = tuple.getLongByField ("endTimestamp"); hvis (sumOfOperations> 2000) {AggregatedWindow aggregatedWindow = new AggregatedWindow (sumOfOperations, beginningTimestamp, endTimestamp); prøv {writer.write (objectMapper.writeValueAsString (aggregatedWindow)); forfatter.newLine (); writer.flush (); } fange (IOException e) {logger.error ("Kunne ikke skrive data til fil.", e); }}} // offentlig konstruktør og andre metoder}

Bemærk, at vi ikke behøver at erklære output, da dette vil være den sidste bolt i vores topologi

6.5. Kørsel af topologien

Endelig kan vi trække alt sammen og køre vores topologi:

offentlig statisk ugyldig runTopology () {TopologyBuilder builder = ny TopologyBuilder (); Tud tilfældig = ny RandomNumberSpout (); builder.setSpout ("randomNumberSpout"); Boltfiltrering = ny FilteringBolt (); builder.setBolt ("filteringBolt", filtrering). shuffleGrouping ("randomNumberSpout"); Boltaggregering = ny AggregatingBolt () .withTimestampField ("tidsstempel") .withLag (BaseWindowedBolt.Duration.seconds (1)) .withWindow (BaseWindowedBolt.Duration.seconds (5)); builder.setBolt ("aggregatingBolt", aggregating) .shuffleGrouping ("filteringBolt"); String filePath = "./src/main/resources/data.txt"; Bolt-fil = ny FileWritingBolt (filePath); builder.setBolt ("fileBolt", file) .shuffleGrouping ("aggregatingBolt"); Config config = ny Config (); config.setDebug (falsk); LocalCluster-klynge = ny LocalCluster (); cluster.submitTopology ("Test", config, builder.createTopology ()); }

For at få dataene til at strømme gennem hvert stykke i topologien, skal vi angive, hvordan vi forbinder dem. shuffleGroup giver os mulighed for at angive disse data for filtrering Bolt kommer fra randomNumberSpout.

For hver Bolt, skal vi tilføje shuffleGroup der definerer kilden til elementer til denne bolt. Kilden til elementer kan være en Tud eller en anden Bolt. Og hvis vi indstiller den samme kilde til mere end en bolt, kilden udsender alle elementer til hver af dem.

I dette tilfælde bruger vores topologi LocalCluster at køre jobbet lokalt.

7. Konklusion

I denne vejledning introducerede vi Apache Storm, et distribueret realtids beregningssystem. Vi skabte en tud, nogle bolte og trak dem sammen til en komplet topologi.

Og som altid kan alle kodeeksempler findes på GitHub.