mqtt協議------paho-mqtt協議
- 一、Client模組
- 二、Publish模組
- 三、Subscribe模組
一、Client模組
與MQTT代理(broker)進行通訊的主要類。
(一)使用流程
- 使用
connect()
/connect_async()
連線MQTT代理 - 頻繁的呼叫
loop()
來維持與MQTT代理之間的流量- 或者使用
loop_start()
來設定一個執行緒為你呼叫loop()
- 或者在一個阻塞的函式中呼叫
loop_forever()
來為你呼叫loop()
- 或者使用
- 使用
subscribe()
訂閱一個主題(topic)並接受訊息(messages) - 使用
publish()
來發送訊息 - 使用
disconnect()
來斷開與MQTT代理的連線
(二)回撥(Callbacks)
1.基本概念
使用回撥處理從MQTT代理返回的資料,要使用回撥需要先定義回撥函式然後將其指派給客戶端例項(client)。
例如:
# 定義一個回撥函式
defon_connect(client, userdata, flags, rc):
print("Connection returned " + str(rc))
# 將回調函式指派給客戶端例項
client.on_connect = on_connect
- 1
- 2
- 3
- 4
- 5
- 6
所有的回撥函式都有client
和userdata
引數。 client
是呼叫回撥的客戶端例項; userdata
可以使任何型別的使用者資料,可以在建立新客戶端例項時設定或者使用user_data_set(userdata)設定。
2.回撥種類
(1)on_connect()
當代理響應連線請求時呼叫。 on_connect(client, userdata, flags, rc):
flags
是一個包含代理回覆的標誌的字典; rc
的值決定了連線成功或者不成功:
值 | 連線情況 |
---|---|
0 | 連線成功 |
1 | 協議版本錯誤 |
2 | 無效的客戶端標識 |
3 | 伺服器無法使用 |
4 | 錯誤的使用者名稱或密碼 |
5 | 未經授權 |
(2)on_disconnect()
當與代理斷開連線時呼叫
on_disconnect(client, userdata, rc):
- 1
rc
引數表示斷開狀態。
如果MQTT_ERR_SUCCESS(0),回撥被呼叫以響應disconnect()呼叫。 如果以任何其他值斷開連線是意外的,例如可能出現網路錯誤。
(3)on_message()
on_message(client, userdata, message):
- 1
當收到關於客戶訂閱的主題的訊息時呼叫。 message
是一個描述所有訊息引數的MQTTMessage。
(4)on_publish()
當使用使用publish()
傳送的訊息已經傳輸到代理時被呼叫。
on_publish(client, userdata, mid):
- 1
對於Qos級別為1和2的訊息,這意味著已經完成了與代理的握手。
對於Qos級別為0的訊息,這隻意味著訊息離開了客戶端。 mid
變數與從相應的publish()
返回的mid變數匹配,以允許跟蹤傳出的訊息。
此回撥很重要,因為即使publish()呼叫返回,但並不總意味著訊息已傳送。
(5)on_subscribe()
當代理響應訂閱請求時被呼叫。
on_subscribe(client, userdata, mid, granted_qos):
- 1
mid
變數匹配從相應的subscri
返回的
be()mid
變數。
‘granted_qos’變數是一個整數列表,它提供了代理為每個不同的訂閱請求授予的QoS級別。
(6)on_unsubscribe()
當代理響應取消訂閱請求時呼叫。
on_unsubscribe(client, userdata, mid):
- 1
mid
匹配從相應的unsubscribe()呼叫返回的中間變數。
(7)on_log()
當客戶端有日誌資訊時呼叫
on_log(client, userdata, level, buf):
- 1
level
變數給出了訊息的嚴重性,並且將是MQTT_LOG_INFO,MQTT_LOG_NOTICE,MQTT_LOG_WARNING,MQTT_LOG_ERR和MQTT_LOG_DEBUG中的一個。 buf
變數用於儲存資訊。
(三)方法
1.建構函式Client()
Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")
- 1
引數 | 含義 |
---|---|
client_id | 連線到代理時使用的唯一客戶端ID字串。 如果client_id長度為零或無,則會隨機生成一個。 在這種情況下,clean_session引數必須為True。 |
clean_session | 一個決定客戶端型別的布林值。 如果為True,那麼代理將在其斷開連線時刪除有關此客戶端的所有資訊。 如果為False,則客戶端是持久客戶端,當客戶端斷開連線時,訂閱資訊和排隊訊息將被保留。 |
userdata | 使用者定義的任何型別的資料作為userdata引數傳遞給回撥函式。 它可能會在稍後使用user_data_set()函式進行更新。 |
protocol | 用於此客戶端的MQTT協議的版本。 可以是MQTTv31或MQTTv311。 |
transport | 設定為“websockets”通過WebSockets傳送MQTT。 保留預設的“tcp”使用原始TCP。 |
示例:
import paho.mqtt.client as mqtt
client = mqtt.Client()
- 1
- 2
2.reinitialise()
reinitialise(client_id="", clean_session=True, userdata=None)
- 1
reinitialise()函式將客戶端重置為其開始狀態,就像它剛剛建立一樣。 它採用與Client()建構函式相同的引數。
示例:
client.reinitialise()
- 1
3.選項函式
這些函式表示可以在客戶端上設定以修改其行為的選項。 在大多數情況下,這必須在連線到代理之前完成。
(1)max_inflight_messages_set()
max_inflight_messages_set(self, inflight)
- 1
設定QoS> 0的訊息的最大數量,該訊息一次可以部分通過其網路流量。預設為20.增加此值將消耗更多記憶體,但可以增加吞吐量。
(2)max_queued_messages_set()
max_queued_messages_set(self, queue_size)
- 1
設定傳出訊息佇列中可等待處理的具有QoS> 0的傳出訊息的最大數量。預設為0表示無限制。 當佇列已滿時,任何其他傳出的訊息都將被丟棄。
(3)message_retry_set()
message_retry_set(retry)
- 1
如果代理沒有響應,設定在重發QoS> 0的訊息之前以秒為單位的時間。預設設定為5秒,通常不需要更改。
(4)ws_set_options()
ws_set_options(self, path="/mqtt", headers=None)
- 1
設定websocket連線選項。 只有在transport =“websockets”被傳入Client()建構函式時才會使用這些選項。
引數 | 含義 |
---|---|
path | 代理使用的mqtt路徑 |
headers | 可以是一個字典,指定應該附加到標準websocket頭部的額外頭部列表,也可以是可呼叫的正常websocket頭部並返回帶有一組頭部以連線到代理的新字典。 |
必須在呼叫connect()之前呼叫。
(4)tls_set()
tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS, ciphers=None)
- 1
- 2
配置網路加密和身份驗證選項。 啟用SSL / TLS支援。
引數 | 含義 |
---|---|
ca_certs | 證書頒發機構證書檔案的字串路徑,該證書檔案將被視為受此客戶端信任。 |
certfile, keyfile | 分別指向PEM編碼的客戶端證書和私鑰的字串。 如果這些引數不是None,那麼它們將用作基於TLS的身份驗證的客戶端資訊。 對此功能的支援取決於代理。 |
cert_reqs | 定義客戶對經紀人施加的證書要求。 預設情況下,這是ssl.CERT_REQUIRED,這意味著代理必須提供證書。 有關此引數的更多資訊,請參閱ssl pydoc。 |
tls_version | 指定要使用的SSL / TLS協議的版本。 預設情況下(如果python版本支援它),檢測到最高的TLS版本。 |
ciphers | 指定哪些加密密碼可供此連線使用的字串,或者使用None來使用預設值。 有關更多資訊,請參閱ssl pydoc。 |
必須在呼叫connect()之前呼叫。
(5)tls_set_context()
配置網路加密和認證上下文。 啟用SSL / TLS支援。
tls_set_context(context=None)
- 1
引數 | 含義 |
---|---|
context | 一個ssl.SSLContext物件。 |
必須在呼叫connect()之前呼叫。
(6)tls_insecure_set()
配置伺服器證書中伺服器主機名的驗證。
tls_insecure_set(value)
- 1
如果value設定為True,則不可能保證您連線的主機不模擬您的伺服器。 這在初始伺服器測試中可能很有用,但是,惡意的第三方通過可以DNS欺騙模擬您的伺服器。
- 請勿在真實系統中使用此功能。 將值設定為True意味著使用加密沒有意義。
- 必須在connect*()之前和tls_set()或tls_set_context()之後呼叫。
(7)enable_logger()
使用標準的Python日誌包啟用日誌記錄。 這可以與on_log回撥方法同時使用
enable_logger(logger=None)
- 1
如果指定了記錄器,那麼將使用該logging.Logger物件,否則將自動建立一個。
按照以下對映將Paho日誌記錄級別轉換為標準日誌級別:
Paho | logging |
---|---|
MQTT_LOG_ERR | ligging.ERROR |
MQTT_LOG_WARNING | logging.WARNING |
MQTT_LOG_NOTICE | logging.INFO (no direct equivalent) |
MQTT_LOG_INFO | logging.INFO |
MQTT_LOG_DEBUG | logging.DEBUG |
(8)disable_logger()
使用標準python日誌包禁用日誌記錄。 這對on_log回撥沒有影響。
disable_logger()
- 1
(9)username_pw_set()
為代理認證設定一個使用者名稱和一個可選的密碼。必須在connect*()之前呼叫。
username_pw_set(username, password=None)
- 1
(10)user_data_set()
設定在生成事件時將傳遞給回撥的私人使用者資料。 將其用於您自己的目的以支援您的應用程式。
user_data_set(userdata)
- 1
(11)will_set()
設定要傳送給代理的遺囑。 如果客戶端斷開而沒有呼叫disconnect(),代理將代表它釋出訊息。
will_set(topic, payload=None, qos=0, retain=False)
- 1
引數 | 含義 |
---|---|
topic | 該遺囑訊息釋出的主題 |
payload | 該訊息將作為遺囑傳送 |
qos | 用於遺囑的服務質量等級 |
retain | 如果設定為True,遺囑訊息將被設定為該主題的“最後已知良好”/保留訊息。 |
如果qos不是0,1或2,或者主題為None或字串長度為零,則引發ValueError。
(11)reconnect_delay_set()
客戶端將自動重試連線。 在每次嘗試之間,它會在min_delay和max_delay之間等待幾秒鐘。
reconnect_delay_set(min_delay=1, max_delay=120)
- 1
當連線丟失時,最初重新連線嘗試延遲min_delay秒。 延遲在隨後的嘗試到中增加一倍。當連線完成時(例如收到CONNACK,而不僅僅是TCP連線建立),延遲重置為min_delay。
4.connect()
connect()函式將客戶端連線到代理。 這是一個阻塞函式。
connect(host, port=1883, keepalive=60, bind_address="")
- 1
引數 | 含義 |
---|---|
host | 遠端代理的主機名或IP地址 |
port | 要連線的伺服器主機的網路埠。 預設為1883 |
keepalive | 與代理通訊之間允許的最長時間段(以秒為單位)。 如果沒有其他訊息正在交換,則它將控制客戶端向代理髮送ping訊息的速率 |
bind_address | 假設存在多個介面,將繫結此客戶端的本地網路介面的IP地址 |
5.connect_async()
與loop_start()一起使用以非阻塞方式連線。 直到呼叫loop_start()之前,連線才會完成。
connect_async(host, port=1883, keepalive=60, bind_address="")
- 1
6.connect_srv()
使用SRV DNS查詢連線到代理以獲取代理地址。
connect_srv(domain, keepalive=60, bind_address="")
- 1
引數 | 含義 |
---|---|
domain | 該DNS域搜尋SRV記錄。 如果無,請嘗試確定本地域名。 |
7.reconnect()
使用先前提供的詳細資訊重新連線到經紀商。 在呼叫此函式之前,您必須先呼叫connect *()。
reconnect()
- 1
8.disconnect()
乾淨地從代理斷開連線。 使用disconnect()不會導致代理髮送遺囑訊息。
disconnect()
- 1
9.loop()
定期呼叫處理網路事件。
loop(timeout=1.0, max_packets=1)
- 1
此呼叫在select()中等待,直到網路套接字可用於讀取或寫入(如果適用),然後處理傳入/傳出資料。
引數 | 含義 |
---|---|
timeout | 此方法最多可阻塞timeout秒 |
max_packets | max_packets引數已過時,應保留為未設定狀態。 |
示例:
run = True
while run:
client.loop()
- 1
- 2
- 3
10.loop_start() / loop_stop()
這些功能實現了到網路迴圈的執行緒介面。
loop_start()loop_stop(force=False)
- 1
- 2
在connect*()之前或之後呼叫loop_start()一次,會在後臺執行一個執行緒來自動呼叫loop()。這釋放了可能阻塞的其他工作的主執行緒。這個呼叫也處理重新連線到代理。
呼叫loop_stop()來停止後臺執行緒。
force引數目前被忽略。
示例:
client.connect("iot.eclipse.org")
client.loop_start()
while True:
temperature = sensor.blocking_read()
client.publish("paho/temperature", temperature)
- 1
- 2
- 3
- 4
- 5
- 6
11.loop_forever()
這是網路迴圈的阻塞形式,直到客戶端呼叫disconnect()時才會返回。它會自動處理重新連線。
loop_forever(timeout=1.0, max_packets=1, retry_first_connection=False)
- 1
除了使用connect_async時的第一次連線嘗試以外,請使用retry_first_connection = True使其重試第一個連線。這可能會導致客戶端連線到一個不存在的主機的情況。
12.publish()
從客戶端傳送訊息給代理。
publish(topic, payload=None, qos=0, retain=False)
- 1
訊息將會發送給代理,並隨後從代理髮送到訂閱匹配主題的任何客戶端。
引數 | 含義 |
---|---|
topic | 該訊息釋出的主題 |
payload | 要傳送的實際訊息。如果沒有給出,或設定為無,則將使用零長度訊息。 傳遞int或float將導致有效負載轉換為表示該數字的字串。 如果你想傳送一個真正的int / float,使用struct.pack()來建立你需要的負載 |
qos | 服務的質量級別 |
retain | 如果設定為True,則該訊息將被設定為該主題的“最後已知良好”/保留的訊息 |
返回以下屬性和方法的MQTTMessageInfo:
rc:釋出的結果。
內容 | 含義 |
---|---|
MQTT_ERR_SUCCESS | 成功 |
MQTT_ERR_NO_CONN | 客戶端當前未連線 |
MQTT_ERR_QUEUE_SIZE | 當使用max_queued_messages_set來指示訊息既不排隊也不傳送。 |
mid:釋出請求的訊息ID。
如果mid已定義,則可以通過檢查on_publish()回撥中的mid來跟蹤釋出請求。
wait_for_publish():函式將阻塞,直到訊息釋出。 如果訊息未排隊(rc == MQTT_ERR_QUEUE_SIZE),它將引發ValueError。
is_published:如果訊息已釋出,is_published返回True。 如果訊息未排隊(rc == MQTT_ERR_QUEUE_SIZE),它將引發ValueError。
如果主題為無,長度為零或無效(包含萬用字元),qos不是0,1或2之一,或者有效負載長度大於268435455位元組,則會引發ValueError。
13.subscribe()
subscribe(topic, qos=0)
- 1
訂閱一個或多個主題。
這個函式可以用三種不同的方式呼叫:
(1)簡單的字串和整數
subscribe("my/topic", 2)
- 1
引數 | 值 |
---|---|
topic | 一個字串,指定要訂閱的訂閱主題 |
qos | 期望的服務質量等級。 預設為0。 |
(2)字串和整數元組
subscribe(("my/topic", 1))
- 1
引數 | 值 |
---|---|
topic | (topic,qos)的元組。 主題和qos都必須存在於元組中。 |
qos | 沒有使用 |
(3)字串和整數元組的列表
這允許在單個SUBSCRIPTION命令中使用多個主題訂閱,這比使用多個訂閱subscribe()更有效。
subscribe([("my/topic", 0), ("another/topic", 2)])
- 1
引數 | 值 |
---|---|
topic | 格式元組列表(topic,qos)。 topic和qos都必須出現在所有的元組中。 |
qos | 沒有使用 |
該函式返回一個元組(result,mid)。
如果qos不是0,1或2,或者主題為None或字串長度為零,或者topic不是字串,元組或列表,則引發ValueError。
14.unsubscribe()
取消訂閱一個或多個主題。
unsubscribe(topic)
- 1
引數 | 含義 |
---|---|
topic | 主題的單個字串或者字串列表 |
返回一個元組(result, mid)
15.外部事件迴圈支援
(1)loop_read()
loop_read(max_packets=1)
- 1
當套接字準備好讀取時呼叫。 max_packets已過時,應保持未設定狀態。
(2)loop_write()
loop_write(max_packets=1)
- 1
當套接字準備好寫入時呼叫。 max_packets已過時,應保持未設定狀態。
(3)loop_misc()
loop_misc()
- 1
每隔幾秒呼叫一次以處理訊息重試和ping。
(4)socket()
socket()
- 1
返回客戶端中使用的套接字物件,以允許與其他事件迴圈進行互動。
(5)want_write()
want_write()
- 1
如果有資料等待寫入,則返回true,以允許將客戶端與其他事件迴圈連線。
16.全域性輔助函式
client模組還提供了一些全域性幫助函式。
(1)topic_matches_sub(sub,topic)
可用於檢查主題是否與預訂匹配。
(2)connack_string(connack_code)
返回與CONNACK結果關聯的錯誤字串。
(3)error_string(mqtt_errno)
返回與Paho MQTT錯誤號關聯的錯誤字串。
二、Publish模組
該模組提供了一些幫助功能,可以以一次性方式直接釋出訊息。換句話說,它們對於您想要釋出給代理的單個/多個訊息然後斷開與其他任何必需的連線的情況非常有用。
(一)方法
1.Single
將一條訊息釋出給代理,然後徹底斷開連線。
single(topic, payload=None, qos=0, retain=False, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,
protocol=mqtt.MQTTv311, transport="tcp")
- 1
- 2
- 3
引數 | 含義 |
---|---|
topic | 唯一必需的引數必須是負載將釋出到的主題字串。 |
payload | 要釋出的有效載荷。 如果“”或None,零長度的有效載荷將被髮布 |
qos | 釋出時使用的qos預設為0 |
retain | 設定訊息保留(True)或不(False) |
hostname | 一個包含要連線的代理地址的字串。 預設為localhost |
port | 要連線到代理的埠。 預設為1883 |
client_id | 要使用的MQTT客戶端ID。 如果“”或None,Paho庫會自動生成客戶端ID |
keepalive | 客戶端的存活超時值。 預設為60秒 |
will | 一個包含客戶端遺囑引數的字典,will = {‘topic’: “<topic>”, ‘payload’:”<payload”>, ‘qos’:<qos>, ‘retain’:<retain>}.
|
auth | 一個包含客戶端驗證引數的字典,auth = {‘username’:”<username>”, ‘password’:”<password>”}
|
tls | 一個包含客戶端的TLS配置引數的字典,dict = {‘ca_certs’:”<ca_certs>”, ‘certfile’:”<certfile>”, ‘keyfile’:”<keyfile>”, ‘tls_version’:”<tls_version>”, ‘ciphers’:”<ciphers”>}
|
protocol | 選擇要使用的MQTT協議的版本。 使用MQTTv31或MQTTv311。 |
transport | 設定為“websockets”通過WebSockets傳送MQTT。 保留預設的“tcp”使用原始TCP。 |
示例:
import paho.mqtt.publish as publish
publish.single("paho/test/single", "payload", hostname="iot.eclipse.org")
- 1
- 2
- 3
2.Multiple
將多條訊息釋出給代理,然後乾淨地斷開連線。
multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,
will=None, auth=None, tls=None, protocol=mqtt.MQTTv311, transport="tcp")
- 1
- 2
引數 | 含義 |
---|---|
msgs | 要釋出的訊息列表。 每條訊息是一個字典或一個元組。msg = {‘topic’:”<topic>”, ‘payload’:”<payload>”, ‘qos’:<qos>, ‘retain’:<retain>} 或(“<topic>”, “<payload>”, qos, retain)
|
有關hostname,port,client_id,keepalive,will,auth,tls,protocol,transport的描述,請參閱single()。
示例:
import paho.mqtt.publish as publish
msgs = [{'topic':"paho/test/multiple", 'payload':"multiple 1"},
("paho/test/multiple", "multiple 2", 0, False)]
publish.multiple(msgs, hostname="iot.eclipse.org")
- 1
- 2
- 3
- 4
- 5
三、Subscribe模組
該模組提供了一些幫助功能,以允許直接訂閱和處理訊息。
(1)方法
1.Simple
訂閱一組主題並返回收到的訊息。 這是一個阻塞函式。
simple(topics, qos=0, msg_count=1, retained=False, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,
protocol=mqtt.MQTTv311)
- 1
- 2
- 3
引數 | 含義 |
---|---|
topics | 唯一需要的引數是客戶端將訂閱的主題字串。 如果需要訂閱多個主題,這可以是字串或字串列表 |
qos | 訂閱時使用的qos預設為0 |
msg_count | 從代理檢索的訊息數量。 預設為1.如果為1,則返回一個MQTTMessage物件。 如果> 1,則返回MQTTMessages列表 |
retained | 設定為True以考慮保留的訊息,將其設定為False以忽略具有保留標誌設定的訊息 |
hostname | 一個包含要連線的代理地址的字串。 預設為localhost |
port | 要連線到代理的埠。 預設為1883 |
client_id | 要使用的MQTT客戶端ID。 如果“”或None,Paho庫會自動生成客戶端ID |
keepalive | 客戶端的存活超時值。 預設為60秒。 |
will | 一個包含客戶端遺囑引數的字典,will = {‘topic’: “<topic>”, ‘payload’:”<payload”>, ‘qos’:<qos>, ‘retain’:<retain>}.
|
auth | 一個包含客戶端驗證引數的字典,auth = {‘username’:”<username>”, ‘password’:”<password>”}
|
tls | 一個包含客戶端的TLS配置引數的字典,dict = {‘ca_certs’:”<ca_certs>”, ‘certfile’:”<certfile>”, ‘keyfile’:”<keyfile>”, ‘tls_version’:”<tls_version>”, ‘ciphers’:”<ciphers”>}
|
protocol | 選擇要使用的MQTT協議的版本。 使用MQTTv31或MQTTv311。 |
2.Callback
訂閱一組主題並使用使用者提供的回叫處理收到的訊息。
callback(callback, topics, qos=0, userdata=None, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None, tls=None,
protocol=mqtt.MQTTv311)
- 1
- 2
- 3
引數 | 含義 |
---|---|
callback | 一個“on_message”回撥將被用於每個收到的訊息 |
topics | 客戶端將訂閱的主題字串。 如果需要訂閱多個主題,這可以是字串或字串列表。 |
qos | 訂閱時使用的qos預設為0 |
userdata | 使用者提供的物件將在收到訊息時傳遞給on_message回撥函式 |
有關hostname,port,client_id,keepalive,will,auth,tls,protocol的描述,請參閱simple()。
示例:
import paho.mqtt.subscribe as subscribe
defon_message_print(client, userdata, message):
print("%s %s" % (message.topic, message.payload))
subscribe.callback(on_message_print, "paho/test/callback", hostname="iot.eclipse.org"
- 1
- 2
- 3
- 4
- 5
- 6
參考資料:https://pypi.python.org/pypi/paho-mqtt