1. 程式人生 > 程式設計 >python 多程序佇列資料處理詳解

python 多程序佇列資料處理詳解

我就廢話不多說了,直接上程式碼吧!

# -*- coding:utf8 -*-
import paho.mqtt.client as mqtt
from multiprocessing import Process,Queue
import time,random,os
import camera_person_num
 
MQTTHOST = "172.19.4.4"
MQTTPORT = 1883
mqttClient = mqtt.Client()
q = Queue() 
 
 
# 連線MQTT伺服器
def on_mqtt_connect():
  mqttClient.connect(MQTTHOST,MQTTPORT,60)
  mqttClient.loop_start()
 
 
# 訊息處理函式
def on_message_come(lient,userdata,msg):
  # print(msg.topic + ":" + str(msg.payload.decode("utf-8")))
  
  q.put(msg.payload.decode("utf-8")) # 放入佇列
  print("產生訊息",msg.payload.decode("utf-8"))
  # 訊息處理開啟多程序
  # p = Process(target=talk,args=("/camera/person/num/result",msg.payload.decode("utf-8")))
  # p.start()
 
 
def consumer(q,pid):
  print("開啟消費序列程序",pid)
  while True:
    msg = q.get()
    # p = Process(target=talk,msg,pid))
    # p.start()
    talk("/camera/person/num/result",pid) 
 
 
# subscribe 訊息訂閱
def on_subscribe():
  mqttClient.subscribe("test123",1) # 主題為"test"
  mqttClient.on_message = on_message_come # 訊息到來處理函式
 
 
# publish 訊息釋出
def on_publish(topic,qos):
  mqttClient.publish(topic,qos);
 
 
# 多程序中釋出訊息需要重新初始化mqttClient
def talk(topic,pid):
  cameraPsersonNum = camera_person_num.CameraPsersonNum(msg)
  t_max,t_mean,t_min = cameraPsersonNum.personNum()
  # time.sleep(20)
  print("消費訊息",pid,msg) 
  mqttClient2 = mqtt.Client()
  mqttClient2.connect(MQTTHOST,60)
  mqttClient2.loop_start()
  mqttClient2.publish(topic,'{"max":' + str(t_max) + ',"mean":' + str(t_mean) + ',"min:"' + t_min + '}',1)
  mqttClient2.disconnect()
 
 
def main():
  
  on_mqtt_connect()
  on_subscribe()
  for i in range(1,3):
    c1 = Process(target=consumer,args=(q,i))
    c1.start()
  while True:
    pass
 
 
if __name__ == '__main__':
  main()

以上這篇python 多程序佇列資料處理詳解就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支援我們。