openstack——RabbitMQ
nova中各個元件之間的互動是通過“訊息佇列”來實現的,其中一種實現方法就是使用RabbitMQ,對RabbitMQ的使用,官方文件上有一個非常好的Get Started,由淺及深,結合例子,很容易理解。現在對RabbitMQ的理解,就是利用它可以非常靈活的定製自己想要實現的訊息收發機制。
其中,有這樣幾個角色:producer, consumer, exchange, queue
producer是訊息傳送者,consumer是訊息接受者,中間要通過exchange和queue。producer將訊息傳送給exchange,exchange決定訊息的路由,即決定要將訊息傳送給哪個queue,然後consumer從queue中取出訊息,進行處理,大致流程如下圖:
這幾個角色當中,我覺得最關鍵的是這個exchange,它有3種類型:direct, topic, fanout。其中,功能最強大的就是topic,用它完全可以實現direct和fanout的功能。
direct是單條件的路由,即在exchange判斷要將訊息傳送給哪個queue時,判斷的依據只能是一個條件;
fanout是廣播式的路由,即將訊息傳送給所有的queue;
topic是多條件的路由,轉發訊息時,依據的條件是多個,所以只使用topic就可以實現direct和fanout的功能。
上面所說的“條件”,反映到程式中,就是routing_key,這個routing_key出現在兩個地方:
1. 每一個傳送的訊息都有一個routing_key,表示傳送的是一個什麼樣的訊息;
2. 每一個queue要和exchange繫結,繫結的時候要提供一個routing_key,表示這個queue想要接收什麼樣的訊息。
這樣,exchange就可以根據routing_key,來將訊息傳送到合適的queue中。
基本的思路就這些吧,下面來看一下官方文件上的那由淺及深的六個例子:
(我很喜歡這種風格的文件,整體由淺及深,適合初學者,其次文章沒有大量的生僻詞彙,而且例子+圖片,比較容易懂,更好的是文章還帶點小小的幽默,不由得讓人匯心一笑,感覺老外做事就是認真細膩,希望自己也能養成這樣的風格)
1. Hello World
最簡單的情況,發一個訊息,接收,打印出來這個訊息。
send.py:
- #!/usr/bin/env python
- import pika
- # 1. Establish a connection with RabbitMQ server.
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- 'localhost'))
- channel = connection.channel()
- # 2. Create a queue to which the message will be delivered, let's name it 'hello'
- channel.queue_declare(queue='hello')
- # 3. Use a default exchange identified by an empty string, which allows us to specify
- # exactly to which queue the message should go. The queue name needs to be specified
- # in the routing_key parameter:
- channel.basic_publish(exchange='',
- routing_key='hello',
- body='Hello World!')
- print" [x] Sent 'Hello World!'"
- # 4. Close the connection
- connection.close()
recv.py:
- #!/usr/bin/env python
- import pika
- # 1. Establish a connection with RabbitMQ server
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- # 2. Make sure that the queue exists,run the command as many times as we like, and only one will be created.
- channel.queue_declare(queue='hello')
- print' [*] Waiting for messages. To exit press CTRL+C'
- # 3. Define a callback function.Whenever we receive a message,
- # this callback function is called by the Pika library.
- def callback(ch, method, properties, body):
- print" [x] Received %r" % (body,)
- # 4. Subscribe the callback function to a queue.
- # Tell RabbitMQ that this particular callback function should receive messages from our hello queue.
- channel.basic_consume(callback,
- queue='hello',
- no_ack=True)
- # 5. Enter a never-ending loop that waits for data and runs callbacks whenever necessary.
- channel.start_consuming()
2. 多個consumer
這個例子跟第一個例子基本上一樣,只是啟動了多個consumer,並且模擬真實情況,即傳送的訊息使得consumer在短時間內不能完成工作。在這種情況下,多個consumer是如何協調工作的呢?其實,這些都是可以在程式中進行控制的。
send.py
- #!/usr/bin/env python
- import pika
- import sys
- # 1. Establish a connection with RabbitMQ server.
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- 'localhost'))
- channel = connection.channel()
- # 2. Create a queue to which the message will be delivered, let's name it 'hello'
- # 'durable=True' makes the queue persistent
- channel.queue_declare(queue='task_queue',durable=True)
- message=' '.join(sys.argv[1:]) or"Hello World!"
- # 3. Use a default exchange identified by an empty string, which allows us to specify
- # exactly to which queue the message should go. The queue name needs to be specified
- # in the routing_key parameter:
- channel.basic_publish(exchange='',
- routing_key='task_queue',
- body=message,
- properties=pika.BasicProperties(
- delivery_mode = 2, # make message persistent
- ))
- print" [x] Sent %r" % (message,)
- # 4. Close the connection
- connection.close()
recv.py:
- #!/usr/bin/env python
- import pika
- import time
- # 1. Establish a connection with RabbitMQ server
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- # 2. Make sure that the queue exists,run the command as many times as we like, and only one will be created.
- # 'durable=True' makes the queue persistent
- channel.queue_declare(queue='task_queue',durable=True)
- print' [*] Waiting for messages. To exit press CTRL+C'
- # 3. Define a callback function.Whenever we receive a message,
- # this callback function is called by the Pika library.
- #
- # Send a ack to tell rabbitmq a task is done, then it can release the message.
- # If a worker dies, rabbitmq fail to receive the ack, it will redeliver the message to another worker.
- # Remember to write the last line code, or rabbitmq will eat more and more memory.
- def callback(ch, method, properties, body):
- print" [x] Received %r" % (body,)
- time.sleep(body.count('.'))
- print"[x] Done"
- ch.basic_ack(delivery_tag = method.delivery_tag)
- # Fair dispatch: Tell rabbitmq not give a worker more than one messages at a time
- channel.basic_qos(prefetch_count=1)
- # 4. Subscribe the callback function to a queue.
- # Tell RabbitMQ that this particular callback function should receive messages from our hello queue.
- channel.basic_consume(callback,
- queue='task_queue',
- no_ack=False)# turn on the (ack)onwledgment, default is False
- # 5. Enter a never-ending loop that waits for data and runs callbacks whenever necessary.
- channel.start_consuming()
3. fanout exchange的例子:
send.py:
- #!/usr/bin/env python
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- # declare a exchange, type is fanout(means broadcast),named 'logs'.
- # exchange is used to receive messages form producer, and send messages to queue.
- # there are four exchange types: direct, topic, headers and fanout
- channel.exchange_declare(exchange='logs',
- type='fanout')
- message = ' '.join(sys.argv[1:]) or"info: Hello World!"
- channel.basic_publish(exchange='logs',
- routing_key='', #routing_key is '', because 'fanout' exchange will ignore its value.
- body=message)
- print" [x] Sent %r" % (message,)
- connection.close()
recv.py:
- #!/usr/bin/env python
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- # if a exchange named 'logs' have not declared yet, then declare one,
- # or just use the existed exchange.
- channel.exchange_declare(exchange='logs',
- type='fanout')
- # declare a temporary queue with a random name
- # 'exclusive=True' flag will delete the queue when the consumer dies.
- result = channel.queue_declare(exclusive=True)
- queue_name = result.method.queue
- # bind the queue to the exchange, to tell the exchange to send messages to our queue.
- channel.queue_bind(exchange='logs',
- queue=queue_name)
- print' [*] Waiting for logs. To exit press CTRL+C'
- def callback(ch, method, properties, body):
- print" [x] %r" % (body,)
- channel.basic_consume(callback,
- queue=queue_name,
- no_ack=True)
- channel.start_consuming()
4. direct exchange的例子:
需要注意,一個queue是可以和同一個exchange多次繫結的,每次繫結要用不同的routing_key
send.py:
- #!/usr/bin/env python
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- # declare a exchange, type is direct, named 'logs'.
- channel.exchange_declare(exchange='direct_logs',
- type='direct')
- severity = sys.argv[1] if len(sys.argv) > 1else'info'
- message = ' '.join(sys.argv[2:]) or'Hello World!'
- # a message is sent to the direct exchange with a routing_key.
- # a message is identified by the routing_key.
- channel.basic_publish(exchange='direct_logs',
- routing_key=severity,
- body=message)
- print" [x] Sent %r:%r" % (severity, message)
- connection.close()
recv.py:
- #!/usr/bin/env python
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- # declare a direct exchange named 'direct_logs'
- channel.exchange_declare(exchange='direct_logs',
- type='direct')
- result = channel.queue_declare(exclusive=True)
- queue_name = result.method.queue
- severities = sys.argv[1:]
- ifnot severities:
- print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \
- (sys.argv[0],)
- sys.exit(1)
- # Bind the queue to the direct exchange,
- # 'routing_key' flag tells the direct exchange which kind of message it wants to receive.
- # A queue can bind multiple times to the same direct exchange with different routing_keys,
- # which means it wants to receive several kinds of messages.
- for severity in severities:
- channel.queue_bind(exchange='direct_logs',
- queue=queue_name,
- routing_key=severity)
- print' [*] Waiting for logs. To exit press CTRL+C'
- def callback(ch, method, properties, body):
- print" [x] %r:%r" % (method.routing_key, body,)
- channel.basic_consume(callback,
- queue=queue_name,
- no_ack=True)
- channel.start_consuming()
5. topic exchange的例子
這裡的routing_key可以使用一種類似正則表示式的形式,但是特殊字元只能是“*”和“#”,“*”代表一個單詞,“#”代表0個或是多個單詞。這樣傳送過來的訊息如果符合某個queue的routing_key定義的規則,那麼就會轉發給這個queue。如下圖示例:
send.py:
- #!/usr/bin/env python
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- # declare a exchange, type is topic, named 'topic_logs'.
- # topic exchange allows to do routing based on multiple criteria.
- channel.exchange_declare(exchange='topic_logs',
- type='topic')
- severity = sys.argv[1] if len(sys.argv) > 1else'anonymous.info'
- message = ' '.join(sys.argv[2:]) or'Hello World!'
- # a message is sent to the topic exchange with a routing_key.
- # a message is identified by the routing_key.
- # the topic routing_key can be like 'topic.host','topic.topic1.topic3', etc
- # also can use '*'(one word) and '#'(zero or more words) to substitute word(s).
- channel.basic_publish(exchange='topic_logs',
- routing_key=severity,
- body=message)
- print" [x] Sent %r:%r" % (severity, message)
- connection.close()
recv.py:
- #!/usr/bin/env python
- import pika
- import sys
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- # declare a topic exchange named 'topic_logs'
- channel.exchange_declare(exchange='topic_logs',
- type='topic')
- result = channel.queue_declare(exclusive=True)
- queue_name = result.method.queue
- binding_keys = sys.argv[1:]
- ifnot binding_keys:
- print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
- sys.exit(1)
- # Bind the queue to the topic exchange,
- # 'routing_key' flag tells the topic exchange which kind of message it wants to receive.
- # A queue can bind multiple times to the same direct exchange with different routing_keys,
- # which means it wants to receive several kinds of messages.
- for binding_key in binding_keys:
- channel.queue_bind(exchange='topic_logs',
- queue=queue_name,
- routing_key=binding_key)
- print' [*] Waiting for logs. To exit press CTRL+C'
- def callback(ch, method, properties, body):
- print" [x] %r:%r" % (method.routing_key, body,)
- channel.basic_consume(callback,
- queue=queue_name,
- no_ack=True)
- channel.start_consuming()
6. PRC(Remote Procedure Call,遠端過程呼叫)
目前對這個的理解就是傳送一個訊息,然後還要得到一個結果,即訊息要走一個來回。如下圖所示:
client.py:
- #!/usr/bin/env python
- import pika
- import uuid
- class FibonacciRpcClient(object):
- def __init__(self):
- self.connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- self.channel = self.connection.channel()
- result = self.channel.queue_declare(exclusive=True)
- self.callback_queue = result.method.queue
- self.channel.basic_consume(self.on_response, no_ack=True,
- queue=self.callback_queue)
- def on_response(self, ch, method, props, body):
- ifself.corr_id == props.correlation_id:
- self.response = body
- def call(self, n):
- self.response = None
- self.corr_id = str(uuid.uuid4())
- self.channel.basic_publish(exchange='',
- routing_key='rpc_queue',
- properties=pika.BasicProperties(
- reply_to = self.callback_queue,
- correlation_id = self.corr_id,
- ),
- body=str(n))
- whileself.response isNone:
- self.connection.process_data_events()
- return int(self.response)
- fibonacci_rpc = FibonacciRpcClient()
- print" [x] Requesting fib(30)"
- response = fibonacci_rpc.call(30)
- print" [.] Got %r" % (response,)
server.py:
- #!/usr/bin/env python
- import pika
- connection = pika.BlockingConnection(pika.ConnectionParameters(
- host='localhost'))
- channel = connection.channel()
- channel.queue_declare(queue='rpc_queue')
- def fib(n):
- if n == 0:
- return0
- elif n == 1:
- return1
- else:
- return fib(n-1) + fib(n-2)
- def on_request(ch, method, props, body):
- n = int(body)
- print" [.] fib(%s)" % (n,)
- response = fib(n)
- ch.basic_publish(exchange='',
- routing_key=props.reply_to,
- properties=pika.BasicProperties(correlation_id = \
- props.correlation_id),
- body=str(response))
- ch.basic_ack(delivery_tag = method.delivery_tag)
- channel.basic_qos(prefetch_count=1)
- channel.basic_consume(on_request, queue='rpc_queue')
- print" [x] Awaiting RPC requests"
- channel.start_consuming()
幾個RabbitMQ相關的命令:
- 1. 檢視RabbitMQ中有多少個queue,以及每個queue中有多少個訊息:
- $ sudo rabbitmqctl list_queues
- 2. 檢視RabbitMQ中exchange的情況:
- $ sudo rabbitmqctl list_exchanges
- 3. 檢視RabbitMQ中exchange和queue繫結情況:
- $ sudo rabbitmqctl list_bindings
- 4. 啟動/停止RabbitMQ:
- $ sudo invoke-rc.d rabbitmq-server stop/start/etc.