MQTT——java簡單測試
阿新 • • 發佈:2021-02-10
服務端程式碼:
1 package bsit.mqtt.demo.one_way; 2 3 import org.eclipse.paho.client.mqttv3.MqttClient; 4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 5 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; 6 import org.eclipse.paho.client.mqttv3.MqttException; 7 import org.eclipse.paho.client.mqttv3.MqttMessage; 8 import org.eclipse.paho.client.mqttv3.MqttPersistenceException; 9 import org.eclipse.paho.client.mqttv3.MqttTopic; 10 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 11 /** 12 * 13 * Title:Server 14 * Description: 伺服器向多個客戶端推送主題,即不同客戶端可向伺服器訂閱相同主題 15 * @author chenrl 16 * 2016年1月6日下午3:29:28 17 */ 18 public class Server { 19 20 public static final String HOST = "tcp://192.168.1.3:61613"; 21 public static final String TOPIC = "toclient/124"; 22 public static final String TOPIC125 = "toclient/125"; 23 private static final String clientid = "server"; 24 25 private MqttClient client; 26 private MqttTopic topic; 27 private MqttTopic topic125; 28 private String userName = "admin"; 29 private String passWord = "password"; 30 31 private MqttMessage message; 32 33 public Server() throws MqttException { 34 // MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存 35 client = new MqttClient(HOST, clientid, new MemoryPersistence()); 36 connect(); 37 } 38 39 private void connect() { 40 MqttConnectOptions options = new MqttConnectOptions(); 41 options.setCleanSession(false); 42 options.setUserName(userName); 43 options.setPassword(passWord.toCharArray()); 44 // 設定超時時間 45 options.setConnectionTimeout(10); 46 // 設定會話心跳時間 47 options.setKeepAliveInterval(20); 48 try { 49 client.setCallback(new PushCallback()); 50 client.connect(options); 51 topic = client.getTopic(TOPIC); 52 topic125 = client.getTopic(TOPIC125); 53 } catch (Exception e) { 54 e.printStackTrace(); 55 } 56 } 57 58 public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException, 59 MqttException { 60 MqttDeliveryToken token = topic.publish(message); 61 token.waitForCompletion(); 62 System.out.println("message is published completely! " 63 + token.isComplete()); 64 } 65 66 public static void main(String[] args) throws MqttException { 67 Server server = new Server(); 68 69 server.message = new MqttMessage(); 70 server.message.setQos(2); 71 server.message.setRetained(true); 72 server.message.setPayload("給客戶端124推送的資訊".getBytes()); 73 server.publish(server.topic , server.message); 74 75 server.message = new MqttMessage(); 76 server.message.setQos(2); 77 server.message.setRetained(true); 78 server.message.setPayload("給客戶端125推送的資訊".getBytes()); 79 server.publish(server.topic125 , server.message); 80 81 System.out.println(server.message.isRetained() + "------ratained狀態"); 82 } 83 }
客戶端程式碼:
1 package bsit.mqtt.demo.one_way; 2 3 import java.util.concurrent.Executors; 4 import java.util.concurrent.ScheduledExecutorService; 5 import java.util.concurrent.TimeUnit; 6 7 import org.eclipse.paho.client.mqttv3.MqttClient; 8 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 9 import org.eclipse.paho.client.mqttv3.MqttException; 10 import org.eclipse.paho.client.mqttv3.MqttSecurityException; 11 import org.eclipse.paho.client.mqttv3.MqttTopic; 12 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 13 14 public class Client { 15 16 public static final String HOST = "tcp://192.168.1.3:61613"; 17 public static final String TOPIC = "toclient/124"; 18 private static final String clientid = "client124"; 19 private MqttClient client; 20 private MqttConnectOptions options; 21 private String userName = "admin"; 22 private String passWord = "password"; 23 24 private ScheduledExecutorService scheduler; 25 26 private void start() { 27 try { 28 // host為主機名,clientid即連線MQTT的客戶端ID,一般以唯一識別符號表示,MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存 29 client = new MqttClient(HOST, clientid, new MemoryPersistence()); 30 // MQTT的連線設定 31 options = new MqttConnectOptions(); 32 // 設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連線記錄,這裡設定為true表示每次連線到伺服器都以新的身份連線 33 options.setCleanSession(true); 34 // 設定連線的使用者名稱 35 options.setUserName(userName); 36 // 設定連線的密碼 37 options.setPassword(passWord.toCharArray()); 38 // 設定超時時間 單位為秒 39 options.setConnectionTimeout(10); 40 // 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端傳送個訊息判斷客戶端是否線上,但這個方法並沒有重連的機制 41 options.setKeepAliveInterval(20); 42 // 設定回撥 43 client.setCallback(new PushCallback()); 44 MqttTopic topic = client.getTopic(TOPIC); 45 //setWill方法,如果專案中需要知道客戶端是否掉線可以呼叫該方法。設定最終埠的通知訊息 46 options.setWill(topic, "close".getBytes(), 2, true); 47 48 client.connect(options); 49 //訂閱訊息 50 int[] Qos = {1}; 51 String[] topic1 = {TOPIC}; 52 client.subscribe(topic1, Qos); 53 54 } catch (Exception e) { 55 e.printStackTrace(); 56 } 57 } 58 59 public static void main(String[] args) throws MqttException { 60 Client client = new Client(); 61 client.start(); 62 } 63 }
MQTT訂閱回撥類:
1 package bsit.mqtt.demo.one_way; 2 3 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 4 import org.eclipse.paho.client.mqttv3.MqttCallback; 5 import org.eclipse.paho.client.mqttv3.MqttMessage; 6 7 /** 8 * 釋出訊息的回撥類 9 * 10 * 必須實現MqttCallback的介面並實現對應的相關介面方法CallBack 類將實現 MqttCallBack。 11 * 每個客戶機標識都需要一個回撥例項。在此示例中,建構函式傳遞客戶機標識以另存為例項資料。 12 * 在回撥中,將它用來標識已經啟動了該回調的哪個例項。 13 * 必須在回撥類中實現三個方法: 14 * 15 * public void messageArrived(MqttTopic topic, MqttMessage message)接收已經預訂的釋出。 16 * 17 * public void connectionLost(Throwable cause)在斷開連線時呼叫。 18 * 19 * public void deliveryComplete(MqttDeliveryToken token)) 20 * 接收到已經發布的 QoS 1 或 QoS 2 訊息的傳遞令牌時呼叫。 21 * 由 MqttClient.connect 啟用此回撥。 22 * 23 */ 24 public class PushCallback implements MqttCallback { 25 26 public void connectionLost(Throwable cause) { 27 // 連線丟失後,一般在這裡面進行重連 28 System.out.println("連線斷開,可以做重連"); 29 } 30 31 public void deliveryComplete(IMqttDeliveryToken token) { 32 System.out.println("deliveryComplete---------" + token.isComplete()); 33 } 34 35 public void messageArrived(String topic, MqttMessage message) throws Exception { 36 // subscribe後得到的訊息會執行到這裡面 37 System.out.println("接收訊息主題 : " + topic); 38 System.out.println("接收訊息Qos : " + message.getQos()); 39 System.out.println("接收訊息內容 : " + new String(message.getPayload())); 40 } 41 }
執行服務端程式碼,可看到伺服器會給客戶端124/125各推送一條訊息,
在執行124客戶端程式碼,可看到124客戶端接收的資訊:
然後把客戶端程式碼的Topic改為TOPIC = "toclient/125";clientid = "client125";再執行該段程式碼,可看到125客戶端接收的資訊:
多個客戶端訂閱同一主題,其clientid必不相同。客戶端124/125訂閱各自主題的內容,但是不同時間啟動,都在啟動後接收到各自資訊,這體現出了伺服器的推送功能。同樣的,傳送的主題資訊,可以在伺服器的topic可以看到,訪問路徑是:http://127.0.0.1:61680/
其實,如若服務端和客戶端相互通訊,即客戶端可以訂閱可以釋出,服務端可以訂閱也可以釋出,則可不區分服務端客戶端,兩邊程式碼幾乎一樣。類似,兩個客戶端都在訂閱同一主題,這時由第三個客戶端釋出這一主題的請求,前兩個客戶端同樣可以接受該主題的內容,這時三個客戶端的程式碼幾乎一樣,只是前兩個是訂閱,後一個是釋出。