Guide til DelayQueue

1. Oversigt

I denne artikel ser vi på DelayQueue konstruere fra java.util.concurrent pakke. Dette er en blokerende kø, der kan bruges i producent-forbrugerprogrammer.

Det har en meget nyttig egenskab - når forbrugeren ønsker at tage et element fra køen, kan de kun tage det, når forsinkelsen for det pågældende element er udløbet.

2. Implementering Forsinket til Elementer i DelayQueue

Hvert element, vi ønsker at sætte i DelayQueue skal implementere Forsinket interface. Lad os sige, at vi vil oprette en DelayObject klasse. Forekomster af denne klasse vil blive lagt i DelayQueue.

Vi passerer Snor data og delayInMilliseconds som og argumenter til sin konstruktør:

offentlig klasse DelayObject implementerer forsinket {private strengdata; privat lang starttid; public DelayObject (String data, long delayInMilliseconds) {this.data = data; this.startTime = System.currentTimeMillis () + delayInMilliseconds; }

Vi definerer en starttidspunkt - dette er et tidspunkt, hvor elementet skal forbruges fra køen. Dernæst skal vi implementere getDelay () metode - den skal returnere den resterende forsinkelse, der er knyttet til dette objekt i den givne tidsenhed.

Derfor er vi nødt til at bruge TimeUnit.convert () metode til at returnere den resterende forsinkelse i korrekt TimeUnit:

@Override public long getDelay (TimeUnit unit) {long diff = startTime - System.currentTimeMillis (); returner unit.convert (diff, TimeUnit.MILLISECONDS); }

Når forbrugeren forsøger at tage et element fra køen, bliver DelayQueue vil udføre getDelay () for at finde ud af, om det er tilladt at returnere dette element fra køen. Hvis den getDelay () metode returnerer nul eller et negativt tal, det betyder, at det kunne hentes fra køen.

Vi er også nødt til at implementere sammenligne med() metode, fordi elementerne i DelayQueue sorteres efter udløbstiden. Det element, der udløber først, holdes i køens hoved, og elementet med den højeste udløbstid holdes i køens hale:

@Override public int CompareTo (Delayed o) {return Ints.saturatedCast (this.startTime - ((DelayObject) o). StartTime); }

3. DelayQueue Consumer og producent

At kunne teste vores DelayQueue vi er nødt til at implementere producent- og forbrugerlogik. Producentklassen tager køen, antallet af elementer, der skal produceres, og forsinkelsen af ​​hver besked i millisekunder som argumenter.

Så når løb() metoden påberåbes, den sætter elementer i køen og sover i 500 millisekunder efter hvert sæt:

offentlig klasse DelayQueueProducer implementerer Runnable {private BlockingQueue kø; privat heltal numberOfElementsToProduc; privat heltal forsinkelseOfEachProducedMessageMilliseconds; // standardkonstruktør @ Override public void run () {for (int i = 0; i <numberOfElementsToProduc; i ++) {DelayObject object = new DelayObject (UUID.randomUUID (). toString (), delayOfEachProducedMessageMilliseconds); System.out.println ("Put objekt:" + objekt); prøv {queue.put (objekt); Tråd. Søvn (500); } fange (InterruptedException ie) {ie.printStackTrace (); }}}}

Forbrugerens implementering er meget ens, men det holder også styr på antallet af beskeder, der blev forbrugt:

offentlig klasse DelayQueueConsumer implementerer Runnable {private BlockingQueue kø; privat HeltalnummerOfElementsToTake; public AtomicInteger numberOfConsumedElements = new AtomicInteger (); // standardkonstruktører @ Override public void run () {for (int i = 0; i <numberOfElementsToTake; i ++) {prøv {DelayObject object = queue.take (); numberOfConsumedElements.incrementAndGet (); System.out.println ("Forbrugeroptagelse:" + objekt); } fange (InterruptedException e) {e.printStackTrace (); }}}}

4. DelayQueue Brugstest

At teste adfærd hos DelayQueue, vi opretter en producenttråd og en forbrugertråd.

Producenten vil sætte() to objekter ind i køen med 500 millisekunder forsinkelse. Testen hævder, at forbrugeren forbrugte to meddelelser:

@Test offentlig ugyldighed givenDelayQueue_whenProducElement _thenShouldConsumeAfterGivenDelay () kaster InterruptedException {// given ExecutorService eksekutor = Executors.newFixedThreadPool (2); BlockingQueue kø = ny DelayQueue (); int numberOfElementsToProduc = 2; int delayOfEachProducedMessageMilliseconds = 500; DelayQueueConsumer forbruger = ny DelayQueueConsumer (kø, numberOfElementsToProducer); DelayQueueProducer-producent = ny DelayQueueProducer (kø, antalOfElementsToProducer, forsinkelseOfEachProducedMessageMillisekunder); // når executor.submit (producent); executor.submit (forbruger); // derefter executor.awaitTermination (5, TimeUnit.SECONDS); executor.shutdown (); assertEquals (consumer.numberOfConsumedElements.get (), numberOfElementsToProducer); }

Vi kan se, at kørsel af dette program vil producere følgende output:

Put objekt: {data = '86046157-e8a0-49b2-9cbb-8326124bcab8', startTime = 1494069868007} Forbrugeroptagelse: {data = '86046157-e8a0-49b2-9cbb-8326124bcab8', startTime = 14940668 'd47927ef-18c7-449b-b491-5ff30e6795ed', startTime = 1494069868512} Forbrugeroptagelse: {data = 'd47927ef-18c7-449b-b491-5ff30e6795ed', startTime = 1494069868512}

Producenten lægger genstanden, og efter et stykke tid forbruges det første objekt, som forsinkelsen udløb for.

Den samme situation opstod for det andet element.

5. Forbruger ikke i stand til at forbruge på det givne tidspunkt

Lad os sige, at vi har en producent, der producerer et element, der vil udløber om 10 sekunder:

int numberOfElementsToProduc = 1; int delayOfEachProducedMessageMilliseconds = 10_000; DelayQueueConsumer forbruger = ny DelayQueueConsumer (kø, numberOfElementsToProducer); DelayQueueProducer-producent = ny DelayQueueProducer (kø, antalOfElementsToProducer, forsinkelseOfEachProducedMessageMillisekunder);

Vi starter vores test, men den afsluttes efter 5 sekunder. På grund af egenskaberne ved DelayQueue, forbrugeren kan ikke forbruge beskeden fra køen, fordi elementet endnu ikke er udløbet:

executor.submit (producent); executor.submit (forbruger); executor.awaitTermination (5, TimeUnit.SECONDS); executor.shutdown (); assertEquals (consumer.numberOfConsumedElements.get (), 0);

Bemærk, at forbrugerens numberOfConsumedElements har en værdi lig med nul.

6. Producering af et element med øjeblikkelig udløb

Når implementeringerne af Forsinket besked getDelay () metode returnerer et negativt tal, det betyder, at det givne element allerede er udløbet. I denne situation vil producenten straks forbruge dette element.

Vi kan teste situationen med at producere et element med negativ forsinkelse:

int numberOfElementsToProduc = 1; int delayOfEachProducedMessageMilliseconds = -10_000; DelayQueueConsumer forbruger = ny DelayQueueConsumer (kø, numberOfElementsToProducer); DelayQueueProducer-producent = ny DelayQueueProducer (kø, antalOfElementsToProducer, forsinkelseOfEachProducedMessageMillisekunder);

Når vi starter testsagen, forbruger forbrugeren elementet med det samme, fordi det allerede er udløbet:

executor.submit (producent); executor.submit (forbruger); executor.awaitTermination (1, TimeUnit.SECONDS); executor.shutdown (); assertEquals (consumer.numberOfConsumedElements.get (), 1);

7. Konklusion

I denne artikel så vi på DelayQueue konstruere fra java.util.concurrent pakke.

Vi implementerede en Forsinket element, der blev produceret og forbrugt fra køen.

Vi udnyttede vores implementering af DelayQueue at forbruge elementer, der var udløbet.

Implementeringen af ​​alle disse eksempler og kodestykker findes i GitHub-projektet - som er et Maven-projekt, så det skal være let at importere og køre som det er.


$config[zx-auto] not found$config[zx-overlay] not found