1. 程式人生 > >MQTT版本升級過程及原始碼解析

MQTT版本升級過程及原始碼解析

MQTT版本升級過程及原始碼解析


  首先說一下為什麼要寫這篇文章呢,在我發現網上對MQTT的文章介紹實在太少了,可能也是使用這個的頻率比較低吧!還有對問題的定位以及解決的方式和辦法也太少了,所以特意寫這篇文章希望能作出一些貢獻,幫助到一些需要的人。

  主要記錄一下MQTT在原先1.2.0版本使用過程中出現的問題,排查問題到升級1.2.1版本過程中出現的問題,通過原始碼一步步排查出最後的問題點,直到符合預期目標。

 

<!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.1</version>
</dependency>

  MQTT的搭建及SSL認證可以參考這個部落格:https://www.cnblogs.com/yueli/p/7490453.html 在這不仔細闡述


  Qos介紹 

  在開頭先了解一下Qos的一些含義,也是這個問題為導火線

  • Qos = 0  最多一次的傳輸

       釋出者PUBLISH訊息到伺服器(broker),傳送即丟棄。沒有確認訊息,也不知道對方是否收到。網路層面,傳輸壓力小

  • Qos = 1  至少一次的傳輸

   釋出者釋出訊息儲存訊息,伺服器(broker)接收到訊息,伺服器(broker)PUBLISH到訂閱者,伺服器(broker)回一個PUBACK資訊到釋出者讓刪除訊息,然後訂閱者接收訊息後PUBACK給伺服器讓刪除訊息。如果失敗了,在一段時間確認資訊沒有收到,傳送方都會將訊息頭的DUP設定為1,然後再次傳送訊息,訊息最少一次到達服務。例如網路延遲等問題,釋出者重複傳送訊息,訂閱者多次訂閱重複訊息 

  • Qos = 2  只有一次的傳輸

  其實Qos = 2 只是在 1 的基礎上做了改掉的趕腳,在釋出者PUBLISH到伺服器之後多了訊息的確認以及多了訊息msgID的快取,重複資訊的去重。在伺服器PUBLISH到訂閱者之後也多了訊息的確認。

  三種情況的區別

   0 沒有儲存message,沒有重發機制,啥事也不知道,1和2 的釋出者和伺服器有儲存message,釋出者有重發機制,伺服器都有PUBLISH之後的PUBACK的確認機制,但是2的伺服器多了快取msgID的一項功能,提供了去重功能,防止了訊息的重複傳送,以及訊息的接收的確認機制。訂閱者這邊不過多的介紹,感興趣再去了解。


 MQTT1.2.1版本出現的問題 

  問題的描述:因為專案中使用MQTT通訊的地方比較多,一般都是以Qos = 0 的形式,這段時間發現會時常提醒報錯Too many publishes in progress (32202),看了一下原始碼報錯地方

   從這裡可以判定actualInFlight超出自己設定的maxInflight最大值導致的,嘗試加大maxInflight也無用,只是延遲報錯的時間而已


 問題排查過程

  首先上網搜一下是否有類似問題出現,果不其然有個哥們碰到了,部落格地址:https://blog.csdn.net/lblblblblzdx/article/details/81159478 此文章給我很大幫助,感謝博主,但是最後的解決方案不是很好。

  跟蹤釋出過程的原始碼邏輯

  第一步:publish的過程

//1
public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext,IMqttActionListener callback)
//2
public void sendNoWait(MqttWireMessage message, MqttToken token)
//3
void internalSend(MqttWireMessage message, MqttToken token)
//4
public void send(MqttWireMessage message, MqttToken token) 

  第二步:所有Qos型別,在publish訊息的頂級父類中的構造器預設設定msgId = 0  

  第三步:訊息在send方法中做了主要處理

   第四步:根據上面的Qos的介紹,說明我們publish的訊息在這個地方全部都快取在tokens這裡,其實就是放入Hashtable中,不管什麼等級的Qos

   在這幾個步驟中,已經快取好資訊,準備非同步傳送,其中的lock機制就不多去解釋

  第五步:非同步傳送,其實主要是在客戶端連結完成的時候就已經開始監聽了,connect流程

 1 //1
 2 public IMqttToken connect(MqttConnectOptions options)
 3 
 4 //2 非同步連結
 5 ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options,userToken,userContext, callback, reconnecting);
 6 connectActionListener.connect();
 7 
 8 //3 客戶端的通訊連結,包括髮送和接收
 9 ClientComms.connect(options, token);
10 
11 //4 個人理解是 將通訊資訊塞進執行緒池中,分別開啟發送和接收的執行緒處理
12 ConnectBG conbg = new ConnectBG(this, token, connect, executorService);
13 conbg.start();

  第六步:在ConnectBG的run中 new CommsSender. start執行緒run中while迴圈傳送的資訊流程,一直髮送訊息中

 

   第七步:在notifySent的方法中判斷Qos = 0 的作出了判斷及操作

   這些差不多就是釋出過程的主題流程結構,瞭解這些才會知道讓你解決問題更加的容易


  問題點

  在上面的第六步的圖中黃色箭頭指出了問題出現的點,主要是在大資料量高併發的時候,因為在Qos = 0 的時候,在tokens(Hashtable)中的key一直是0,預設初始化。後面的流程中並沒有改變過,在黃色那塊tokenStore.getToken 在傳送之後才remove資料,但是多條資料高併發的時候,在remove資料之後,後一條在get的時候會出現空的狀況,不傳送資訊,導致actualInFlight沒有減,一直增加,一定時間後就會超出最大值。Qos =1 、2 是不會發送這樣的情況,因為他們的messageId是唯一的。


  解決辦法

  1. 將Qos設定為1 (這是網上主流的解決辦法,但是這個太耗費資源問題,在我看來只是規避問題點而已)
  2. 升級MQTT版本1.2.1    (這個版本解決了剛才說的bug)      
  3. 也可以想想在不升級版本的情況下如何去改善這個問題,重寫那些類可以實現···

  


 

   解決過程

  既然說了第一種解決辦法不是很贊同,那就直接進去第二種辦法吧。升級到1.2.1版,時間:Feb, 2019,但是在升級版本的時候又出現了一些問題,因為改動還是有點多的。

  接下來說說1.2.1版本的改動了什麼呢


  首先主要改動的是在Qos = 0 的不放入tokens中了,首先想到的是不會get到了,也不用擔心重複了,直接從集合pendingMessages中拿

   然後從資訊的自身獲取資料的token,獲取不到再去tokens中拿

    以上兩個就是主要解決這個高併發衝突的原因


    跳入另一個坑中···

   為什麼這麼說呢,在我們開發的意識中,升級版本怎麼也要向下相容吧,那就順其自然的換個版本就完事了,結果一跑起來,一堆紅色的出現,那心情···我太南了。以我的性子,就是不能慣著她,繼續深挖為什麼,既然坑已經有了,就不怕有多深。

   另一個坑的過程

    第一時間也是上網搜一些為啥,大家的解決也差不多,都是SSL驗證出現問題,但是解決方案也是五花八門


 

  解決方案

       第一種:設定系統屬性  類似這樣的文章 https://blog.csdn.net/hxpjava1/article/details/77937026

   第二種:有看了一些程式碼了,就是重寫X509TrustManagerImpl,繞過SSL的驗證,試過有用,類似這樣的文章 https://blog.csdn.net/iverson_AL/article/details/100669777

   第三種:再深入看一些原始碼,你會看到會什麼會報錯,為什麼會驗證,主要是你的屬性沒有設定好,採用了預設驗證導致


   解決過程

  這裡主要說一下解決的過程,如何從這些網路文章種找出適合自己的出路。

  首先 第一種我就不咋喜歡,動不動就設定了系統全域性屬性,第二種方案,有兩種可能性,一是這個api真的有問題或是不符合自己的專案需要重寫程式碼邏輯結構,二是在不瞭解的情況下直接繞過驗證。我在測試第二個的時候就是如此只是稍微看了一下原始碼,沒有深入看進去,試了一下,果真可以實現我想要目的。

  但是過後又想了一下,不該如此,既然開源出來的東西,不可能如此**,應該會有什麼地方可以簡單設定一下的。既然有這個想法就一直深入探究下去,果不其然,真相出來了。

  在1.2.1版本中MqttConnectOptions 的httpsHostnameVerificationEnabled屬性預設true,導致不是Https的被驗證不通過導致的,也可能MQTT開發人員安全意識很強,在1.2.0版本中沒有這個概念存在,所以在版本升級的時候需要加上MqttConnectOptions.setHttpsHostnameVerificationEnabled(false);

  以下原始碼檢視的過程

    若是有Https證書是不會有問題的。


  總結一下

   其實很簡單的問題,居然整的時候那麼複雜,原因是我們不夠強,面對原始碼的時候還是比較害怕的,還有比較懶吧。

   整個過程比較繁瑣,囉嗦吧,耐心看下來,應該有收穫。

   操作總結:

    1. 升級MQTT版本1.2.1
    2. 若不是https的需要設定為false
    3. 不要走,跑起來

   擴充套件點及疑惑地點可以供參考

  

 

              轉載請註明出處  https://www.cnblogs.com/zhouguanglin/p/11986446.html

&n