MQTT客戶端JAVA程式碼----fusesource mqtt-client
下載地址:https://github.com/fusesource/mqtt-client
fusesource提供三種mqtt client api: 阻塞API,基於Futur的API和回撥API。其中,回撥API是最複雜的也是效能最好的,另外兩種均是對回撥API的封裝。 我們下面就簡單介紹一下回調API的使用方法。
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.hawtdispatch.DispatchQueue;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.Callback;
import org.fusesource.mqtt.client.CallbackConnection;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.Listener;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.fusesource.mqtt.client.Tracer;
import org.fusesource.mqtt.codec.MQTTFrame;
public class MqttClient {
public static void main(String[] args)
{
try {
MQTT mqtt=new MQTT();
//MQTT設定說明
mqtt.setHost("tcp://10.1.58.191:1883");
mqtt.setClientId("876543210"); //用於設定客戶端會話的ID。在setCleanSession(false);被呼叫時,MQTT伺服器利用該ID獲得相應的會話。此ID應少於23個字元,預設根據本機地址、埠和時間自動生成
mqtt.setCleanSession(false); //若設為false,MQTT伺服器將持久化客戶端會話的主體訂閱和ACK位置,預設為true
mqtt.setKeepAlive((short) 60);//定義客戶端傳來訊息的最大時間間隔秒數,伺服器可以據此判斷與客戶端的連線是否已經斷開,從而避免TCP/IP超時的長時間等待
mqtt.setUserName("admin");//伺服器認證使用者名稱
mqtt.setPassword("admin");//伺服器認證密碼
mqtt.setWillTopic("willTopic");//設定“遺囑”訊息的話題,若客戶端與伺服器之間的連線意外中斷,伺服器將釋出客戶端的“遺囑”訊息
mqtt.setWillMessage("willMessage");//設定“遺囑”訊息的內容,預設是長度為零的訊息
mqtt.setWillQos(QoS.AT_LEAST_ONCE);//設定“遺囑”訊息的QoS,預設為QoS.ATMOSTONCE
mqtt.setWillRetain(true);//若想要在釋出“遺囑”訊息時擁有retain選項,則為true
mqtt.setVersion("3.1.1");
//失敗重連線設定說明
mqtt.setConnectAttemptsMax(10L);//客戶端首次連線到伺服器時,連線的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,預設為-1
mqtt.setReconnectAttemptsMax(3L);//客戶端已經連線到伺服器,但因某種原因連線斷開時的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,預設為-1
mqtt.setReconnectDelay(10L);//首次重連線間隔毫秒數,預設為10ms
mqtt.setReconnectDelayMax(30000L);//重連線間隔毫秒數,預設為30000ms
mqtt.setReconnectBackOffMultiplier(2);//設定重連線指數迴歸。設定為1則停用指數迴歸,預設為2
//Socket設定說明
mqtt.setReceiveBufferSize(65536);//設定socket接收緩衝區大小,預設為65536(64k)
mqtt.setSendBufferSize(65536);//設定socket傳送緩衝區大小,預設為65536(64k)
mqtt.setTrafficClass(8);//設定傳送資料包頭的流量型別或服務型別欄位,預設為8,意為吞吐量最大化傳輸
//頻寬限制設定說明
mqtt.setMaxReadRate(0);//設定連線的最大接收速率,單位為bytes/s。預設為0,即無限制
mqtt.setMaxWriteRate(0);//設定連線的最大發送速率,單位為bytes/s。預設為0,即無限制
//選擇訊息分發佇列
mqtt.setDispatchQueue(Dispatch.createQueue("foo"));//若沒有呼叫方法setDispatchQueue,客戶端將為連線新建一個佇列。如果想實現多個連線使用公用的佇列,顯式地指定佇列是一個非常方便的實現方法
//設定跟蹤器
mqtt.setTracer(new Tracer(){
@Override
public void onReceive(MQTTFrame frame) {
System.out.println("recv: "+frame);
}
@Override
public void onSend(MQTTFrame frame) {
System.out.println("send: "+frame);
}
@Override
public void debug(String message, Object... args) {
System.out.println(String.format("debug: "+message, args));
}
});
//使用回撥式API
final CallbackConnection callbackConnection=mqtt.callbackConnection();
//連線監聽
callbackConnection.listener(new Listener() {
//接收訂閱話題釋出的訊息
@Override
public void onPublish(UTF8Buffer topic, Buffer payload, Runnable onComplete) {
System.out.println("=============receive msg================"+new String(payload.toByteArray()));
onComplete.run();
}
//連線失敗
@Override
public void onFailure(Throwable value) {
System.out.println("===========connect failure===========");
callbackConnection.disconnect(null);
}
//連線斷開
@Override
public void onDisconnected() {
System.out.println("====mqtt disconnected=====");
}
//連線成功
@Override
public void onConnected() {
System.out.println("====mqtt connected=====");
}
});
//連線
callbackConnection.connect(new Callback() {
//連線失敗
public void onFailure(Throwable value) {
System.out.println("============連線失敗:"+value.getLocalizedMessage()+"============");
}
// 連線成功
public void onSuccess(Void v) {
//訂閱主題
Topic[] topics = {new Topic("foo", QoS.AT_LEAST_ONCE)};
callbackConnection.subscribe(topics, new Callback() {
//訂閱主題成功
public void onSuccess(byte[] qoses) {
System.out.println("========訂閱成功=======");
}
//訂閱主題失敗
public void onFailure(Throwable value) {
System.out.println("========訂閱失敗=======");
callbackConnection.disconnect(null);
}
});
//釋出訊息
callbackConnection.publish("foo", ("Hello ").getBytes(), QoS.AT_LEAST_ONCE, true, new Callback() {
public void onSuccess(Void v) {
System.out.println("===========訊息釋出成功============");
}
public void onFailure(Throwable value) {
System.out.println("========訊息釋出失敗=======");
callbackConnection.disconnect(null);
}
});
}
});
while(true)
{
}
} catch (Exception e) {
e.printStackTrace();
}
}
}