MQTT伺服器的搭建與測試pub/sub通訊過程
阿新 • • 發佈:2019-02-10
MQTT是一個即時通訊協議,採用輕量級釋出和訂閱訊息傳輸機制。專門設計用於低頻寬或者高昂的網路費用的通訊過程中。以及提供三種不同質量的訊息服務:
- 1.”至多一次”,訊息釋出完全依賴底層 TCP/IP 網路。會發生訊息丟失或重複。這一級別可用於如下情況,環境感測器資料,丟失一次讀記錄無所謂,因為不久後還會有第二次傳送。
- 2.”至少一次”,確保訊息到達,但訊息重複可能會發生。
- 3.”只有一次”,確保訊息到達一次。這一級別可用於如下情況,在計費系統中,訊息重複或丟失會導致不正確的結果。
對於實現了MQTT協議的訊息代理軟體有眾多。Mosquitto,npm社群的mosca,Apache社群的ActivityMQ等等
分別嘗試了這三種的搭建過程,最Mosquitto容易上手。
安裝Mosquitto
#下載原始碼包
wget http://mosquitto.org/files/source/mosquitto-1.4.5.tar.gz
# 解壓
tar zxfv mosquitto-1.4.5.tar.gz
# 進入目錄
cd mosquitto-1.4.5
# 編譯
make
# 安裝
sudo make install
執行Mosquitto
Mosquitto的配置檔案存放在/etc/mosquitto/mosquitto.conf
配置檔案具體的配置內容為:
# =================================================================
# General configuration
# =================================================================
# 客戶端心跳的間隔時間
#retry_interval 20
# 系統狀態的重新整理時間
#sys_interval 10
# 系統資源的回收時間,0表示儘快處理
#store_clean_interval 10
# 服務程序的PID
#pid_file /var/run/mosquitto.pid
# 服務程序的系統使用者
#user mosquitto
# 客戶端心跳訊息的最大併發數
#max_inflight_messages 10
# 客戶端心跳訊息快取佇列
#max_queued_messages 100
# 用於設定客戶端長連線的過期時間,預設永不過期
#persistent_client_expiration
# =================================================================
# Default listener
# =================================================================
# 服務繫結的IP地址
#bind_address
# 服務繫結的埠號
#port 1883
# 允許的最大連線數,-1表示沒有限制
#max_connections -1
# cafile:CA證書檔案
# capath:CA證書目錄
# certfile:PEM證書檔案
# keyfile:PEM金鑰檔案
#cafile
#capath
#certfile
#keyfile
# 必須提供證書以保證資料安全性
#require_certificate false
# 若require_certificate值為true,use_identity_as_username也必須為true
#use_identity_as_username false
# 啟用PSK(Pre-shared-key)支援
#psk_hint
# SSL/TSL加密演算法,可以使用“openssl ciphers”命令獲取
# as the output of that command.
#ciphers
# =================================================================
# Persistence
# =================================================================
# 訊息自動儲存的間隔時間
#autosave_interval 1800
# 訊息自動儲存功能的開關
#autosave_on_changes false
# 持久化功能的開關
persistence true
# 持久化DB檔案
#persistence_file mosquitto.db
# 持久化DB檔案目錄
#persistence_location /var/lib/mosquitto/
# =================================================================
# Logging
# =================================================================
# 4種日誌模式:stdout、stderr、syslog、topic
# none 則表示不記日誌,此配置可以提升些許效能
log_dest none
# 選擇日誌的級別(可設定多項)
#log_type error
#log_type warning
#log_type notice
#log_type information
# 是否記錄客戶端連線資訊
#connection_messages true
# 是否記錄日誌時間
#log_timestamp true
# =================================================================
# Security
# =================================================================
# 客戶端ID的字首限制,可用於保證安全性
#clientid_prefixes
# 允許匿名使用者
#allow_anonymous true
# 使用者/密碼檔案,預設格式:username:password
#password_file
# PSK格式密碼檔案,預設格式:identity:key
#psk_file
# pattern write sensor/%u/data
# ACL許可權配置,常用語法如下:
# 使用者限制:user <username>
# 話題限制:topic [read|write] <topic>
# 正則限制:pattern write sensor/%u/data
#acl_file
# =================================================================
# Bridges
# =================================================================
# 允許服務之間使用“橋接”模式(可用於分散式部署)
#connection <name>
#address <host>[:<port>]
#topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix]
# 設定橋接的客戶端ID
#clientid
# 橋接斷開時,是否清除遠端伺服器中的訊息
#cleansession false
# 是否釋出橋接的狀態資訊
#notifications true
# 設定橋接模式下,訊息將會發布到的話題地址
# $SYS/broker/connection/<clientid>/state
#notification_topic
# 設定橋接的keepalive數值
#keepalive_interval 60
# 橋接模式,目前有三種:automatic、lazy、once
#start_type automatic
# 橋接模式automatic的超時時間
#restart_timeout 30
# 橋接模式lazy的超時時間
#idle_timeout 60
# 橋接客戶端的使用者名稱
#username
# 橋接客戶端的密碼
#password
# bridge_cafile:橋接客戶端的CA證書檔案
# bridge_capath:橋接客戶端的CA證書目錄
# bridge_certfile:橋接客戶端的PEM證書檔案
# bridge_keyfile:橋接客戶端的PEM金鑰檔案
#bridge_cafile
#bridge_capath
#bridge_certfile
#bridge_keyfile
# 自己的配置可以放到以下目錄中
include_dir /etc/mosquitto/conf.d
啟動Mosquitto服務:
mosquitto -c /etc/mosquitto/mosquitto.conf -d (MQTT協議使用1883埠,檢視該埠驗證是否啟動成功)
java使用MQTT的訂閱釋出
Maven依賴為:
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.12</version>
</dependency>
訂閱(Sub)端HelloWorld:
訂閱一個名為foo的主題,訊息級別為 AT_LEAST_ONCE
MQTT mqtt = new MQTT();
try {
mqtt.setHost("tcp://192.168.2.112:1883");
System.out.println("start");
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
System.out.println(connection == null);
connection.subscribe(new Topic[] { new Topic("foo", QoS.AT_LEAST_ONCE) });
while (true) {
Message message = connection.receive();
System.out.println("MQTTFutureClient.Receive Message " + "Topic Title :" + message.getTopic()
+ " context :" + String.valueOf(message.getPayloadBuffer()));
}
} catch (Exception e) {
}
System.out.println("end");
釋出(pub)端:
給一個名為foo的主題推送訊息,訊息級別為 AT_LEAST_ONCE
MQTT mqtt = new MQTT();
try {
mqtt.setHost("tcp://192.168.2.112:1883");
System.out.println("start");
BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
System.out.println(connection == null);
for (int i = 0; i <100000; i++) {
connection.publish("foo", "HelloWQEQWEQ".getBytes(), QoS.AT_LEAST_ONCE, false);
}
} catch (Exception e) {
}
System.out.println("end");
通過執行便可以看到訊息成功pub到sub端。
至於連線型別,該客戶端提供了三種
- BlockingConnection 阻塞式
- CallbackConnection 回撥函式式
- FutureConnection 非同步式
java客戶端還能制定更多的通訊細節
// 連線前清空會話資訊 ,若設為false,MQTT伺服器將持久化客戶端會話的主體訂閱和ACK位置,預設為true
mqtt.setCleanSession(CLEAN_START);
// 設定心跳時間
// ,定義客戶端傳來訊息的最大時間間隔秒數,伺服器可以據此判斷與客戶端的連線是否已經斷開,從而避免TCP/IP超時的長時間等待
mqtt.setKeepAlive(KEEP_ALIVE);
// 設定客戶端id,用於設定客戶端會話的ID。在setCleanSession(false);被呼叫時,MQTT伺服器利用該ID獲得相應的會話。
// 此ID應少於23個字元,預設根據本機地址、埠和時間自動生成
mqtt.setClientId(CLIENT_ID);
//設定“遺囑”訊息的內容,預設是長度為零的訊息 mqtt.setWillMessage("willMessage");
* //設定“遺囑”訊息的QoS,預設為QoS.ATMOSTONCE
* mqtt.setWillQos(QoS.AT_LEAST_ONCE);
* //若想要在釋出“遺囑”訊息時擁有retain選項,則為true mqtt.setWillRetain(true);
* //設定“遺囑”訊息的話題,若客戶端與伺服器之間的連線意外中斷,伺服器將釋出客戶端的“遺囑”訊息
* mqtt.setWillTopic("willTopic");
*/
// ==失敗重連線設定說明
// 設定重新連線的次數
// ,客戶端已經連線到伺服器,但因某種原因連線斷開時的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,預設為-1
mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
// 設定重連的間隔時間 ,首次重連線間隔毫秒數,預設為10ms
mqtt.setReconnectDelay(RECONNECTION_DELAY);
// 客戶端首次連線到伺服器時,連線的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,預設為-1
// mqtt.setConnectAttemptsMax(10L);
MQTT提供的pub/sub確實比redis的pub/sub機制強大些。