MQTT版本升級過程及原始碼解析
MQTT版本升級過程及原始碼解析
首先說一下為什麼要寫這篇文章呢,在我發現網上對MQTT的文章介紹實在太少了,可能也是使用這個的頻率比較低吧!還有對問題的定位以及解決的方式和辦法也太少了,所以特意寫這篇文章希望能作出一些貢獻,幫助到一些需要的人。
主要記錄一下MQTT在原先1.2.0版本使用過程中出現的問題,排查問題到升級1.2.1版本過程中出現的問題,通過原始碼一步步排查出最後的問題點,直到符合預期目標。
<!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.1</version> </dependency>
MQTT的搭建及SSL認證可以參考這個部落格:https://www.cnblogs.com/yueli/p/7490453.html 在這不仔細闡述
Qos介紹
在開頭先了解一下Qos的一些含義,也是這個問題為導火線
-
Qos = 0 最多一次的傳輸
釋出者PUBLISH訊息到伺服器(broker),傳送即丟棄。沒有確認訊息,也不知道對方是否收到。網路層面,傳輸壓力小
-
Qos = 1 至少一次的傳輸
釋出者釋出訊息儲存訊息,伺服器(broker)接收到訊息,伺服器(broker)PUBLISH到訂閱者,伺服器(broker)回一個PUBACK資訊到釋出者讓刪除訊息,然後訂閱者接收訊息後PUBACK給伺服器讓刪除訊息。如果失敗了,在一段時間確認資訊沒有收到,傳送方都會將訊息頭的DUP設定為1,然後再次傳送訊息,訊息最少一次到達服務。例如網路延遲等問題,釋出者重複傳送訊息,訂閱者多次訂閱重複訊息
-
Qos = 2 只有一次的傳輸
其實Qos = 2 只是在 1 的基礎上做了改掉的趕腳,在釋出者PUBLISH到伺服器之後多了訊息的確認以及多了訊息msgID的快取,重複資訊的去重。在伺服器PUBLISH到訂閱者之後也多了訊息的確認。
三種情況的區別
0 沒有儲存message,沒有重發機制,啥事也不知道,1和2 的釋出者和伺服器有儲存message,釋出者有重發機制,伺服器都有PUBLISH之後的PUBACK的確認機制,但是2的伺服器多了快取msgID的一項功能,提供了去重功能,防止了訊息的重複傳送,以及訊息的接收的確認機制。訂閱者這邊不過多的介紹,感興趣再去了解。
MQTT1.2.1版本出現的問題
問題的描述:因為專案中使用MQTT通訊的地方比較多,一般都是以Qos = 0 的形式,這段時間發現會時常提醒報錯Too many publishes in progress (32202),看了一下原始碼報錯地方
從這裡可以判定actualInFlight超出自己設定的maxInflight最大值導致的,嘗試加大maxInflight也無用,只是延遲報錯的時間而已
問題排查過程
首先上網搜一下是否有類似問題出現,果不其然有個哥們碰到了,部落格地址:https://blog.csdn.net/lblblblblzdx/article/details/81159478 此文章給我很大幫助,感謝博主,但是最後的解決方案不是很好。
跟蹤釋出過程的原始碼邏輯
第一步:publish的過程
//1 public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext,IMqttActionListener callback) //2 public void sendNoWait(MqttWireMessage message, MqttToken token) //3 void internalSend(MqttWireMessage message, MqttToken token) //4 public void send(MqttWireMessage message, MqttToken token)
第二步:所有Qos型別,在publish訊息的頂級父類中的構造器預設設定msgId = 0
第三步:訊息在send方法中做了主要處理
第四步:根據上面的Qos的介紹,說明我們publish的訊息在這個地方全部都快取在tokens這裡,其實就是放入Hashtable中,不管什麼等級的Qos
在這幾個步驟中,已經快取好資訊,準備非同步傳送,其中的lock機制就不多去解釋
第五步:非同步傳送,其實主要是在客戶端連結完成的時候就已經開始監聽了,connect流程
1 //1 2 public IMqttToken connect(MqttConnectOptions options) 3 4 //2 非同步連結 5 ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options,userToken,userContext, callback, reconnecting); 6 connectActionListener.connect(); 7 8 //3 客戶端的通訊連結,包括髮送和接收 9 ClientComms.connect(options, token); 10 11 //4 個人理解是 將通訊資訊塞進執行緒池中,分別開啟發送和接收的執行緒處理 12 ConnectBG conbg = new ConnectBG(this, token, connect, executorService); 13 conbg.start();
第六步:在ConnectBG的run中 new CommsSender. start執行緒run中while迴圈傳送的資訊流程,一直髮送訊息中
第七步:在notifySent的方法中判斷Qos = 0 的作出了判斷及操作
這些差不多就是釋出過程的主題流程結構,瞭解這些才會知道讓你解決問題更加的容易
問題點
在上面的第六步的圖中黃色箭頭指出了問題出現的點,主要是在大資料量高併發的時候,因為在Qos = 0 的時候,在tokens(Hashtable)中的key一直是0,預設初始化。後面的流程中並沒有改變過,在黃色那塊tokenStore.getToken 在傳送之後才remove資料,但是多條資料高併發的時候,在remove資料之後,後一條在get的時候會出現空的狀況,不傳送資訊,導致actualInFlight沒有減,一直增加,一定時間後就會超出最大值。Qos =1 、2 是不會發送這樣的情況,因為他們的messageId是唯一的。
解決辦法
- 將Qos設定為1 (這是網上主流的解決辦法,但是這個太耗費資源問題,在我看來只是規避問題點而已)
- 升級MQTT版本1.2.1 (這個版本解決了剛才說的bug)
- 也可以想想在不升級版本的情況下如何去改善這個問題,重寫那些類可以實現···
解決過程
既然說了第一種解決辦法不是很贊同,那就直接進去第二種辦法吧。升級到1.2.1版,時間:Feb, 2019,但是在升級版本的時候又出現了一些問題,因為改動還是有點多的。
接下來說說1.2.1版本的改動了什麼呢
首先主要改動的是在Qos = 0 的不放入tokens中了,首先想到的是不會get到了,也不用擔心重複了,直接從集合pendingMessages中拿
然後從資訊的自身獲取資料的token,獲取不到再去tokens中拿
以上兩個就是主要解決這個高併發衝突的原因
跳入另一個坑中···
為什麼這麼說呢,在我們開發的意識中,升級版本怎麼也要向下相容吧,那就順其自然的換個版本就完事了,結果一跑起來,一堆紅色的出現,那心情···我太南了。以我的性子,就是不能慣著她,繼續深挖為什麼,既然坑已經有了,就不怕有多深。
另一個坑的過程
第一時間也是上網搜一些為啥,大家的解決也差不多,都是SSL驗證出現問題,但是解決方案也是五花八門
解決方案
第一種:設定系統屬性 類似這樣的文章 https://blog.csdn.net/hxpjava1/article/details/77937026
第二種:有看了一些程式碼了,就是重寫X509TrustManagerImpl,繞過SSL的驗證,試過有用,類似這樣的文章 https://blog.csdn.net/iverson_AL/article/details/100669777
第三種:再深入看一些原始碼,你會看到會什麼會報錯,為什麼會驗證,主要是你的屬性沒有設定好,採用了預設驗證導致
解決過程
這裡主要說一下解決的過程,如何從這些網路文章種找出適合自己的出路。
首先 第一種我就不咋喜歡,動不動就設定了系統全域性屬性,第二種方案,有兩種可能性,一是這個api真的有問題或是不符合自己的專案需要重寫程式碼邏輯結構,二是在不瞭解的情況下直接繞過驗證。我在測試第二個的時候就是如此只是稍微看了一下原始碼,沒有深入看進去,試了一下,果真可以實現我想要目的。
但是過後又想了一下,不該如此,既然開源出來的東西,不可能如此**,應該會有什麼地方可以簡單設定一下的。既然有這個想法就一直深入探究下去,果不其然,真相出來了。
在1.2.1版本中MqttConnectOptions 的httpsHostnameVerificationEnabled屬性預設true,導致不是Https的被驗證不通過導致的,也可能MQTT開發人員安全意識很強,在1.2.0版本中沒有這個概念存在,所以在版本升級的時候需要加上MqttConnectOptions.setHttpsHostnameVerificationEnabled(false);
以下原始碼檢視的過程
若是有Https證書是不會有問題的。
總結一下
其實很簡單的問題,居然整的時候那麼複雜,原因是我們不夠強,面對原始碼的時候還是比較害怕的,還有比較懶吧。
整個過程比較繁瑣,囉嗦吧,耐心看下來,應該有收穫。
操作總結:
- 升級MQTT版本1.2.1
- 若不是https的需要設定為false
- 不要走,跑起來
擴充套件點及疑惑地點可以供參考
轉載請註明出處 https://www.cnblogs.com/zhouguanglin/p/11986446.html
&n