1. 程式人生 > >MQTT-Android訂閱和釋出

MQTT-Android訂閱和釋出

訂閱和接收

//    final String serverUri = "tcp://iot.eclipse.org:1883";
    final String serverUri = "tcp://ip:port";
    String clientId = "ExampleAndroidClient";
    final String subscriptionTopic = "subscribe_topic";
    final String publishTopic = "publish_topic";
    final String publishMessage = "Hello World!"
; private static String userName = "admin"; private static String passWord = "password"; private void initConnect() { //封裝好的MQTTClient供操作,傳送和接手等設定都用它 mqttAndroidClient = new MqttAndroidClient(getApplicationContext(), serverUri, clientId); //設定回撥 mqttAndroidClient.setCallback(new
MqttCallbackExtended() { /** * 連線完成回撥 * @param reconnect true 斷開重連,false 首次連線 * @param serverURI 伺服器URI */ @Override public void connectComplete(boolean reconnect, String serverURI) { if (reconnect) { addToHistory("Reconnected to : "
+ serverURI); // Because Clean Session is true, we need to re-subscribe subscribeToTopic(); } else { addToHistory("Connected to: " + serverURI); } } /** * @desc 連線斷開回調 * 可在這裡做一些重連等操作 */ @Override public void connectionLost(Throwable cause) { addToHistory("The Connection was lost."); } /** * 訊息接收,如果在訂閱的時候沒有設定IMqttMessageListener,那麼收到訊息則會在這裡回撥。 * 如果設定了IMqttMessageListener,則訊息回撥在IMqttMessageListener中 * @param topic 該訊息來自的訂閱主題 * @param message 訊息內容 */ @Override public void messageArrived(String topic, MqttMessage message) { addToHistory("Incoming message: " + new String(message.getPayload())); } /** * 交付完成回撥。在publish訊息的時候會收到此回撥. * qos: * 0 傳送完則回撥 * 1 或 2 會在對方收到時候回撥 * @param token */ @Override public void deliveryComplete(IMqttDeliveryToken token) { addToHistory("deliveryComplete,token:" + token); } }); //mqtt連線引數設定 MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); //設定自動重連 mqttConnectOptions.setAutomaticReconnect(true); // 設定是否清空session,這裡如果設定為false表示伺服器會保留客戶端的連線記錄 // 這裡設定為true表示每次連線到伺服器都以新的身份連線 mqttConnectOptions.setCleanSession(false); //設定連線的使用者名稱 mqttConnectOptions.setUserName(userName); //設定連線的密碼 mqttConnectOptions.setPassword(passWord.toCharArray()); // 設定超時時間 單位為秒 mqttConnectOptions.setConnectionTimeout(10); // 設定會話心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向客戶端傳送個訊息判斷客戶端是否線上,但這個方法並沒有重連的機制 mqttConnectOptions.setKeepAliveInterval(20); try { //設定好相關引數後,開始連線 mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { Log.i(TAG, "onSuccess,asyncActionToken"); /*連線成功之後設定連線斷開的緩衝配置*/ DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions(); //開啟 disconnectedBufferOptions.setBufferEnabled(true); //離線後最多快取100調 disconnectedBufferOptions.setBufferSize(100); //不一直持續留存 disconnectedBufferOptions.setPersistBuffer(false); //刪除舊訊息 disconnectedBufferOptions.setDeleteOldestMessages(false); mqttAndroidClient.setBufferOpts(disconnectedBufferOptions); //訂閱主題 subscribeToTopic(); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { addToHistory("Failed to connect to: " + serverUri); } }); } catch (MqttException ex) { ex.printStackTrace(); } } /** * 訂閱主題 */ public void subscribeToTopic() { try { //主題、QOS、context,訂閱監聽,訊息監聽 mqttAndroidClient.subscribe(subscriptionTopic, 0, null, new IMqttActionListener() { @Override public void onSuccess(IMqttToken asyncActionToken) { addToHistory("Subscribed!"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { addToHistory("Failed to subscribe"); } }, new IMqttMessageListener() { /** * @desc 訊息到達回撥 * @param topic * @param message */ @Override public void messageArrived(String topic, MqttMessage message) { // message Arrived! final String messageRecive = new String(message.getPayload()); runOnUiThread(new Runnable() { @Override public void run() { addToHistory("recive message:" + messageRecive); } }); System.out.println("Message: " + topic + " : " + messageRecive); } }); } catch (MqttException ex) { System.err.println("Exception whilst subscribing"); ex.printStackTrace(); } }

傳送

使用Apollo作為伺服器來擔當訊息代理,又有mqttv3封裝庫,接收只需要訂閱主題,傳送只需往某個主題傳送訊息,在mqttv3中也已經封裝好了。

/**
     * 傳送訊息¬
     */
    private static void publish() {
        //主題
        String topic = "主題";
        //訊息內容
        String content = "來嘛,又給你推送訊息";
        //設定qos
        int qos = 0;
        //伺服器URI
        String broker = "tcp://ip:port"
        //客戶端id
        String clientId = "JavaSample";

        MemoryPersistence persistence = new MemoryPersistence();
        try {
            //傳送依舊使用MqttClient
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            sampleClient.setCallback(new MqttCallback() {
                /**
                 * 連線斷開回調
                 * @param throwable
                 */
                @Override
                public void connectionLost(Throwable throwable) {
                    System.out.println("connectionLost");
                }

                /**
                 * 訊息接收回調,如果只發送,這裡沒關係
                 * @param s
                 * @param mqttMessage
                 * @throws Exception
                 */
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    System.out.println("messageArrived:" + s);
                }

                /**
                 * 交付完成回撥。在publish訊息的時候會收到此回撥.
                 * qos:
                 * 0 傳送完則回撥
                 * 1 或 2 會在對方收到時候回撥
                 * @param iMqttDeliveryToken id,可通過id來獲取傳送的訊息內容
                 */
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    try {
                        System.out.println("deliveryComplete token:" + iMqttDeliveryToken.getMessage().toString());
                    } catch (MqttException e) {
                        e.printStackTrace();
                    }
                }
            });
            //連線操作設定
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName(userName);
            connOpts.setPassword(passWord.toCharArray());
            connOpts.setCleanSession(false);
            System.out.println("Connecting to broker: " + broker);
            sampleClient.connect(connOpts);
            System.out.println("Connected");
            System.out.println("Publishing message: " + content);

            /*傳送,使用MqttMessage封裝類來進行傳送*/
            MqttMessage message = new MqttMessage(content.getBytes());
            //設定優先順序
            message.setQos(qos);
            //設定是否被伺服器保留
            message.setRetained(true);
            //使用mqttClient進行傳送到對應主題
            sampleClient.publish(topic, message);
            if (!sampleClient.isConnected()) {
                System.out.println(mqttAndroidClient.getBufferedMessageCount() + " messages in buffer.");
            }
            System.out.println("Message published");
            //傳送完則斷開
//            sampleClient.disconnect();
            System.out.println("Disconnected");
            System.exit(0);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
  • 離線後,可以看到離線後傳送訊息是存在了快取裡,在設定了離線快取配置後,快取大小這些都由自己控制啦,待上線之後再自動進行傳送
paho.mqtt.java.example I/System.out: LOG: Message Published
paho.mqtt.java.example I/System.out: LOG: 2 messages in buffer.
 paho.mqtt.java.example I/System.out: LOG: Message Published
paho.mqtt.java.example I/System.out: LOG: 3 messages in buffer.
paho.mqtt.java.example I/System.out: LOG: Message Published
paho.mqtt.java.example I/System.out: LOG: 4 messages in buffer.
paho.mqtt.java.example I/System.out: LOG: Message Published
paho.mqtt.java.example I/System.out: LOG: 5 messages in buffer.
paho.mqtt.java.example I/System.out: LOG: Message Published
paho.mqtt.java.example I/System.out: LOG: 6 messages in buffer.

設定了最大快取條數,那麼達到條數之後,就不會再存再裡面咯。

  • QOS

qos代表服務質量,指的是交通優先順序和資源預留控制機制,而不是接收的服務質量。 服務質量是為不同應用程式,使用者或資料流提供的不同優先順序的能力,或者也可以說是為資料流保證一定的效能水平的能力。
以下是每一個服務質量級別的具體描述
0 :最多一次傳送 (只負責傳送,傳送過後就不管資料的傳送情況)
1 : 至少一次傳送 (確認資料交付)
2 :正好一次傳送 (保證資料交付成功)

demo專案地址:https://github.com/nxSin/paho.mqtt.android。該專案frok自eclipese下專案,如果是Android使用,建議參考這個庫來進行二次封裝使用,內部對其封裝的MqttAndroidClient使用了Service來進行處理連線和收發,完全適用Android場景。

相關推薦

MQTT-Android訂閱釋出

訂閱和接收 // final String serverUri = "tcp://iot.eclipse.org:1883"; final String serverUri = "tcp://ip:port"; String clie

MQTT QOS等級訂閱釋出的關係

釋出publish和訂閱subscribe都可以指定qos等級。 pub時指定的qos是跟伺服器有關係的,比如qos2時,是保證伺服器只收到一次,而不是最終的訂閱者。 訂閱者在sub時雖然指定了qos,但是收到的訊息不一定就是指定qos等級的訊息,而可能是降

Android廣告釋出的市場

首先申明這些都是複製過來的:網址:http://blog.csdn.net/listening_music/article/details/7037941 1.萬普平臺   這個平臺是目前為止收入最高的一個平臺,但也是被各發布渠道拒絕使用該平臺應用上架的廣告平臺。其主要廣告為廣告條、積

ROS 基礎: 在同一個節點裡訂閱釋出訊息

       在一些應用中,可能有的人需要在同一個節點中實現訂閱一個訊息,然後在該訊息的回撥函式中處理一下這些資料後再發布到另一個topic上。 #include <ros/ros.h>

利用redis的訂閱釋出來實現實時監控的一個DEMO(Python版本)

       redis的list型別有個很好的特性,就是每次新增元素後會返回當前list的長度,利用這個特點,我們可以監控它的長度,比如我們的key是使用者註冊的IP地址,list中存放的是已經在此IP地址上註冊的使用者的ID,當用戶數超過1000的時候來發一個告警,而r

RabbitMQ的訂閱釋出步驟詳解

一、關於RabbitMQ搭建和基本概念這裡不做介紹,下面給出實用的參考部落格 RabbitMQ基礎概念及詳細介紹參考文件:http://blog.csdn.net/whycold/article/details/41119807 RabbitMQ入門及環境的搭建:http

linux下使用hiredis非同步API實現sub/pub訊息訂閱釋出的功能

本文轉載自連結:  最近使用redis的c介面——hiredis,使客戶端與redis伺服器通訊,實現訊息訂閱和釋出(PUB/SUB)的功能,我把遇到的一些問題和解決方法列出來供大家學習。        廢話不多說,先貼程式碼。 redis_publisher.

Java for Web學習筆記(九十):訊息叢集(5)利用websocket實現訂閱釋出(上)

叢集中的訂閱和釋出 利用spring framework在本app內的訂閱和釋出十分簡單。當我們系統越來越複雜的時候,我們需要向其他app釋出訊息。本學習將給出一個通過websocket來實現不同app之間訊息的訂購和釋出。 在小例子中,我們在所有節點之間都建立webSoc

MQTT——取消訂閱報文斷開連接報文

style ima es2017 alt 同時 消息 mage xxxxxxxx logs 筆者已經把連接報文,訂閱報文,發布報文都講解了完了。而接下來就是取消訂閱報文和斷開連接報文。和其他的報文比較的話,他們顯示非常簡單。甚至筆者覺得可以不必要拿出來講。只要看一下MQTT

Redis管道釋出訂閱

管道:原子性執行命令 ''' redis-py預設在執行每次請求都會建立(連線池申請連線)和斷開(歸還連線池)一次連線操作, 如果想要在一次請求中指定多個命令,則可以使用pipline實現一次請求指定多個命令, 並且預設情況下一次pipline 是原子性操作 ''' import redis poo

Redis瑞士軍刀:慢查詢,Pipeline釋出訂閱

1.慢查詢 1.1 慢查詢的生命週期 步驟一:client通過網路向Redis傳送一條命令 步驟二:由於Redis是單執行緒應用,可以把Redis想像成一個佇列,client執行的所有命令都在排隊等著server端執行 步驟三:Redis服務端按順序執行命令 步驟四:server端把命令結果通過網路返回給cl

觀察者釋出訂閱模式的區別

       Pub-Sub  Pattern          在“釋出者-訂閱者”模式中,稱為釋出者的訊息傳送者不會將訊息程式設計為直接傳送給稱為訂閱者的特定接收者。這意味著釋出者和訂閱者不知道彼此的存在。存在第三個元件,稱為代理或訊息代理或事件匯流排,它由釋出者和訂閱者都知道,它過濾所有傳入的訊息並相

面向大眾的移動技術:簽名,封裝釋出Android app

作者: Andrew Glover 原文地址 譯者:Ahaha  校對:趙峰 面向大從的移動打樁其它四篇文章地址(校對新增): 新增一個多選擇quiz到你的Android手機app,然後用一個安全數字證書籤名 用網路邏輯,內容為王。但是對與手機使用者來說,互動規則才是王道。對移動app

設計模式——觀察者釋出訂閱模式

最近在學習設計模式,本文就同一個例子對觀察者和釋出訂閱進行探討。觀察者模式    比較概念的解釋是,目標和觀察者是基類,目標提供維護觀察者的一系列方法,觀察者提供更新介面。具體觀察者和具體目標繼承各自的基類,然後具體觀察者把自己註冊到具體目標裡,在具體目標發生變化時候,排程觀

Apache Apollo 服務端搭建與mqtt客戶端(java)訂閱釋出的實現。

注意:本部落格純屬娛樂,不接受任何批評!一、Apache Apollo伺服器搭建:       略。       搭建好後訪問http://localhost:61680  登入賬號密碼:admin/password,然後熟悉下頁面。二、java版客戶端實現訂閱與釋出    

MQTT協議之訂閱釋出(使用paho-mqtt-client或mqttv3實現)

另外一個MQTT釋出訂閱客戶端paho-mqtt-client或mqttv3採用回撥的方式實現訊息的接收,下面看一下實現: 1.訊息接收回調類 [java] view plain copy  print? package cn.smartslim.mqt

redis的訊息佇列釋出訂閱demo

以前做online judge的時候用mysql+時間戳做訊息佇列,現在redis提供了一種現成的訊息佇列的模式,使用redis佇列可以直接模擬訊息通訊的方式,在將併發轉化為非併發時非常有用,同時通訊的雙方不需要關注彼此的資訊,實現解耦合。比如使用者提交了程式碼,我後臺往訊

觀察者模式(Observer)釋出(Publish/訂閱模式(Subscribe)的區別

觀察者模式(Observer)和釋出(Publish/訂閱模式(Subscribe)的區別 在翻閱資料的時候,有人把觀察者(Observer)模式等同於釋出(Publish)/訂閱(Subscribe)模式,也有人認為這兩種模式還是存在差異,而我認為確實是存在差異的,本質上的區別是排程的地方不同。

ROS學習--RoboWare Studio的使用釋出器/訂閱器的編寫與測試

開始ROS學習之前,先按照官網教程和其他老大們寫的部落格安裝好ROS,再安裝一個RoboWare-Studio,雖然用Qt和Eclipse也行,但比較麻煩,這個很方便。 按照大多網路教材的順序,我們先來做一個釋出器和訂閱器。步驟如下: 1.在RoboWare

EMQ程序樹/MQTT連線/訂閱/釋出原始碼流程分析

寫在前面EMQ作為一款優秀的開源MQTT broker,從一些庫的使用以及框架的設計,可以看出作者也是非常精通Erlang的大牛!比如說對於叢集化,作者自己實現了Ekka庫,對於網路併發,作者自己實現了esockd。首先不論這些庫效能如何,但敢於自己造輪子,會造輪子,絕對是對