mqtt基於paho的訊息訂閱接收的JAVA程式碼
阿新 • • 發佈:2018-12-30
看到網上的部分程式碼,對於訂閱主題後,使用mqttCallBack介面來接收訊息,雖然這種方法也可以接收到訊息,但是mqtt Paho提供了正規的方法去接收訊息,這裡分享一下自己的demo。
使用callback介面訂閱類:
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import edu.jia.pub.Publish; public class Subscribe { public static final String HOST = "tcp://116.196.99.111:1883"; public static final String TOPIC = "topic"; private static final String clientid = "Client Subscribe"; private MqttClient client; private MqttConnectOptions options; private String msg = null; // private String userName = "admin"; // private String passWord = "password"; // private ScheduledExecutorService scheduler; public MqttClient connect() throws MqttException { // host為主機名,clientid即連線MQTT的客戶端ID,一般以唯一識別符號表示,MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存 this.client = new MqttClient(HOST, clientid, new MemoryPersistence()); // MQTT的連線設定 options = new MqttConnectOptions(); // 設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連線記錄,這裡設定為true表示每次連線到伺服器都以新的身份連線 options.setCleanSession(true); // 設定連線的使用者名稱 // options.setUserName(userName); // // 設定連線的密碼 // options.setPassword(passWord.toCharArray()); // 設定超時時間 單位為秒 options.setConnectionTimeout(10); // 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端傳送個訊息 判斷客戶端是否線上,但這個方法並沒有重連的機制 options.setKeepAliveInterval(20); // 設定回撥 this.client.setCallback(new MqttCallback() { public void connectionLost(Throwable cause) { // 連線丟失後,一般在這裡面進行重連 System.out.println("連線斷開,可以做重連"); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe後得到的訊息會執行到這裡面 System.out.println("接收訊息的主題 : " + topic); System.out.println("接收訊息的質量Qos : " + message.getQos()); // msg = new String(message.getPayload()); System.out.println(">>>>>>>>>>>>>>>>>>>" + msg); } }); // MqttTopic topic = client.getTopic(TOPIC); // setWill方法,如果專案中需要知道客戶端是否掉線可以呼叫該方法。設定最終埠的通知訊息 // options.setWill(topic, "close".getBytes(), 2, true); this.client.connect(options); return this.client; } public IMqttToken subscribe(MqttClient client) throws MqttException { // 訂閱訊息 int Qos = 2; String topic1 = TOPIC; IMqttToken token = client.subscribeWithResponse(topic1, Qos); String str = new String(token.getResponse().getPayload()); System.out.println("============================" + str); return token; } public static void main(String[] args) throws Throwable { System.out.println("下發配置"); Thread.sleep(3000); System.out.println("配置已經下發"); Thread.sleep(3000); System.out.println("監聽回傳訊息"); Subscribe sub = new Subscribe(); MqttClient client = sub.connect(); Thread.sleep(3000); System.out.println("建立連線"); IMqttToken token = sub.subscribe(client); if (token.isComplete()) { System.out.println("完成訂閱"); Publish Publish = new Publish(); Publish.setMessage(new MqttMessage()); Publish.getMessage().setQos(2); Publish.getMessage().setRetained(true); Publish.getMessage().setPayload("2018/06-------mqtt服務端測試 msg2".getBytes()); // 重寫publish方法 Publish.publish(Publish.getTopic(), Publish.getMessage()); Publish.getClient().disconnect(); } if (sub.msg == null) { Thread.sleep(3000); if (sub.msg!=null) { System.out.println("====================" + sub.msg); } else { System.out.println("time out "); } } } }
使用subscribe方法接收訊息:
package edu.jia.sub; import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class TestSub { private static int qos = 2; private static String broker = "tcp://116.196.99.111:1883"; private static MqttClient connect(String clientId) throws MqttException{ MemoryPersistence persistence = new MemoryPersistence(); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(false); connOpts.setConnectionTimeout(10); connOpts.setKeepAliveInterval(20); MqttClient mqttClient = new MqttClient(broker, clientId, persistence); mqttClient.connect(connOpts); return mqttClient; } public static void sub(MqttClient mqttClient,String topic) throws MqttException{ int[] Qos = {qos}; String[] topics = {topic}; mqttClient.subscribe(topics, Qos); } private static void runsub(String clientId, String topic) throws MqttException{ MqttClient mqttClient = connect(clientId); if(mqttClient != null){ sub(mqttClient,topic); } mqttClient.subscribe(topic,2, new IMqttMessageListener() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // TODO Auto-generated method stub System.out.println(new String(message.getPayload())); } }); } public static void main(String[] args) throws MqttException{ runsub("testSub", "test"); } }
測試:在遠端主機,開啟mqtt服務:mosquitto -c /etc/mosquitto/mosquitto.conf
mostuitto_pub -p 1883 -q 2 -t "test " -m "test ---- ok "
多說幾句:subscribe提供了三個過載方法,大家可以根據自己的需要選擇需要的過載函式,如果對於listener要求比較高的話,可以將內部類單獨寫出來繼承介面即可。