MQTT簡單demo(java)
上次已經簡單的談了一些MQTT協議的一些知識,今天就來就上次的知識具體的Java實現。
現在就來具體說說實現這一步吧。中間的時間也是有點久。
MQTT消息的發送和訂閱都是依賴MQTT服務器的,沒有MQTT服務器,你的客戶端是無法訂閱和發送消息的。所以在最開始的時候,可以選擇性的在你的電腦上面安裝一個MQTT服務器。MQTT服務器有很多,大家也可以在網上去找一些安裝教程,這裏因為和我要講內容關系不大,所以不再累述。
MQTT協議中是沒有發送者和接收者·的概念,所有的連接都是用戶,所以一個MQTT連接既可以發送消息,也可以接收消息。就等於所有的連接都是客戶端。下面我的客戶端代碼也是如此,因為公司這邊接收的信息先是要進行認證,認證成功後再接收有用的信息。這時,客戶端在根據設備的信息來控制網關上面的設備,達到遠程控制設備的目的。因為要使用服務器來轉發消息,所以對於服務器的測試也是比較重要的,但是我使用的是公司的服務器,所以這一塊我的了解比較少。但是我這邊有一些工具,谷歌瀏覽器的插件MQTTLens。可能會幫助你。(需要翻閱墻體)
MQTT使用的庫也是有很多的,下面的網址也是列舉了MQTT支持的庫,有java的,也有c的。網址如下:https://github.com/mqtt/mqtt.github.io/wiki/libraries。因為最開始我的接觸還是比較淺,使用的是:Fusesource mqtt-client。所以java的demo也是基於這個庫的,但是後來和spring整合的時候發現有一些問題,因為spring支持的只有一個庫,就是Eclipse Paho Java。但是原理都是一樣的,大家可以自己去決定,我的簡單的demo代碼還是基於Fusesource mqtt-client。在下一篇Spring和MQTT整合中使用的是Eclipse Paho Java。
下面就說一說具體的思路,這邊我的代碼是基於公司的網關需求,所以先說一說公司網關的具體流程。首先,網關會一直發送身份驗證消息,等待客戶端認證,客戶端認證通過後,會發送具體有用的信息。客戶端這時在根據網關信息發送控制命令,到達控制的目的。在這個過程中,客戶端有訂閱和發送,所以一個客戶端就練習了發送消息和訂閱消息。這就是公司的具體操作流程。下面就說一說代碼的流程。
運行時要使用jar包,也可使用maven,但是使用maven時要註意版本。
具體的jar包和maven依賴在網址:https://gitee.com/iots/mqtt-client
依賴為:
<dependency> <groupId>org.fusesource.mqtt-client</groupId> <artifactId>mqtt-client</artifactId> <version>1.12</version> </dependency>
下面開始編寫demo
首先先要配置MQTT的一些配置,配置比較多,也很繁瑣。
主要是配置主機號和端口號,根據自己的配置編寫代碼,在配置其他的一些細節配置,主要是和連接有關的。
代碼如下:
// MQTT設置說明 // 設置主機號 mqtt.setHost("tcp://10.168.5.208:1883"); // 用於設置客戶端會話的ID。在setCleanSession(false);被調用時,MQTT服務器利用該ID獲得相應的會話。此ID應少於23個字符,默認根據本機地址、端口和時間自動生成 mqtt.setClientId("876543210"); // 若設為false,MQTT服務器將持久化客戶端會話的主體訂閱和ACK位置,默認為true mqtt.setCleanSession(false); // 定義客戶端傳來消息的最大時間間隔秒數,服務器可以據此判斷與客戶端的連接是否已經斷開,從而避免TCP/IP超時的長時間等待 mqtt.setKeepAlive((short) 60); // 服務器認證用戶名 mqtt.setUserName("admin"); // 服務器認證密碼 mqtt.setPassword("admin"); // 設置“遺囑”消息的話題,若客戶端與服務器之間的連接意外中斷,服務器將發布客戶端的“遺囑”消息 mqtt.setWillTopic("willTopic"); // 設置“遺囑”消息的內容,默認是長度為零的消息 mqtt.setWillMessage("willMessage"); // 設置“遺囑”消息的QoS,默認為QoS.ATMOSTONCE mqtt.setWillQos(QoS.AT_LEAST_ONCE); // 若想要在發布“遺囑”消息時擁有retain選項,則為true mqtt.setWillRetain(true); // 設置版本 mqtt.setVersion("3.1.1"); // 失敗重連接設置說明 // 客戶端首次連接到服務器時,連接的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,默認為-1 mqtt.setConnectAttemptsMax(10L); // 客戶端已經連接到服務器,但因某種原因連接斷開時的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,默認為-1 mqtt.setReconnectAttemptsMax(3L); // 首次重連接間隔毫秒數,默認為10ms mqtt.setReconnectDelay(10L); // 重連接間隔毫秒數,默認為30000ms mqtt.setReconnectDelayMax(30000L); // 設置重連接指數回歸。設置為1則停用指數回歸,默認為2 mqtt.setReconnectBackOffMultiplier(2); // Socket設置說明 // 設置socket接收緩沖區大小,默認為65536(64k) mqtt.setReceiveBufferSize(65536); // 設置socket發送緩沖區大小,默認為65536(64k) mqtt.setSendBufferSize(65536); // 設置發送數據包頭的流量類型或服務類型字段,默認為8,意為吞吐量最大化傳輸 mqtt.setTrafficClass(8); // 帶寬限制設置說明 // 設置連接的最大接收速率,單位為bytes/s。默認為0,即無限制 mqtt.setMaxReadRate(0); // 設置連接的最大發送速率,單位為bytes/s。默認為0,即無限制 mqtt.setMaxWriteRate(0); // 選擇消息分發隊列 // 若沒有調用方法setDispatchQueue,客戶端將為連接新建一個隊列。如果想實現多個連接使用公用的隊列,顯式地指定隊列是一個非常方便的實現方法 mqtt.setDispatchQueue(Dispatch.createQueue("foo"));
上面都是一些配置的問題,具體情況自己決定配置。具體的配置也可以參考下面的網址,這個網址也有詳細的描述:https://gitee.com/iots/mqtt-client。
下面開始講講連接和訂閱和發送主題
fusesource提供三種mqtt client api,分別為阻塞API,基於Futur的API和回調API。
其中,阻塞API是在MQTT.connectBlocking方法建立連接和提供阻斷API的連接。
基於Futur的API則是:在MQTT.connectFuture方法建立連接,為您提供了一個與結合Futur的連接。所有操作的連接是無阻塞的,並且經由返回的結果。
回調API是最復雜的也是性能最好的,另外兩種均是對回調API的封裝。
因為回調API有些復雜,現在只是介紹回調API的封裝。就是前兩個,前兩個的區別是第一個為阻塞的,第二個不是阻塞。下面開始代碼演示。
第一個阻塞API。代碼如下:
// 使用future連接 FutureConnection connection = mqtt.futureConnection(); Future<Void> f1 = connection.connect(); f1.await(); // 訂閱消息 Future<byte[]> f2 = connection.subscribe(new Topic[] { new Topic("datasources/1/1", QoS.AT_LEAST_ONCE) }); // byte[] qoses = f2.await(); // 發送身份驗證消息. // Future<Void> f3 = connection.publish("foo", "Hello".getBytes(), // QoS.AT_LEAST_ONCE, false); // 接收訂閱消息.. Future<Message> receive = connection.receive(); // 打印消息. Message message = receive.await(); System.out.println(String.valueOf(message.getPayloadBuffer())); // 回應 message.ack(); // Future<Void> f4 = connection.disconnect(); f4.await();
第三個是最難的,我這邊的代碼也是有點亂,直接上代碼吧。
// 監聽 connection.listener(new Listener() { @Override public void onPublish(UTF8Buffer topicmsg, Buffer msg, Runnable ack) { // utf-8 is used for dealing with the garbled String topic = topicmsg.utf8().toString(); String payload = msg.utf8().toString(); System.out.println(topic + " " + payload); String Amsg = AuthenticationSendDemo.Authentication(topic, payload); if (topic.equals("datasources/req")) { // 重起一個阻塞線程 connection.getDispatchQueue().execute(new Runnable() { public void run() { connection.publish("datasources/17/01/req_ack", Amsg.getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() { @Override public void onSuccess(Void args) { // 表示發布主題成功 System.out.println("發布成功!"); System.out.println("發布的消息" + Amsg); } @Override public void onFailure(Throwable throwable) { // 表示發布主題失敗 System.out.println("發布失敗!"); } }); } }); } // 表示監聽成功 ack.run(); } @Override public void onFailure(Throwable value) { // 表示監聽失敗 } // execute only once when connection is ended @Override public void onDisconnected() { // 表示監聽到斷開連接 System.out.println("斷開連接!!"); } // execute only once when connecting started @Override public void onConnected() { // 表示監聽到連接成功 System.out.println("haha"); System.out.println(); } });
因為代碼中使用到了線程和回調,我對於這兩個掌握的也不是很好,也不再這裏亂扯,有大佬知道比較好的寫法最好指點一下。在這裏感謝。
三種寫法都寫完了,下面談一談感想和中間遇到的問題。
以為看具體的文檔實在太多了,現在公司還在忙著趕項目,我這邊時間也不是很多,代碼的整理以後有時間在說。我感覺最重要的還是對於協議的一些掌握和體會,這些要比上面的代碼重要的多,因為你最終的代碼還是要和項目整合的,和Spring整合的時候你會發現這些都是框架提供好了,你需要做的就是填參數,但是整合中遇到的問題的解決辦法都是你從寫上面的代碼中得到的。
因為剛開始寫代碼,所以代碼中的註釋也是非常多的,這裏也不再累述。寫上面的代碼的時候遇到了很多的問題,解決的網站都在我第一篇MQTT博客中,比如MQTT的官網,網上的文章都是抄的,要不就是一知半解(我也是)。最終還是看自己的深入體會。
就這樣吧,結束。
MQTT簡單demo(java)