MQTT-klient i Java

1. Oversigt

I denne vejledning ser vi, hvordan vi kan tilføje MQTT-meddelelser i et Java-projekt ved hjælp af bibliotekerne leveret af Eclipse Paho-projektet.

2. MQTT Primer

MQTT (MQ Telemetry Transport) er en beskedprotokol der blev oprettet for at imødekomme behovet for en enkel og let metode til at overføre data til / fra enheder med lav strøm, såsom dem, der anvendes i industrielle applikationer.

Med den øgede popularitet af IoT-enheder (Internet of Things) har MQTT oplevet en øget anvendelse, hvilket har ført til standardisering af OASIS og ISO.

Protokollen understøtter et enkelt messaging-mønster, nemlig Publish-Subscribe-mønsteret: hver besked, der sendes af en klient, indeholder et tilknyttet "emne", som bruges af mægleren til at dirigere det til abonnerede klienter. Emnenavne kan være enkle strenge som “olietemp”Eller en kurvelignende streng”motor / 1 / o / min“.

For at modtage meddelelser abonnerer en klient på et eller flere emner ved hjælp af dets nøjagtige navn eller en streng, der indeholder et af de understøttede jokertegn ("#" for emner på flere niveauer og "+" for enkeltniveau ").

3. Opsætning af projekt

For at inkludere Paho-biblioteket i et Maven-projekt skal vi tilføje følgende afhængighed:

 org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.0 

Den seneste version af Eclipse Paho Java-biblioteksmodulet kan downloades fra Maven Central.

4. Klientopsætning

Når vi bruger Paho-biblioteket, er det første, vi skal gøre for at sende og / eller modtage meddelelser fra en MQTT-mægler, at opnå en implementering af IMqttClient interface. Denne grænseflade indeholder alle de metoder, der kræves af et program for at oprette en forbindelse til serveren, sende og modtage meddelelser.

Paho kommer ud af kassen med to implementeringer af denne grænseflade, en asynkron (MqttAsyncClient) og en synkron (MqttClient).I vores tilfælde vil vi fokusere på den synkrone version, som har enklere semantik.

Selve opsætningen er en totrins proces: vi opretter først en forekomst af MqttClient klasse, og så forbinder vi den til vores server. Følgende underafsnit beskriver disse trin.

4.1. Oprettelse af et nyt IMqttClient Instans

Følgende kodestykke viser, hvordan du opretter et nyt IMqttClient synkron forekomst:

String publisherId = UUID.randomUUID (). ToString (); IMqttClient-udgiver = ny MqttClient ("tcp: //iot.eclipse.org: 1883", publisherId);

I dette tilfælde, vi bruger den enkleste konstruktør, der er tilgængelig, som tager slutpunktadressen på vores MQTT-mægler og en klient-id, der entydigt identificerer vores klient.

I vores tilfælde brugte vi en tilfældig UUID, så der genereres en ny klient-id ved hver kørsel.

Paho leverer også yderligere konstruktører, som vi kan bruge til at tilpasse persistensmekanismen, der bruges til at gemme ikke-godkendte meddelelser og / eller ScheduledExecutorService bruges til at køre baggrundsopgaver, der kræves af implementeringen af ​​protokolmotoren.

Det serverendepunkt, vi bruger, er en offentlig MQTT-mægler, der hostes af Paho-projektet, som gør det muligt for alle med en internetforbindelse at teste klienter uden behov for godkendelse.

4.2. Opretter forbindelse til serveren

Vores nyoprettede MqttClient forekomst er ikke forbundet til serveren. Vi gør det ved at kalde det Opret forbindelse() metode, valgfrit forbi en MqttConnectOptions eksempel, der giver os mulighed for at tilpasse nogle aspekter af protokollen.

Især kan vi bruge disse muligheder til at videregive yderligere oplysninger såsom sikkerhedsoplysninger, sessiongendannelsestilstand, genforbindelsestilstand og så videre.

Det MqttConnectionOptions klasse udsætter disse muligheder som enkle egenskaber, som vi kan indstille ved hjælp af normale settermetoder. Vi behøver kun at indstille de egenskaber, der kræves til vores scenario - de resterende antager standardværdier.

Koden, der bruges til at oprette forbindelse til serveren, ser typisk sådan ud:

MqttConnectOptions muligheder = nye MqttConnectOptions (); option.setAutomaticReconnect (sand); option.setCleanSession (sand); option.setConnectionTimeout (10); publisher.connect (valgmuligheder);

Her definerer vi vores forbindelsesmuligheder, så:

  • Biblioteket forsøger automatisk at oprette forbindelse igen til serveren i tilfælde af en netværksfejl
  • Det vil kassere usendte beskeder fra en tidligere kørsel
  • Forbindelsestimeout er indstillet til 10 sekunder

5. Afsendelse af beskeder

Afsendelse af beskeder ved hjælp af en allerede tilsluttet MqttClient er meget ligetil. Vi bruger en af offentliggøre() metodevarianter til at sende nyttelasten, som altid er et byte-array, til et givet emneved hjælp af en af ​​følgende servicekvalitetsindstillinger:

  • 0 - "højst en gang" semantik, også kendt som "fyr-og-glem". Brug denne mulighed, når meddelelsestab er acceptabelt, da det ikke kræver nogen form for anerkendelse eller vedholdenhed
  • 1 - “mindst én gang” semantik. Brug denne indstilling, når tab af beskeder ikke er acceptabelt og dine abonnenter kan håndtere dubletter
  • 2 - "nøjagtigt en gang" semantik. Brug denne indstilling, når tab af beskeder ikke er acceptabelt og dine abonnenter kan ikke håndtere dubletter

I vores eksempler på projekt, MotortemperaturSensor klasse spiller rollen som en mock-sensor, der producerer en ny temperaturaflæsning hver gang vi påberåber sig dens opkald() metode.

Denne klasse implementerer Kan kaldes interface, så vi let kan bruge det med en af ExecutorService implementeringer tilgængelige i java.util.concurrent pakke:

offentlig klasse EngineTemperatureSensor implementerer kaldbar {// ... private medlemmer udeladt offentlig EngineTemperatureSensor (IMqttClient-klient) {this.client = client; } @ Override public Void call () kaster undtagelse {if (! Client.isConnected ()) {return null; } MqttMessage msg = readEngineTemp (); msg.setQos (0); msg.setRetained (true); client.publish (TOPIC, msg); returnere null; } privat MqttMessage readEngineTemp () {dobbelt temp = 80 + rnd.nextDouble () * 20.0; byte [] nyttelast = String.format ("T:% 04.2f", temp) .getBytes (); returner ny MqttMessage (nyttelast); }}

Det MqttMessage indkapsler selve nyttelasten, den anmodede servicekvalitet og også tilbageholdt flag for meddelelsen. Dette flag indikerer for mægleren, at den skal beholde denne meddelelse, indtil den forbruges af en abonnent.

Vi kan bruge denne funktion til at implementere en "sidst kendt god" opførsel, så når en ny abonnent opretter forbindelse til serveren, modtager den den bevarede besked med det samme.

6. Modtagelse af beskeder

For at modtage beskeder fra MQTT-mægleren, vi er nødt til at bruge en af abonner () metode varianter, som giver os mulighed for at specificere:

  • Et eller flere emnefiltre til beskeder, vi ønsker at modtage
  • Den tilknyttede QoS
  • Tilbagekaldshåndtereren til behandling af modtagne meddelelser

I det følgende eksempel viser vi, hvordan du tilføjer en beskedlytter til en eksisterende IMqttClient instans for at modtage beskeder fra et givet emne. Vi bruger en CountDownLatch som en synkroniseringsmekanisme mellem vores tilbagekald og hovedudførelsestråden, hvor det reduceres hver gang en ny besked ankommer.

I prøvekoden har vi brugt en anden IMqttClient eksempel for at modtage beskeder. Vi gjorde det bare for at gøre mere klart, hvilken klient der gør hvad, men dette er ikke en Paho-begrænsning - hvis du vil, kan du bruge den samme klient til at offentliggøre og modtage meddelelser:

CountDownLatch receivedSignal = ny CountDownLatch (10); subscriber.subscribe (EngineTemperatureSensor.TOPIC, (topic, msg) -> {byte [] nyttelast = msg.getPayload (); // ... nyttelasthåndtering udeladt receivedSignal.countDown ();}); receivedSignal.await (1, TimeUnit.MINUTES);

Det abonner () ovenfor anvendte variant tager en IMqttMessageListener eksempel som dets andet argument.

I vores tilfælde bruger vi en simpel lambda-funktion, der behandler nyttelasten og reducerer en tæller. Hvis der ikke kommer nok meddelelser i det angivne tidsvindue (1 minut), vises vente() metode kaster en undtagelse.

Når du bruger Paho, behøver vi ikke udtrykkeligt bekræfte modtagelse af meddelelse. Hvis tilbagekaldet vender tilbage normalt, antager Paho, at det er et vellykket forbrug og sender en bekræftelse til serveren.

Hvis tilbagekaldet kaster et Undtagelse, lukkes klienten. Bemærk, at dette vil medføre tab af beskeder sendt med QoS-niveau på 0.

Meddelelser sendt med QoS niveau 1 eller 2 sendes igen af ​​serveren, når klienten er tilsluttet igen og abonnerer på emnet igen.

7. Konklusion

I denne artikel demonstrerede vi, hvordan vi kan tilføje support til MQTT-protokollen i vores Java-applikationer ved hjælp af biblioteket, der leveres af Eclipse Paho-projektet.

Dette bibliotek håndterer alle detaljer på lavt niveau, hvilket giver os mulighed for at fokusere på andre aspekter af vores løsning og samtidig give god plads til at tilpasse vigtige aspekter af dens interne funktioner, såsom vedholdenhed af meddelelser.

Koden vist i denne artikel er tilgængelig på GitHub.


$config[zx-auto] not found$config[zx-overlay] not found