1. 程式人生 > >MQTT協議實現Eclipse Paho學習總結一

MQTT協議實現Eclipse Paho學習總結一

一、概述

遙測傳輸 (MQTT) 是輕量級基於代理的釋出/訂閱的訊息傳輸協議,設計思想是開放、簡單、輕量、易於實現。這些特點使它適用於受限環境。例如,但不僅限於此:

  • 網路代價昂貴,頻寬低、不可靠。
  • 在嵌入裝置中執行,處理器和記憶體資源有限。

該協議的特點有:

  • 使用釋出/訂閱訊息模式,提供一對多的訊息釋出,解除應用程式耦合。
  • 對負載內容遮蔽的訊息傳輸。
  • 使用 TCP/IP 提供網路連線。
  • 有三種訊息釋出服務質量:
    • “至多一次”,訊息釋出完全依賴底層 TCP/IP 網路。會發生訊息丟失或重複。這一級別可用於如下情況,環境感測器資料,丟失一次讀記錄無所謂,因為不久後還會有第二次傳送。
    • “至少一次”,確保訊息到達,但訊息重複可能會發生。
    • “只有一次”,確保訊息到達一次。這一級別可用於如下情況,在計費系統中,訊息重複或丟失會導致不正確的結果。
  • 小型傳輸,開銷很小(固定長度的頭部是 2 位元組),協議交換最小化,以降低網路流量。
  • 使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制。
因為MQTT是輕量級的釋出/訂閱的訊息傳輸協議,因此很多應用都可以借用MQTT的思想,比如Facebook的的Messager據說就是按照MQTT的協議編寫的。如果需要了解這個協議,簡單的讀一下其協議的主要內容其實是不能深刻理解其中的意思的,就像你看了XMPP的協議之後,不讀smack很快就會遺忘掉這個協議的樣子一樣,程式設計師對程式碼的熱愛程度會遠遠大多文件(初級碼農),於是乎讀了一下MQTT的實現Eclipse Paho,一下是一些簡單的總結。

二、MQTT協議實現Eclipse Paho

因為MQTT是輕量級的釋出/訂閱的訊息傳輸協議,其實現Eclipse Paho,也是非常的輕量級,相比smack程式碼真是小巫見大巫了,看過smack之後,再看Eclipse Paho你心裡會豁然開朗,原來程式碼這麼少啊!其主要實現包如下:

其中主要的程式碼集中在畫框的三個包內。 org.eclipse.paho.client.mqttv3:主要用於對外提供服務,即整個Eclipse Paho對外的視窗,當你的程式需要呼叫Eclipse Paho時,直接呼叫org.eclipse.paho.client.mqttv3包內的類就能實現Eclipse Paho所提供的整個功能。當然你也可以呼叫其他包內的類,這要看你對整個程式碼的瞭解程度了。
org.eclipse.paho.client.mqttv3.internal:看看單詞internal你可能就猜到了,沒錯,這就是第一個包的主要功能實現,這個包有承上啟下的功能,首先對第一包提供功能的實現,其次呼叫剩下包中的類以實現MQTT協議的規定。
org.eclipse.paho.client.mqttv3.internal.nls:主要是國際化相關的檔案,開啟這個包之後,你會欣喜的看到messages_zh_CN.properties,有中文實現!
org.eclipse.paho.client.mqttv3.internal.security:當然是跟安全相關,其中包含了MQTT協議所規定的實現的TLS協議實現,當然在Java中tls的實現當然是SSLSocket。
org.eclipse.paho.client.mqttv3.internal.wire:主要是資訊的載體,也就是socket之上傳輸的心跳包,訂閱,釋出資訊等報文資訊。
org.eclipse.paho.client.mqttv3.logging:日誌。
org.eclipse.paho.client.mqttv3.persist:主要用於儲存已經發送的資料包。從這裡可以看出,MQTT協議最初的面向目標即感測器之間資訊的傳輸,其實現採用了,將資料包儲存的檔案當中的方式(MqttDefaultFilePersistence)保證了資料肯定能夠傳送到伺服器,不管程式崩潰不崩潰,網路好不好,只要傳送的資料包沒有收到確認,這個資料包就一直儲存在檔案當中,直到其傳送出去為止。
org.eclipse.paho.client.mqttv3.util:工具類。
這些包中,最主要的包就是上圖中包含在框中的包,這三個包中,最主要的就是org.eclipse.paho.client.mqttv3.internal這個包,因此只要你看懂了這個包中的主要的類,那麼你就拿下了MQTT協議的實現Eclipse Paho!!

三、MQTT協議的報文類別

3.1 MQTT協議規定報文

1.連線請求(CONNECT)
 當一個從客戶端到伺服器的TCP/IP套接字連線被建立時,必須用一個連線流來建立一個協議級別的會話。
2.連線請求確認(CONNECTACK)
 連線請求確認報文(CONNECTACK)是伺服器發給客戶端,用以確認客戶端的連線請求
3.釋出報文(PUBLISH)
客戶端釋出報文到伺服器端,用來提供給有著不同需求的訂閱者們。每個釋出的報文都有一個主題,這是一個分層的名稱空間,他定義了報文來源分類,方便訂閱者訂閱他們需要的主題。訂閱者們可以註冊自己的需要的報文類別。
4.釋出確認報文(PUBACK)
釋出確認報文(PUBACK)是對服務質量級別為1的釋出報文的應答。他可以是伺服器對釋出報文的客戶端的報文確認,也可以是報文訂閱者對釋出報文的伺服器的應答。
5.釋出確認報文(PUBREC)
PUBREC報文是對服務質量級別為2的釋出報文的應答。這是服務質量級別為2的協議流的第二個報文。PUBREC是由伺服器端對釋出報文的客戶端的應答,或者是報文訂閱者對釋出報文的伺服器的應答。
6.釋出確認報文(PUBREL)
PUBREL是報文釋出者對來自伺服器的PUBREC報文的確認,或者是伺服器對來自報文訂閱者的PUBREC報文的確認。它是服務質量級別為2的協議流的第三個報文。
7.確定釋出完成(PUBCOMP)
PUBCOMP報文是伺服器對報文釋出者的PUBREL報文的應答,或者是報文訂閱者對伺服器的PUBREL報文的應答。它是服務質量級別為2的協議流的第四個也是最後一個報文。
8.訂閱命名的主題(SUBSCRIBE)
訂閱報文(SUBSCRIBE)允許一個客戶端在伺服器上註冊一個或多個感興趣的主題名字。釋出給這些主題的報文作為釋出報文從伺服器端交付給客戶端。訂閱報文也描述了訂閱者想要收到的釋出報文的服務質量等級。
9. 訂閱報文確認(SUBACK)
當伺服器收到客戶端發來的訂閱報文時,將傳送訂閱報文的確認報文給客戶端。一個這樣的確認報文包含一列被授予的服務質量等級。被授予的服務質量等級次序和對應的訂閱報文中的主題名稱的次序相符。
10. 退訂命名的主題(UNSUBSCRIBE)
退訂主題的報文是從客戶端發往伺服器端,用以退訂命名的主題。
11. 退訂確認(UNSUBACK)
退訂確認報文是從伺服器發往客戶端,用以確認客戶端發來的退訂請求報文。
12. Ping請求(PINGREQ)
Ping請求報文是從連線的客戶端發往伺服器端,用來詢問伺服器端是否還存在。
13. Ping應答(PINGRESP)
Ping應答報文是從伺服器端發往Ping請求的客戶端,對客戶端的Ping請求進行確認。

14. 斷開通知(DISCONNECT)
斷開通知報文是從客戶端發往伺服器端用來指明將要關閉它的TCP/IP連線,他允許徹底地斷開,而非只是下線。如果客戶端已經和乾淨會話標誌集聯絡,那麼所有先前關於客戶端維護的資訊將被丟棄。一個伺服器在收到斷開報文之後,不能依賴客戶端關閉TCP/IP連線。

3.2 Eclipse Paho的對報文的實現

Eclipse Paho對MQTT協議報文的實現,主要在org.eclipse.paho.client.mqttv3.internal.wire包下,

其下包含了對MQTT協議14中報文的主要實現如下:
從以上看,其傳送一個數據包後,伺服器端必須回覆一個確認包,這為傳輸資料包的魯棒性,降低丟包率,提高準確性提供了很好實現。不同於IM協議MXPP,沒有對資料的確認。

3.3 心跳包

還有一個重要一點就是對其對心跳包的設定,看心跳包,主要是要看public class MqttPingReq extends MqttWireMessage 這個類!
  1. publicclass MqttPingReq extends MqttWireMessage {  
  2.     public MqttPingReq() {  
  3.         super(MqttWireMessage.MESSAGE_TYPE_PINGREQ);  
  4.     }  
  5.     /** 
  6.      * Returns <code>false</code> as message IDs are not required for MQTT 
  7.      * PINGREQ messages. 
  8.      */
  9.     publicboolean isMessageIdRequired() {  
  10.         returnfalse;  
  11.     }  
  12.     protectedbyte[] getVariableHeader() throws MqttException {  
  13.         returnnewbyte[0];  
  14.     }  
  15.     protectedbyte getMessageInfo() {  
  16.         return0;  
  17.     }  
  18.     public String getKey() {  
  19.         returnnew String("Ping");  
  20.     }  
  21. }  

當然只看這個類,也無法知道其心跳包的內容,這時候,我們需要從其傳送的內容當中逆向推出其心跳包的內容。 我們先看其傳送的的模組:找到public class CommsSender implements Runnable 類,看到其有一個private MqttOutputStream out;私有欄位,一看這個方法,我們就能判斷,這個欄位就是輸出流,然後,我們順藤摸瓜,看public class MqttOutputStream extends OutputStream這個類,你會看到這樣一個方法:
  1. /** 
  2.      * Writes an <code>MqttWireMessage</code> to the stream. 
  3.      */
  4.     publicvoid write(MqttWireMessage message) throws IOException, MqttException {  
  5.         byte[] bytes = message.getHeader();  
  6.         byte[] pl = message.getPayload();  
  7. //      out.write(message.getHeader());
  8. //      out.write(message.getPayload());
  9.         out.write(bytes,0,bytes.length);  
  10.         out.write(pl,0,pl.length);  
  11.     }  
哦,這下好了,原來,其傳送的是header和payload,然後,我們就可以看心跳包的header和payload是什麼。 public class MqttPingReq extends MqttWireMessage心跳包下有
  1. protectedbyte[] getVariableHeader() throws MqttException {  
  2.     returnnewbyte[0];  
  3. }  
這個方法,我們就知道了,這個肯定是父類MqttWireMessage中getHeader呼叫的方法,然後再回到MqttWireMessage,果真getHeader方法如下:
  1. publicbyte[] getHeader() throws MqttException {  
  2.         if (encodedHeader == null) {  
  3.             try {  
  4.                 int first = ((getType() & 0x0f) << 4) ^ (getMessageInfo() & 0x0f);  
  5.                 byte[] varHeader = getVariableHeader();  
  6.                 int remLen = varHeader.length + getPayload().length;  
  7.                 ByteArrayOutputStream baos = new ByteArrayOutputStream();  
  8.                 DataOutputStream dos = new DataOutputStream(baos);  
  9.                 dos.writeByte(first);//1個位元組
  10.                 dos.write(encodeMBI(remLen));//1個位元組
  11.                 dos.write(varHeader);//0個位元組
  12.                 dos.flush();  
  13.                 encodedHeader = baos.toByteArray();  
  14.             } catch(IOException ioe) {  
  15.                 thrownew MqttException(ioe);  
  16.             }  
  17.         }  
  18.         return encodedHeader;  
  19.     }  
而MqttWireMessage中還有一個getPayload方法,這個方法MqttPingReq 沒有重寫,也就是說,預設呼叫這個方法。
  1. /** 
  2.      * Sub-classes should override this method to supply the payload bytes. 
  3.      */
  4.     publicbyte[] getPayload() throws MqttException {  
  5.         returnnewbyte[0];//0個位元組
  6.     }  
也就是說MQTT的心跳包只有2個位元組!