Pålidelig besked med JGroups

1. Oversigt

JGroups er en Java API til pålidelig meddelelsesudveksling. Den har en simpel grænseflade, der giver:

  • en fleksibel protokolstak, inklusive TCP og UDP
  • fragmentering og genmontering af store meddelelser
  • pålidelig unicast og multicast
  • fejlregistrering
  • flowkontrol

Samt mange andre funktioner.

I denne vejledning opretter vi en simpel applikation til udveksling Snor meddelelser mellem applikationer og levering af delt tilstand til nye applikationer, når de slutter sig til netværket.

2. Opsætning

2.1. Maven afhængighed

Vi er nødt til at tilføje en enkelt afhængighed af vores pom.xml:

 org.jgroups jgroups 4.0.10.Final 

Den seneste version af biblioteket kan kontrolleres på Maven Central.

2.2. Netværk

JGroups forsøger at bruge IPV6 som standard. Afhængigt af vores systemkonfiguration kan dette resultere i, at applikationer ikke kan kommunikere.

For at undgå dette indstiller vi java.net.preferIPv4Stack til rigtigt egenskab, når du kører vores applikationer her:

java -Djava.net.preferIPv4Stack = sand com.baeldung.jgroups.JGroupsMessenger 

3. JKanaler

Vores forbindelse til et JGroups-netværk er en JChannel. Kanalen slutter sig til en klynge og sender og modtager meddelelser samt information om netværkets tilstand.

3.1. Oprettelse af en kanal

Vi opretter en JChannel med en sti til en konfigurationsfil. Hvis vi udelader filnavnet, ser det efter udp.xml i den aktuelle arbejdsmappe.

Vi opretter en kanal med en eksplicit navngivet konfigurationsfil:

JChannel-kanal = ny JChannel ("src / main / resources / udp.xml"); 

JGroups-konfiguration kan være meget kompliceret, men standard UDP- og TCP-konfigurationer er tilstrækkelige til de fleste applikationer. Vi har inkluderet filen til UDP i vores kode og vil bruge den til denne vejledning.

For mere information om konfiguration af transport se JGroups manualen her.

3.2. Tilslutning af en kanal

Når vi har oprettet vores kanal, skal vi deltage i en klynge. En klynge er en gruppe noder, der udveksler meddelelser.

Tilslutning til en klynge kræver et klyngenavn:

channel.connect ("Baeldung"); 

Den første node, der forsøger at deltage i en klynge, opretter den, hvis den ikke findes. Vi ser denne proces i aktion nedenfor.

3.3. Navngivning af en kanal

Noder identificeres med et navn, så jævnaldrende kan sende dirigerede meddelelser og modtage underretninger om, hvem der går ind i og forlader klyngen. JGroups tildeler automatisk et navn, eller vi kan indstille vores eget:

channel.name ("bruger1");

Vi bruger disse navne nedenfor til at spore, når noder kommer ind og forlader klyngen.

3.4. Lukning af en kanal

Oprydning af kanaler er afgørende, hvis vi ønsker, at jævnaldrende modtager rettidig underretning om, at vi er forladt.

Vi lukker en JChannel med sin tætte metode:

channel.close ()

4. Ændringer i klyngevisning

Med en oprettet JChannel er vi nu klar til at se jævnaldrende i klyngen og udveksle meddelelser med dem.

JGroups opretholder klyngetilstand inde i Udsigt klasse. Hver kanal har en enkelt Udsigt af netværket. Når visningen ændres, leveres den via viewAccepted () ring tilbage.

Til denne vejledning udvider vi Modtageradaptor API-klasse, der implementerer alle de interface-metoder, der kræves til en applikation.

Det er den anbefalede måde at implementere tilbagekald på.

Lad os tilføje viewAccepted til vores ansøgning:

public void viewAccepted (View newView) {private View lastView; hvis (lastView == null) {System.out.println ("Modtaget oprindelig visning:"); newView.forEach (System.out :: println); } andet {System.out.println ("Modtaget ny visning."); Liste newMembers = View.newMembers (lastView, newView); System.out.println ("Nye medlemmer:"); newMembers.forEach (System.out :: println); Liste exMembers = View.leftMembers (lastView, newView); System.out.println ("Afsluttede medlemmer:"); exMembers.forEach (System.out :: println); } lastView = newView; } 

Hver Udsigt indeholder en Liste af Adresse objekter, der repræsenterer hvert medlem af klyngen. JGroups tilbyder bekvemmelighedsmetoder til at sammenligne en visning med en anden, som vi bruger til at opdage nye eller forladte medlemmer af klyngen.

5. Afsendelse af beskeder

Beskedhåndtering i JGroups er ligetil. EN Besked indeholder en byte array og Adresse objekter svarende til afsenderen og modtageren.

Til denne vejledning bruger vi Strenge læses fra kommandolinjen, men det er let at se, hvordan et program kan udveksle andre datatyper.

5.1. Broadcast-beskeder

EN Besked oprettes med en destination og et byte-array; JChannel indstiller afsenderen til os. Hvis målet er nul, hele klyngen modtager beskeden.

Vi accepterer tekst fra kommandolinjen og sender den til klyngen:

System.out.print ("Indtast en besked:"); String line = in.readLine (). ToLowerCase (); Beskedbesked = ny besked (null, line.getBytes ()); channel.send (besked); 

Hvis vi kører flere forekomster af vores program og sender denne besked (efter at vi har implementeret modtage() metode nedenfor), ville alle modtage den, inklusive afsenderen.

5.2. Blokering af vores beskeder

Hvis vi ikke vil se vores meddelelser, kan vi indstille en egenskab til det:

channel.setDiscardOwnMessages (true); 

Når vi kører den forrige test, modtager meddelelsesafsenderen ikke sin udsendelsesbesked.

5.3. Direkte beskeder

Afsendelse af en direkte besked kræver en gyldig Adresse. Hvis vi henviser til noder efter navn, har vi brug for en måde at slå op på Adresse. Heldigvis har vi Udsigt for det.

Den nuværende Udsigt er altid tilgængelig fra JChannel:

privat Valgfri getAddress (strengnavn) {Vis visning = channel.view (); returner view.getMembers (). stream () .filter (adresse -> name.equals (address.toString ())) .findAny (); } 

Adresse navne er tilgængelige via klassen toString () metode, så vi søger blot i Liste af klyngemedlemmer til det navn, vi ønsker.

Så vi kan acceptere et navn fra konsollen, finde den tilknyttede destination og sende en direkte besked:

Adressedestination = null; System.out.print ("Indtast en destination:"); String destinationName = in.readLine (). ToLowerCase (); destination = getAddress (destinationName) .orElseThrow (() -> ny undtagelse ("Destination ikke fundet"); Beskedbesked = ny besked (destination, "Hej der!"); channel.send (besked); 

6. Modtagelse af beskeder

Vi kan sende beskeder, lad os nu tilføje, prøv at modtage dem nu.

Lad os tilsidesætte Modtageradaptor tom modtagemetode:

offentlig ugyldig modtagelse (beskedbesked) {String line = Besked modtaget fra: "+ message.getSrc () +" til: "+ message.getDest () +" -> "+ message.getObject (); System.out.println (linje);} 

Da vi ved, indeholder meddelelsen en Snor, vi kan sikkert passere getObject () til System.out.

7. Statsudveksling

Når en knude kommer ind i netværket, skal det muligvis hente tilstandsoplysninger om klyngen. JGroups giver en statsoverførselsmekanisme til dette.

Når en node slutter sig til klyngen, kalder den simpelthen getState (). Klyngen henter normalt staten fra det ældste medlem i gruppen - koordinatoren.

Lad os tilføje et antal udsendelser til vores applikation. Vi tilføjer en ny medlemsvariabel og forøger den indeni modtage():

privat heltal messageCount = 0; offentlig ugyldig modtagelse (Beskedbesked) {String line = "Besked modtaget fra:" + besked.getSrc () + "til:" + besked.getDest () + "->" + besked.getObject (); System.out.println (linje); hvis (message.getDest () == null) {messageCount ++; System.out.println ("Antal meddelelser:" + meddelelsesantal); }} 

Vi tjekker for en nul destination, fordi hvis vi tæller direkte beskeder, vil hver node have et andet nummer.

Dernæst tilsidesætter vi to yderligere metoder i Modtageradaptor:

public void setState (InputStream input) {try {messageCount = Util.objectFromStream (new DataInputStream (input)); } fange (Undtagelse e) {System.out.println ("Fejl ved fjernelse af tilstand!"); } System.out.println (messageCount + "er det aktuelle meddelelsestal."); } offentlig ugyldighed getState (OutputStream-output) kaster Undtagelse {Util.objectToStream (messageCount, ny DataOutputStream (output)); } 

Svarende til meddelelser overfører JGroups tilstand som en matrix af bytes.

JGroups leverer en InputStream til koordinatoren for at skrive staten til og en OutputStream for den nye node at læse. API'en giver bekvemmelighedsklasser til serialisering og deserialisering af dataene.

Bemærk, at adgang til tilstandsinformation i produktionskode skal være trådsikker.

Endelig tilføjer vi opkaldet til getState () til vores opstart, når vi opretter forbindelse til klyngen:

channel.connect (clusterName); channel.getState (null, 0); 

getState () accepterer en destination, hvorfra staten skal anmodes, og en timeout i millisekunder. EN nul destination angiver koordinatoren og 0 betyder ikke timeout.

Når vi kører denne app med et par noder og udveksler udsendelsesmeddelelser, ser vi antallet af beskeder tæller.

Så hvis vi tilføjer en tredje klient eller stopper og starter en af ​​dem, ser vi den nytilsluttede node udskrive det korrekte antal meddelelser.

8. Konklusion

I denne vejledning brugte vi JGroups til at oprette en applikation til udveksling af meddelelser. Vi brugte API'en til at overvåge, hvilke noder der var forbundet til og forlod klyngen, og også til at overføre klyngetilstand til en ny node, da den sluttede sig.

Kodeprøver kan som altid findes på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found