在 Python 中使用 MQTT的方法
Python 是一種廣泛使用的解釋型、高階程式設計、通用型程式語言。Python 的設計哲學強調程式碼的可讀性和簡潔的語法(尤其是使用空格縮排劃分程式碼塊,而非使用大括號或者關鍵詞)。Python 讓開發者能夠用更少的程式碼表達想法,不管是小型還是大型程式,該語言都試圖讓程式的結構清晰明瞭。
MQTT 是一種基於釋出/訂閱模式的 輕量級物聯網訊息傳輸協議 ,可以用極少的程式碼和頻寬為聯網裝置提供實時可靠的訊息服務,它廣泛應用於物聯網、移動網際網路、智慧硬體、車聯網、電力能源等行業。
本文主要介紹如何在 Python 專案中使用 paho-mqtt 客戶端庫 ,實現客戶端與 MQTT 伺服器的連線、訂閱、取消訂閱、收發訊息等功能。
專案初始化
本專案使用 Python 3.6 進行開發測試,讀者可用如下命令確認 Python 的版本。
➜ ~ python3 --version Python 3.6.7
選擇 MQTT 客戶端庫
paho-mqtt 是目前 Python 中使用較多的 MQTT 客戶端庫,它在 Python 2.7 或 3.x 上為客戶端類提供了對 MQTT v3.1 和 v3.1.1 的支援。它還提供了一些幫助程式功能,使將訊息釋出到 MQTT 伺服器變得非常簡單。
Pip 安裝 Paho MQTT 客戶端
Pip 是 Python 包管理工具,該工具提供了對 Python 包的查詢、下載、安裝、解除安裝的功能。
pip3 install -i https://pypi.doubanio.com/simple paho-mqtt
Python MQTT 使用
連線 MQTT 伺服器
本文將使用 EMQ X 提供的 免費公共 MQTT 伺服器 ,該服務基於 EMQ X 的 MQTT 物聯網雲平臺 建立。伺服器接入資訊如下:
- Broker: broker.emqx.io
- TCP Port: 1883
- Websocket Port: 8083
匯入 Paho MQTT客戶端
from paho.mqtt import client as mqtt_client
設定 MQTT Broker 連線引數
設定 MQTT Broker 連線地址,埠以及 topic,同時我們呼叫 Python random.randint 函式隨機生成 MQTT 客戶端 id。
broker = 'broker.emqx.io' port = 1883 topic = "/python/mqtt" client_id = f'python-mqtt-{random.randint(0,1000)}'
編寫 MQTT 連線函式
編寫連接回調函式 on_connect ,該函式將在客戶端連線後被呼叫,在該函式中可以依據 rc 來判斷客戶端是否連線成功。通常同時我們將建立一個 MQTT 客戶端,該客戶端將連線到 broker.emqx.io 。
def connect_mqtt(): def on_connect(client,userdata,flags,rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect,return code %d\n",rc) # Set Connecting Client ID client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker,port) return client
釋出訊息
首先定義一個 while 迴圈語句,在迴圈中我們將設定每秒呼叫 MQTT 客戶端 publish 函式向 /python/mqtt 主題傳送訊息。
def publish(client): msg_count = 0 while True: time.sleep(1) msg = f"messages: {msg_count}" result = client.publish(topic,msg) # result: [0,1] status = result[0] if status == 0: print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") msg_count += 1
訂閱訊息
編寫訊息回撥函式 on_message ,該函式將在客戶端從 MQTT Broker 收到訊息後被呼叫,在該函式中我們將打印出訂閱的 topic 名稱以及接收到的訊息內容。
def subscribe(client: mqtt_client): def on_message(client,msg): print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") client.subscribe(topic) client.on_message = on_message
完整程式碼
訊息釋出程式碼
# python 3.6 import random import time from paho.mqtt import client as mqtt_client broker = 'broker.emqx.io' port = 1883 topic = "/python/mqtt" # generate client ID with pub prefix randomly client_id = f'python-mqtt-{random.randint(0,1000)}' def connect_mqtt(): def on_connect(client,rc) client = mqtt_client.Client(client_id) client.on_connect = on_connect client.connect(broker,port) return client def publish(client): msg_count = 0 while True: time.sleep(1) msg = f"messages: {msg_count}" result = client.publish(topic,msg) # result: [0,1] status = result[0] if status == 0: print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") msg_count += 1 def run(): client = connect_mqtt() client.loop_start() publish(client) if __name__ == '__main__': run()
訊息訂閱程式碼
# python 3.6 import random import time from paho.mqtt import client as mqtt_client broker = 'broker.emqx.io' port = 1883 topic = "/python/mqtt" # generate client ID with pub prefix randomly client_id = f'python-mqtt-{random.randint(0,1] status = result[0] if status == 0: print(f"Send `{msg}` to topic `{topic}`") else: print(f"Failed to send message to topic {topic}") msg_count += 1 def run(): client = connect_mqtt() client.loop_start() publish(client) if __name__ == '__main__': run() 訊息訂閱程式碼 # python3.6 import random from paho.mqtt import client as mqtt_client broker = 'broker.emqx.io' port = 1883 topic = "/python/mqtt" # generate client ID with pub prefix randomly client_id = f'python-mqtt-{random.randint(0,100)}' def connect_mqtt() -> mqtt_client: def on_connect(client,port) return client def subscribe(client: mqtt_client): def on_message(client,msg): print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") client.subscribe(topic) client.on_message = on_message def run(): client = connect_mqtt() subscribe(client) client.loop_forever() if __name__ == '__main__': run()
測試
訊息釋出
執行 MQTT 訊息釋出程式碼,我們將看到客戶端連線成功,並且成功將訊息釋出。
python3 pub.py
訊息訂閱
執行 MQTT 訊息訂閱程式碼,我們將看到客戶端連線成功,並且成功接收到釋出的訊息。
python3 sub.py
總結
至此,我們完成了使用 paho-mqtt 客戶端連線到 公共 MQTT 伺服器 ,並實現了測試客戶端與 MQTT 伺服器的連線、訊息釋出和訂閱。
與 C ++ 或 Java 之類的高階語言不同,Python 比較適合裝置側的業務邏輯實現,使用 Python 您可以減少程式碼上的邏輯複雜度,降低與裝置的互動成本。我們相信在物聯網領域 Python 將會有更廣泛的應用。
接下來我們將會陸續釋出更多關於物聯網開發及 Python 的相關文章,敬請關注。
以上就是在 Python 中使用 MQTT的方法的詳細內容,更多關於Python 中使用 MQTT的資料請關注我們其它相關文章!