1. 程式人生 > >MQTT協議實現Eclipse Paho學習總結

MQTT協議實現Eclipse Paho學習總結

來源 list 設計 cte soc flush play req log

MQTT協議實現Eclipse Paho學習總結

摘自:https://www.cnblogs.com/yfliufei/p/4383852.html

2015-04-01 14:57 by 辣椒醬, 4278 閱讀, 0 評論, 收藏, 編輯轉載自:http://xiaoxinzhou.blog.163.com/blog/static/20704538620145411306821/

一、概述

遙測傳輸 (MQTT) 是輕量級基於代理的發布/訂閱的消息傳輸協議,設計思想是開放、簡單、輕量、易於實現。這些特點使它適用於受限環境。例如,但不僅限於此:

  • 網絡代價昂貴,帶寬低、不可靠。
  • 在嵌入設備中運行,處理器和內存資源有限。

該協議的特點有:

  • 使用發布/訂閱消息模式,提供一對多的消息發布,解除應用程序耦合。
  • 對負載內容屏蔽的消息傳輸。
  • 使用 TCP/IP 提供網絡連接。
  • 有三種消息發布服務質量:
    • “至多一次”,消息發布完全依賴底層 TCP/IP 網絡。會發生消息丟失或重復。這一級別可用於如下情況,環境傳感器數據,丟失一次讀記錄無所謂,因為不久後還會有第二次發送。
    • “至少一次”,確保消息到達,但消息重復可能會發生。
    • “只有一次”,確保消息到達一次。這一級別可用於如下情況,在計費系統中,消息重復或丟失會導致不正確的結果。
  • 小型傳輸,開銷很小(固定長度的頭部是 2 字節),協議交換最小化,以降低網絡流量。
  • 使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制。
因為MQTT是輕量級的發布/訂閱的消息傳輸協議,因此很多應用都可以借用MQTT的思想, 比如Facebook的的Messager據說就是按照MQTT的協議編寫的。如果需要了解這個協議,簡單的讀一下其協議的主要內容其實是不能深刻理解其 中的意思的,就像你看了XMPP的協議之後,不讀smack很快就會遺忘掉這個協議的樣子一樣,程序員對代碼的熱愛程度會遠遠大多文檔(初級碼農),於是 乎讀了一下MQTT的實現Eclipse Paho,一下是一些簡單的總結。

二、MQTT協議實現Eclipse Paho

MQTT有不同語言,不同版本的諸多的實現,詳細信息見http://mqtt.org/software,其中Eclipse Paho只是諸多Java實現中的一個,關於Eclipse Paho的介紹如下http://www.eclipse.org/proposals/technology.paho/,具體下載地址http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.java.git/。因為MQTT是輕量級的發布/訂閱的消息傳輸協議,其實現Eclipse Paho,也是非常的輕量級,相比smack代碼真是小巫見大巫了,看過smack之後,再看Eclipse Paho你心裏會豁然開朗,原來代碼這麽少啊!其主要實現包如下:技術分享圖片
其中主要的代碼集中在畫框的三個包內。 org.eclipse.paho.client.mqttv3: 主要用於對外提供服務,即整個Eclipse Paho對外的窗口,當你的程序需要調用Eclipse Paho時,直接調用org.eclipse.paho.client.mqttv3包內的類就能實現Eclipse Paho所提供的整個功能。當然你也可以調用其他包內的類,這要看你對整個代碼的了解程度了。
org.eclipse.paho.client.mqttv3.internal:看看單詞internal你可能就猜到了,沒錯,這就是第一個包的主要功能實現,這個包有承上啟下的功能,首先對第一包提供功能的實現,其次調用剩下包中的類以實現MQTT協議的規定。 org.eclipse.paho.client.mqttv3.internal.nls:主要是國際化相關的文件,打開這個包之後,你會欣喜的看到messages_zh_CN.properties,有中文實現! org.eclipse.paho.client.mqttv3.internal.security:當然是跟安全相關,其中包含了MQTT協議所規定的實現的TLS協議實現,當然在Java中tls的實現當然是SSLSocket。 org.eclipse.paho.client.mqttv3.internal.wire:主要是信息的載體,也就是socket之上傳輸的心跳包,訂閱,發布信息等報文信息。 org.eclipse.paho.client.mqttv3.logging:日誌。 org.eclipse.paho.client.mqttv3.persist: 主要用於保存已經發送的數據包。從這裏可以看出,MQTT協議最初的面向目標即傳感器之間信息的傳輸,其實現采用了,將數據包保存的文件當中的方式 (MqttDefaultFilePersistence)保證了數據肯定能夠發送到服務器,不管程序崩潰不崩潰,網絡好不好,只要發送的數據包沒有收到 確認,這個數據包就一直保存在文件當中,直到其發送出去為止。 org.eclipse.paho.client.mqttv3.util:工具類。 這些包中,最主要的包就是上圖中包含在框中的包,這三個包中,最主要的就是org.eclipse.paho.client.mqttv3.internal這個包,因此只要你看懂了這個包中的主要的類,那麽你就拿下了MQTT協議的實現Eclipse Paho!!

三、MQTT協議的報文類別

3.1 MQTT協議規定報文

1.連接請求(CONNECT)
當一個從客戶端到服務器的TCP/IP套接字連接被建立時,必須用一個連接流來創建一個協議級別的會話。
2.連接請求確認(CONNECTACK)
連接請求確認報文(CONNECTACK)是服務器發給客戶端,用以確認客戶端的連接請求
3.發布報文(PUBLISH)
客戶端發布報文到服務器端,用來提供給有著不同需求的訂閱者們。每個發布的報文都有一個主題,這是一個分層的命名空間,他定義了報文來源分類,方便訂閱者訂閱他們需要的主題。訂閱者們可以註冊自己的需要的報文類別。
4.發布確認報文(PUBACK)
發布確認報文(PUBACK)是對服務質量級別為1的發布報文的應答。他可以是服務器對發布報文的客戶端的報文確認,也可以是報文訂閱者對發布報文的服務器的應答。
5.發布確認報文(PUBREC)
PUBREC報文是對服務質量級別為2的發布報文的應答。這是服務質量級別為2的協議流的第二個報文。PUBREC是由服務器端對發布報文的客戶端的應答,或者是報文訂閱者對發布報文的服務器的應答。
6.發布確認報文(PUBREL)
PUBREL是報文發布者對來自服務器的PUBREC報文的確認,或者是服務器對來自報文訂閱者的PUBREC報文的確認。它是服務質量級別為2的協議流的第三個報文。
7.確定發布完成(PUBCOMP)
PUBCOMP報文是服務器對報文發布者的PUBREL報文的應答,或者是報文訂閱者對服務器的PUBREL報文的應答。它是服務質量級別為2的協議流的第四個也是最後一個報文。
8.訂閱命名的主題(SUBSCRIBE)
訂閱報文(SUBSCRIBE)允許一個客戶端在服務器上註冊一個或多個感興趣的主題名字。發布給這些主題的報文作為發布報文從服務器端交付給客戶端。訂閱報文也描述了訂閱者想要收到的發布報文的服務質量等級。
9. 訂閱報文確認(SUBACK)
當服務器收到客戶端發來的訂閱報文時,將發送訂閱報文的確認報文給客戶端。一個這樣的確認報文包含一列被授予的服務質量等級。被授予的服務質量等級次序和對應的訂閱報文中的主題名稱的次序相符。
10. 退訂命名的主題(UNSUBSCRIBE)
退訂主題的報文是從客戶端發往服務器端,用以退訂命名的主題。
11. 退訂確認(UNSUBACK)
退訂確認報文是從服務器發往客戶端,用以確認客戶端發來的退訂請求報文。
12. Ping請求(PINGREQ)
Ping請求報文是從連接的客戶端發往服務器端,用來詢問服務器端是否還存在。
13. Ping應答(PINGRESP)
Ping應答報文是從服務器端發往Ping請求的客戶端,對客戶端的Ping請求進行確認。

14. 斷開通知(DISCONNECT)
斷開通知報文是從客戶端發往服務器端用來指明將要關閉它的TCP/IP連接,他允許徹底地斷開,而非只是下線。如果客戶端已經和幹凈會話標誌集聯系,那麽所有先前關於客戶端維護的信息將被丟棄。一個服務器在收到斷開報文之後,不能依賴客戶端關閉TCP/IP連接。

3.2 Eclipse Paho的對報文的實現

Eclipse Paho對MQTT協議報文的實現,主要在org.eclipse.paho.client.mqttv3.internal.wire包下,技術分享圖片其下包含了對MQTT協議14中報文的主要實現如下:技術分享圖片 從以上看,其發送一個數據包後,服務器端必須回復一個確認包,這為傳輸數據包的魯棒性,降低丟包率,提高準確性提供了很好實現。不同於IM協議MXPP,沒有對數據的確認。

3.3 心跳包

還有一個重要一點就是對其對心跳包的設定,看心跳包,主要是要看public class MqttPingReq extends MqttWireMessage 這個類! [java] view plaincopy
  1. public class MqttPingReq extends MqttWireMessage {
  2. public MqttPingReq() {
  3. super(MqttWireMessage.MESSAGE_TYPE_PINGREQ);
  4. }
  5. /**
  6. * Returns <code>false</code> as message IDs are not required for MQTT
  7. * PINGREQ messages.
  8. */
  9. public boolean isMessageIdRequired() {
  10. return false;
  11. }
  12. protected byte[] getVariableHeader() throws MqttException {
  13. return new byte[0];
  14. }
  15. protected byte getMessageInfo() {
  16. return 0;
  17. }
  18. public String getKey() {
  19. return new String("Ping");
  20. }
  21. }
當然只看這個類,也無法知道其心跳包的內容,這時候,我們需要從其發送的內容當中逆向推出其心跳包的內容。 我 們先看其發送的的模塊:找到public class CommsSender implements Runnable 類,看到其有一個private MqttOutputStream out;私有字段,一看這個方法,我們就能判斷,這個字段就是輸出流,然後,我們順藤摸瓜,看public class MqttOutputStream extends OutputStream這個類,你會看到這樣一個方法:[java] view plaincopy
  1. /**
  2. * Writes an <code>MqttWireMessage</code> to the stream.
  3. */
  4. public void write(MqttWireMessage message) throws IOException, MqttException {
  5. byte[] bytes = message.getHeader();
  6. byte[] pl = message.getPayload();
  7. // out.write(message.getHeader());
  8. // out.write(message.getPayload());
  9. out.write(bytes,0,bytes.length);
  10. out.write(pl,0,pl.length);
  11. }
哦,這下好了,原來,其發送的是header和payload,然後,我們就可以看心跳包的header和payload是什麽。 public class MqttPingReq extends MqttWireMessage心跳包下有 [java] view plaincopy
  1. protected byte[] getVariableHeader() throws MqttException {
  2. return new byte[0];
  3. }
這個方法,我們就知道了,這個肯定是父類MqttWireMessage中getHeader調用的方法,然後再回到MqttWireMessage,果真getHeader方法如下:[java] view plaincopy
  1. public byte[] getHeader() throws MqttException {
  2. if (encodedHeader == null) {
  3. try {
  4. int first = ((getType() & 0x0f) << 4) ^ (getMessageInfo() & 0x0f);
  5. byte[] varHeader = getVariableHeader();
  6. int remLen = varHeader.length + getPayload().length;
  7. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  8. DataOutputStream dos = new DataOutputStream(baos);
  9. dos.writeByte(first);//1個字節
  10. dos.write(encodeMBI(remLen));//1個字節
  11. dos.write(varHeader);//0個字節
  12. dos.flush();
  13. encodedHeader = baos.toByteArray();
  14. } catch(IOException ioe) {
  15. throw new MqttException(ioe);
  16. }
  17. }
  18. return encodedHeader;
  19. }
而MqttWireMessage中還有一個getPayload方法,這個方法MqttPingReq 沒有重寫,也就是說,默認調用這個方法。[java] view plaincopy
  1. /**
  2. * Sub-classes should override this method to supply the payload bytes.
  3. */
  4. public byte[] getPayload() throws MqttException {
  5. return new byte[0];//0個字節
  6. }

也就是說MQTT的心跳包只有2個字節!

MQTT協議實現Eclipse Paho學習總結