pb格式的檔案與rabbitmq連用
阿新 • • 發佈:2020-08-28
背景:
這篇主要講解的是pb協議輸入如何放入到mq進行生產和消費,與實際原理都是一樣的,與python使用protobuf協議傳輸固定格式協議資料這個文章相對應
https://www.cnblogs.com/chongyou/p/13543340.html
工作中的運用
在我們工作中經常會使用到生產資料與消費資料,一般普通的訊息生產如下
生產訊息
import pika def SendToMQ(data): ip = '172.XX.233.XX' # mq的連線地址 ex = 'test-ex-msgrecv' #mq的exchange rkey = 'test-rkey-msgrecv-msgdb-64v' #mq的rkey credentials = pika.PlainCredentials('xx', 'xx') #mq的賬戶和密碼 ,注意我們需要使用到pika, parameters = pika.ConnectionParameters('%s'%(ip),5672,'/',credentials) connection = pika.BlockingConnection(parameters) #連線時的一系列的配置 channel = connection.channel() #建立一個頻道 channel.exchange_declare(exchange=ex, #這一塊都在說明mq的配置 exchange_type='direct', #mq型別 durable=False) #是否持久化 # sendstr = RecvMsg() channel.basic_publish(exchange=ex, #使用的exchange routing_key=rkey, #使用的那個rkey body="張君字串") #傳輸的內容,什麼內容都可以 channel.close() connection.close()
消費
import pika import logging import time from datetime import datetime def Ex_To_MQ(hostIP,rkey): print("MQ config:hostIP:%s,rkey:%s"%(hostIP,rkey)) logging.debug("MQ is starting") credentials=pika.PlainCredentials('push','testmq') connection=pika.BlockingConnection(pika.ConnectionParameters(hostIP,5672,'/',credentials)) channel=connection.channel() test_queue = 'test-sms-delivery-2' channel.exchange_declare(exchange='test-ex-xx',exchange_type='direct') #注意生產的與消費的exchange要一致 result = channel.queue_declare(queue=test_queue) q_name = result.method.queue channel.queue_bind(exchange='test-ex-xx', #繫結的exchange queue=q_name, routing_key=rkey) def unpack(data): j = jpush_protocol_pb2.JPushProtocol() j.Clear() j.ParseFromString(data) print(str(j)) return j def callback(ch, method, properties, body): print "body:",body a = unpack(body) logging.debug("[message batch received] %r:%r"%(method.routing_key,a)) channel.basic_consume(callback, queue=q_name, no_ack=True) channel.start_consuming() if __name__=='__main__': logging.basicConfig(level=logging.DEBUG, format='[%(levelname)s][%(funcName)s][%(lineno)d][%(asctime)s][%(message)s]', filename='./log/consume_ex2_conn.log', filemode='w' ) logging.debug("the process is starting") jobs=[] hostIP="xx.xx.xx.118" rkey="test-rkey-xx-xx-64v" Ex_To_MQ(hostIP,rkey)
與PB協議檔案連用
其實與pb檔案一起使用,主要是把data資料的源更換成pb協議的就行了,結合上一篇文章,那麼生產訊息就成了
#-*- coding: utf-8 -*- import string import pika import json import random import types import time import jpush_protocol_pb2 msg_ctrl = jpush_protocol_pb2.JPushProtocol() def ReturnRid(): rid, sid = random.randint(4294967296, 18446744073709551616), random.randint(4294967296, 18446744073709551616) return rid, sid def usage(): print("""Usage:<appkey><uid><mid> for example: python %s a47e89aaa2b3123537d91da0 6013066725 [1088953989,]""" % (sys.argv[0])) def GetMsgCtrl(appkey, msgid,uid): #head = jpush_protocol_pb2.JPushProtocolHead() msg_ctrl.head.ver = 0 msg_ctrl.head.uid = uid msg_ctrl.head.app_key = appkey msg_ctrl.head.platform = 0 msg_ctrl.head.rom = 0 msg_ctrl.head.ctime = int(time.time()) msg_ctrl.body.msg.msgid = int(msgid) msg_ctrl.body.msg.real_uid = int(uid) msg_ctrl.body.msg.platform = jpush_protocol_pb2.JPushPushMessage.ANDROID print(msg_ctrl) str_msg = msg_ctrl.SerializeToString() pare=msg_ctrl.ParseFromString(str_msg) print(pare) print(str_msg) return str_msg def SendToMQ(data): ip = '172.19.233.118' ex = 'test-ex-msgrecv' rkey = 'test-rkey-msgrecv-msgdb-64v' credentials = pika.PlainCredentials('push', 'testmq') parameters = pika.ConnectionParameters('%s'%(ip),5672,'/',credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.exchange_declare(exchange=ex, exchange_type='direct', durable=False) # sendstr = RecvMsg() channel.basic_publish(exchange=ex, routing_key=rkey, body=data) channel.close() connection.close()