En guide til Java SynchronousQueue

1. Oversigt

I denne artikel ser vi på SynchronousQueue fra java.util.concurrent pakke.

Kort sagt, denne implementering giver os mulighed for at udveksle information mellem tråde på en trådsikker måde.

2. API-oversigt

Det SynchronousQueue har kun to understøttede operationer: tage() og sætte(), og begge blokerer.

For eksempel, når vi vil føje et element til køen, skal vi ringe til sætte() metode. Denne metode vil blokere, indtil en anden tråd kalder tage() metode, der signalerer, at den er klar til at tage et element.

Selvom SynchronousQueue har en grænseflade til en kø, skal vi tænke på det som et udvekslingspunkt for et enkelt element mellem to tråde, hvor en tråd udleverer et element, og en anden tråd tager dette element.

3. Implementering af overdragelser ved hjælp af en delt variabel

For at se hvorfor SynchronousQueue kan være så nyttigt, vi implementerer en logik ved hjælp af en delt variabel mellem to tråde, og derefter omskriver vi den logik ved hjælp af SynchronousQueue gør vores kode meget enklere og mere læselig.

Lad os sige, at vi har to tråde - en producent og en forbruger - og når producenten indstiller en værdi af en delt variabel, vil vi signalere den kendsgerning til forbrugertråden. Dernæst henter forbrugertråden en værdi fra en delt variabel.

Vi bruger CountDownLatch at koordinere disse to tråde for at forhindre en situation, hvor forbrugeren får adgang til en værdi af en delt variabel, der endnu ikke var indstillet.

Vi definerer en delt stat variabel og a CountDownLatch der vil blive brugt til at koordinere behandlingen:

ExecutorService eksekutor = Executors.newFixedThreadPool (2); AtomicInteger sharedState = nyt AtomicInteger (); CountDownLatch countDownLatch = ny CountDownLatch (1);

Producenten gemmer et tilfældigt heltal i delt stat variabel, og udfør countDown () metode til countDownLatch, signal til forbrugeren, at den kan hente en værdi fra sharedState:

Kørbar producent = () -> {Heltal produceret Element = ThreadLocalRandom .current () .nextInt (); sharedState.set (producedElement); countDownLatch.countDown (); };

Forbrugeren vil vente på countDownLatch bruger vente() metode. Når producenten signaliserer, at variablen blev indstillet, henter forbrugeren den fra sharedState:

Kørbar forbruger = () -> {prøv {countDownLatch.await (); Heltal consumedElement = sharedState.get (); } fange (InterruptedException ex) {ex.printStackTrace (); }};

Sidst men ikke mindst, lad os starte vores program:

eksekutor. udføre (producent); eksekutor. udføre (forbruger); executor.awaitTermination (500, TimeUnit.MILLISECONDS); executor.shutdown (); assertEquals (countDownLatch.getCount (), 0);

Det producerer følgende output:

At gemme et element: -1507375353 til udvekslingspunktet forbrugte et element: -1507375353 fra udvekslingspunktet

Vi kan se, at dette er meget kode for at implementere en så enkel funktionalitet som at udveksle et element mellem to tråde. I det næste afsnit vil vi forsøge at gøre det bedre.

4. Implementering af handoffs ved hjælp af SynchronousQueue

Lad os nu implementere den samme funktionalitet som i det foregående afsnit, men med en SynchronousQueue. Det har en dobbelt effekt, fordi vi kan bruge det til at udveksle tilstand mellem tråde og til at koordinere den handling, så vi ikke behøver at bruge noget udover SynchronousQueue.

For det første definerer vi en kø:

ExecutorService eksekutor = Executors.newFixedThreadPool (2); SynchronousQue kø = ny SynchronousQueue ();

Producenten kalder en sætte() metode, der blokerer, indtil en anden tråd tager et element fra køen:

Kørbar producent = () -> {Heltal produceretElement = ThreadLocalRandom .current () .nextInt (); prøv {queue.put (producedElement); } fange (InterruptedException ex) {ex.printStackTrace (); }};

Forbrugeren vil simpelthen hente dette element ved hjælp af tage() metode:

Kørbar forbruger = () -> {prøv {Integer consumedElement = queue.take (); } fange (InterruptedException ex) {ex.printStackTrace (); }};

Dernæst starter vi vores program:

eksekutor. udføre (producent); eksekutor. udføre (forbruger); executor.awaitTermination (500, TimeUnit.MILLISECONDS); executor.shutdown (); assertEquals (queue.size (), 0);

Det producerer følgende output:

At gemme et element: 339626897 til udvekslingspunktet forbrugte et element: 339626897 fra udvekslingspunktet

Vi kan se, at en SynchronousQueue bruges som et udvekslingspunkt mellem trådene, hvilket er meget bedre og mere forståeligt end det foregående eksempel, der brugte den delte tilstand sammen med en CountDownLatch.

5. Konklusion

I denne hurtige vejledning så vi på SynchronousQueue konstruere. Vi oprettede et program, der udveksler data mellem to tråde ved hjælp af delt tilstand og derefter omskrev programmet for at udnytte SynchronousQueue konstruere. Dette fungerer som et udvekslingspunkt, der koordinerer producenten og forbrugertråden.

Implementeringen af ​​alle disse eksempler og kodestykker findes i GitHub-projektet - dette er et Maven-projekt, så det skal være let at importere og køre, som det er.


$config[zx-auto] not found$config[zx-overlay] not found