開源MQTT中介軟體:moquette
經常會在專案中用到訊息傳遞,在不同的場景下,訊息傳遞的要求是不一樣的。java世界中,jms的規範可遵循,同時也有開源的相關軟體來支援。 本文來說說一下mqtt,以及moquette。在選擇mqtt的中介軟體時較為糾結,對於非大眾化的開源框架的使用沒有底氣。好在有原始碼,研究原始碼,經過大量測試,效果還可以,推薦給大家。
溝通交流群:
經測試過程發現moquette存在一些問題,已修改,可能是認識的問題,也可能是出發點不一樣。
總之,修改如下:
1. 修改訊息佇列長度為32,避免了原來訊息佇列超過最大條數之後,publish出錯的情況 修改了storage的建構函式,使其更通用
2. 修改了每次都對clientId的判定,針對client首次連結的情況 修改了離線訊息簽收時的空指標異常 棄用了一些不常用的模組
3. 添加了redis儲存實現 redis採用了現有conf配置機制 重新設計了session的儲存結構,以便後續新增分片處理
4. 修改工程的結構,獨立了common模組,同時將redis,mapdb,broker建在common基礎上
5. 針對publish的記憶體洩漏,進行了修改(原本以為是netty洩露),經過兩天的不眠不休的除錯,發現是moquette未回收導致
使用說明文件:
1.簡介
Moquette是一款開源的訊息代理,整個系統基於java開發,以netty為基礎完整實現了MQTT協議的。
基於測試,moquette的客戶端承載量及訊息的推送速度都比較客觀,在大批量頻繁短線上線的情景下,也可以承受。
Moquette程式碼是完全開源的,測試過程中的問題進行了一定的修改,擴充套件實現了基於redis儲存的機制。
2.使用
2.1配置檔案
Moquette所使用的配置檔案位於其根目錄下的config裡,包括以下:
- acl.conf 許可權配置
- hazelcast.xml 叢集配置
- password_file.conf 使用者密碼配置
- moquette.conf 主配置
下面將詳細講解各配置檔案
許可權配置
基於檔案的許可權配置較為複雜,以下為示例格式,將針對該示例具體說明。
user admin
topic write mqtt/log
pattern write mqtt/log/+
topic read mqtt/lost
user client
topic read mqtt/log
pattern read mqtt/log/%c
topic write mqtt/lost
[user admin] 指示一個使用者admin。其後的條目,代表該使用者的相關topic的讀寫許可權,一直到另一個user結束。
[topic write wifi/log] 代表隊wifi/log主題具有write許可權,topic命令指定特定的主題名稱,不能帶有萬用字元。
[pattern write wifi/log/+] 使用萬用字元指示符合規則的一定數量的topic的許可權。
許可權分類:
- write
- read
- writeread
叢集配置
Moquette的叢集配置實用的是hazelcast。Hazelcast是基於java編寫的資料同步工具。在moquette中,用於不同節點訊息的同步。
<network>
<public-address>IP1:5701</public-address>
<port>5701</port>
<join>
<multicast enabled="false" />
<tcp-ip enabled="true">
<required-member>IP2:5701</required-member>
</tcp-ip>
</join>
</network>
public-address:代表了當前節點的IP及埠
required-member:代表了叢集中的其他節點。
各節點的叢集模式建立後,各節點是對等關係,無主從之分
使用者管理
該檔案用於系統的可登入使用者,例項格式如下:
#*********************************************
# Each line define a user login in the format
# <username>:sha256(<password>)
#*********************************************
#NB this password is sha256(passwd)
admin:8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92
client:8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92
該檔案的格式非常簡單:
每行代表了一個使用者及其密碼,用:分割,密碼是sha256摘要後的結果。
關於client(消費者)所使用的使用者,大部分情況下,client只需要其clientId來區分,因此後臺可針對業務型別建立不同的使用者分給client使用,不需要為每個clientId都建立使用者。
主配置
主配置中包含了較多內容,介紹如下:
1.埠
port 1883
websocket_port 8383
port 1883 是broker的主埠,預設為MQTT協議的1883埠
由於系統提供了websocket功能,可以使用websocket的方式使用(該模式未進行測試)。
2. SSL埠及配置
# ssl_port 8883
#jks_path serverkeystore.jks
#key_store_password passw0rdsrv
#key_manager_password passw0rdsrv
對於有較高安全要求的系統,可以新增SSL支援。
3.IP繫結限制
#*********************************************************************
# The interface to bind the server
# 0.0.0.0 means "any"
#*********************************************************************
host 0.0.0.0
4.儲存設定
storage_class io.moquette.persistence.redis.RedisStorageService
由於基於不同儲存的實現的效能,差異性較大,moquette預設採用記憶體儲存的模式,該模式有很高的效能,但存在單點崩潰下,訊息丟失的風險(由於叢集負載的使用,可降低該問題發生的影響範圍)。
如果對儲存過於看重,效能可求次,可使用基於redis的儲存實現,其自帶的mapdb的儲存實現,錯誤較多。
在不設定的情況,預設採用的基於memory的儲存實現。
5.啟用許可權訪問
#*********************************************************************
# acl_file:
# defines the path to the ACL file relative to moquette home dir
# contained in the moquette.path system property
#*********************************************************************
acl_file config/acl.conf
以上代表了,broker將以acl.conf中的內容為基礎進行授權鑑權。
6.是否允許匿名訪問
#*********************************************************************
# allow_anonymous is used to accept MQTT connections also from not
# authenticated clients.
# - false to accept ONLY client connetions with credentials.
# - true to accept client connection without credentails, validating
# only against the password_file, the ones that provides.
#*********************************************************************
allow_anonymous false
以上代表不允許匿名訪問,必須使用使用者名稱及密碼才可以訪問。
7.使用者密碼檔案配置
#*********************************************************************
# password_file:
# defines the path to the file that contains the credentials for
# authenticated client connection. It's relative to moquette home dir
# defined by the system property moquette.path
#*********************************************************************
password_file config/password_file.conf
以上代表broker將使用password檔案進行鑑權,若不需要則可以將其註釋掉。
8.epoll的啟用
#*********************************************************************
# Netty Configuration
#*********************************************************************
#
# Linux systems can use epoll instead of nio. To get a performance
# gain and reduced GC.
# http://netty.io/wiki/native-transports.html for more information
#
netty.epoll true
在linux系統下,提供的epoll機制,可使系統能夠承載更高的終端。以上代表啟用epoll。在機器硬體較好的情況下,epoll模式提升明顯。
9.叢集配置
#hazelcast
#intercept.handler io.moquette.interception.HazelcastInterceptHandler
叢集配置的情況下,需要開啟以上配置,開啟配置的前提是hazelcast.xml檔案已配置。
10.Redis配置
#redis storage
redis.host localhost
redis.port 6379
redis.password
redis.database 0
redis.prefix monitor:
在store_class已經配置為redis的情況下,需要配置以上引數,由於叢集模式使用hazelcast,目前的基於redis的實現,不具備分片等功能,但鍵值的設計已經具備。
2.2 啟動
Moquette程式碼工程採用maven管理,採用maven install可以打包一個在linux下執行的檔案,打包後的格式如下:
Lib目錄是所有使用到的lib檔案,分為:
- Netty相關
- Hazelcast
- Log相關
- Redis儲存實現引入的lib:在使用memory的模式下,redis相關可以刪除,減少包的大小。
Bin目錄:
linux下的moquette.sh啟動方式:
預設不是以後臺執行的方式,需要使用以下命令執行:
linux下的moquette.sh啟動方式:
預設不是以後臺執行的方式,需要使用以下命令執行:
setsid ./moquette.sh &
nohup命令模式會找不到配置好的log輸出。
Windows下以bat命令執行
2.3 客戶端
實現moqtt協議的客戶端存在很多種,針對該broker,目前測試使用的是eclipse-phao的,該客戶端實現提供了多種語言版本,便於不同終端使用。
針對不同的語言版本,可提供的功能存在不同,目前broker預設沒有實現除mqtt協議規範中提到的功能。
重連機制,需要客戶端想法實現該機制,避免客戶端掉線後只能重啟才能連結的境界。
3. 測試
該broker經過了多次測試。
測試場景:
1.機器配置
記憶體 | CPU |
---|---|
8G | 4核 |
Client所執行機器多樣,每臺機器執行5000個client。
Publish為普通windows機器,兩個publisher,每個5個傳送執行緒,平均每秒100條訊息。
2.訊息傳送速度
- 10秒一條群發的情況下,測試3論。
- 每秒100條點對點的訊息情況下,測試3輪。
- 每輪測試20到30幾個小時。
3.客戶端情況
Client在clean Session的情況下,broker的記憶體佔用較低,僅400M左右
在不清理會話的情況下,記憶體佔用較高,在client大批量反覆掉線重連情況下記憶體佔用達到2G
心跳設定60s,過短(低於30s)的心跳,對broker來說不能承受。
4叢集情況
搭建了兩個節點的叢集,通過nginx進行tcp負載,客戶端測試數量為3萬。
3.1承載量測試
1.單broker從8000,15000,18000,25000幾個級別的測試,在不傳送訊息的情況下,這幾個級別的客戶端都可以連線。
承載量 | 結果 |
---|---|
8000 | OK |
15000 | OK |
18000 | OK |
25000 | OK |
2.在傳送訊息的情況
承載量 | 結果 |
---|---|
8000 | OK |
15000 | OK |
18000 | NG |
25000 | NG |
在每10秒一條群發訊息的情況下,單點broker的無掉線承載量是15000,18000發生較多的掉線情況。
3.2訊息接收速度
在10秒一條群發訊息及每秒100條點對點訊息的傳送情況下,訊息的接受速度都在1秒以內。
3.3所佔記憶體
基於實現分析,記憶體的佔用主要由client在不清理會話進行連結掉線後產生的訊息積累,在原有基於記憶體的實現機制中,為每個client儲存1024條訊息,在超過1024條後,訊息會導致publish端出錯。修改後的實現,為每個離線client儲存最新的32條訊息,超過32條的將被丟棄。
基於redis的實現,訊息目前沒有設定棄用或過期機制。
測試期間的記憶體分析:
IP1的broker,總記憶體佔用情況如下
記憶體穩定在800M左右,處於穩定狀態。
3.4注意問題
基於記憶體的儲存實現,目前僅儲存32條離線訊息,超過32條將丟棄原有的。
遺願訊息內容必須為ascii,不能為其他字元。遺願訊息主要用於客戶端掉線後的處理。
客戶端的心跳不能設定過小,否則broker的承載量將嚴重下降,建議60s以上。
遺留的問題:
- 在心跳之間的時間段,測試發現存在broker誤簽收的情況。
- 以上問題,在業務實際使用過程中,採取業務簽收等方式,避免訊息質量的不可靠性的出現。