springboot + rabbitmq 做智慧家居,我也沒想到會這麼簡單
阿新 • • 發佈:2020-06-21
>本文收錄在個人部落格:[www.chengxy-nds.top](http://www.chengxy-nds.top),共享技術資源,共同進步
前一段有幸參與到一個智慧家居專案的開發,由於之前都沒有過這方面的開發經驗,所以對智慧硬體的開發模式和技術棧都頗為好奇。
![智慧可燃氣體報警器](https://img-blog.csdnimg.cn/20200619094346691.jpg)
產品是一款可燃氣體報警器,如果家中燃氣洩露濃度到達一定閾值,報警器檢測到並上傳氣體濃度值給後臺,後臺以電話、簡訊、微信等方式,提醒使用者家中可能有氣體洩漏。
使用者還可能向報警器發一些關閉報警、調整音量的指令等。整體功能還是比較簡單的,大致的邏輯如下圖所示:
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200619174414764.png)
但當我真正的參與其中開發時,其實有一點小小的失望,因為在整個研發過程中,並沒用到什麼新的技術,還是常規的幾種中介軟體,只不過換個用法而已。
技術選型用`rabbitmq` 來做核心的元件,主要考慮到運維成本低,組內成員使用的熟練度比較高。
---
下面和小夥伴分享一下如何用 `springboot` + `rabbitmq` 搭建物聯網(`IOT`)平臺,其實智慧硬體也沒想象的那麼高不可攀!
很多小夥伴可能有點懵?`rabbitmq` 不是訊息佇列嗎?**怎麼又能做智慧硬體了**?
其實`rabbitmq`有兩種協議,我們平時接觸的訊息佇列是用的`AMQP`協議,而用在智慧硬體中的是`MQTT`協議。
### 一、什麼是 MQTT協議?
`MQTT` 全稱(Message Queue Telemetry Transport):一種基於釋出/訂閱(`publish`/`subscribe`)模式的`輕量級`通訊協議,通過訂閱相應的主題來獲取訊息,是物聯網(`Internet of Thing`)中的一個標準傳輸協議。
該協議將訊息的釋出者(`publisher`)與訂閱者(`subscriber`)進行分離,因此可以在不可靠的網路環境中,為遠端連線的裝置提供可靠的訊息服務,使用方式與傳統的MQ有點類似。
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200609165130729.png?#pic_center)
`TCP`協議位於傳輸層,`MQTT` 協議位於應用層,`MQTT` 協議構建於`TCP/IP`協議上,也就是說只要支援`TCP/IP`協議棧的地方,都可以使用`MQTT`協議。
### 二、為什麼要用 MQTT協議?
`MQTT`協議為什麼在物聯網(IOT)中如此受偏愛?而不是其它協議,比如我們更為熟悉的 `HTTP`協議呢?
- 首先`HTTP`協議它是一種同步協議,客戶端請求後需要等待伺服器的響應。而在物聯網(IOT)環境中,裝置會很受制於環境的影響,比如頻寬低、網路延遲高、網路通訊不穩定等,顯然非同步訊息協議更為適合`IOT`應用程式。
- `HTTP`是單向的,如果要獲取訊息客戶端必須發起連線,而在物聯網(IOT)應用程式中,裝置或感測器往往都是客戶端,這意味著它們無法被動地接收來自網路的命令。
- 通常需要將一條命令或者訊息,傳送到網路上的所有裝置上。`HTTP`要實現這樣的功能不但很困難,而且成本極高。
### 三、MQTT協議介紹
前邊說過`MQTT`是一種輕量級的協議,它只專注於發訊息, 所以此協議的結構也非常簡單。
#### MQTT資料包
在`MQTT`協議中,一個`MQTT`資料包由:`固定頭`(Fixed header)、 `可變頭`(Variable header)、 `訊息體`(payload)三部分構成。
- 固定頭(Fixed header),所有資料包中都有固定頭,包含資料包型別及資料包的分組標識。
- 可變頭(Variable header),部分資料包型別中有可變頭。
- 內容訊息體(Payload),存在於部分資料包類,是客戶端收到的具體訊息內容。
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200620130648304.png#pic_center)
**1、固定頭**
固定頭部,使用兩個位元組,共16位:
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200621001520940.png)
(4-7)位表示訊息型別,使用4位二進位制表示,可代表如下的16種訊息型別,不過 0 和 15位置屬於保留待用,所以共14種訊息事件型別。
![圖片源於網路,如有侵權聯絡刪除](https://img-blog.csdnimg.cn/20200620215046195.png)
**DUP Flag(重試標識)**
DUP Flag:保證訊息可靠傳輸,訊息是否已送達的標識。預設為0,只佔用一個位元組,表示第一次傳送,當值為1時,表示當前訊息先前已經被傳送過。
**QoS Level(訊息質量等級)**
QoS Level:訊息的質量等級,後邊會詳細介紹
**RETAIN(持久化)**
- 值為`1`:表示傳送的訊息需要一直持久儲存,而且不受伺服器重啟影響,不但要傳送給當前的訂閱者,且以後新加入的客戶端訂閱了此`Topic`,訂閱者也會馬上得到推送。
**注意**:新加入的訂閱者,只會取出最新的一個`RETAIN flag = 1`的訊息推送。
- 值為`0`:僅為當前訂閱者推送此訊息。
**Remaining Length(剩餘長度)**
在當前訊息中剩餘的`byte`(位元組)數,包含可變頭部和訊息體payload。
**2、可變頭**
固定頭部僅定義了訊息型別和一些標誌位,一些訊息的元資料需要放入可變頭部中。可變頭部內容位元組長度 + 訊息體payload = 剩餘長度。
可變頭部居於固定頭部和payload中間,包含了協議名稱,版本號,連線標誌,使用者授權,心跳時間等內容。
可變頭存在於這些型別的訊息:PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK。
**3、訊息體payload**
訊息體payload只存在於`CONNECT`、`PUBLISH`、`SUBSCRIBE`、`SUBACK`、`UNSUBSCRIBE`這幾種型別的訊息:
- `CONNECT`:包含客戶端的`ClientId`、訂閱的`Topic`、`Message`以及`使用者名稱`和`密碼`。
- `PUBLISH`:向對應主題傳送訊息。
- `SUBSCRIBE`:要訂閱的主題以及`QoS`。
- `SUBACK`:伺服器對於`SUBSCRIBE`所申請的主題及`QoS`進行確認和回覆。
- `UNSUBSCRIBE`:取消要訂閱的主題。
#### 訊息質量(QoS )
`訊息質量`(Quality of Service),即訊息的傳送質量,釋出者(`publisher`)和訂閱者(`subscriber`)都可以指定`qos`等級,有`QoS 0`、`QoS 1`、`QoS 2`三個等級。
下邊分別說明一下這三個等級的區別。
**1、Qos 0**:`At most once`(至多一次),只發送一次訊息,不保證訊息是否成功送達,沒有確認機制,訊息可能會丟失或重複。
![圖片源於網路,如有侵權聯絡刪除](https://img-blog.csdnimg.cn/20200620185008447.jpg)
**2、Qos 1**:`At least once`(至少一次),相對於`QoS 0`而言`Qos 1`增加了`ack`確認機制,傳送者(`publisher`)推送訊息到MQTT代理(`broker`)時,兩者自身都會先持久化訊息,只有當`publisher` 或者 `Broker`分別收到 `PUBACK`確認時,才會刪除自身持久化的訊息,否則就會重發。
但有個問題,儘管我們可以通過確認來保證一定收到客戶端 或 伺服器的`message`,可我們卻不能保證僅收到一次`message`,也就是當客戶端`publisher`沒收到`Broker`的`puback`或者 `Broker`沒有收到`subscriber`的`puback`,那麼就會一直重發。
**publisher -> broker 大致流程:**
1. publisher store msg -> publish ->broker (傳遞message)
1. broker -> puback -> publisher delete msg (確認傳遞成功)
![圖片源於網路,如有侵權聯絡刪除](https://img-blog.csdnimg.cn/20200620185035599.jpg#pic_center)
**3、Qos 2**:`Exactly once`(只有一次),相對於`QoS 1`,`QoS 2`升級實現了僅接受一次`message`,`publisher` 和 `broker` 同樣對訊息進行持久化,其中 `publisher` 快取了`message`和 對應的`msgID`,而 `broker` 快取了 `msgID`,可以保證訊息不重複,由於又增加了一個`confirm` 機制,整個流程變得複雜很多。
**publisher -> broker 大致流程:**
1. publisher store msg -> publish ->broker -> broker store
1. msgID(傳遞message) broker -> puberc (確認傳遞成功)
2. publisher -> pubrel ->broker delete msgID (告訴broker刪除msgID)
3. broker -> pubcomp -> publisher delete msg (告訴publisher刪除msg)
![圖片源於網路,如有侵權聯絡刪除](https://img-blog.csdnimg.cn/20200620185008459.jpg)
#### LWT(最後遺囑)
`LWT` 全稱為 `Last Will and Testament`,其實遺囑是一個由客戶端預先定義好的主題和對應訊息,附加在`CONNECT`的資料包中,包括`遺願主題`、`遺願 QoS`、`遺願訊息`等。
當MQTT代理 `Broker` 檢測到有客戶端`client`非正常斷開連線時,再由伺服器主動釋出此訊息,然後相關的訂閱者會收到訊息。
**舉個栗子**:聊天室中所有人都訂閱一個叫`talk`的主題 ,但小富由於網路抖動突然斷開了連結,這時聊天室中所有訂閱主題 `talk`的客戶端都會收到一個 “`小富離開聊天室`” 的遺願訊息。
遺囑的相關引數:
- `Will Flag`:是否使用 LWT,1 開啟
- `Will Topic`:遺願主題名,不可使用萬用字元
- `Will Qos`:釋出遺願訊息時使用的 QoS
- `Will Retain`:遺願訊息的 Retain 標識
- `Will Message`:遺願訊息內容
**那客戶端`Client` 有哪些場景是非正常斷開連線呢?**
- `Broker` 檢測到底層的 I/O 異常;
- 客戶端 未能在心跳 `Keep Alive` 的間隔內和 `Broker` 進行訊息互動;
- 客戶端 在關閉底層 `TCP` 連線前沒有傳送 `DISCONNECT` 資料包;
- 客戶端 傳送錯誤格式的資料包到 `Broker`,導致關閉和客戶端的連線等。
**注意**:當客戶端通過釋出 `DISCONNECT` 資料包斷開連線時,屬於正常斷開連線,並不會觸發 `LWT` 的機制,與此同時`Broker` 還會丟棄掉當前客戶端在連線時指定的相關 `LWT` 引數。
### 四、MQTT協議應用場景
`MQTT`協議廣泛應用於物聯網、移動網際網路、智慧硬體、車聯網、電力能源等領域。使用的場景也是非常非常多,下邊列舉一些:
- 物聯網M2M通訊,物聯網大資料採集
- Android訊息推送,WEB訊息推送
- 移動即時訊息,例如Facebook Messenger
- 智慧硬體、智慧傢俱、智慧電器
- 車聯網通訊,電動車站樁採集
- 智慧城市、遠端醫療、遠端教育
- 電力、石油與能源等行業市場
### 五、程式碼實現
具體 `rabbitmq` 的環境搭建就不贅述了,網上教程比較多,有條件的用伺服器,沒條件的像我搞個`Windows`版的也很快樂嘛。
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/2020060911125490.jpg?#pic_center)
#### 1、啟用 rabbitmq的mqtt協議
我們先開啟 `rabbitmq` 的 `mqtt`協議,因為預設安裝下是關閉的,命令如下:
```javascript
rabbitmq-plugins enable rabbitmq_mqtt
```
#### 2、mqtt 客戶端依賴包
上一步中安裝`rabbitmq`環境並開啟 `mqtt`協議後,實際上`mqtt` 訊息代理服務就搭建好了,接下來要做的就是實現客戶端訊息的推送和訂閱。
這裡使用`spring-integration-mqtt`、`org.eclipse.paho.client.mqttv3`兩個工具包實現。
```javascript
org.springframework.integration
spring-integration-mqtt
org.eclipse.paho
org.eclipse.paho.client.mqttv3
1.2.0
```
#### 3、訊息傳送者
訊息的傳送比較簡單,主要是應用到`@ServiceActivator`註解,需要注意`messageHandler.setAsync`屬性,如果設定成`false`,關閉非同步模式傳送訊息時可能會阻塞。
```javascript
@Configuration
public class IotMqttProducerConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(mqttConfig.getServers());
return factory;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "iotMqttInputChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getServerClientId(), mqttClientFactory());
messageHandler.setAsync(false);
messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
return messageHandler;
}
}
```
`MQTT` 對外提供傳送訊息的`API`時,需要使用`@MessagingGateway` 註解,去提供一個訊息閘道器代理,引數`defaultRequestChannel` 指定傳送訊息繫結的`channel`。
可以實現三種`API`介面,`payload` 為傳送的訊息,`topic` 傳送訊息的主題,`qos` 訊息質量。
```javascript
@MessagingGateway(defaultRequestChannel = "iotMqttInputChannel")
public interface IotMqttGateway {
// 向預設的 topic 傳送訊息
void sendMessage2Mqtt(String payload);
// 向指定的 topic 傳送訊息
void sendMessage2Mqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);
// 向指定的 topic 傳送訊息,並指定服務質量引數
void sendMessage2Mqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
```
#### 4、訊息訂閱
訊息訂閱和我們平時用的MQ訊息監聽實現思路基本相似,`@ServiceActivator`註解表明當前方法用於處理`MQTT`訊息,`inputChannel` 引數指定了用於接收訊息的`channel`。
```javascript
/**
* @Author: xiaofu
* @Description: 訊息訂閱配置
* @date 2020/6/8 18:24
*/
@Configuration
public class IotMqttSubscriberConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(mqttConfig.getServers());
return factory;
}
@Bean
public MessageChannel iotMqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(iotMqttInputChannel());
return adapter;
}
/**
* @author xiaofu
* @description 訊息訂閱
* @date 2020/6/8 18:20
*/
@Bean
@ServiceActivator(inputChannel = "iotMqttInputChannel")
public MessageHandler handlerTest() {
return message -> {
try {
String string = message.getPayload().toString();
System.out.println("接收到訊息:" + string);
} catch (MessagingException ex) {
//logger.info(ex.getMessage());
}
};
}
}
```
### 六、測試訊息
額~ 由於本渣渣對硬體一竅不通,為了模擬硬體的傳送訊息,只能藉助一下工具,其實硬體端實現`MQTT`協議,跟我們前邊的基本沒什麼區別,只不過換種語言嵌入到硬體中而已。
這裡選的測試工具為`mqttbox`,下載地址:`http://workswithweb.com/mqttbox.html`
#### 1、測試訊息傳送
我們用先用`mqttbox`模擬向主題`mqtt_test_topic`傳送訊息,看後臺是否能成功接收到。
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/2020061918163049.png)
看到後臺成功拿到了向主題`mqtt_test_topic`傳送的訊息。
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200619182208808.png?)
#### 2、測試訊息訂閱
用`mqttbox`模擬訂閱主題`mqtt_test_topic`,在後臺向主題`mqtt_test_topic`傳送一條訊息,這裡我簡單的寫了個`controller`呼叫API傳送訊息。
http://127.0.0.1:8080/fun/testMqtt?topic=mqtt_test_topic&message=我是後臺向主題 mqtt_test_topic 傳送的訊息
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200619183645343.png)
我們看`mqttbox`的訂閱訊息,已經成功的接收到了後臺的訊息,到此我們的`MQTT`通訊環境就算搭建成功了。如果把`mqttbox`工具換成具體硬體裝置,整個流程就是我們常說的智慧家居了,其實真的沒那麼難。
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200619183705312.png)
### 七、應用注意事項
在我們實際的生產環境中遇到過的問題,這裡分享一下讓大家少踩坑。
#### clientId 要唯一
在客戶端`connect`連線的時,會有一個`clientId` 引數,需要每個客戶端都保持唯一的。但我們在開發測試階段`clientId`直接在程式碼中寫死了,而且服務都是單例項部署,並沒有暴露出什麼問題。
```javascript
MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());
```
然而在生產環境內側的時候,由於服務是多例項叢集部署,結果出現了下邊的奇怪問題。同一時間內只能有一個客戶端能拿到訊息,其他客戶端不但不能消費訊息,而且還在不斷的掉線重連:`Lost connection: 已斷開連線; retrying...`。
![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200619190020404.png)
這就是由於`clientId`相同導致客戶端間相互競爭消費,最後將`clientId`獲取方式換成從發號器中拿,問題就好了,所以這個地方是需要特別注意的。
平時程式在開發環境沒問題,可偏偏到了生產環境就一大堆問題,很多都是因為服務部署方式不同導致的。所以多學習分散式還是很有必要的。
### 八、其他中介軟體
`MQTT`它只是一種協議,支援`MQTT`協議的訊息中介軟體產品非常多,下邊的也只是其中的一部分
- Mosquitto
- Eclipse Paho
- RabbitMQ
- Apache ActiveMQ
- HiveMQ
- JoramMQ
- ThingMQ
- VerneMQ
- Apache Apollo
- emqttd Xively
- IBM Websphere
.....
### 總結
我也是第一次做和硬體相關的專案,之前聽到智慧家居都會覺得好高大上,但實際上手開發後發現,技術嘛萬變不離其宗,也只是換種用法而已。
雙手奉上專案 demo 的`github`地址 :https://github.com/chengxy-nds/springboot-rabbitmq-mqtt.git
感興趣的小夥伴可以下載跑一跑,實現起來非常的簡單。
* * *
**原創不易,燃燒秀髮輸出內容,希望你能有一丟丟收穫!**
整理了幾百本各類技術電子書,送給小夥伴們,關注公號回覆【666】自行領取。和一些小夥伴們建了一個技術交流群,一起探討技術、分享技術資料,旨在共同學習進步,如果感興趣就加入我們吧!
![](https://imgconvert.csdnimg.cn/aHR0cHM6Ly91c2VyLWdvbGQtY2RuLnhpdHUuaW8vMjAyMC8yLzQvMTcwMGU0Mjk1MDQzMjQ0Yg?x-oss-process=image/for