Vejledning til java.util.concurrent.BlockingQueue

1. Oversigt

I denne artikel vil vi se på en af ​​de mest nyttige konstruktioner java.util.concurrent for at løse det samtidige producent-forbrugerproblem. Vi ser på en API af BlockingQueue interface, og hvordan metoder fra denne grænseflade gør det lettere at skrive samtidige programmer.

Senere i artiklen viser vi et eksempel på et simpelt program, der har flere producenttråde og flere forbrugertråde.

2. BlockingQueue Typer

Vi kan skelne mellem to typer BlockingQueue:

  • ubegrænset kø - kan vokse næsten på ubestemt tid
  • afgrænset kø - med maksimal kapacitet defineret

2.1. Ubegrænset kø

Oprettelse af ubegrænsede køer er enkel:

BlockingQueue blockingQueue = ny LinkedBlockingDeque ();

Kapaciteten af blockingQueue vil blive indstillet til Heltal.MAX_VALUE. Alle operationer, der føjer et element til den ubegrænsede kø, blokerer aldrig, og det kan således vokse til en meget stor størrelse.

Det vigtigste ved design af et producent-forbrugerprogram ved hjælp af ubegrænset BlockingQueue er, at forbrugerne skal være i stand til at forbruge beskeder så hurtigt som producenterne tilføjer beskeder i køen. Ellers kunne hukommelsen fyldes op, og vi fik en Ikke mere hukommelse undtagelse.

2.2. Afgrænset kø

Den anden type kø er den afgrænsede kø. Vi kan oprette sådanne køer ved at overføre kapaciteten som et argument til en konstruktør:

BlockingQueue blockingQueue = ny LinkedBlockingDeque (10);

Her har vi en blockingQueue der har en kapacitet lig med 10. Det betyder, at når en producent forsøger at føje et element til en allerede fuld kø, afhængigt af en metode, der blev brugt til at tilføje det (tilbud(), tilføje() eller sætte()), vil den blokere, indtil plads til indsættelse af objekt bliver tilgængelig. Ellers mislykkes operationerne.

Brug af afgrænset kø er en god måde at designe samtidige programmer på, fordi når vi indsætter et element i en allerede fuld kø, skal disse operationer vente, indtil forbrugerne indhenter og stille plads i køen. Det giver os nedsættelse uden nogen indsats fra vores side.

3. BlockingQueue API

Der er to typer metoder i BlockingQueue interfacemetoder, der er ansvarlige for at føje elementer til en kø, og metoder, der henter disse elementer. Hver metode fra disse to grupper opfører sig forskelligt, hvis køen er fuld / tom.

3.1. Tilføjelse af elementer

  • tilføje() - vender tilbage rigtigt hvis indsættelse var vellykket, ellers kaster en IllegalStateException
  • sætte () - indsætter det angivne element i en kø og venter på en ledig plads, hvis det er nødvendigt
  • tilbud() - vender tilbage rigtigt hvis indsættelse var vellykket, ellers falsk
  • tilbud (E e, lang timeout, TimeUnit enhed) - forsøger at indsætte element i en kø og venter på en ledig plads inden for en bestemt timeout

3.2. Henter elementer

  • tage() - venter på et hovedelement i en kø og fjerner det. Hvis køen er tom, blokerer den og venter på, at et element bliver tilgængeligt
  • afstemning (lang timeout, TimeUnit enhed) - henter og fjerner hovedet på køen og venter op til den angivne ventetid, hvis det er nødvendigt for at et element bliver tilgængeligt. Vender tilbage nul efter en timeout

Disse metoder er de vigtigste byggesten fra BlockingQueue interface, når man bygger producent-forbrugerprogrammer.

4. Flertrådet producent-forbrugereksempel

Lad os oprette et program, der består af to dele - en producent og en forbruger.

Producenten vil producere et tilfældigt tal fra 0 til 100 og placere dette nummer i et BlockingQueue. Vi har 4 producenttråde og bruger sætte() metode til at blokere, indtil der er ledig plads i køen.

Den vigtige ting at huske er, at vi skal stoppe vores forbrugertråde i at vente på, at et element vises i en kø på ubestemt tid.

En god teknik til at signalere fra producent til forbrugeren, at der ikke er flere beskeder at behandle, er at sende en speciel besked kaldet en giftpiller. Vi er nødt til at sende så mange giftpiller, som vi har forbrugere. Derefter når en forbruger tager den særlige giftpillebesked fra en kø, afslutter den eksekveringen yndefuldt.

Lad os se på en producentklasse:

offentlig klasse NumbersProducer implementerer Runnable {private BlockingQueue numbersQueue; privat endelig intgiftPill; privat endelig int poisonPillPerProducer; offentlige NumbersProducer (BlockingQueue numbersQueue, int poisonPill, int poisonPillPerProducer) {this.numbersQueue = numbersQueue; this.poisonPill = poisonPill; this.poisonPillPerProducer = poisonPillPerProducer; } offentlig ugyldig kørsel () {prøv {genererNumre (); } fange (InterruptedException e) {Thread.currentThread (). interrupt (); }} privat tomrum generereNumre () kaster InterruptedException {for (int i = 0; i <100; i ++) {numbersQueue.put (ThreadLocalRandom.current (). nextInt (100)); } for (int j = 0; j <poisonPillPerProducer; j ++) {numbersQueue.put (poisonPill); }}}

Vores producent konstruktør tager som argument BlockingQueue der bruges til at koordinere behandling mellem producent og forbruger. Vi ser den metode generereNumre () vil sætte 100 elementer i en kø. Det kræver også giftpillebesked for at vide, hvilken type besked der skal sættes i en kø, når udførelsen er afsluttet. Denne besked skal sættes poisonPillPerProducer gange ind i en kø.

Hver forbruger tager et element fra en BlockingQueue ved brug af tage() metode, så den blokerer, indtil der er et element i en kø. Efter at have taget en Heltal fra en kø kontrollerer det, om meddelelsen er en giftpiller, hvis ja, er udførelsen af ​​en tråd afsluttet. Ellers udskriver resultatet resultatet på standardoutput sammen med den aktuelle tråds navn.

Dette giver os indsigt i vores forbrugers indre funktion:

offentlig klasse NumbersConsumer implementerer Runnable {privat BlockingQue kø; privat endelig intgiftPill; offentlige NumbersConsumer (BlockingQueue kø, int poisonPill) {this.queue = kø; this.poisonPill = poisonPill; } public void run () {try {while (true) {Integer number = queue.take (); if (number.equals (poisonPill)) {return; } System.out.println (Thread.currentThread (). GetName () + "resultat:" + nummer); }} fange (InterruptedException e) {Thread.currentThread (). interrupt (); }}}

Det vigtige at bemærke er brugen af ​​en kø. Samme som i producentkonstruktøren, passeres en kø som et argument. Vi kan gøre det fordi BlockingQueue kan deles mellem tråde uden eksplicit synkronisering.

Nu hvor vi har vores producent og forbruger, kan vi starte vores program. Vi skal definere køens kapacitet, og vi indstiller den til 100 elementer.

Vi vil have 4 producenttråde, og et antal forbrugertråde vil svare til antallet af tilgængelige processorer:

int BUNDET = 10; int N_PRODUCERS = 4; int N_CONSUMERS = Runtime.getRuntime (). availableProcessors (); int poisonPill = Heltal.MAX_VALUE; int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; int mod = N_CONSUMERS% N_PRODUCERS; BlockingQueue queue = new LinkedBlockingQueue (BOUND); for (int i = 1; i <N_PRODUCERS; i ++) {ny tråd (ny NumbersProducer (kø, poisonPill, poisonPillPerProducer)). start (); } for (int j = 0; j <N_CONSUMERS; j ++) {new Thread (new NumbersConsumer (queue, poisonPill)). start (); } ny tråd (ny NumbersProducer (kø, poisonPill, poisonPillPerProducer + mod)). start (); 

BlockingQueue oprettes ved hjælp af konstruktion med en kapacitet. Vi opretter 4 producenter og N-forbrugere. Vi specificerer, at vores giftpiller meddelelse er en Heltal.MAX_VALUE fordi sådan værdi aldrig vil blive sendt af vores producent under normale arbejdsforhold. Det vigtigste at bemærke her er, at BlockingQueue bruges til at koordinere arbejdet mellem dem.

Når vi kører programmet, vil 4 producenttråde sætte tilfældige Heltal i en BlockingQueue og forbrugere vil tage disse elementer fra køen. Hver tråd udskriver navnet på tråden til standardoutput sammen med et resultat.

5. Konklusion

Denne artikel viser en praktisk anvendelse af BlockingQueue og forklarer metoder, der bruges til at tilføje og hente elementer fra det. Vi har også vist, hvordan man bygger et multitrådet producent-forbrugerprogram ved hjælp af BlockingQueue at koordinere arbejdet mellem producenter og forbrugere.

Implementeringen af ​​alle disse eksempler og kodestykker findes i GitHub-projektet - dette er et Maven-baseret projekt, så det skal være let at importere og køre som det er.