1. 程式人生 > >MQTT客戶端JAVA程式碼----fusesource mqtt-client

MQTT客戶端JAVA程式碼----fusesource mqtt-client

fusesource版本:mqtt-client-1.7-uber.jar
下載地址:https://github.com/fusesource/mqtt-client

fusesource提供三種mqtt client api: 阻塞API,基於Futur的API和回撥API。其中,回撥API是最複雜的也是效能最好的,另外兩種均是對回撥API的封裝。 我們下面就簡單介紹一下回調API的使用方法。


import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.hawtdispatch.Dispatch;


import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.Callback;
import org.fusesource.mqtt.client.CallbackConnection;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.Listener;
import org.fusesource.mqtt.client.MQTT;

import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;

public class MqttClient {
public static void main(String[] args)
{
 try {
   MQTT mqtt=new MQTT();



   //MQTT設定說明
   mqtt.setHost("tcp://10.1.58.191:1883");
   mqtt.setClientId("876543210"); //用於設定客戶端會話的ID。在setCleanSession(false);被呼叫時,MQTT伺服器利用該ID獲得相應的會話。此ID應少於23個字元,預設根據本機地址、埠和時間自動生成
   mqtt.setCleanSession(false); //若設為false,MQTT伺服器將持久化客戶端會話的主體訂閱和ACK位置,預設為true
   mqtt.setKeepAlive((short) 60);//定義客戶端傳來訊息的最大時間間隔秒數,伺服器可以據此判斷與客戶端的連線是否已經斷開,從而避免TCP/IP超時的長時間等待
   mqtt.setUserName("admin");//伺服器認證使用者名稱
   mqtt.setPassword("admin");//伺服器認證密碼

   mqtt.setWillTopic("willTopic");//設定“遺囑”訊息的話題,若客戶端與伺服器之間的連線意外中斷,伺服器將釋出客戶端的“遺囑”訊息
   mqtt.setWillMessage("willMessage");//設定“遺囑”訊息的內容,預設是長度為零的訊息
   mqtt.setWillQos(QoS.AT_LEAST_ONCE);//設定“遺囑”訊息的QoS,預設為QoS.ATMOSTONCE
   mqtt.setWillRetain(true);//若想要在釋出“遺囑”訊息時擁有retain選項,則為true
   mqtt.setVersion("3.1.1");

   //失敗重連線設定說明
   mqtt.setConnectAttemptsMax(10L);//客戶端首次連線到伺服器時,連線的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,預設為-1
   mqtt.setReconnectAttemptsMax(3L);//客戶端已經連線到伺服器,但因某種原因連線斷開時的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,預設為-1
   mqtt.setReconnectDelay(10L);//首次重連線間隔毫秒數,預設為10ms
   mqtt.setReconnectDelayMax(30000L);//重連線間隔毫秒數,預設為30000ms
   mqtt.setReconnectBackOffMultiplier(2);//設定重連線指數迴歸。設定為1則停用指數迴歸,預設為2

   //Socket設定說明
         mqtt.setReceiveBufferSize(65536);//設定socket接收緩衝區大小,預設為65536(64k)
         mqtt.setSendBufferSize(65536);//設定socket傳送緩衝區大小,預設為65536(64k)
         mqtt.setTrafficClass(8);//設定傳送資料包頭的流量型別或服務型別欄位,預設為8,意為吞吐量最大化傳輸

         //頻寬限制設定說明
         mqtt.setMaxReadRate(0);//設定連線的最大接收速率,單位為bytes/s。預設為0,即無限制
         mqtt.setMaxWriteRate(0);//設定連線的最大發送速率,單位為bytes/s。預設為0,即無限制

         //選擇訊息分發佇列
         mqtt.setDispatchQueue(Dispatch.createQueue("foo"));//若沒有呼叫方法setDispatchQueue,客戶端將為連線新建一個佇列。如果想實現多個連線使用公用的佇列,顯式地指定佇列是一個非常方便的實現方法

         //設定跟蹤器
   mqtt.setTracer(new Tracer(){
             @Override
             public void onReceive(MQTTFrame frame) {
                 System.out.println("recv: "+frame);
             }

             @Override
             public void onSend(MQTTFrame frame) {
                 System.out.println("send: "+frame);
             }

             @Override
             public void debug(String message, Object... args) {
                 System.out.println(String.format("debug: "+message, args));
             }
         });



         //使用回撥式API
   final CallbackConnection callbackConnection=mqtt.callbackConnection();

   //連線監聽
   callbackConnection.listener(new Listener() {

   //接收訂閱話題釋出的訊息
   @Override
   public void onPublish(UTF8Buffer topic, Buffer payload, Runnable onComplete) {
    System.out.println("=============receive msg================"+new String(payload.toByteArray()));
     onComplete.run();
   }

   //連線失敗
   @Override
   public void onFailure(Throwable value) {
    System.out.println("===========connect failure===========");
    callbackConnection.disconnect(null);
   }

      //連線斷開
   @Override
   public void onDisconnected() {
    System.out.println("====mqtt disconnected=====");

   }

   //連線成功
   @Override
   public void onConnected() {
    System.out.println("====mqtt connected=====");

   }
  });



   //連線
   callbackConnection.connect(new Callback() {

     //連線失敗
       public void onFailure(Throwable value) {
           System.out.println("============連線失敗:"+value.getLocalizedMessage()+"============");
       }

       // 連線成功
       public void onSuccess(Void v) {

           //訂閱主題
           Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
           callbackConnection.subscribe(topics, new Callback() {
               //訂閱主題成功
            public void onSuccess(byte[] qoses) {
                   System.out.println("========訂閱成功=======");
               }
            //訂閱主題失敗
               public void onFailure(Throwable value) {
                System.out.println("========訂閱失敗=======");
                callbackConnection.disconnect(null);
               }
           });


            //釋出訊息
           callbackConnection.publish("foo", ("Hello ").getBytes(), QoS.AT_LEAST_ONCE, true, new Callback() {
               public void onSuccess(Void v) {
                 System.out.println("===========訊息釋出成功============");
               }
               public void onFailure(Throwable value) {
                System.out.println("========訊息釋出失敗=======");
                callbackConnection.disconnect(null);
               }
           });


       }
   });



   while(true)
   {

   }


 } catch (Exception e) {
  e.printStackTrace();
 }

}
}