Python paho-mqtt使用心得
阿新 • • 發佈:2022-04-18
一、概述
一)基本概念
使用回撥處理從MQTT代理返回的資料,要使用回撥需要先定義回撥函式然後將其指派給客戶端例項(client)。
例如:
# 定義一個回撥函式
def on_connect(client, userdata, flags, rc):
print("Connection returned " + str(rc))
# 將回調函式指派給客戶端例項
client.on_connect = on_connect
所有的回撥函式都有client和userdata引數。
client是呼叫回撥的客戶端例項;
userdata可以使任何型別的使用者資料,可以在建立新客戶端例項時設定或者使用user_data_set(userdata)設定。
二)paho-mqtt總的說來分為三部分:
**種類:** 1.伺服器連線on_connect()/伺服器斷開 on_disconnect() 2.資訊的回撥 on_message() 3.資訊的釋出on_publish()/資訊的訂閱on_subscribe() **介紹:** 1. 使用connect()/connect_async() 連線MQTT代理 2. 頻繁的呼叫loop()來維持與MQTT代理之間的流量 2.1. 或者使用loop_start()來設定一個執行緒為你呼叫loop() 2.2. 或者在一個阻塞的函式中呼叫loop_forever()來為你呼叫loop() 3.使用subscribe()訂閱一個主題(topic)並接受訊息(messages) 4.使用publish()來發送訊息 5.使用disconnect()來斷開與MQTT代理的連線
二、paho-mqtt 在Python中的安裝方法
pip install paho-mqtt
三、on_connect()回撥函式介紹
當代理響應連線請求時呼叫。
on_connect(client, userdata, flags, rc):
rc的值決定了連線成功或者不成功:
值 連線情況
0 連線成功
1 協議版本錯誤
2 無效的客戶端標識
3 伺服器無法使用
4 錯誤的使用者名稱或密碼
5 未經授權
import paho.mqtt.client as mqtt #定義一個on_connect方法 def on_connect(client,userdata,flags,rc): return str(rc) class IotSubDevViewSet(viewsets.ModelViewSet): #系統啟動後,會把SUBSCRIBED狀態的裝置加入訂閱程序 def init_subscribe(): iotsubdevs = IotSubDev.objects.all() for iotsubdev in iotsubdevs: try: devices_pk = iotsubdev.device.id client = mqtt.Client() client.username_pw_set(username=settings.MQTT_USERNAME, password=settings.MQTT_PASSWORD)#設定mqtt伺服器使用者名稱和密碼 client.on_connect = on_connect client.on_message = on_message rc = client.connect(settings.MQTT_HOST, port=1883, keepalive=60) if(rc==0 and iotsubdev.status=="SUBSCRIBED"): print("初始化開始sub") client.subscribe(topic=str(devices_pk),qos=0) client.loop_start() print("初始化sub結束") else: # print("連線失敗") pass except: pass init_subscribe()
三、on_message()回撥函式介紹
import json
def on_message(client, userdata, msg):
msg = msg.payload #將資訊轉換成json格式
try:
params = json.loads(msg)
except:
return (False)
#Beilai BL102
if tmp =="sensorDatas":
for dc_tmp in params[tmp]:
print(dc_tmp)
timestamp = datetime.now()
try:
ctrlchannel = CtrlChannel.objects.filter(id=dc_tmp['flag']).first()
metricdata = MetricData(ctrlchannel=ctrlchannel,
timestamp=timestamp,
value=dc_tmp['value'],
direction="UP")
metricdata.save()
except:
pass
else:
tmp = "Wrong Parameters"
return tmp
print("Subscribed is OK")
return True
四、on_publish()回撥函式介紹
import paho.mqtt.publish as publish
class MetricDataViewSet(viewsets.ModelViewSet):
""""
list:
查詢資料點資訊列表
create:
建立資料點資訊
如果方向為DOWN,支援MQTT釋出資訊
retrieve:
查詢資料點資訊詳情
update:
更新資料點資訊,不建議使用
partial_update:
更新資料點資訊的部分屬性,不建議使用
destroy:
刪除資料點資訊
"""
serializer_class = MetricDataSerializer
permission_classes = (permissions.IsAuthenticated,)
# pagination_class = StanderResultsSetPagination
authentication_classes = (authentication.JWTAuthentication,)
queryset = MetricData.objects.all()
def create(self,request , *args, **kwargs):
serializer =self.get_serializer(data=request.data)
if serializer.is_valid(raise_exception=True):
self.perform_create(serializer)
headers = self.get_success_headers(serializer.data)
try:
if request.data['direction'] == 'DOWN':
ctrlchannels_id = request.data["ctrlchannel"]
ctrlchannels_value = request.data["value"]
# payload = json.dumps(request.data)
#beilai BL102
jsonload ={"sensorDatas":[{"sensorsId":100,
"flag":ctrlchannels_id,
"value": str(ctrlchannels_value)}],
"down":"down"}
print(jsonload)
payload = json.dumps(jsonload)
# print(ctrlchannels_id)
publish.single(topic=ctrlchannels_id+"/100",
payload=payload,
hostname=settings.MQTT_HOST,
auth={'username':settings.MQTT_USERNAME, 'password':settings.MQTT_PASSWORD})
# self.perform_create(serializer)
except:
pass
return Response(serializer.data,status=status.HTTP_201_CREATED,headers=headers)
return Response(serializer.errors,status=status.HTTP_400_BAD_REQUEST)