Håndtering af Amazon SQS-køer i Java

1. Oversigt

I denne vejledning undersøger vi, hvordan du gør det brug Amazons SQS (Simple Queue Service) ved hjælp af Java SDK.

2. Forudsætninger

De Maven-afhængigheder, AWS-kontoindstillinger og klientforbindelse, der er nødvendige for at bruge Amazon AWS SDK til SQS, er de samme som i denne artikel her.

Forudsat at vi har oprettet en forekomst af AWS legitimationsoplysninger, som beskrevet i den forrige artikel, kan vi gå videre og oprette vores SQS-klient:

AmazonSQS sqs = AmazonSQSClientBuilder.standard () .withCredentials (new AWSStaticCredentialsProvider (credentials)) .withRegion (Regions.US_EAST_1) .build (); 

3. Oprettelse af køer

Når vi har oprettet vores SQS-klient, oprettelse af køer er ret ligetil.

3.1. Oprettelse af en standard kø

Lad os se, hvordan vi kan oprette en standardkø. At gøre dette, vi bliver nødt til at oprette en forekomst af CreateQueueRequest:

CreateQueueRequest createStandardQueueRequest = ny CreateQueueRequest ("baeldung-kø"); Streng standardQueueUrl = sqs.createQueue (createStandardQueueRequest) .getQueueUrl (); 

3.2. Oprettelse af en FIFO-kø

Oprettelse af en FIFO svarer til oprettelse af en standardkø. Vi bruger stadig en forekomst af CreateQueueRequest, som vi gjorde tidligere. Kun denne gang, vi bliver nødt til at videregive i køattributter og indstille FifoQueue attribut til rigtigt:

KortkøAttributter = nyt HashMap (); queueAttributes.put ("FifoQueue", "true"); queueAttributes.put ("ContentBasedDeduplication", "true"); CreateQueueRequest createFifoQueueRequest = ny CreateQueueRequest ("baeldung-queue.fifo"). MedAttributter (køAttributter); String fifoQueueUrl = sqs.createQueue (createFifoQueueRequest) .getQueueUrl (); 

4. Udstationering af meddelelser i køer

Når vi har konfigureret vores køer, kan vi begynde at sende beskeder.

4.1. Udstationering af en besked til en standardkø

For at sende meddelelser til en standardkø skal vi skal oprette en forekomst af SendMessageRequest.

Derefter vedhæfter vi et kort over meddelelsesattributter til denne anmodning:

KortmeddelelseAttributter = ny HashMap (); messageAttributes.put ("AttributeOne", ny MessageAttributeValue () .withStringValue ("Dette er en attribut") .withDataType ("String")); SendMessageRequest sendMessageStandardQueue = ny SendMessageRequest () .withQueueUrl (standardQueueUrl) .withMessageBody ("En simpel besked.") .WithDelaySeconds (30) .withMessageAttributter (meddelelsesattributter); sqs.sendMessage (sendMessageStandardQueue); 

Det withDelaySeconds () angiver, hvor længe meddelelsen skal ankomme til køen.

4.2. Udstationering af en besked til en FIFO-kø

Den eneste forskel, i dette tilfælde, er den vi bliver nødt til at specificere gruppe som meddelelsen hører til:

SendMessageRequest sendMessageFifoQueue = ny SendMessageRequest () .withQueueUrl (fifoQueueUrl) .withMessageBody ("En anden enkel besked.") .WithMessageGroupId ("baeldung-group-1") .withMessageAttributes (messageAttributes)

Som du kan se i kodeeksemplet ovenfor, specificerer vi gruppen ved hjælp af withMessageGroupId ().

4.3. Udstationering af flere meddelelser i en kø

Det kan vi også sende flere beskeder til en kø ved hjælp af en enkelt anmodning. Vi opretter en liste over SendMessageBatchRequestEntry som vi sender ved hjælp af en forekomst af SendMessageBatchRequest:

Liste messageEntries = ny ArrayList (); messageEntries.add (ny SendMessageBatchRequestEntry () .withId ("id-1") .withMessageBody ("batch-1") .withMessageGroupId ("baeldung-group-1")); messageEntries.add (ny SendMessageBatchRequestEntry () .withId ("id-2") .withMessageBody ("batch-2") .withMessageGroupId ("baeldung-group-1")); SendMessageBatchRequest sendMessageBatchRequest = ny SendMessageBatchRequest (fifoQueueUrl, messageEntries); sqs.sendMessageBatch (sendMessageBatchRequest);

5. Læsning af meddelelser fra køer

Vi kan modtage beskeder fra vores kø ved påberåber sig modtag besked () metode på en forekomst af ReceiveMessageRequest:

ReceiveMessageRequest receiveMessageRequest = ny ReceiveMessageRequest (fiveoQueueUrl) .withWaitTimeSeconds (10) .withMaxNumberOfMessages (10); Liste over sqsMessages = sqs.receiveMessage (receiveMessageRequest) .getMessages (); 

Ved brug af withMaxNumberOfMessages (), vi specificerer, hvor mange meddelelser der skal hentes fra køen - selvom det skal bemærkes, at det maksimale er 10.

Metoden withWaitTimeSeconds () muliggør lang afstemning. Lang afstemning er en måde at begrænse antallet af modtagne meddelelsesanmodninger, vi sender til SQS.

Kort sagt betyder det, at vi venter op til det angivne antal sekunder for at hente en besked. Hvis der ikke er nogen meddelelser i køen i den varighed, returneres anmodningen tom. Hvis en meddelelse ankommer til køen i løbet af denne periode, returneres den.

Vi kan få attributterne og kroppen til en given meddelelse:

sqsMessages.get (0) .getAttributter (); sqsMessages.get (0) .getBody ();

6. Sletning af en besked fra en kø

For at slette en besked bruger vi en DeleteMessageRequest:

sqs.deleteMessage (ny DeleteMessageRequest () .withQueueUrl (fifoQueueUrl) .withReceiptHandle (sqsMessages.get (0) .getReceiptHandle ())); 

7. Dead Letter-køer

En dødbogskø skal være af samme type som basiskøen - det skal være FIFO, hvis basiskøen er FIFO, og standard, hvis basiskøen er standard. I dette eksempel bruger vi en standardkø.

Det første vi skal gøre er at opret hvad der bliver vores døde bogstavskø:

String deadLetterQueueUrl = sqs.createQueue ("baeldung-dead-letter-queue"). GetQueueUrl (); 

Derefter skal vi få vores nyoprettede køs ARN (Amazon Resource Name):

GetQueueAttributesResult deadLetterQueueAttributes = sqs.getQueueAttributts (nye GetQueueAttributesRequest (deadLetterQueueUrl) .withAttributeNames ("QueueArn")); String deadLetterQueueARN = deadLetterQueueAttributes.getAttributes () .get ("QueueArn"); 

Endelig, vi indstil denne nyoprettede kø til at være vores oprindelige standardkø's dødbogskø:

SetQueueAttributesRequest queueAttributesRequest = ny SetQueueAttributesRequest () .withQueueUrl (standardQueueUrl) .addAttributesEntry ("RedrivePolicy", "{\" maxReceiveCount \ ": \" 2 \ "," + "\" deadLetterTargetA \ "dead \ Letter \ TargetA "}"); sqs.setQueueAttributes (queueAttributesRequest); 

JSON-pakken, vi satte i addAttributesEntry () metode, når man bygger vores SetQueueAttributesRequest instans indeholder de oplysninger, vi har brug for: det maxReceiveCount er 2, hvilket betyder, at hvis en meddelelse modtages så mange gange, antages den ikke at være blevet behandlet korrekt og sendes til vores døde brevkø.

Det deadLetterTargetArn attribut peger vores standardkø til vores nyoprettede døde bogstavskø.

8. Overvågning

Vi kan tjek, hvor mange beskeder der i øjeblikket er i en given kø, og hvor mange der er på flugt med SDK. Først skal vi oprette en GetQueueAttributesRequest.

Derfra kontrollerer vi køens tilstand:

GetQueueAttributesRequest getQueueAttributesRequest = ny GetQueueAttributesRequest (standardQueueUrl) .withAttributeNames ("Alle"); GetQueueAttributesResult getQueueAttributesResult = sqs.getQueueAttributts (getQueueAttributesRequest); System.out.println (String.format ("Antallet af meddelelser i køen:% s", getQueueAttributesResult.getAttributes () .get ("ApproximateNumberOfMessages"))); System.out.println (String.format ("Antallet af meddelelser under flyvning:% s", getQueueAttributesResult.getAttributes () .get ("ApproximateNumberOfMessagesNotVisible")));

Mere dybtgående overvågning kan opnås ved hjælp af Amazon Cloud Watch.

9. Konklusion

I denne artikel har vi set hvordan administrere SQS-køer ved hjælp af AWS Java SDK.

Som normalt kan alle kodeeksempler, der bruges i artiklen, findes på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found