Guide til CountDownLatch i Java

1. Introduktion

I denne artikel giver vi en guide til CountDownLatch klasse og demonstrere, hvordan den kan bruges i et par praktiske eksempler.

I det væsentlige ved hjælp af en CountDownLatch vi kan få en tråd til at blokere, indtil andre tråde har gennemført en given opgave.

2. Brug i samtidig programmering

Kort sagt, en CountDownLatch har en tæller felt, som du kan reducere, som vi har brug for. Vi kan derefter bruge den til at blokere en opkaldstråd, indtil den er talt ned til nul.

Hvis vi foretog nogle parallelle processer, kunne vi instantiere CountDownLatch med samme værdi for tælleren som et antal tråde, vi vil arbejde på tværs af. Derefter kunne vi bare ringe nedtælling () efter hver tråd er færdig, hvilket garanterer, at en afhængig tråd ringer vente() vil blokere, indtil arbejderens tråde er færdige.

3. Venter på, at en pulje af tråde er færdig

Lad os prøve dette mønster ved at oprette en Arbejder og ved hjælp af en CountDownLatch felt for at signalere, når det er afsluttet:

offentlig klasse Arbejdsværktøjer Runnable {private List outputScraper; private CountDownLatch countDownLatch; offentlig arbejdstager (liste outputScraper, CountDownLatch countDownLatch) {this.outputScraper = outputScraper; this.countDownLatch = countDownLatch; } @ Override public void run () {doSomeWork (); outputScraper.add ("Nedtælles"); countDownLatch.countDown (); }}

Lad os derefter oprette en test for at bevise, at vi kan få en CountDownLatch at vente på Arbejder forekomster, der skal udføres:

@Test offentlig ugyldig nårParallelProcessing_thenMainThreadWillBlockUntilCompletion () kaster InterruptedException {List outputScraper = Collections.synchronizedList (new ArrayList ()); CountDownLatch countDownLatch = ny CountDownLatch (5); Listearbejdere = Stream .generate (() -> ny tråd (ny arbejder (outputScraper, countDownLatch))) .limit (5) .collect (toList ()); workers.forEach (tråd :: start); countDownLatch.await (); outputScraper.add ("Latch frigivet"); assertThat (outputScraper) .containsExactly ("Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Latch released"); }

Naturligvis vil "Latch frigivet" altid være den sidste output - da det afhænger af CountDownLatch frigive.

Bemærk, at hvis vi ikke ringede vente(), ville vi ikke være i stand til at garantere rækkefølgen af ​​udførelsen af ​​trådene, så testen ville tilfældigt mislykkes.

4. En pool af tråde, der venter på at begynde

Hvis vi tog det foregående eksempel, men denne gang startede tusindvis af tråde i stedet for fem, er det sandsynligt, at mange af de tidligere vil være færdigbehandlet, før vi endda har ringet Start() på de senere. Dette kan gøre det vanskeligt at forsøge at reproducere et samtidighedsproblem, da vi ikke kunne få alle vores tråde til at køre parallelt.

Lad os få den for at komme omkring dette Nedtællingslås at arbejde anderledes end i det foregående eksempel. I stedet for at blokere en overordnet tråd, indtil nogle børnetråde er færdige, kan vi blokere hver underordnet tråd, indtil alle de andre er startet.

Lad os ændre vores løb() metode, så den blokerer inden behandling:

offentlig klasse WaitingWorker implementerer Runnable {private List outputScraper; private CountDownLatch readyThreadCounter; privat CountDownLatch callingThreadBlocker; privat CountDownLatch completedThreadCounter; public WaitingWorker (List outputScraper, CountDownLatch readyThreadCounter, CountDownLatch callingThreadBlocker, CountDownLatch completedThreadCounter) {this.outputScraper = outputScraper; this.readyThreadCounter = readyThreadCounter; this.callingThreadBlocker = callingThreadBlocker; this.completedThreadCounter = completeThreadCounter; } @ Override offentlig ugyldig kørsel () {readyThreadCounter.countDown (); prøv {callingThreadBlocker.await (); doSomeWork (); outputScraper.add ("Nedtælles"); } fange (InterruptedException e) {e.printStackTrace (); } endelig {completeThreadCounter.countDown (); }}}

Lad os nu ændre vores test, så den blokerer indtil alt Arbejdere er startet, fjerner blokeringen af Arbejdstagere, og blokerer derefter indtil Arbejdere er færdig:

@Test offentlig ugyldig nårDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime () kaster InterruptedException {List outputScraper = Collections.synchronizedList (new ArrayList ()); CountDownLatch readyThreadCounter = ny CountDownLatch (5); CountDownLatch callingThreadBlocker = ny CountDownLatch (1); CountDownLatch completedThreadCounter = ny CountDownLatch (5); Listearbejdere = Stream .generate (() -> ny tråd (ny WaitingWorker (outputScraper, readyThreadCounter, callingThreadBlocker, completeThreadCounter))) .limit (5) .collect (toList ()); workers.forEach (tråd :: start); readyThreadCounter.await (); outputScraper.add ("Arbejdere klar"); callingThreadBlocker.countDown (); completeThreadCounter.await (); outputScraper.add ("Arbejdere færdige"); assertThat (outputScraper) .containsExactly ("Workers ready", "Counted down", "Counted down", "Counted down", "Counted down", "Counted down", "Workers complete"); }

Dette mønster er virkelig nyttigt til at forsøge at gengive samtidige fejl, som det kan bruges til at tvinge tusindvis af tråde til at prøve at udføre nogle logik parallelt.

5. Afslutning af en Nedtællingslås Tidlig

Nogle gange kan vi løbe ind i en situation, hvor Arbejdere afslut ved en fejl, før du tæller ned CountDownLatch. Dette kan resultere i, at det aldrig når nul og vente() slutter aldrig:

@ Override public void run () {if (true) {throw new RuntimeException ("Åh kære, jeg er en BrokenWorker"); } countDownLatch.countDown (); outputScraper.add ("Nedtælles"); }

Lad os ændre vores tidligere test for at bruge en BrokenWorker, for at vise hvordan vente() vil blokere for evigt:

@Test offentlig ugyldig nårFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck () kaster InterruptedException {List outputScraper = Collections.synchronizedList (new ArrayList ()); CountDownLatch countDownLatch = ny CountDownLatch (5); Listearbejdere = Stream .generate (() -> ny tråd (ny BrokenWorker (outputScraper, countDownLatch))) .limit (5) .collect (toList ()); workers.forEach (tråd :: start); countDownLatch.await (); }

Det er klart, at dette ikke er den adfærd, vi ønsker - det ville være meget bedre for applikationen at fortsætte end uendelig blokering.

For at omgå dette, lad os tilføje et timeout-argument til vores opfordring til vente().

boolsk afsluttet = countDownLatch.await (3L, TimeUnit.SECONDS); assertThat (afsluttet) .isFalse ();

Som vi kan se, vil testen til sidst timeout og vente() kommer tilbage falsk.

6. Konklusion

I denne hurtige vejledning har vi demonstreret, hvordan vi kan bruge en CountDownLatch for at blokere en tråd, indtil andre tråde er færdigbehandlet.

Vi har også vist, hvordan det kan bruges til at debugge problemer med samtidighed ved at sikre, at tråde kører parallelt.

Implementeringen af ​​disse eksempler findes på GitHub; dette er et Maven-baseret projekt, så det skal være let at køre som det er.