基於python實現MQTT釋出訂閱過程原理解析
MQTT簡介
MQTT 全稱為 Message Queuing Telemetry Transport(訊息佇列遙測傳輸)是一種基於釋出/訂閱正規化的“輕量級”訊息協議。該協議構建於TCP/IP協議上。
MQTT協議是輕量、簡單、開放和易於實現的,這些特點使它適用範圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通訊和物聯網(IoT)。
其在,通過衛星鏈路通訊感測器、偶爾撥號的醫療裝置、智慧家居、及一些小型化裝置中已廣泛使用。
MQTT特點
1、使用釋出/訂閱訊息模式,提供一對多的訊息釋出,解除應用程式耦合。該協議需要客戶端和服務端,而協議中主要有三種身份:釋出者(Publisher)、代理(Broker,伺服器)、訂閱者(Subscriber)。其中,訊息的釋出者和訂閱者都是客戶端,訊息代理是伺服器,而訊息釋出者可以同時是訂閱者,實現了生產者與消費者的脫耦;
2、對負載內容遮蔽的訊息傳輸;
3、使用 TCP/IP 提供網路連線;
4、有三種訊息釋出服務質量:
- “至多一次”,訊息釋出完全依賴底層 TCP/IP 網路。會發生訊息丟失或重複。這一級別可用於如下情況,環境感測器資料,丟失一次讀記錄無所謂,因為不久後還會有第二次傳送。
- “至少一次”,確保訊息到達,但訊息重複可能會發生。
- “只有一次”,確保訊息到達一次。這一級別可用於如下情況,在計費系統中,訊息重複或丟失會導致不正確的結果。
5、小型傳輸,開銷很小(固定長度的頭部是 2 位元組),協議交換最小化,以降低網路流量;
6、使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制。
原理
MQTT協議有三種身份:釋出者、代理、訂閱者,釋出者和訂閱者都為客戶端,代理為伺服器,同時訊息的釋出者也可以是訂閱者(為了節約記憶體和流量釋出者和訂閱者一般都會定義在一起)。
MQTT傳輸的訊息分為主題(Topic,可理解為訊息的型別,訂閱者訂閱後,就會收到該主題的訊息內容(payload))和負載(payload,可以理解為訊息的內容)兩部分。
1.MQTT協議實現方式
實現MQTT協議需要客戶端和伺服器端通訊完成,在通訊過程中,MQTT協議中有三種身份:釋出者(Publish)、代理(Broker)(伺服器)、訂閱者(Subscribe)。其中,訊息的釋出者和訂閱者都是客戶端,訊息代理是伺服器,訊息釋出者可以同時是訂閱者。
MQTT傳輸的訊息分為:主題(Topic)和負載(payload)兩部分:
(1)Topic,可以理解為訊息的型別,訂閱者訂閱(Subscribe)後,就會收到該主題的訊息內容(payload);
(2)payload,可以理解為訊息的內容,是指訂閱者具體要使用的內容。
2.網路傳輸與應用訊息
MQTT會構建底層網路傳輸:它將建立客戶端到伺服器的連線,提供兩者之間的一個有序的、無損的、基於位元組流的雙向傳輸。
當應用資料通過MQTT網路傳送時,MQTT會把與之相關的服務質量(QoS)和主題名(Topic)相關連。
3.MQTT客戶端
一個使用MQTT協議的應用程式或者裝置,它總是建立到伺服器的網路連線。客戶端可以:
(1)釋出其他客戶端可能會訂閱的資訊;
(2)訂閱其它客戶端釋出的訊息;
(3)退訂或刪除應用程式的訊息;
(4)斷開與伺服器連線。
4.MQTT服務端
MQTT伺服器以稱為"訊息代理"(Broker),可以是一個應用程式或一臺裝置。它是位於訊息釋出者和訂閱者之間,它可以:
(1)接受來自客戶的網路連線;
(2)接受客戶釋出的應用資訊;
(3)處理來自客戶端的訂閱和退訂請求;
(4)向訂閱的客戶轉發應用程式訊息。
5.MQTT協議中的訂閱、主題、會話
一、訂閱(Subscription)
訂閱包含主題篩選器(Topic Filter)和最大服務質量(QoS)。訂閱會與一個會話(Session)關聯。一個會話可以包含多個訂閱。每一個會話中的每個訂閱都有一個不同的主題篩選器。
二、會話(Session)
每個客戶端與伺服器建立連線後就是一個會話,客戶端和伺服器之間有狀態互動。會話存在於一個網路之間,也可能在客戶端和伺服器之間跨越多個連續的網路連線。
三、主題名(Topic Name)
連線到一個應用程式訊息的標籤,該標籤與伺服器的訂閱相匹配。伺服器會將訊息傳送給訂閱所匹配標籤的每個客戶端。
四、主題篩選器(Topic Filter)
一個對主題名萬用字元篩選器,在訂閱表示式中使用,表示訂閱所匹配到的多個主題。
五、負載(Payload)
訊息訂閱者所具體接收的內容。
6.MQTT協議中的方法
MQTT協議中定義了一些方法(也被稱為動作),來於表示對確定資源所進行操作。這個資源可以代表預先存在的資料或動態生成資料,這取決於伺服器的實現。通常來說,資源指伺服器上的檔案或輸出。主要方法有:
(1)Connect。等待與伺服器建立連線。
(2)Disconnect。等待MQTT客戶端完成所做的工作,並與伺服器斷開TCP/IP會話。
(3)Subscribe。等待完成訂閱。
(4)UnSubscribe。等待伺服器取消客戶端的一個或多個topics訂閱。
(5)Publish。MQTT客戶端傳送訊息請求,傳送完成後返回應用程式執行緒。
7.應用場景
因為它傳輸訊息具有非同步性(釋出訂閱模式),同時該協議本身的輕量特定,因此可用於輕量級應用,
可作為物聯網的通訊元件使用,例如樹莓派上完全可以搭建一個mqtt伺服器,當未來智慧家居全面普及的時候,
家居中的訊息通訊都可用此實現,如智慧冰箱溫度檢測,房間溫度檢測等資訊都能通過mqtt去實現,遙感資料、
汽車檢測資料、智慧家居、智慧城市、醫療醫護都具有應用場景。
客戶端
#!/usr/bin/env python # encoding: utf-8 """ @version: v1.0 @author: W_H_J @license: Apache Licence @contact: [email protected] @software: PyCharm @file: clicentMqttTest.py @time: 2019/2/22 14:19 @describe: mqtt客戶端 """ import json import sys import os import paho.mqtt.client as mqtt import time sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..')) sys.path.append("..") TASK_TOPIC = 'test' # 客戶端釋出訊息主題 client_id = time.strftime('%Y%m%d%H%M%S',time.localtime(time.time())) """ client_id是連線到代理。如果client_id的長度為零或為零,則行為為由使用的協議版本定義。如果使用MQTT v3.1.1, 那麼一個零長度的客戶機id將被髮送到代理,代理將被髮送為客戶端生成一個隨機變數。如果使用MQTT v3.1,那麼id將是 隨機生成的。在這兩種情況下,clean_session都必須為True。如果這在這種情況下不會產生ValueError。 注意:一般情況下如果客戶端服務端啟用兩個監聽那麼客戶端client_id 不能與伺服器相同,如這裡用時間"20190222142358"作為它的id, 如果與伺服器id相同,則無法接收到訊息 """ client = mqtt.Client(client_id,transport='tcp') client.connect("127.0.0.1",1883,60) # 此處埠預設為1883,通訊埠期keepalive預設60 client.loop_start() def clicent_main(message: str): """ 客戶端釋出訊息 :param message: 訊息主體 :return: """ time_now = time.strftime('%Y-%m-%d %H-%M-%S',time.localtime(time.time())) payload = {"msg": "%s" % message,"data": "%s" % time_now} # publish(主題:Topic; 訊息內容) client.publish(TASK_TOPIC,json.dumps(payload,ensure_ascii=False)) print("Successful send message!") return True if __name__ == '__main__': msg = "我是一條測試資料!" clicent_main(msg) client
服務端
#!/usr/bin/env python # encoding: utf-8 """ @version: v1.0 @author: W_H_J @license: Apache Licence @contact: [email protected] @software: PyCharm @file: serverMqttTest.py @time: 2019/2/22 14:35 @describe: mqtt 服務端 """ import json import sys import os import time import paho.mqtt.client as mqtt sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..')) sys.path.append("..") REPORT_TOPIC = 'test' # 主題 def on_connect(client,userdata,flags,rc): print('connected to mqtt with resurt code ',rc) client.subscribe(REPORT_TOPIC) # 訂閱主題 def on_message(client,msg): """ 接收客戶端傳送的訊息 :param client: 連線資訊 :param userdata: :param msg: 客戶端返回的訊息 :return: """ print("Start server!") payload = json.loads(msg.payload.decode('utf-8')) print(payload) def server_conenet(client): client.on_connect = on_connect # 啟用訂閱模式 client.on_message = on_message # 接收訊息 client.connect("127.0.0.1",60) # 連結 # client.loop_start() # 以start方式執行,需要啟動一個守護執行緒,讓服務端執行,否則會隨主執行緒死亡 client.loop_forever() # 以forever方式阻塞執行。 def server_stop(client): client.loop_stop() # 停止服務端 sys.exit(0) def server_main(): client_id = time.strftime('%Y%m%d%H%M%S',time.localtime(time.time())) client = mqtt.Client(client_id,transport='tcp') server_conenet(client) if __name__ == '__main__': # 啟動監聽 server_main() server
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。