Introduktion til RSocket

1. Introduktion

I denne tutorial tager vi et første kig på RSocket, og hvordan det muliggør klient-server-kommunikation.

2. Hvad er? RSocket?

RSocket er en binær, punkt-til-punkt kommunikationsprotokol beregnet til brug i distribuerede applikationer. I den forstand giver det et alternativ til andre protokoller som HTTP.

En fuld sammenligning mellem RSocket og andre protokoller ligger uden for denne artikels anvendelsesområde. I stedet fokuserer vi på en nøglefunktion i RSocket: dens interaktionsmodeller.

RSocket tilbyder fire interaktionsmodeller. Med det i tankerne undersøger vi hver enkelt med et eksempel.

3. Maven-afhængigheder

RSocket behøver kun to direkte afhængigheder til vores eksempler:

 io.rsocket rsocket-core 0.11.13 io.rsocket rsocket-transport-netty 0.11.13 

Rsocket-core og rsocket-transport-netty afhængigheder er tilgængelige på Maven Central.

En vigtig note er, at RSocket-biblioteket ofte bruger reaktive streams. Det Strøm og Mono klasser bruges i hele denne artikel, så en grundlæggende forståelse af dem vil være nyttigt.

4. Serveropsætning

Lad os først oprette Server klasse:

offentlig klasse Server {privat endelig Engangsserver; public Server () {this.server = RSocketFactory.receive () .acceptor ((setupPayload, reactiveSocket) -> Mono.just (new RSocketImpl ())) .transport (TcpServerTransport.create ("localhost", TCP_PORT)) .start (). abonner (); } offentlig ugyldig disposition () {this.server.dispose (); } privat klasse RSocketImpl udvider AbstractRSocket {}}

Her bruger vi RSocketFactory for at konfigurere og lytte til en TCP-bøsning. Vi passerer i vores skik RSocketImpl til at håndtere anmodninger fra klienter. Vi tilføjer metoder til RSocketImpl som vi går.

For at starte serveren skal vi blot instantiere den:

Server server = ny server ();

En enkelt serverinstans kan håndtere flere forbindelser. Som et resultat understøtter kun en serverinstans alle vores eksempler.

Når vi er færdige, bortskaffes metode stopper serveren og frigiver TCP-porten.

4. Interaktionsmodeller

4.1. Anmodning / svar

RSocket leverer en anmodnings- / svarmodel - hver anmodning modtager et enkelt svar.

For denne model opretter vi en simpel tjeneste, der returnerer en besked tilbage til klienten.

Lad os starte med at tilføje en metode til vores udvidelse af Abstrakt stikkontakt, RSocketImpl:

@ Override offentlig Mono requestResponse (nyttelast nyttelast) {prøv {returner Mono.just (nyttelast); // reflekter nyttelasten tilbage til afsender} fangst (Undtagelse x) {returner Mono.error (x); }}

Det requestResponse metode returnerer et enkelt resultat for hver anmodning, som vi kan se ved Mono svarstype.

Nyttelast er den klasse, der indeholder beskedindhold og metadata. Det bruges af alle interaktionsmodeller. Indholdet af nyttelasten er binært, men der er bekvemhedsmetoder, der understøtter Snor-baseret indhold.

Dernæst kan vi oprette vores klientklasse:

offentlig klasse ReqResClient {privat endelig RSocket-stik; public ReqResClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } public String callBlocking (String string) {return socket .requestResponse (DefaultPayload.create (string)) .map (Payload :: getDataUtf8) .block (); } offentlig ugyldig disposition () {this.socket.dispose (); }}

Klienten bruger RSocketFactory.connect () metode til at starte en sokkelforbindelse med serveren. Vi bruger requestResponse metode på stikket for at sende en nyttelast til serveren.

Vores nyttelast indeholder Snor videregivet til klienten. Når Mono svar ankommer, kan vi bruge getDataUtf8 () metode til at få adgang til Snor svarets indhold.

Endelig kan vi køre integrationstesten for at se anmodning / svar i aktion. Vi sender en Snor til serveren og kontrollere, at det samme Snor returneres:

@Test offentlig ugyldig nårSendingAString_thenRevceiveTheSameString () {ReqResClient-klient = ny ReqResClient (); String string = "Hej RSocket"; assertEquals (string, client.callBlocking (string)); client.dispose (); }

4.2. Fire-and-Glem

Med ild-og-glem-modellen, klienten modtager intet svar fra serveren.

I dette eksempel sender klienten simulerede målinger til serveren i 50 ms intervaller. Serveren offentliggør målingerne.

Lad os tilføje en brand-og-glem-handler til vores server i RSocketImpl klasse:

@ Override offentlig Mono fireAndForget (nyttelast) {prøv {dataPublisher.publish (nyttelast); // videresende nyttelastretur Mono.empty (); } fange (Undtagelse x) {returner Mono.error (x); }}

Denne handler ligner meget anmodnings- / svarhandleren. Imidlertid, fireAndForget vender tilbage Mono i stedet for Mono.

Det dataPublisher er en forekomst af org.reactivestreams.Publisher. Således gør den nyttelasten tilgængelig for abonnenter. Vi bruger det i anmodningen / stream-eksemplet.

Derefter opretter vi brand-og-glem-klienten:

offentlig klasse FireNForgetClient {privat endelig RSocket-stik; private endelige listedata; offentlig FireNForgetClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } / ** Send binær hastighed (flyde) hver 50 ms * / offentlig ugyldig sendData () {data = Collections.unmodifiableList (generereData ()); Flux.interval (Duration.ofMillis (50)) .take (data.size ()) .map (dette :: createFloatPayload) .flatMap (socket :: fireAndForget) .blockLast (); } // ...}

Opsætningen af ​​soklen er nøjagtig den samme som før.

Det sendData () metoden bruger en Strøm stream for at sende flere beskeder. For hver besked påberåber vi os stikkontakt :: fireAndForget.

Vi er nødt til at abonnere på Mono svar for hver besked. Hvis vi glemmer at abonnere så stikkontakt :: fireAndForget vil ikke udføre.

Det flatMap operatøren sørger for, at Ugyldig svar sendes til abonnenten, mens blockLast operatør fungerer som abonnent.

Vi venter til næste afsnit for at køre brand-og-glem-testen. På det tidspunkt opretter vi en anmodning / stream-klient for at modtage de data, der blev skubbet af brand-og-glem-klienten.

4.3. Anmod / stream

I anmodnings- / stream-modellen en enkelt anmodning kan modtage flere svar. For at se dette i aktion kan vi bygge på eksemplet med ild og glem. For at gøre det, lad os anmode om en strøm for at hente de målinger, vi sendte i det foregående afsnit.

Som før, lad os starte med at tilføje en ny lytter til RSocketImpl på serveren:

@Override public Flux requestStream (nyttelast nyttelast) {returner Flux.from (dataPublisher); }

Det requestStream handler returnerer a Strøm strøm. Som vi husker fra det foregående afsnit, er fireAndForget handler offentliggjorde indgående data til dataPublisher. Nu opretter vi en Strøm stream ved hjælp af det samme dataPublisher som begivenhedskilde. Ved at gøre dette vil måledataene strømme asynkront fra vores brand-og-glem klient til vores anmodning / stream klient.

Lad os oprette anmodningen / streame klienten næste:

offentlig klasse ReqStreamClient {privat endelig RSocket-stik; offentlig ReqStreamClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } public Flux getDataStream () {return socket .requestStream (DefaultPayload.create (DATA_STREAM_NAME)) .map (Payload :: getData) .map (buf -> buf.getFloat ()) .onErrorReturn (null); } offentlig ugyldig disposition () {this.socket.dispose (); }}

Vi opretter forbindelse til serveren på samme måde som vores tidligere klienter.

I getDataStream ()vi bruger socket.requestStream () for at modtage en Flux-stream fra serveren. Fra den strøm udtrækker vi Flyde værdier fra de binære data. Endelig returneres strømmen til den, der ringer op, så den, der ringer, kan abonnere på den og behandle resultaterne.

Lad os nu teste. Vi verificerer rundtur fra brand-og-glemmer at anmode om / streame.

Vi kan hævde, at hver værdi modtages i samme rækkefølge, som den blev sendt. Derefter kan vi hævde, at vi modtager det samme antal værdier, der blev sendt:

@Test offentlig ugyldig nårSendingStream_thenReceiveTheSameStream () {FireNForgetClient fnfClient = ny FireNForgetClient (); ReqStreamClient streamClient = ny ReqStreamClient (); Listedata = fnfClient.getData (); Liste dataReceived = ny ArrayList (); Engangsabonnement = streamClient.getDataStream () .index (). Abonnement (tuple -> {assertEquals ("Forkert værdi", data.get (tuple.getT1 (). IntValue ()), tuple.getT2 ()); dataReceived. tilføj (tuple.getT2 ());}, err -> LOG.error (err.getMessage ())); fnfClient.sendData (); // ... bortskaff klient- og abonnementsassertEquals ("Forkert datatælling modtaget", data.size (), dataReceived.size ()); }

4.4. Kanal

Kanalmodellen giver tovejskommunikation. I denne model flyder meddelelsesstrømme asynkront i begge retninger.

Lad os oprette en simpel spil simulering for at teste dette. I dette spil bliver hver side af kanalen en spiller. Når spillet kører, sender disse spillere beskeder til den anden side med tilfældige tidsintervaller. Den modsatte side vil reagere på beskederne.

For det første opretter vi handler på serveren. Som før tilføjer vi til RSocketImpl:

@Override public Flux requestChannel (Publisher payloads) {Flux.from (payloads) .subscribe (gameController :: processPayload); returner Flux.from (gameController); }

Det requestChannel handler har Nyttelast streams til både input og output. Det Forlægger inputparameter er en strøm af nyttelast modtaget fra klienten. Når de ankommer, overføres disse nyttelast til gameController :: processPayload fungere.

Som svar returnerer vi en anden Strøm stream tilbage til klienten. Denne stream er oprettet fra vores gameController, som også er en Forlægger.

Her er et resumé af GameController klasse:

offentlig klasse GameController implementerer Publisher {@Override public void subscribe (Subscriber subscriber) {// send Payload-meddelelser til abonnenten med tilfældige intervaller} public void processPayload (Payload payload) {// reager på meddelelser fra den anden spiller}}

Når GameController modtager en abonnent, begynder den at sende beskeder til den abonnent.

Lad os derefter oprette klienten:

offentlig klasse ChannelClient {privat endelig RSocket-stik; privat final GameController gameController; offentlig ChannelClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)). start () .block (); this.gameController = ny GameController ("Client Player"); } public void playGame () {socket.requestChannel (Flux.from (gameController)) .doOnNext (gameController :: processPayload) .blockLast (); } offentlig ugyldig disposition () {this.socket.dispose (); }}

Som vi har set i vores tidligere eksempler, opretter klienten forbindelse til serveren på samme måde som de andre klienter.

Klienten opretter sin egen forekomst af GameController.

Vi bruger socket.requestChannel () at sende vores Nyttelast streame til serveren. Serveren reagerer med en egen Payload-strøm.

Som nyttelast modtaget fra serveren sender vi dem til vores gameController :: processPayload handler.

I vores spil simulering er klienten og serveren spejlbilleder af hinanden. Det er, hver side sender en strøm af Nyttelast og modtage en strøm af Nyttelast fra den anden ende.

Strømmene kører uafhængigt uden synkronisering.

Lad os endelig køre simuleringen i en test:

@Test offentlig ugyldig når RunChannelGame_thenLogTheResults () {ChannelClient-klient = ny ChannelClient (); client.playGame (); client.dispose (); }

5. Konklusion

I denne indledende artikel har vi udforsket interaktionsmodellerne leveret af RSocket. Den fulde kildekode for eksemplerne kan findes i vores Github-arkiv.

Sørg for at tjekke RSocket-webstedet for en dybere diskussion. Især giver FAQ og motivationsdokumenter en god baggrund.


$config[zx-auto] not found$config[zx-overlay] not found