1. 程式人生 > 其它 >Python paho-mqtt使用心得

Python paho-mqtt使用心得

一、概述
一)基本概念

使用回撥處理從MQTT代理返回的資料,要使用回撥需要先定義回撥函式然後將其指派給客戶端例項(client)。
例如:

# 定義一個回撥函式
def on_connect(client, userdata, flags, rc):
    print("Connection returned " + str(rc))

# 將回調函式指派給客戶端例項
client.on_connect = on_connect

所有的回撥函式都有client和userdata引數。
client是呼叫回撥的客戶端例項;
userdata可以使任何型別的使用者資料,可以在建立新客戶端例項時設定或者使用user_data_set(userdata)設定。

二)paho-mqtt總的說來分為三部分:

 **種類:**
  1.伺服器連線on_connect()/伺服器斷開 on_disconnect()    
  2.資訊的回撥 on_message()  
  3.資訊的釋出on_publish()/資訊的訂閱on_subscribe()
 **介紹:**
  1. 使用connect()/connect_async() 連線MQTT代理
  2. 頻繁的呼叫loop()來維持與MQTT代理之間的流量
    2.1. 或者使用loop_start()來設定一個執行緒為你呼叫loop()
    2.2. 或者在一個阻塞的函式中呼叫loop_forever()來為你呼叫loop()
  3.使用subscribe()訂閱一個主題(topic)並接受訊息(messages)
  4.使用publish()來發送訊息
  5.使用disconnect()來斷開與MQTT代理的連線

二、paho-mqtt 在Python中的安裝方法

pip install paho-mqtt

三、on_connect()回撥函式介紹

當代理響應連線請求時呼叫。
on_connect(client, userdata, flags, rc):
rc的值決定了連線成功或者不成功:

值	連線情況
0	連線成功
1	協議版本錯誤
2	無效的客戶端標識
3	伺服器無法使用
4	錯誤的使用者名稱或密碼
5	未經授權
import paho.mqtt.client as mqtt
#定義一個on_connect方法
def on_connect(client,userdata,flags,rc):
    return str(rc)

class IotSubDevViewSet(viewsets.ModelViewSet):
 #系統啟動後,會把SUBSCRIBED狀態的裝置加入訂閱程序
    def init_subscribe():
        iotsubdevs = IotSubDev.objects.all()
        for iotsubdev in iotsubdevs:
            try:
                devices_pk = iotsubdev.device.id
                client = mqtt.Client()
                client.username_pw_set(username=settings.MQTT_USERNAME, password=settings.MQTT_PASSWORD)#設定mqtt伺服器使用者名稱和密碼

                client.on_connect = on_connect
                client.on_message = on_message
                rc = client.connect(settings.MQTT_HOST, port=1883, keepalive=60)

                if(rc==0 and iotsubdev.status=="SUBSCRIBED"):
                    print("初始化開始sub")
                    client.subscribe(topic=str(devices_pk),qos=0)
                    client.loop_start()
                    print("初始化sub結束")
                else:
                    # print("連線失敗")
                    pass

            except:
                    pass

    init_subscribe()

三、on_message()回撥函式介紹

import json
def on_message(client, userdata, msg):
    msg = msg.payload  #將資訊轉換成json格式
    try:
        params = json.loads(msg)
    except:
        return (False)

       #Beilai BL102
        if tmp =="sensorDatas":
            for dc_tmp in params[tmp]:
                print(dc_tmp)
                timestamp = datetime.now()
                try:
                    ctrlchannel = CtrlChannel.objects.filter(id=dc_tmp['flag']).first()
                    metricdata = MetricData(ctrlchannel=ctrlchannel,
                                            timestamp=timestamp,
                                            value=dc_tmp['value'],
                                            direction="UP")
                    metricdata.save()
                except:
                    pass
        else:
            tmp = "Wrong Parameters"
            return tmp
    print("Subscribed is OK")
    return True

四、on_publish()回撥函式介紹

import paho.mqtt.publish as publish
class MetricDataViewSet(viewsets.ModelViewSet):
    """"
    list:
    查詢資料點資訊列表

    create:
    建立資料點資訊
    如果方向為DOWN,支援MQTT釋出資訊

    retrieve:
    查詢資料點資訊詳情

    update:
    更新資料點資訊,不建議使用

    partial_update:
    更新資料點資訊的部分屬性,不建議使用

    destroy:
    刪除資料點資訊

    """
    serializer_class = MetricDataSerializer
    permission_classes = (permissions.IsAuthenticated,)
    # pagination_class = StanderResultsSetPagination
    authentication_classes = (authentication.JWTAuthentication,)
    queryset = MetricData.objects.all()

    def create(self,request , *args, **kwargs):
        serializer =self.get_serializer(data=request.data)
        if serializer.is_valid(raise_exception=True):
            self.perform_create(serializer)
            headers = self.get_success_headers(serializer.data)

            try:
                if request.data['direction'] == 'DOWN':
                    ctrlchannels_id = request.data["ctrlchannel"]
                    ctrlchannels_value = request.data["value"]
                    # payload = json.dumps(request.data)
                    #beilai BL102
                    jsonload ={"sensorDatas":[{"sensorsId":100,
                                              "flag":ctrlchannels_id,
                                              "value": str(ctrlchannels_value)}],
                              "down":"down"}
                    print(jsonload)
                    payload = json.dumps(jsonload)

                    # print(ctrlchannels_id)
                    publish.single(topic=ctrlchannels_id+"/100",
                                   payload=payload,
                                   hostname=settings.MQTT_HOST,
                                   auth={'username':settings.MQTT_USERNAME, 'password':settings.MQTT_PASSWORD})

                    # self.perform_create(serializer)
            except:
                pass

            return Response(serializer.data,status=status.HTTP_201_CREATED,headers=headers)
        return Response(serializer.errors,status=status.HTTP_400_BAD_REQUEST)