1. 程式人生 > >MQTT伺服器的搭建與測試pub/sub通訊過程

MQTT伺服器的搭建與測試pub/sub通訊過程

MQTT是一個即時通訊協議,採用輕量級釋出和訂閱訊息傳輸機制。專門設計用於低頻寬或者高昂的網路費用的通訊過程中。以及提供三種不同質量的訊息服務:

  • 1.”至多一次”,訊息釋出完全依賴底層 TCP/IP 網路。會發生訊息丟失或重複。這一級別可用於如下情況,環境感測器資料,丟失一次讀記錄無所謂,因為不久後還會有第二次傳送。
  • 2.”至少一次”,確保訊息到達,但訊息重複可能會發生。
  • 3.”只有一次”,確保訊息到達一次。這一級別可用於如下情況,在計費系統中,訊息重複或丟失會導致不正確的結果。

對於實現了MQTT協議的訊息代理軟體有眾多。Mosquitto,npm社群的mosca,Apache社群的ActivityMQ等等

分別嘗試了這三種的搭建過程,最Mosquitto容易上手。

安裝Mosquitto

#下載原始碼包
wget http://mosquitto.org/files/source/mosquitto-1.4.5.tar.gz
# 解壓
tar zxfv mosquitto-1.4.5.tar.gz
# 進入目錄
cd mosquitto-1.4.5
# 編譯
make
# 安裝
sudo make install

執行Mosquitto

Mosquitto的配置檔案存放在/etc/mosquitto/mosquitto.conf

配置檔案具體的配置內容為:

# =================================================================
# General configuration # ================================================================= # 客戶端心跳的間隔時間 #retry_interval 20 # 系統狀態的重新整理時間 #sys_interval 10 # 系統資源的回收時間,0表示儘快處理 #store_clean_interval 10 # 服務程序的PID #pid_file /var/run/mosquitto.pid # 服務程序的系統使用者 #user mosquitto # 客戶端心跳訊息的最大併發數 #max_inflight_messages 10
# 客戶端心跳訊息快取佇列 #max_queued_messages 100 # 用於設定客戶端長連線的過期時間,預設永不過期 #persistent_client_expiration # ================================================================= # Default listener # ================================================================= # 服務繫結的IP地址 #bind_address # 服務繫結的埠號 #port 1883 # 允許的最大連線數,-1表示沒有限制 #max_connections -1 # cafile:CA證書檔案 # capath:CA證書目錄 # certfile:PEM證書檔案 # keyfile:PEM金鑰檔案 #cafile #capath #certfile #keyfile # 必須提供證書以保證資料安全性 #require_certificate false # 若require_certificate值為true,use_identity_as_username也必須為true #use_identity_as_username false # 啟用PSK(Pre-shared-key)支援 #psk_hint # SSL/TSL加密演算法,可以使用“openssl ciphers”命令獲取 # as the output of that command. #ciphers # ================================================================= # Persistence # ================================================================= # 訊息自動儲存的間隔時間 #autosave_interval 1800 # 訊息自動儲存功能的開關 #autosave_on_changes false # 持久化功能的開關 persistence true # 持久化DB檔案 #persistence_file mosquitto.db # 持久化DB檔案目錄 #persistence_location /var/lib/mosquitto/ # ================================================================= # Logging # ================================================================= # 4種日誌模式:stdout、stderr、syslog、topic # none 則表示不記日誌,此配置可以提升些許效能 log_dest none # 選擇日誌的級別(可設定多項) #log_type error #log_type warning #log_type notice #log_type information # 是否記錄客戶端連線資訊 #connection_messages true # 是否記錄日誌時間 #log_timestamp true # ================================================================= # Security # ================================================================= # 客戶端ID的字首限制,可用於保證安全性 #clientid_prefixes # 允許匿名使用者 #allow_anonymous true # 使用者/密碼檔案,預設格式:username:password #password_file # PSK格式密碼檔案,預設格式:identity:key #psk_file # pattern write sensor/%u/data # ACL許可權配置,常用語法如下: # 使用者限制:user <username> # 話題限制:topic [read|write] <topic> # 正則限制:pattern write sensor/%u/data #acl_file # ================================================================= # Bridges # ================================================================= # 允許服務之間使用“橋接”模式(可用於分散式部署) #connection <name> #address <host>[:<port>] #topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix] # 設定橋接的客戶端ID #clientid # 橋接斷開時,是否清除遠端伺服器中的訊息 #cleansession false # 是否釋出橋接的狀態資訊 #notifications true # 設定橋接模式下,訊息將會發布到的話題地址 # $SYS/broker/connection/<clientid>/state #notification_topic # 設定橋接的keepalive數值 #keepalive_interval 60 # 橋接模式,目前有三種:automatic、lazy、once #start_type automatic # 橋接模式automatic的超時時間 #restart_timeout 30 # 橋接模式lazy的超時時間 #idle_timeout 60 # 橋接客戶端的使用者名稱 #username # 橋接客戶端的密碼 #password # bridge_cafile:橋接客戶端的CA證書檔案 # bridge_capath:橋接客戶端的CA證書目錄 # bridge_certfile:橋接客戶端的PEM證書檔案 # bridge_keyfile:橋接客戶端的PEM金鑰檔案 #bridge_cafile #bridge_capath #bridge_certfile #bridge_keyfile # 自己的配置可以放到以下目錄中 include_dir /etc/mosquitto/conf.d

啟動Mosquitto服務:

mosquitto -c /etc/mosquitto/mosquitto.conf -d (MQTT協議使用1883埠,檢視該埠驗證是否啟動成功)

java使用MQTT的訂閱釋出

Maven依賴為:

        <dependency>
            <groupId>org.fusesource.mqtt-client</groupId>
            <artifactId>mqtt-client</artifactId>
            <version>1.12</version>
        </dependency>

訂閱(Sub)端HelloWorld:

訂閱一個名為foo的主題,訊息級別為 AT_LEAST_ONCE

             MQTT mqtt = new MQTT();
                try {
                    mqtt.setHost("tcp://192.168.2.112:1883");
                    System.out.println("start");
                    BlockingConnection connection = mqtt.blockingConnection();
                    connection.connect();
                    System.out.println(connection == null);
                    connection.subscribe(new Topic[] { new Topic("foo", QoS.AT_LEAST_ONCE) });
                    while (true) {
                        Message message = connection.receive();
                        System.out.println("MQTTFutureClient.Receive Message " + "Topic Title :" + message.getTopic()
                                + " context :" + String.valueOf(message.getPayloadBuffer()));
                    }
                } catch (Exception e) {
                }
                System.out.println("end");

釋出(pub)端:

給一個名為foo的主題推送訊息,訊息級別為 AT_LEAST_ONCE

                MQTT mqtt = new MQTT();
                try {
                    mqtt.setHost("tcp://192.168.2.112:1883");
                    System.out.println("start");
                    BlockingConnection connection = mqtt.blockingConnection();
                    connection.connect();
                    System.out.println(connection == null);
                    for (int i = 0; i <100000; i++) {
                        connection.publish("foo", "HelloWQEQWEQ".getBytes(), QoS.AT_LEAST_ONCE, false);
                    }
                } catch (Exception e) {
                }
                System.out.println("end");

通過執行便可以看到訊息成功pub到sub端。

至於連線型別,該客戶端提供了三種

  • BlockingConnection 阻塞式
  • CallbackConnection 回撥函式式
  • FutureConnection 非同步式

java客戶端還能制定更多的通訊細節

// 連線前清空會話資訊 ,若設為false,MQTT伺服器將持久化客戶端會話的主體訂閱和ACK位置,預設為true
            mqtt.setCleanSession(CLEAN_START);
            // 設定心跳時間
            // ,定義客戶端傳來訊息的最大時間間隔秒數,伺服器可以據此判斷與客戶端的連線是否已經斷開,從而避免TCP/IP超時的長時間等待
            mqtt.setKeepAlive(KEEP_ALIVE);
            // 設定客戶端id,用於設定客戶端會話的ID。在setCleanSession(false);被呼叫時,MQTT伺服器利用該ID獲得相應的會話。
            // 此ID應少於23個字元,預設根據本機地址、埠和時間自動生成
            mqtt.setClientId(CLIENT_ID);
 //設定“遺囑”訊息的內容,預設是長度為零的訊息 mqtt.setWillMessage("willMessage");
             * //設定“遺囑”訊息的QoS,預設為QoS.ATMOSTONCE
             * mqtt.setWillQos(QoS.AT_LEAST_ONCE);
             * //若想要在釋出“遺囑”訊息時擁有retain選項,則為true mqtt.setWillRetain(true);
             * //設定“遺囑”訊息的話題,若客戶端與伺服器之間的連線意外中斷,伺服器將釋出客戶端的“遺囑”訊息
             * mqtt.setWillTopic("willTopic");
             */

            // ==失敗重連線設定說明
            // 設定重新連線的次數
            // ,客戶端已經連線到伺服器,但因某種原因連線斷開時的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,預設為-1
            mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
            // 設定重連的間隔時間 ,首次重連線間隔毫秒數,預設為10ms
            mqtt.setReconnectDelay(RECONNECTION_DELAY);
            // 客戶端首次連線到伺服器時,連線的最大重試次數,超出該次數客戶端將返回錯誤。-1意為無重試上限,預設為-1
            // mqtt.setConnectAttemptsMax(10L);

MQTT提供的pub/sub確實比redis的pub/sub機制強大些。