1. 程式人生 > >Python-rabbitmq例項

Python-rabbitmq例項

import rabbitMq

#!/usr/bin/python
# coding=utf-8
# author=He


import pika
import sys
import config
import json


class RabbitMqService:

    channel = ''
    connection = ''

    def __init__(self, queue_name, exchange):
        """
                    :param queue_name: The queue name
                    :param exchange: The exchange name consists of a non-empty
                    :type queue_name str
                    :type exchange str
                    """
try: self.__queue_name = queue_name self.__exchange = exchange credentials = pika.PlainCredentials(username=config.mq_user, password=config.mq_password) self.__connection = pika.BlockingConnection(pika.ConnectionParameters( config.mq_host, config.mq_port, config.mq_hosts, credentials)) self.__channel = self.__connection.channel() if
self.__connection.is_open is False: print 'RabbitMq Open Connect Fail' sys.exit(1) if self.__channel.is_open is False: print 'RabbitMq Open Channel Fail' sys.exit(1) except Exception as e: print e def
send(self, data, queue_durable=True, exchange_type=config.pattern['fanout']):
""" :param data: The message body :param queue_durable: Survive a reboot of RabbitMQ :param exchange_type: The exchange type to use :return: Void """ self.__channel.queue_declare(queue=self.__queue_name, passive=False, durable=True, exclusive=False) self.__channel.exchange_declare(exchange=self.__exchange, exchange_type=exchange_type, durable=queue_durable) self.__channel.queue_bind(queue=self.__queue_name, exchange=self.__exchange) if 'date' in data: self.__add_queue(data=data) def __add_queue(self, data): """ :param data: The message body :return: Void """ self.__channel.basic_publish(routing_key=self.__queue_name, body=json.dumps(data), properties=pika.BasicProperties(delivery_mode=2), exchange=self.__exchange) def get__queue_name(self): """ :type: string :return: self.__queue_name """ return self.__queue_name def get__exchange(self): """ :type: string :return: self.__exchange """ return self.__exchange def get__channel(self): """ :rtype: pika.synchronous_connection.BlockingChannel :return: self.__channel """ return self.__channel def get__connection(self): return self.__connection def close(self): """ :return: Void """ is_closed = self.is_closed() if is_closed is False: self.__connection.close() def is_closed(self): """ :return: bool """ return self.__connection.is_closed def is_open(self): """ :return:bool """ return self.__connection.is_open

config.py

# 訊息佇列
mq_host = 'localhost'   # 訊息佇列主機地址
mq_user = 'mq'          # 訊息佇列使用者名稱
mq_password = '123456'  # 訊息佇列密碼
mq_port = 5672          # 訊息佇列埠
mq_hosts = '/'          # 訊息佇列節點
pattern = {'direct': 'direct', 'fanout': 'fanout', 'topic': 'topic'}
queue_durable = True    # 是否持久化

呼叫

生產

import rabbitMq
import config

mq = rabbitMq.RabbitMqService(queue_name='test', exchange='test')
mq.send({'shop_id': row['shop_id'], 'date': int(time.time())}, queue_durable=True, exchange_type=self.__exchange_type)
if mq.is_closed():
    print "OK"

消費

import rabbitMq
import config

mq = rabbitMq.RabbitMqService(queue_name='test', exchange='test')
channel = mq.get__channel()
        channel.exchange_declare(exchange=exchange, exchange_type=config.pattern['fanout'],
                                 durable=config.queue_durable)
        channel.queue_declare(queue=queue_name, passive=False, durable=True, exclusive=False, auto_delete=False)
        channel.queue_bind(queue=queue_name, exchange=exchange)
    def clalback(ch, method, properties, body):
        """
                    :param ch: BlockingChannel
                    :param method: spec.Basic.Deliver
                    :param properties: spec.BasicProperties
                    :param body: str or unicode
                    """
    ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(clalback, queue=queue_name)
        channel.start_consuming()