1. 程式人生 > 實用技巧 >pb格式的檔案與rabbitmq連用

pb格式的檔案與rabbitmq連用

背景:

這篇主要講解的是pb協議輸入如何放入到mq進行生產和消費,與實際原理都是一樣的,與python使用protobuf協議傳輸固定格式協議資料這個文章相對應

https://www.cnblogs.com/chongyou/p/13543340.html

工作中的運用

在我們工作中經常會使用到生產資料與消費資料,一般普通的訊息生產如下

生產訊息

import pika

def SendToMQ(data):

    ip = '172.XX.233.XX'    # mq的連線地址
    ex = 'test-ex-msgrecv'   #mq的exchange
    rkey = 'test-rkey-msgrecv-msgdb-64v'   #mq的rkey 
    credentials = pika.PlainCredentials('xx', 'xx')      #mq的賬戶和密碼  ,注意我們需要使用到pika,
    parameters = pika.ConnectionParameters('%s'%(ip),5672,'/',credentials)
    connection = pika.BlockingConnection(parameters) #連線時的一系列的配置
    channel = connection.channel()   #建立一個頻道

    channel.exchange_declare(exchange=ex,   #這一塊都在說明mq的配置
                             exchange_type='direct',   #mq型別
                             durable=False)    #是否持久化
    # sendstr = RecvMsg()
    channel.basic_publish(exchange=ex,      #使用的exchange
                          routing_key=rkey,   #使用的那個rkey
                          body="張君字串")    #傳輸的內容,什麼內容都可以

    channel.close()
    connection.close()

  

消費

import pika

import logging
import time
from datetime import datetime


def Ex_To_MQ(hostIP,rkey):
    print("MQ config:hostIP:%s,rkey:%s"%(hostIP,rkey))
    logging.debug("MQ is starting")
    credentials=pika.PlainCredentials('push','testmq')    
    connection=pika.BlockingConnection(pika.ConnectionParameters(hostIP,5672,'/',credentials))
    channel=connection.channel()
    test_queue = 'test-sms-delivery-2'
    channel.exchange_declare(exchange='test-ex-xx',exchange_type='direct')   #注意生產的與消費的exchange要一致
    result = channel.queue_declare(queue=test_queue)
    q_name = result.method.queue

    channel.queue_bind(exchange='test-ex-xx', #繫結的exchange
                       queue=q_name,
                       routing_key=rkey)  
    def unpack(data):
        j = jpush_protocol_pb2.JPushProtocol()
        j.Clear()
        j.ParseFromString(data)
        print(str(j))
        return j

    def callback(ch, method, properties, body):
        print "body:",body
        a = unpack(body)
        logging.debug("[message batch received] %r:%r"%(method.routing_key,a))

    channel.basic_consume(callback,
                          queue=q_name,
                          no_ack=True)
    channel.start_consuming()

if __name__=='__main__':
    logging.basicConfig(level=logging.DEBUG,
        format='[%(levelname)s][%(funcName)s][%(lineno)d][%(asctime)s][%(message)s]',
        filename='./log/consume_ex2_conn.log',
        filemode='w'
        )
    logging.debug("the process is starting")
    jobs=[]

    hostIP="xx.xx.xx.118"
    rkey="test-rkey-xx-xx-64v"
    Ex_To_MQ(hostIP,rkey)

  

與PB協議檔案連用

其實與pb檔案一起使用,主要是把data資料的源更換成pb協議的就行了,結合上一篇文章,那麼生產訊息就成了

#-*- coding: utf-8 -*-
import string
import pika
import json
import random
import types
import time

import   jpush_protocol_pb2
msg_ctrl = jpush_protocol_pb2.JPushProtocol()

def ReturnRid():
    rid, sid = random.randint(4294967296, 18446744073709551616), random.randint(4294967296, 18446744073709551616)
    return rid, sid

def usage():
    print("""Usage:<appkey><uid><mid>
    for example:
        python %s a47e89aaa2b3123537d91da0 6013066725 [1088953989,]""" % (sys.argv[0]))


def GetMsgCtrl(appkey, msgid,uid):
    #head = jpush_protocol_pb2.JPushProtocolHead()
    msg_ctrl.head.ver = 0
    msg_ctrl.head.uid = uid
    msg_ctrl.head.app_key = appkey
    msg_ctrl.head.platform = 0
    msg_ctrl.head.rom = 0
    msg_ctrl.head.ctime = int(time.time())

    msg_ctrl.body.msg.msgid = int(msgid)
    msg_ctrl.body.msg.real_uid = int(uid)
    msg_ctrl.body.msg.platform = jpush_protocol_pb2.JPushPushMessage.ANDROID
    print(msg_ctrl)
    str_msg = msg_ctrl.SerializeToString()
    pare=msg_ctrl.ParseFromString(str_msg)
    print(pare)
    print(str_msg)
    return str_msg




def SendToMQ(data):

    ip = '172.19.233.118'
    ex = 'test-ex-msgrecv'
    rkey = 'test-rkey-msgrecv-msgdb-64v'
    credentials = pika.PlainCredentials('push', 'testmq')
    parameters = pika.ConnectionParameters('%s'%(ip),5672,'/',credentials)
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()

    channel.exchange_declare(exchange=ex,
                             exchange_type='direct',
                             durable=False)
    # sendstr = RecvMsg()
    channel.basic_publish(exchange=ex,
                          routing_key=rkey,
                          body=data)

    channel.close()
    connection.close()