Integrering af foråret med AWS Kinesis

1. Introduktion

Kinesis er et værktøj til at indsamle, behandle og analysere datastrømme i realtid, udviklet hos Amazon. En af dens største fordele er, at det hjælper med udviklingen af ​​begivenhedsdrevne applikationer.

I denne vejledning udforsker vi et par biblioteker, der gør det muligt for vores Spring-applikation at producere og forbruge poster fra en Kinesis Stream. Kodeeksemplerne viser den grundlæggende funktionalitet, men repræsenterer ikke den produktionsklare kode.

2. Forudsætning

Før vi går videre, er vi nødt til at gøre to ting.

Den første er at oprette et forårsprojekt, da målet her er at interagere med Kinesis fra et forårsprojekt.

Den anden er at oprette en Kinesis Data Stream. Vi kan gøre dette fra en webbrowser i vores AWS-konto. Et alternativ for AWS CLI fans blandt os er at bruge kommandolinjen. Fordi vi interagerer med det fra kode, skal vi også have adgang til AWS IAM-legitimationsoplysninger, adgangsnøglen og den hemmelige nøgle og regionen.

Alle vores producenter opretter dummy IP-adresseoptegnelser, mens forbrugerne læser disse værdier og viser dem i applikationskonsollen.

3. AWS SDK til Java

Det allerførste bibliotek, vi bruger, er AWS SDK til Java. Dens fordel er, at det giver os mulighed for at styre mange dele af arbejdet med Kinesis Data Streams. Vi kan læse data, producere data, oprette datastrømme og genhardføre datastrømme. Ulempen er, at for at have produktionsklar kode, bliver vi nødt til at kode aspekter som genharding, fejlhåndtering eller en dæmon for at holde forbrugeren i live.

3.1. Maven afhængighed

Amazon-kinesis-klientens Maven-afhængighed vil bringe alt, hvad vi har brug for, for at have fungerende eksempler. Vi tilføjer det nu til vores pom.xml fil:

 com.amazonaws amazon-kinesis-client 1.11.2 

3.2. Forårsopsætning

Lad os genbruge AmazonKinesis objekt, der er nødvendigt for at interagere med vores Kinesis Stream. Vi opretter det som en @Bønne inde i vores @SpringBootApplication klasse:

@Bean offentlige AmazonKinesis buildAmazonKinesis () {BasicAWSCredentials awsCredentials = nye BasicAWSCredentials (accessKey, secretKey); returner AmazonKinesisClientBuilder.standard () .withCredentials (ny AWSStaticCredentialsProvider (awsCredentials)) .withRegion (Regions.EU_CENTRAL_1) .build (); }

Lad os derefter definere aws.access.key og aws.secret.key, nødvendigt for den lokale maskine, i application.properties:

aws.access.key = min-aws-adgang-nøgle-går-her aws.secret.key = min-aws-hemmelige-nøgle-går-her

Og vi læser dem ved hjælp af @Værdi kommentar:

@Value ("$ {aws.access.key}") privat streng adgangskode; @Value ("$ {aws.secret.key}") privat streng secretKey;

Af hensyn til enkelheden vil vi stole på @ Planlagt metoder til at oprette og forbruge poster.

3.3. Forbruger

Det AWS SDK Kinesis Consumer bruger en pull-model, hvilket betyder, at vores kode vil trække poster fra skårene i Kinesis-datastrømmen:

GetRecordsRequest recordsRequest = ny GetRecordsRequest (); recordsRequest.setShardIterator (shardIterator.getShardIterator ()); recordsRequest.setLimit (25); GetRecordsResult recordsResult = kinesis.getRecords (recordsRequest); while (! recordsResult.getRecords (). isEmpty ()) {recordsResult.getRecords (). stream () .map (record -> new String (record.getData (). array ())) .forEach (System.out: : println); recordsRequest.setShardIterator (recordsResult.getNextShardIterator ()); recordsResult = kinesis.getRecords (recordsRequest); }

Det GetRecordsRequest objekt bygger anmodningen om streamdata. I vores eksempel har vi defineret en grænse på 25 poster pr. Anmodning, og vi fortsætter med at læse, indtil der ikke er mere at læse.

Vi kan også bemærke, at vi til vores iteration har brugt en GetShardIteratorResult objekt. Vi oprettede dette objekt inde i en @PostConstrucmetode, så vi straks begynder at spore poster:

privat GetShardIteratorResult shardIterator; @PostConstruct privat tomrum buildShardIterator () {GetShardIteratorRequest readShardsRequest = ny GetShardIteratorRequest (); readShardsRequest.setStreamName (IPS_STREAM); readShardsRequest.setShardIteratorType (ShardIteratorType.LATEST); readShardsRequest.setShardId (IPS_SHARD_ID); this.shardIterator = kinesis.getShardIterator (readShardsRequest); }

3.4. Producent

Lad os nu se, hvordan man gør det håndtere oprettelsen af ​​poster til vores Kinesis-datastrøm.

Vi indsætter data ved hjælp af en PutRecordsRequest objekt. Til dette nye objekt tilføjer vi en liste, der indeholder flere PutRecordsRequestEntry genstande:

Listeindgange = IntStream.range (1, 200) .mapToObj (ipSuffix -> {PutRecordsRequestEntry entry = new PutRecordsRequestEntry (); entry.setData (ByteBuffer.wrap (("192.168.0." + IpSuffix) .getBytes ())) ; entry.setPartitionKey (IPS_PARTITION_KEY); returpost;}). indsamle (Collectors.toList ()); PutRecordsRequest createRecordsRequest = nye PutRecordsRequest (); createRecordsRequest.setStreamName (IPS_STREAM); createRecordsRequest.setRecords (poster); kinesis.putRecords (createRecordsRequest);

Vi har oprettet en grundlæggende forbruger og en producent af simulerede IP-poster. Alt, hvad der er tilbage at gøre nu, er at køre vores forårsprojekt og se IP'er, der er angivet i vores applikationskonsol.

4. KCL og KPL

Kinesis Client Library (KCL) er et bibliotek, der forenkler forbruget af poster. Det er også et lag med abstraktion over AWS SDK Java API'er til Kinesis-datastrømme. Bag kulisserne håndterer biblioteket belastningsafbalancering på tværs af mange forekomster, reagerer på instansfejl, kontrolpunkterer behandlede poster og reagerer på genharding.

Kinesis Producer Library (KPL) er et bibliotek, der er nyttigt til skrivning til en Kinesis-datastrøm. Det giver også et lag af abstraktion, der sidder over AWS SDK Java API'er til Kinesis Data Streams. For bedre ydeevne håndterer biblioteket automatisk batching, multi-threading og prøv igen logik.

KCL og KPL har begge den største fordel, at de er nemme at bruge, så vi kan fokusere på at producere og forbruge poster.

4.1. Maven afhængigheder

De to biblioteker kan medbringes separat i vores projekt, hvis det er nødvendigt. For at inkludere KPL og KCL i vores Maven-projekt skal vi opdatere vores pom.xml-fil:

 com.amazonaws amazon-kinesis-producer 0.13.1 com.amazonaws amazon-kinesis-client 1.11.2 

4.2. Forårsopsætning

Den eneste forberedelse, vi har brug for, er at sikre, at vi har IAM-legitimationsoplysningerne ved hånden. Værdierne for aws.access.key og aws.secret.key er defineret i vores application.properties fil, så vi kan læse dem med @Værdi når det er nødvendigt.

4.3. Forbruger

Først skal vi oprette en klasse, der implementerer IRecordProcessor interface og definerer vores logik for, hvordan Kinesis datastrømsposter skal håndteres, som skal udskrive dem i konsollen:

offentlig klasse IpProcessor implementerer IRecordProcessor {@Override public void initialize (InitializationInput initializationInput) {} @Override public void processRecords (ProcessRecordsInput processRecordsInput) {processRecordsInput.getRecords (). forEach (record -> System.out.println (newData) () .array ()))); } @ Override offentlig ugyldig nedlukning (ShutdownInput shutdownInput) {}}

Det næste trin er at definere en fabriksklasse, der implementerer IRecordProcessorFactory interface og returnerer en tidligere oprettet IpProcessor objekt:

offentlig klasse IpProcessorFactory implementerer IRecordProcessorFactory {@Override offentlig IRecordProcessor createProcessor () {returner ny IpProcessor (); }}

Og nu til det sidste trin, vi bruger en Arbejder modsætter sig at definere vores forbrugerpipeline. Vi har brug for en KinesisClientLibConfiguration objekt, der om nødvendigt definerer IAM-legitimationsoplysninger og AWS-regionen.

Vi passerer KinesisClientLibConfigurationog vores IpProcessorFactory objekt, mod vores Arbejder og start det derefter i en separat tråd. Vi holder denne logik med at forbruge optegnelser altid i live ved brug af Arbejder klasse, så vi læser løbende nye poster nu:

BasicAWSCredentials awsCredentials = nye BasicAWSCredentials (accessKey, secretKey); KinesisClientLibConfiguration consumerConfig = ny KinesisClientLibConfiguration (APP_NAME, IPS_STREAM, ny AWSStaticCredentialsProvider (awsCredentials), IPS_WORKER) .withRegionName (Regions.EU_CENTRAL_1.getName ()); endelig arbejderarbejder = ny Worker.Builder () .recordProcessorFactory (ny IpProcessorFactory ()) .config (consumerConfig) .build (); CompletableFuture.runAsync (worker.run ());

4.4. Producent

Lad os nu definere KinesisProducerConfiguration objekt, tilføjelse af IAM-legitimationsoplysninger og AWS-regionen:

BasicAWSCredentials awsCredentials = nye BasicAWSCredentials (accessKey, secretKey); KinesisProducerConfiguration producerConfig = ny KinesisProducerConfiguration () .setCredentialsProvider (ny AWSStaticCredentialsProvider (awsCredentials)) .setVerifyCertificate (false) .setRegion (Regioner.EU_CENTRAL_1.getName ()); this.kinesisProducer = ny KinesisProducer (producerConfig);

Vi inkluderer kinesisProducer objekt oprettet tidligere i en @ Planlagt job og producer poster til vores Kinesis-datastrøm kontinuerligt:

IntStream.range (1, 200) .mapToObj (ipSuffix -> ByteBuffer.wrap (("192.168.0." + IpSuffix) .getBytes ())). For hver (post -> kinesisProducer.addUserRecord (IPS_STREAM, IPS_PARTITION_KEY, );

5. Forår Cloud Stream Binder Kinesis

Vi har allerede set to biblioteker, begge oprettet uden for forårets økosystem. Godt så se hvordan Spring Cloud Stream Binder Kinesis kan forenkle vores liv yderligere mens du bygger oven på Spring Cloud Stream.

5.1. Maven afhængighed

Den Maven-afhængighed, vi skal definere i vores ansøgning om Spring Cloud Stream Binder Kinesis, er:

 org.springframework.cloud spring-cloud-stream-binder-kinesis 1.2.1.RELEASE 

5.2. Forårsopsætning

Når du kører på EC2, opdages de nødvendige AWS-egenskaber automatisk, så der er ingen grund til at definere dem. Da vi kører vores eksempler på en lokal maskine, skal vi definere vores IAM-adgangsnøgle, hemmelige nøgle og region til vores AWS-konto. Vi har også deaktiveret den automatiske registrering af CloudFormation-staknavn for applikationen:

cloud.aws.credentials.access-key = min-aws-adgangsnøgle cloud.aws.credentials.secret-key = min-aws-hemmelige-nøgle cloud.aws.region.static = eu-central-1 cloud.aws .stack.auto = falsk

Spring Cloud Stream er pakket med tre grænseflader, som vi kan bruge i vores stream-binding:

  • Det Håndvask er til dataindtagelse
  • Det Kilde bruges til udgivelse af poster
  • Det Processor er en kombination af begge

Vi kan også definere vores egne grænseflader, hvis vi har brug for det.

5.3. Forbruger

At definere en forbruger er et todelt job. Først definerer vi, i application.properties, datastrømmen, hvorfra vi bruger:

spring.cloud.stream.bindings.input.destination = live-ips spring.cloud.stream.bindings.input.group = live-ips-gruppe spring.cloud.stream.bindings.input.content-type = text / plain

Og lad os derefter definere et forår @Komponent klasse. Annotationen @EnableBinding (Sink.class) giver os mulighed for at læse fra Kinesis-strømmen ved hjælp af metoden, der er kommenteret med @StreamListener (Sink.INPUT):

@EnableBinding (Sink.class) offentlig klasse IpConsumer {@StreamListener (Sink.INPUT) offentlig tomrumsforbrug (String ip) {System.out.println (ip); }}

5.4. Producent

Producenten kan også opdeles i to. Først skal vi definere vores streamegenskaber indeni application.properties:

spring.cloud.stream.bindings.output.destination = live-ips spring.cloud.stream.bindings.output.content-type = text / plain

Og så tilføjer vi @EnableBinding (Source.class) på en forår @Komponent og oprette nye testmeddelelser hvert par sekunder:

@Component @EnableBinding (Source.class) offentlig klasse IpProducer {@Autowired privat kilde kilde; @Scheduled (fixedDelay = 3000L) private void produce () {IntStream.range (1, 200) .mapToObj (ipSuffix -> "192.168.0." + IpSuffix) .forEach (post -> source.output (). Send ( MessageBuilder.withPayload (post) .build ())); }}

Det er alt, hvad vi har brug for, for at Spring Cloud Stream Binder Kinesis kan fungere. Vi kan simpelthen starte applikationen nu.

6. Konklusion

I denne artikel har vi set, hvordan vi integrerer vores forårsprojekt med to AWS-biblioteker til interaktion med en Kinesis Data Stream. Vi har også set, hvordan vi bruger Spring Cloud Stream Binder Kinesis-biblioteket for at gøre implementeringen endnu lettere.

Kildekoden til denne artikel kan findes på Github.


$config[zx-auto] not found$config[zx-overlay] not found