1. 程式人生 > >openstack——RabbitMQ

openstack——RabbitMQ

nova中各個元件之間的互動是通過“訊息佇列”來實現的,其中一種實現方法就是使用RabbitMQ,對RabbitMQ的使用,官方文件上有一個非常好的Get Started,由淺及深,結合例子,很容易理解。現在對RabbitMQ的理解,就是利用它可以非常靈活的定製自己想要實現的訊息收發機制。

其中,有這樣幾個角色:producer, consumer, exchange, queue

producer是訊息傳送者,consumer是訊息接受者,中間要通過exchange和queue。producer將訊息傳送給exchange,exchange決定訊息的路由,即決定要將訊息傳送給哪個queue,然後consumer從queue中取出訊息,進行處理,大致流程如下圖:

這幾個角色當中,我覺得最關鍵的是這個exchange,它有3種類型:direct, topic, fanout。其中,功能最強大的就是topic,用它完全可以實現direct和fanout的功能。

direct是單條件的路由,即在exchange判斷要將訊息傳送給哪個queue時,判斷的依據只能是一個條件;

fanout是廣播式的路由,即將訊息傳送給所有的queue;

topic是多條件的路由,轉發訊息時,依據的條件是多個,所以只使用topic就可以實現direct和fanout的功能。

上面所說的“條件”,反映到程式中,就是routing_key,這個routing_key出現在兩個地方:

    1. 每一個傳送的訊息都有一個routing_key,表示傳送的是一個什麼樣的訊息;

    2. 每一個queue要和exchange繫結,繫結的時候要提供一個routing_key,表示這個queue想要接收什麼樣的訊息。

這樣,exchange就可以根據routing_key,來將訊息傳送到合適的queue中。

基本的思路就這些吧,下面來看一下官方文件上的那由淺及深的六個例子:

(我很喜歡這種風格的文件,整體由淺及深,適合初學者,其次文章沒有大量的生僻詞彙,而且例子+圖片,比較容易懂,更好的是文章還帶點小小的幽默,不由得讓人匯心一笑,感覺老外做事就是認真細膩,希望自己也能養成這樣的風格)

1. Hello World

最簡單的情況,發一個訊息,接收,打印出來這個訊息。

send.py:

  1. #!/usr/bin/env python
  2. import pika  
  3. # 1. Establish a connection with RabbitMQ server.
  4. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  5.                'localhost'))  
  6. channel = connection.channel()  
  7. # 2. Create a queue to which the message will be delivered, let's name it 'hello'
  8. channel.queue_declare(queue='hello')  
  9. # 3. Use a default exchange identified by an empty string, which allows us to specify
  10. #    exactly to which queue the message should go. The queue name needs to be specified
  11. #    in the routing_key parameter:
  12. channel.basic_publish(exchange='',  
  13.                       routing_key='hello',  
  14.                       body='Hello World!')  
  15. print" [x] Sent 'Hello World!'"
  16. # 4. Close the connection
  17. connection.close()  

recv.py:

  1. #!/usr/bin/env python
  2. import pika  
  3. # 1. Establish a connection with RabbitMQ server
  4. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  5.         host='localhost'))  
  6. channel = connection.channel()  
  7. # 2. Make sure that the queue exists,run the command as many times as we like, and only one will be created.
  8. channel.queue_declare(queue='hello')  
  9. print' [*] Waiting for messages. To exit press CTRL+C'
  10. # 3. Define a callback function.Whenever we receive a message, 
  11. #    this callback function is called by the Pika library.
  12. def callback(ch, method, properties, body):  
  13.     print" [x] Received %r" % (body,)  
  14. # 4. Subscribe the callback function to a queue.
  15. #    Tell RabbitMQ that this particular callback function should receive messages from our hello queue.
  16. channel.basic_consume(callback,  
  17.                       queue='hello',  
  18.                       no_ack=True)  
  19. # 5. Enter a never-ending loop that waits for data and runs callbacks whenever necessary.
  20. channel.start_consuming()  

2. 多個consumer

這個例子跟第一個例子基本上一樣,只是啟動了多個consumer,並且模擬真實情況,即傳送的訊息使得consumer在短時間內不能完成工作。在這種情況下,多個consumer是如何協調工作的呢?其實,這些都是可以在程式中進行控制的。

send.py

  1. #!/usr/bin/env python
  2. import pika  
  3. import sys  
  4. # 1. Establish a connection with RabbitMQ server.
  5. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  6.                'localhost'))  
  7. channel = connection.channel()  
  8. # 2. Create a queue to which the message will be delivered, let's name it 'hello'
  9. #    'durable=True' makes the queue persistent
  10. channel.queue_declare(queue='task_queue',durable=True)  
  11. message=' '.join(sys.argv[1:]) or"Hello World!"
  12. # 3. Use a default exchange identified by an empty string, which allows us to specify
  13. #    exactly to which queue the message should go. The queue name needs to be specified
  14. #    in the routing_key parameter:
  15. channel.basic_publish(exchange='',  
  16.                       routing_key='task_queue',  
  17.                       body=message,  
  18.                       properties=pika.BasicProperties(  
  19.                          delivery_mode = 2# make message persistent
  20.                       ))  
  21. print" [x] Sent %r" % (message,)  
  22. # 4. Close the connection
  23. connection.close()  

recv.py:
  1. #!/usr/bin/env python
  2. import pika  
  3. import time  
  4. # 1. Establish a connection with RabbitMQ server
  5. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  6.         host='localhost'))  
  7. channel = connection.channel()  
  8. # 2. Make sure that the queue exists,run the command as many times as we like, and only one will be created.
  9. #    'durable=True' makes the queue persistent
  10. channel.queue_declare(queue='task_queue',durable=True)  
  11. print' [*] Waiting for messages. To exit press CTRL+C'
  12. # 3. Define a callback function.Whenever we receive a message, 
  13. #    this callback function is called by the Pika library.
  14. #
  15. #    Send a ack to tell rabbitmq a task is done, then it can release the message.
  16. #    If a worker dies, rabbitmq fail to receive the ack, it will redeliver the message to another worker.
  17. #    Remember to write the last line code, or rabbitmq will eat more and more memory.
  18. def callback(ch, method, properties, body):  
  19.     print" [x] Received %r" % (body,)  
  20.     time.sleep(body.count('.'))  
  21.     print"[x] Done"
  22.     ch.basic_ack(delivery_tag = method.delivery_tag)   
  23. # Fair dispatch: Tell rabbitmq not give a worker more than one messages at a time
  24. channel.basic_qos(prefetch_count=1)  
  25. # 4. Subscribe the callback function to a queue.
  26. #    Tell RabbitMQ that this particular callback function should receive messages from our hello queue.
  27. channel.basic_consume(callback,  
  28.                       queue='task_queue',  
  29.                       no_ack=False)# turn on the (ack)onwledgment, default is False
  30. # 5. Enter a never-ending loop that waits for data and runs callbacks whenever necessary.
  31. channel.start_consuming()  

3. fanout exchange的例子:

send.py:

  1. #!/usr/bin/env python
  2. import pika  
  3. import sys  
  4. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  5.         host='localhost'))  
  6. channel = connection.channel()  
  7. # declare a exchange, type is fanout(means broadcast),named 'logs'.
  8. # exchange is used to receive messages form producer, and send messages to queue.
  9. # there are four exchange types: direct, topic, headers and fanout
  10. channel.exchange_declare(exchange='logs',  
  11.                          type='fanout')  
  12. message = ' '.join(sys.argv[1:]) or"info: Hello World!"
  13. channel.basic_publish(exchange='logs',  
  14.                       routing_key=''#routing_key is '', because 'fanout' exchange will ignore its value.
  15.                       body=message)  
  16. print" [x] Sent %r" % (message,)  
  17. connection.close()  

recv.py:
  1. #!/usr/bin/env python
  2. import pika  
  3. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  4.         host='localhost'))  
  5. channel = connection.channel()  
  6. # if a exchange named 'logs' have not declared yet, then declare one, 
  7. # or just use the existed exchange.
  8. channel.exchange_declare(exchange='logs',  
  9.                          type='fanout')  
  10. # declare a temporary queue with a random name
  11. # 'exclusive=True' flag will delete the queue when the consumer dies.
  12. result = channel.queue_declare(exclusive=True)  
  13. queue_name = result.method.queue  
  14. # bind the queue to the exchange, to tell the exchange to send messages to our queue.
  15. channel.queue_bind(exchange='logs',  
  16.                    queue=queue_name)  
  17. print' [*] Waiting for logs. To exit press CTRL+C'
  18. def callback(ch, method, properties, body):  
  19.     print" [x] %r" % (body,)  
  20. channel.basic_consume(callback,  
  21.                       queue=queue_name,  
  22.                       no_ack=True)  
  23. channel.start_consuming()  

4. direct exchange的例子:

需要注意,一個queue是可以和同一個exchange多次繫結的,每次繫結要用不同的routing_key

send.py:

  1. #!/usr/bin/env python
  2. import pika  
  3. import sys  
  4. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  5.         host='localhost'))  
  6. channel = connection.channel()  
  7. # declare a exchange, type is direct, named 'logs'.
  8. channel.exchange_declare(exchange='direct_logs',  
  9.                          type='direct')  
  10. severity = sys.argv[1if len(sys.argv) > 1else'info'
  11. message = ' '.join(sys.argv[2:]) or'Hello World!'
  12. # a message is sent to the direct exchange with a routing_key.
  13. # a message is identified by the routing_key.
  14. channel.basic_publish(exchange='direct_logs',  
  15.                       routing_key=severity,  
  16.                       body=message)  
  17. print" [x] Sent %r:%r" % (severity, message)  
  18. connection.close()  

recv.py:
  1. #!/usr/bin/env python
  2. import pika  
  3. import sys  
  4. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  5.         host='localhost'))  
  6. channel = connection.channel()  
  7. # declare a direct exchange named 'direct_logs'
  8. channel.exchange_declare(exchange='direct_logs',  
  9.                          type='direct')  
  10. result = channel.queue_declare(exclusive=True)  
  11. queue_name = result.method.queue  
  12. severities = sys.argv[1:]  
  13. ifnot severities:  
  14.     print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \  
  15.                          (sys.argv[0],)  
  16.     sys.exit(1)  
  17. # Bind the queue to the direct exchange,
  18. # 'routing_key' flag tells the direct exchange which kind of message it wants to receive.
  19. # A queue can bind multiple times to the same direct exchange with different routing_keys,
  20. # which means it wants to receive several kinds of messages.
  21. for severity in severities:  
  22.     channel.queue_bind(exchange='direct_logs',  
  23.                        queue=queue_name,  
  24.                        routing_key=severity)  
  25. print' [*] Waiting for logs. To exit press CTRL+C'
  26. def callback(ch, method, properties, body):  
  27.     print" [x] %r:%r" % (method.routing_key, body,)  
  28. channel.basic_consume(callback,  
  29.                       queue=queue_name,  
  30.                       no_ack=True)  
  31. channel.start_consuming()  

5. topic exchange的例子

這裡的routing_key可以使用一種類似正則表示式的形式,但是特殊字元只能是“*”和“#”,“*”代表一個單詞,“#”代表0個或是多個單詞。這樣傳送過來的訊息如果符合某個queue的routing_key定義的規則,那麼就會轉發給這個queue。如下圖示例:

send.py:

  1. #!/usr/bin/env python
  2. import pika  
  3. import sys  
  4. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  5.         host='localhost'))  
  6. channel = connection.channel()  
  7. # declare a exchange, type is topic, named 'topic_logs'.
  8. # topic exchange allows to do routing based on multiple criteria.
  9. channel.exchange_declare(exchange='topic_logs',  
  10.                          type='topic')  
  11. severity = sys.argv[1if len(sys.argv) > 1else'anonymous.info'
  12. message = ' '.join(sys.argv[2:]) or'Hello World!'
  13. # a message is sent to the topic exchange with a routing_key.
  14. # a message is identified by the routing_key.
  15. # the topic routing_key can be like 'topic.host','topic.topic1.topic3', etc
  16. # also can use '*'(one word) and '#'(zero or more words) to substitute word(s).
  17. channel.basic_publish(exchange='topic_logs',  
  18.                       routing_key=severity,  
  19.                       body=message)  
  20. print" [x] Sent %r:%r" % (severity, message)  
  21. connection.close()  

recv.py:
  1. #!/usr/bin/env python
  2. import pika  
  3. import sys  
  4. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  5.         host='localhost'))  
  6. channel = connection.channel()  
  7. # declare a topic exchange named 'topic_logs'
  8. channel.exchange_declare(exchange='topic_logs',  
  9.                          type='topic')  
  10. result = channel.queue_declare(exclusive=True)  
  11. queue_name = result.method.queue  
  12. binding_keys = sys.argv[1:]  
  13. ifnot binding_keys:  
  14.     print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)  
  15.     sys.exit(1)  
  16. # Bind the queue to the topic exchange,
  17. # 'routing_key' flag tells the topic exchange which kind of message it wants to receive.
  18. # A queue can bind multiple times to the same direct exchange with different routing_keys,
  19. # which means it wants to receive several kinds of messages.
  20. for binding_key in binding_keys:  
  21.     channel.queue_bind(exchange='topic_logs',  
  22.                        queue=queue_name,  
  23.                        routing_key=binding_key)  
  24. print' [*] Waiting for logs. To exit press CTRL+C'
  25. def callback(ch, method, properties, body):  
  26.     print" [x] %r:%r" % (method.routing_key, body,)  
  27. channel.basic_consume(callback,  
  28.                       queue=queue_name,  
  29.                       no_ack=True)  
  30. channel.start_consuming()  

6. PRC(Remote Procedure Call,遠端過程呼叫)

目前對這個的理解就是傳送一個訊息,然後還要得到一個結果,即訊息要走一個來回。如下圖所示:


client.py:

  1. #!/usr/bin/env python
  2. import pika  
  3. import uuid  
  4. class FibonacciRpcClient(object):  
  5.     def __init__(self):  
  6.         self.connection = pika.BlockingConnection(pika.ConnectionParameters(  
  7.                 host='localhost'))  
  8.         self.channel = self.connection.channel()  
  9.         result = self.channel.queue_declare(exclusive=True)  
  10.         self.callback_queue = result.method.queue  
  11.         self.channel.basic_consume(self.on_response, no_ack=True,  
  12.                                    queue=self.callback_queue)  
  13.     def on_response(self, ch, method, props, body):  
  14.         ifself.corr_id == props.correlation_id:  
  15.             self.response = body  
  16.     def call(self, n):  
  17.         self.response = None
  18.         self.corr_id = str(uuid.uuid4())  
  19.         self.channel.basic_publish(exchange='',  
  20.                                    routing_key='rpc_queue',  
  21.                                    properties=pika.BasicProperties(  
  22.                                          reply_to = self.callback_queue,  
  23.                                          correlation_id = self.corr_id,  
  24.                                          ),  
  25.                                    body=str(n))  
  26.         whileself.response isNone:  
  27.             self.connection.process_data_events()  
  28.         return int(self.response)  
  29. fibonacci_rpc = FibonacciRpcClient()  
  30. print" [x] Requesting fib(30)"
  31. response = fibonacci_rpc.call(30)  
  32. print" [.] Got %r" % (response,)  

server.py:
  1. #!/usr/bin/env python
  2. import pika  
  3. connection = pika.BlockingConnection(pika.ConnectionParameters(  
  4.         host='localhost'))  
  5. channel = connection.channel()  
  6. channel.queue_declare(queue='rpc_queue')  
  7. def fib(n):  
  8.     if n == 0:  
  9.         return0
  10.     elif n == 1:  
  11.         return1
  12.     else:  
  13.         return fib(n-1) + fib(n-2)  
  14. def on_request(ch, method, props, body):  
  15.     n = int(body)  
  16.     print" [.] fib(%s)"  % (n,)  
  17.     response = fib(n)  
  18.     ch.basic_publish(exchange='',  
  19.                      routing_key=props.reply_to,  
  20.                      properties=pika.BasicProperties(correlation_id = \  
  21.                                                      props.correlation_id),  
  22.                      body=str(response))  
  23.     ch.basic_ack(delivery_tag = method.delivery_tag)  
  24. channel.basic_qos(prefetch_count=1)  
  25. channel.basic_consume(on_request, queue='rpc_queue')  
  26. print" [x] Awaiting RPC requests"
  27. channel.start_consuming()  


幾個RabbitMQ相關的命令:

  1. 1. 檢視RabbitMQ中有多少個queue,以及每個queue中有多少個訊息:  
  2. $ sudo rabbitmqctl list_queues  
  3. 2. 檢視RabbitMQ中exchange的情況:  
  4. $ sudo rabbitmqctl list_exchanges  
  5. 3. 檢視RabbitMQ中exchange和queue繫結情況:  
  6. $  sudo   rabbitmqctl list_bindings  
  7. 4. 啟動/停止RabbitMQ:  
  8. $  sudo   invoke-rc.d rabbitmq-server stop/start/etc.