1. 程式人生 > >MQTT協議之訂閱及釋出(使用paho-mqtt-client或mqttv3實現)

MQTT協議之訂閱及釋出(使用paho-mqtt-client或mqttv3實現)

另外一個MQTT釋出訂閱客戶端paho-mqtt-client或mqttv3採用回撥的方式實現訊息的接收,下面看一下實現:

1.訊息接收回調類

[java] view plain copy  print?
  1. package cn.smartslim.mqtt.demo.paho;  
  2. import org.eclipse.paho.client.mqttv3.MqttCallback;  
  3. import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;  
  4. import org.eclipse.paho.client.mqttv3.MqttMessage;  
  5. import org.eclipse.paho.client.mqttv3.MqttTopic;  
  6. /**  
  7.  * 釋出訊息的回撥類  
  8.  *   
  9.  * 必須實現MqttCallback的介面並實現對應的相關介面方法  
  10.  *      ◦CallBack 類將實現 MqttCallBack。每個客戶機標識都需要一個回撥例項。在此示例中,建構函式傳遞客戶機標識以另存為例項資料。在回撥中,將它用來標識已經啟動了該回調的哪個例項。  
  11.  *  ◦必須在回撥類中實現三個方法:  
  12.  *   
  13.  *  public void messageArrived(MqttTopic topic, MqttMessage message) 
     
  14.  *  接收已經預訂的釋出。  
  15.  *   
  16.  *  public void connectionLost(Throwable cause)  
  17.  *  在斷開連線時呼叫。  
  18.  *   
  19.  *  public void deliveryComplete(MqttDeliveryToken token))  
  20.  *      接收到已經發布的 QoS 1 或 QoS 2 訊息的傳遞令牌時呼叫。  
  21.  *  ◦由 MqttClient.connect 啟用此回撥。  
  22.  *   
  23.  */
  24. publicclass PushCallback implements MqttCallback {  
  25.     public
    void connectionLost(Throwable cause) {  
  26.         // 連線丟失後,一般在這裡面進行重連
  27.         System.out.println("連線斷開,可以做重連");  
  28.     }  
  29.     publicvoid deliveryComplete(MqttDeliveryToken token) {  
  30.         // publish後會執行到這裡
  31.         System.out.println("deliveryComplete---------"+ token.isComplete());  
  32.     }  
  33.     publicvoid messageArrived(MqttTopic topic, MqttMessage message) throws Exception {  
  34.         // subscribe後得到的訊息會執行到這裡面
  35.         System.out.println("接收訊息主題:"+topic.getName());  
  36.         System.out.println("接收訊息Qos:"+message.getQos());  
  37.         System.out.println("接收訊息內容:"+new String(message.getPayload()));  
  38.     }  
  39. }  
2.服務端釋出訊息 [java] view plain copy  print?
  1. package cn.smartslim.mqtt.demo.paho;  
  2. import org.eclipse.paho.client.mqttv3.MqttClient;  
  3. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;  
  4. import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;  
  5. import org.eclipse.paho.client.mqttv3.MqttException;  
  6. import org.eclipse.paho.client.mqttv3.MqttMessage;  
  7. import org.eclipse.paho.client.mqttv3.MqttPersistenceException;  
  8. import org.eclipse.paho.client.mqttv3.MqttTopic;  
  9. import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence;  
  10. publicclass Server {  
  11.     publicstaticfinal String HOST = "tcp://192.168.36.102:1883";  
  12.     publicstaticfinal String TOPIC = "tokudu/yzq124";  
  13.     privatestaticfinal String clientid ="server";   
  14.     private MqttClient client;  
  15.     private MqttTopic topic;  
  16.     private String userName = "test";  
  17.     private String passWord = "test";  
  18.     private MqttMessage message;  
  19.     public Server() throws MqttException {  
  20.          //MemoryPersistence設定clientid的儲存形式,預設為以記憶體儲存
  21.         client = new MqttClient(HOST, clientid, new MemoryPersistence());  
  22.         connect();  
  23.     }  
  24.     privatevoid connect() {  
  25.         MqttConnectOptions options = new MqttConnectOptions();  
  26.         options.setCleanSession(false);  
  27.         options.setUserName(userName);  
  28.         options.setPassword(passWord.toCharArray());  
  29.         // 設定超時時間
  30.         options.setConnectionTimeout(10);  
  31.         // 設定會話心跳時間
  32.         options.setKeepAliveInterval(20);  
  33.         try {  
  34.                client.setCallback(new PushCallback());  
  35.                client.connect(options);  
  36.                topic = client.getTopic(TOPIC);  
  37.         } catch (Exception e) {  
  38.                e.printStackTrace();  
  39.         }  
  40.     }  
  41.     publicvoid publish(MqttMessage message) throws MqttPersistenceException, MqttException{  
  42.         MqttDeliveryToken token = topic.publish(message);  
  43.         token.waitForCompletion();  
  44.         System.out.println(token.isComplete()+"========");  
  45.     }  
  46.     publicstaticvoid main(String[] args) throws MqttException {  
  47.         Server server =  new Server();  
  48.         server.message = new MqttMessage();  
  49.         server.message.setQos(1);  
  50.         server.message.setRetained(true);  
  51.         server.message.setPayload("eeeeeaaaaaawwwwww---".getBytes());  
  52.          server.publish(server.message);  
  53.          System.out.println(server.message.isRetained()+"------ratained狀態");  
  54.     }  
  55. }  
3.客戶端接收訊息 [java] view plain copy  print?
  1. package cn.smartslim.mqtt.demo.paho;  
  2. import java.util.concurrent.Executors;  
  3. import java.util.concurrent.ScheduledExecutorService;  
  4. import java.util.concurrent.TimeUnit;  
  5. import org.eclipse.paho.client.mqttv3.MqttClient;  
  6. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;  
  7. import org.eclipse.paho.client.mqttv3.MqttException;  
  8. import org.eclipse.paho.client.mqttv3.MqttSecurityException;  
  9. import org.eclipse.paho.client.mqttv3.MqttTopic;  
  10. import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence;  
  11. publicclass Client {  
  12.     publicstaticfinal String HOST = "tcp://192.168.36.102:1883";  
  13.     publicstaticfinal String TOPIC = "tokudu/yzq124";  
  14.     privatestaticfinal String clientid = "client";  
  15.     private MqttClient client;  
  16.     private MqttConnectOptions options;  
  17.     private String userName = "test";  
  18.     private String passWord = "test";  
  19.     private ScheduledExecutorService scheduler;  
  20.     //重新連結
  21.     publicvoid startReconnect() {  
  22.         scheduler = Executors.newSingleThreadScheduledExecutor();  
  23.         scheduler.scheduleAtFixedRate(new Runnable() {  
  24.             publicvoid run() {  
  25.                 if (!client.isConnected()) {  
  26.                     try {  
  27.                         client.connect(options);  
  28. 相關推薦

    MQTT協議訂閱釋出使用paho-mqtt-clientmqttv3實現

    另外一個MQTT釋出訂閱客戶端paho-mqtt-client或mqttv3採用回撥的方式實現訊息的接收,下面看一下實現: 1.訊息接收回調類 [java] view plain copy  print? package cn.smartslim.mqt

    使用MQTT協議訂閱釋出模式,實現從伺服器推送訊息到客戶端功能。

    3、手機端設定相同的連線主題引數,訂閱訊息。關鍵程式碼:PC端//伺服器埠 賬號 密碼 private String host = "tcp://127.0.0.1:61613"; private String userName = "admin"; private Stri

    solr搜索入門原理

    solr solr入門 1 solr簡介solr官方文檔:http://wiki.apache.org/solr/DataImportHandler 下載地址:http://www.apache.org/dyn/closer.cgi/lucene/solr/2 solr入門我們使

    【MySQL資料庫】效能優化索引優化

    一、Mysql效能優化之影響效能的因素 1.商業需求的影響 不合理的需求造成的資源投入產出,這裡就用一個看上去很簡單的功能分析。需求:一個論壇帖子的總量統計,附加要求:實時更新。從功能上看來是非常容易實現的,執行一條select count(*)from表名就可以得到結果,但是如果我們採

    MQTT協議Mosquitto安裝和使用

        Mosquitto是一款實現了 MQTT v3.1 協議的開源訊息代理軟體,提供輕量級的,支援釋出/訂閱的的訊息推送模式,使裝置對裝置之間的短訊息通訊簡單易用。 1.下載解壓、安裝Mosquitto:http://mosquitto.org/files/source

    mqtt協議 activeMq、apollo的使用

    在這裡先講講activeMq 與 apollo 的關係:         apollo 是 ActiveMQ的子工程,是 ActiveMQ的下一代訊息代理。         apollo 是一

    安卓快取LruCache設計非同步+快取圖片載入器LruCacheImageLoader

    一、LruCache LruCache是一套記憶體快取的解決方案,演算法基於LRU。 LRU:Least Recently Used(近期最少使用)。LruCache基於LRU演算法的快取策略。 LruCache是一個泛型類,其以強引用的方式

    Java for Web學習筆記九十:訊息和叢集5利用websocket實現訂閱釋出

    叢集中的訂閱和釋出 利用spring framework在本app內的訂閱和釋出十分簡單。當我們系統越來越複雜的時候,我們需要向其他app釋出訊息。本學習將給出一個通過websocket來實現不同app之間訊息的訂購和釋出。 在小例子中,我們在所有節點之間都建立webSoc

    MQTT QOS等級訂閱釋出的關係

    釋出publish和訂閱subscribe都可以指定qos等級。 pub時指定的qos是跟伺服器有關係的,比如qos2時,是保證伺服器只收到一次,而不是最終的訂閱者。 訂閱者在sub時雖然指定了qos,但是收到的訊息不一定就是指定qos等級的訊息,而可能是降

    Django專案建立---Templates擴充套件

    1.建立步驟 在APP的根目錄下建立名為templates的目錄,然後在該目錄 下建立HTML檔案(檔案上滑鼠右鍵,選擇New–>HTML File),命名為index.html,程式碼如下: <!DOCTYPE html> &l

    MQTT協議moquette 安裝使用

        在MQTT 官網 (http://mqtt.org/software)中有眾多MQTT的實現方式。具體參看官網,Moquette是基於netty(老版本使用的是mina) 的模型的一個Java MQTT broker,支援websocket,SSL。 如果想直接啟動

    安卓快取DiskLruCache設計非同步+快取圖片載入器DiskCacheImageLoader

    DiskLruCache DiskLruCache是一套硬碟快取的解決方案,演算法同LruCache基於LRU演算法。 DiskLruCache不是由Google官方編寫的,這個類沒有被包含在Android API當中。這個類需要從網上下載

    2017面向對象程序設計Java 第1周學習指導要求2017.8.24-2017.8.27

    令行 str applet 面向 學習目標 對象 com 變量 課程學習 2017面向對象程序設計(Java) 第1周學習指導及要求(2017.8.24-2017.8.27) 學習目標 了解課程上課方式及老師教學要求,掌握課程學習必要的軟件工具; 簡單了解Java特點

    Fortify漏洞Cross-Site ScriptingXSS 跨站腳本攻擊

    puts 私人 解決方案 sta 行為 sel getpara image 字母   書接上文,繼續對Fortify漏洞進行總結,本篇主要針對XSS跨站腳步攻擊漏洞進行總結如下: 1、Cross-Site Scripting(XSS 跨站腳本攻擊) 1.1、產生原因: 1.

    周記A Fresh Start2018/9/2-2018/9/8

    自己 計算 三分 專業課 探索 可見 不出 技術 學習過程 新學期、新開始、新面貌、新姿態、新目標、新動力……希望自己不忘初心,在自己的地圖上摸索自己的路,然後一直走下去,永不回頭。在此平臺立下一個flag:至少每周一記,包括本周內所做所想所感所悟,繼而更加堅定以後的征程,

    axios配置使用發起請求時帶上token

    ima exp The push .post 設置 export host 接口 1.安裝 利用npm安裝 npm install axios --save 2.引入即可使用 import axios from ‘axios‘ 3.目錄 4.各個文件設置: (1

    java8重新認識HashMap轉自美團技術團隊

    java8之重新認識HashMap(轉自美團技術團隊) java8之重新認識HashMap 摘要 HashMap是Java程式設計師使用頻率最高的用於對映(鍵值對)處理的資料型別。隨著JDK(Java Developmet Kit)版本的更新,JDK1.8對HashMap底層的

    週記期中已至2018/10/29-2018/11/5

    開學至今,已有倆月,課程基本都已過半,意味著前一半的時光一去不復返,後一半的時光要承受著更沉重的負擔,硬體的課很多很難懂很抽象,《嵌入式系統》、《計算機組成原理》,還有很多專業性很強的課,《計算機網路》、《Linux作業系統》、《.net開發》還有《專業外語》,總之,要著手準備複習。 今天上的《計算機組成原

    楊老師課堂JavaScript懸浮事件滑鼠移入移出事件

     今天給大家分享一個簡單的JavaScript事件案例: 該事件屬於懸浮事件 改程式碼邏輯非常簡單,主要是 當滑鼠移動到按鈕上顯示一個盒子,移開之後盒子隱藏 JavaScript事件中     onmouseover 代表的是滑鼠指標移動到指定的物件

    軟體測試流程規範參考大華為的規範

    軟體測試流程及規範(參考大華為的規範) 參考某大佬(窩真不知道是哪位大佬)總結的測試流程並結合在華為做測試學到的規範,整理的我們公司的測試流程,分享是一種美德,so開始你的閱讀吧~ 軟體測試流程及規範 一、目標 制定完整且具體的測試路線和流程,為快速、高效和高質量的軟體測試提供基礎流