Python-rabbitmq例項
阿新 • • 發佈:2019-02-19
import rabbitMq
#!/usr/bin/python
# coding=utf-8
# author=He
import pika
import sys
import config
import json
class RabbitMqService:
channel = ''
connection = ''
def __init__(self, queue_name, exchange):
"""
:param queue_name: The queue name
:param exchange: The exchange name consists of a non-empty
:type queue_name str
:type exchange str
"""
try:
self.__queue_name = queue_name
self.__exchange = exchange
credentials = pika.PlainCredentials(username=config.mq_user, password=config.mq_password)
self.__connection = pika.BlockingConnection(pika.ConnectionParameters(
config.mq_host, config.mq_port, config.mq_hosts, credentials))
self.__channel = self.__connection.channel()
if self.__connection.is_open is False:
print 'RabbitMq Open Connect Fail'
sys.exit(1)
if self.__channel.is_open is False:
print 'RabbitMq Open Channel Fail'
sys.exit(1)
except Exception as e:
print e
def send(self, data, queue_durable=True, exchange_type=config.pattern['fanout']):
"""
:param data: The message body
:param queue_durable: Survive a reboot of RabbitMQ
:param exchange_type: The exchange type to use
:return: Void
"""
self.__channel.queue_declare(queue=self.__queue_name, passive=False, durable=True, exclusive=False)
self.__channel.exchange_declare(exchange=self.__exchange, exchange_type=exchange_type, durable=queue_durable)
self.__channel.queue_bind(queue=self.__queue_name, exchange=self.__exchange)
if 'date' in data:
self.__add_queue(data=data)
def __add_queue(self, data):
"""
:param data: The message body
:return: Void
"""
self.__channel.basic_publish(routing_key=self.__queue_name, body=json.dumps(data),
properties=pika.BasicProperties(delivery_mode=2), exchange=self.__exchange)
def get__queue_name(self):
"""
:type: string
:return: self.__queue_name
"""
return self.__queue_name
def get__exchange(self):
"""
:type: string
:return: self.__exchange
"""
return self.__exchange
def get__channel(self):
"""
:rtype: pika.synchronous_connection.BlockingChannel
:return: self.__channel
"""
return self.__channel
def get__connection(self):
return self.__connection
def close(self):
"""
:return: Void
"""
is_closed = self.is_closed()
if is_closed is False:
self.__connection.close()
def is_closed(self):
"""
:return: bool
"""
return self.__connection.is_closed
def is_open(self):
"""
:return:bool
"""
return self.__connection.is_open
config.py
# 訊息佇列
mq_host = 'localhost' # 訊息佇列主機地址
mq_user = 'mq' # 訊息佇列使用者名稱
mq_password = '123456' # 訊息佇列密碼
mq_port = 5672 # 訊息佇列埠
mq_hosts = '/' # 訊息佇列節點
pattern = {'direct': 'direct', 'fanout': 'fanout', 'topic': 'topic'}
queue_durable = True # 是否持久化
呼叫
生產
import rabbitMq
import config
mq = rabbitMq.RabbitMqService(queue_name='test', exchange='test')
mq.send({'shop_id': row['shop_id'], 'date': int(time.time())}, queue_durable=True, exchange_type=self.__exchange_type)
if mq.is_closed():
print "OK"
消費
import rabbitMq
import config
mq = rabbitMq.RabbitMqService(queue_name='test', exchange='test')
channel = mq.get__channel()
channel.exchange_declare(exchange=exchange, exchange_type=config.pattern['fanout'],
durable=config.queue_durable)
channel.queue_declare(queue=queue_name, passive=False, durable=True, exclusive=False, auto_delete=False)
channel.queue_bind(queue=queue_name, exchange=exchange)
def clalback(ch, method, properties, body):
"""
:param ch: BlockingChannel
:param method: spec.Basic.Deliver
:param properties: spec.BasicProperties
:param body: str or unicode
"""
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(clalback, queue=queue_name)
channel.start_consuming()