1. 程式人生 > 程式設計 >在 Python 中使用 MQTT的方法

在 Python 中使用 MQTT的方法

Python 是一種廣泛使用的解釋型、高階程式設計、通用型程式語言。Python 的設計哲學強調程式碼的可讀性和簡潔的語法(尤其是使用空格縮排劃分程式碼塊,而非使用大括號或者關鍵詞)。Python 讓開發者能夠用更少的程式碼表達想法,不管是小型還是大型程式,該語言都試圖讓程式的結構清晰明瞭。

MQTT 是一種基於釋出/訂閱模式的 輕量級物聯網訊息傳輸協議 ,可以用極少的程式碼和頻寬為聯網裝置提供實時可靠的訊息服務,它廣泛應用於物聯網、移動網際網路、智慧硬體、車聯網、電力能源等行業。

本文主要介紹如何在 Python 專案中使用 paho-mqtt 客戶端庫 ,實現客戶端與 MQTT 伺服器的連線、訂閱、取消訂閱、收發訊息等功能。

專案初始化

本專案使用 Python 3.6 進行開發測試,讀者可用如下命令確認 Python 的版本。

➜ ~ python3 --version    
Python 3.6.7

選擇 MQTT 客戶端庫

paho-mqtt 是目前 Python 中使用較多的 MQTT 客戶端庫,它在 Python 2.7 或 3.x 上為客戶端類提供了對 MQTT v3.1 和 v3.1.1 的支援。它還提供了一些幫助程式功能,使將訊息釋出到 MQTT 伺服器變得非常簡單。

Pip 安裝 Paho MQTT 客戶端

Pip 是 Python 包管理工具,該工具提供了對 Python 包的查詢、下載、安裝、解除安裝的功能。

pip3 install -i https://pypi.doubanio.com/simple paho-mqtt

Python MQTT 使用

連線 MQTT 伺服器

本文將使用 EMQ X 提供的 免費公共 MQTT 伺服器 ,該服務基於 EMQ X 的 MQTT 物聯網雲平臺 建立。伺服器接入資訊如下:

  • Broker: broker.emqx.io
  • TCP Port: 1883
  • Websocket Port: 8083

匯入 Paho MQTT客戶端

from paho.mqtt import client as mqtt_client

設定 MQTT Broker 連線引數

設定 MQTT Broker 連線地址,埠以及 topic,同時我們呼叫 Python random.randint 函式隨機生成 MQTT 客戶端 id。

broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
client_id = f'python-mqtt-{random.randint(0,1000)}'

編寫 MQTT 連線函式

編寫連接回調函式 on_connect ,該函式將在客戶端連線後被呼叫,在該函式中可以依據 rc 來判斷客戶端是否連線成功。通常同時我們將建立一個 MQTT 客戶端,該客戶端將連線到 broker.emqx.io 。

def connect_mqtt():
 def on_connect(client,userdata,flags,rc):
  if rc == 0:
   print("Connected to MQTT Broker!")
  else:
   print("Failed to connect,return code %d\n",rc)
 # Set Connecting Client ID
 client = mqtt_client.Client(client_id)
 client.on_connect = on_connect
 client.connect(broker,port)
 return client

釋出訊息

首先定義一個 while 迴圈語句,在迴圈中我們將設定每秒呼叫 MQTT 客戶端 publish 函式向 /python/mqtt 主題傳送訊息。

def publish(client):
  msg_count = 0
  while True:
   time.sleep(1)
   msg = f"messages: {msg_count}"
   result = client.publish(topic,msg)
   # result: [0,1]
   status = result[0]
   if status == 0:
    print(f"Send `{msg}` to topic `{topic}`")
   else:
    print(f"Failed to send message to topic {topic}")
   msg_count += 1

訂閱訊息

編寫訊息回撥函式 on_message ,該函式將在客戶端從 MQTT Broker 收到訊息後被呼叫,在該函式中我們將打印出訂閱的 topic 名稱以及接收到的訊息內容。

def subscribe(client: mqtt_client):
 def on_message(client,msg):
  print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

 client.subscribe(topic)
 client.on_message = on_message

完整程式碼

訊息釋出程式碼

# python 3.6

import random
import time

from paho.mqtt import client as mqtt_client


broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0,1000)}'


def connect_mqtt():
 def on_connect(client,rc)

 client = mqtt_client.Client(client_id)
 client.on_connect = on_connect
 client.connect(broker,port)
 return client


def publish(client):
 msg_count = 0
 while True:
  time.sleep(1)
  msg = f"messages: {msg_count}"
  result = client.publish(topic,msg)
  # result: [0,1]
  status = result[0]
  if status == 0:
   print(f"Send `{msg}` to topic `{topic}`")
  else:
   print(f"Failed to send message to topic {topic}")
  msg_count += 1


def run():
 client = connect_mqtt()
 client.loop_start()
 publish(client)


if __name__ == '__main__':
 run()

訊息訂閱程式碼

# python 3.6

import random
import time

from paho.mqtt import client as mqtt_client


broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0,1]
  status = result[0]
  if status == 0:
   print(f"Send `{msg}` to topic `{topic}`")
  else:
   print(f"Failed to send message to topic {topic}")
  msg_count += 1


def run():
 client = connect_mqtt()
 client.loop_start()
 publish(client)


if __name__ == '__main__':
 run()
訊息訂閱程式碼
# python3.6

import random

from paho.mqtt import client as mqtt_client


broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0,100)}'


def connect_mqtt() -> mqtt_client:
 def on_connect(client,port)
 return client


def subscribe(client: mqtt_client):
 def on_message(client,msg):
  print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

 client.subscribe(topic)
 client.on_message = on_message


def run():
 client = connect_mqtt()
 subscribe(client)
 client.loop_forever()


if __name__ == '__main__':
 run()

測試

訊息釋出

執行 MQTT 訊息釋出程式碼,我們將看到客戶端連線成功,並且成功將訊息釋出。

python3 pub.py

在 Python 中使用 MQTT的方法

訊息訂閱

執行 MQTT 訊息訂閱程式碼,我們將看到客戶端連線成功,並且成功接收到釋出的訊息。

python3 sub.py

在 Python 中使用 MQTT的方法

總結

至此,我們完成了使用 paho-mqtt 客戶端連線到 公共 MQTT 伺服器 ,並實現了測試客戶端與 MQTT 伺服器的連線、訊息釋出和訂閱。

與 C ++ 或 Java 之類的高階語言不同,Python 比較適合裝置側的業務邏輯實現,使用 Python 您可以減少程式碼上的邏輯複雜度,降低與裝置的互動成本。我們相信在物聯網領域 Python 將會有更廣泛的應用。

接下來我們將會陸續釋出更多關於物聯網開發及 Python 的相關文章,敬請關注。

以上就是在 Python 中使用 MQTT的方法的詳細內容,更多關於Python 中使用 MQTT的資料請關注我們其它相關文章!