OpenStack 中 RabbitMQ 的使用 OpenStack 中 RabbitMQ 的使用
OpenStack 中 RabbitMQ 的使用
本文是 OpenStack 中的 RabbitMQ 使用研究 兩部分中的第一部分,將介紹 RabbitMQ 的基本概念,即 RabbitMQ 是什麼。第二部分將介紹其在 OpenStack 中的使用。
1 RabbitMQ 的基本概念
RabbitMQ 是實現了高階訊息佇列協議(AMQP)的開源訊息代理軟體(亦稱面向訊息的中介軟體)。
AMQP 是一個定義了在應用或者組織之間傳送訊息的協議的開放標準 (an open standard for passing business messages between applications or organizations),它最新的版本是 1.0。AMQP 目標在於解決在兩個應用之間傳送訊息存在的下列問題:
- 網路是不可靠的 =>訊息需要儲存後再轉發並有出錯處理機制
- 與本地呼叫相比,網路速度慢 =>得非同步呼叫
- 應用之間是不同的(比如不同語言實現、不同作業系統等) =>得與應用無關
- 應用會經常變化 =>同上
AMQP 使用非同步的、應用對應用的、二進位制資料通訊來解決這些問題。
RabbitMQ 是 AMQP 的一種實現,它包括Server (伺服器端)、Client (客戶端) 和 Plugins (外掛)。RabbitMQ 伺服器是用 Erlang 語言編寫的,其最新版本是剛剛(2015/02/11)釋出的 3.4.4
1.1 RabbitMQ 的概念非常清晰、簡潔
其基本概念參見下圖:
RabbitMQ 官網 和其它網站上有很多文章來描述其基本概念。簡單說明如下:
- Message (訊息):RabbitMQ 轉發的二進位制物件,包括Headers(頭)、Properties (屬性)和 Data (資料),其中資料部分不是必要的。具體見 1.2 部分的描述。
- Producer(生產者)
- Consumer (消費者):使用佇列 Queue 從 Exchange 中獲取訊息的應用。
- Exchange (交換機):負責接收生產者的訊息並把它轉到到合適的佇列 Queue 。下面有 1.3 部分描述。
-
Queue (佇列):一個儲存Exchange 發來的訊息的緩衝,並將訊息主動傳送給Consumer,或者 Consumer 主動來獲取訊息。見 1.4 部分的描述。
- Binding (繫結):佇列 和 交換機 之間的關係。Exchange 根據訊息的屬性和 Binding 的屬性來轉發訊息。繫結的一個重要屬性是 binding_key。
- Connection (連線)和 Channel (通道):生產者和消費者需要和 RabbitMQ 建立 TCP 連線。一些應用需要多個connection,為了節省TCP 連線,可以使用 Channel,它可以被認為是一種輕型的共享 TCP 連線的連線。連線需要使用者認證,並且支援 TLS (SSL)。連線需要顯式關閉。
- Virtual Host (虛擬主機) :RabbitMQ 用來進行資源隔離的機制。一個虛機主機會隔離使用者、exchange、queue 等。預設的虛擬主機為 "/"。
1.2 關於訊息 message
訊息結構:
訊息的幾個重要屬性:
- routing_key:Direct 和 Topic 型別的 exchange 會根據本屬性來轉發訊息。
-
delivery_mode: 將其值設定為 2 將用於訊息的持久化,持久化的訊息會被儲存到磁碟上來防止其丟失。下面章節 3 有描述。
- reply_to:一般用來表示RPC實現中客戶端的回撥佇列的名字。下面章節 4 有描述。
- correlation_id:用於使用 RabbitMQ 來實現 RPC的情形。下面章節 4 有描述。
- content_type:表示訊息data的編碼格式名稱。實際上RabbitMQ只負責原樣傳送訊息因此不會使用該屬性,該屬性只被 Publisher 和 Consumer 使用。
訊息的確認/刪除機制:
Consumer 處理訊息可能會失敗,那麼 RabbitMQ 怎麼知道什麼時候來刪除 queue 中的訊息呢?它使用兩種機制:
- 當 RabbitMQ 主動將訊息發給 Consumer 以後,它會刪除訊息
- 當 Consumer 發回一個確認後,RabbitMQ 會刪除訊息。
第二種情況下,如果 RabbitMQ 沒收到確認,它會把訊息重新放進佇列(re-queued)並新增標識 'redelivered' 表明該訊息之前已經發送過 ,如果沒有Consumer的話,訊息將保持到有下一個 Consumer 為止。
Consumer 可以主動告訴 RabbitMQ 訊息處理失敗了(拒絕訊息),並告知RabbitMQ 是刪除訊息還是重新放進佇列。
1.3 exchange 交換機
exchange 有幾個重要的屬性:
- Name 名字:交換機名字。空字串名字的exchange為預設的exchange。
- Type 型別:Direct, Fanout, Topic, Headers。型別決定 exchange 的訊息轉發能力。下面 章節2 有描述。
- durable:值為 True/False。值為 true 的 exchange 在 rabbitmq 重啟後會被自動建立。OpenStack 使用的exchange的該值都為false。
- auto_delete:值為 True/False。設定為 true 的話,當所有消費者的連線都關閉後,該 exchange 會被自動刪除。OpenStack 使用的exchange的該值都為false。
- exclusive:值為 True/False。設定為 true 的話,該 exchange 只允許被建立的connection使用,並且在該 connection 關閉後它會被自動刪除。
RabbitMQ 預設會為每一種型別生成一個或者兩個的預設的 exchange:
- Fanout 型別:名字為 amq.fanout
- Topic 型別: 名字為 amq.topic
- Headers 型別:名字為 amq.match 和 amq.headers
- Direct 型別:名字為空字串的exchange 以及 amq.direct。其中名字為空的exchange比較特殊。在一個 Queue 被建立後,RabbitMQ 會自動建立它和該 exchange 之間的binding,並且設定其 binding_key 為該queue 的名字。這樣,該語句 channel.basic_publish(exchange='', routing_key='hello', body=message) 會讓該預設的 exchange 將該 message 轉發到名字為 'hello' 的佇列中。
1.4 佇列 Queue
佇列同樣有類似於 exchange 的 name、durable、auto_delete 和 exclusive 等屬性,並且含義相同。
Exchange 會將訊息分發(copy)到符合要求的所有佇列中。
Consumer 可以主動獲取或者被動接受Queue裡面的訊息:
- 被動接收訊息(訂閱訊息 "push API"):使用 basic.consume(short reserved-1, queue-name queue, consumer-tag consumer-tag,no-local no-local, no-ack no-ack, bit exclusive, no-wait no-wait,table arguments)
方法。見 5.1 示例程式碼。 - 主動獲取訊息 ("pull API"): 使用 basic.get(short reserved-1, queue-name queue, no-ack no-ack) 方法。
一個 Queue 允許有多個 Consumer,比如利用 RabbitMQ 來實現一個簡單的 load balancer。這時候,訊息會在這些 Consumer 之間根據 channel 的 prefetch level 做分發(請參見AQMP: QoS or message prefetching),如果該值一樣的話,訊息會被平均分發給這些Consumer。
1.5 rabbitmqctl Cli
RabbitMQ 提供Cli rabbitmqctl [-n <node>] [-q] <command> [<command options>] 來進行管理和配置。常用到的命令有:
- stop/start_app
- add/delete/list_vhosts
- list_queues/exchanges/bindings/connections/channels
- trace_on/off
2 訊息轉發機制
Exchange 根據它自身的型別 type、訊息的屬性 routing_key 或者 headers,以及 Binding 的屬性 binding_key 來轉發訊息。
Exchange 的型別 Type | 使用的訊息屬性 | 使用的Binding 屬性 | 轉發模式 |
Fanout | - (忽略訊息的轉發屬性) | - (忽略binding的轉發屬性) | Exchange 將訊息轉發到所有與它有 binding 關係的佇列中。 這種方法轉發效率較高。OpenStack 大量使用這種型別的 exchange。 |
Direct | routing_key (任意的字串,比如 "abc") | binding_key (任意的字串,比如 "abc") | Exchange 只將訊息轉到 binding 的 binding_key 等於訊息的 routing_key 的佇列中。 |
Topic | routing_key (以 "." 分割的多單詞字串,比如 abc.efg.hij) | binding_key (包含 "#" 和 "*" 的以 “.” 分割的多單詞字串,比如 *.efg.*) | Exchange 只將訊息轉到訊息的 routing_key 和 binding 的 binding_key 匹配的佇列中。匹配規則如下: (1)兩者以"."分割的單詞數目相同 (2)"*"可代表一個單詞 (3)"#“可代表零個或多個單詞 |
Headers | headers (訊息頭) | binding_key | Exchange 只將訊息轉到訊息的 headers 和 binding 的 binding_key 匹配的佇列中。匹配規則待研究。 OpenStack不使用該型別的exchange。 |
參考文件:
https://www.rabbitmq.com/getstarted.html 這裡有詳細的闡述和示例原始碼。
http://www.cnblogs.com/starof/p/4173413.html 這裡有官網文件的中文版。
3 持久化
訊息的持久化意味著在 RabbitMQ 被重啟後,訊息依然還在。要實現持久化,得實現幾個相關元件的持久化:
(1). 交換機的持久化,需要將其 durable 屬性設為 true。chan.exchange_declare(exchange="sorting_room", type="direct", durable=True, auto_delete=False,)
(2). 佇列的持久化,需要將其 durable 屬性設定為 true。chan.queue_declare(queue="po_box", durable=True, exclusive=False, auto_delete=False)
(3). 訊息的持久化,需要將其 Delivery Mode 屬性設定成2 。msg.properties["delivery_mode"] = 24 RPC
可以使用 RabbitMQ 來實現 RPC 機制,這裡說說其實現原理:
過程:
(1). 客戶端 Client 設定訊息的 routing key 為 Service 的佇列 op_q;設定訊息的 reply-to 屬性為返回的 response 的目標佇列 reponse_q,設定其 correlation_id 為以隨機UUID,然後將訊息發到 exchange。比如 channel.basic_publish(exchange='', routing_key='op_q', properties=pika.BasicProperties(reply_to = reponse_q, correlation_id = self.corr_id),body=request)
(2). Exchange 將訊息轉發到 Service 的 op_q
(3). Service 收到該訊息後進行處理,然後將response 發到 exchange,並設定訊息的 routing_key 為原訊息的 reply_to 屬性,以及設定其 correlation_id 為原訊息的 correlation_id 。
ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response))
(4). Exchange 將訊息轉發到 reponse_q
(5). Client 逐一接受 response_q 中的訊息,檢查訊息的 correlation_id 是否為等於它發出的訊息的correlation_id,是的話表明該訊息為它需要的response。
這裡有詳細的闡述。
5 Python AMQP SDK
常用的Python AMQP SDK包括:
- py-amqplib (支援 AMQP 0.8): http://barryp.org/software/py-amqplib/
- pika (支援 AMQP 0.9.1,Python 2.6 和 2.7):https://github.com/pika/pika
- txamqp: https://launchpad.net/txamqp
5.1 一個簡單的使用 py-amqplib 的 Consumer 實現
#建立Connection和Channel連線到 RabbitMQ 伺服器 conn = amqp.Connection(host="localhost:5672", userid="guest", password="1111", virtual_host="/", insist=False) chan = conn.channel() #建立 queue result = chan.queue_declare(queue="debug", durable=True, exclusive=False, auto_delete=False) #建立 exchange result = chan.exchange_declare(exchange="sorting_room2", type="topic", durable=True, auto_delete=False,) #建立 binding result = chan.queue_bind(queue="debug", exchange="sorting_room2", routing_key="*.debug") #回撥函式,當有 message 到達 queue 後,該函式會被呼叫 def recv_callback(msg): print 'Received: ' + msg.body + ' from channel #' + str(msg.channel.channel_id)
# lChannel.basic_ack(msg.delivery_tag) #如果no_ack=False的話,可以需要發回一個確認
#啟動一個 consumer,consumer_tag 是該 consumer 的一個唯一識別符號
#no_ack = True 表示該 consumer 不會發回確認
chan.basic_consume(queue='debug', no_ack=True, callback=recv_callback, consumer_tag="debugtag")
#等待有訊息發到 queue while True: chan.wait()
#終止該 consumer chan.basic_cancel("testtag") #關閉 connection 和 channel chan.close() conn.close()
5.2 一個簡單的使用 py-amqplib 的 Producer 實現程式碼
from amqplib import client_0_8 as amqp import sys
#建立 connection 和 channel conn = amqp.Connection(host="localhost:5672", userid="guest", password="1111", virtual_host="/", insist=False) chan = conn.channel()
#建立 message msg = amqp.Message(sys.argv[1]) msg.properties["delivery_mode"] = 2
#傳送 message chan.basic_publish(msg,exchange="sorting_room2",routing_key=(sys.argv[2]))
#關閉 connection 和 channel chan.close() conn.close()
5.3 使用 pika
5.3.1 安裝 pika
wget https://pypi.python.org/packages/source/p/pika/pika-0.9.14.tar.gz #md5=b99aad4b88961d3c7e4876b8327fc97c tar zxvf pika-0.9.14.tar.gz cd pika-0.9.14 python setup.py install
5.3.2 使用 pika 程式設計(來源)
#!/usr/bin/env python ''' rabbitmq trace scripts. require (rabbitmq_tracing): $ sudo rabbitmq-plugins enable rabbitmq_tracing usage: $ sudo rabbitmqctl trace_on $ ./rabbitmqtrace.py << output >> ''' import sys import time from optparse import OptionParser import pika __AUTHOR__ = 'smallfish' __VERSION__ = '0.0.1' def _out(args): print time.strftime('%Y-%m-%d %H:%M:%S'), args def _run(host, port, vhost, user, password): conn = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port, virtual_host=vhost, credentials=pika.PlainCredentials(user, password))) chan = conn.channel() def _on_message(ch, method, properties, body): ret = {} ret['routing_key'] = method.routing_key ret['headers'] = properties.headers ret['body'] = body _out(ret) _out('start subscribe amq.rabbitmq.trace') ret = chan.queue_declare(exclusive=False, auto_delete=True) queue = ret.method.queue chan.queue_bind(exchange='amq.rabbitmq.trace', queue=queue, routing_key='#') chan.queue_bind(exchange='amq.rabbitmq.log', queue=queue, routing_key='#') chan.basic_consume(_on_message, queue=queue, no_ack=True) chan.start_consuming() def main(): parser = OptionParser('usage: %prog') parser.add_option('', '--host', metavar='host', default='localhost', help='rabbitmq host address, default: %default') parser.add_option('', '--port', metavar='port', default=5672, type='int', help='rabbitmq port, default: %default') parser.add_option('', '--vhost', metavar='vhost', default='/', help='rabbitmq vhost, default: %default') parser.add_option('', '--user', metavar='user', default='guest', help='rabbitmq user, default: %default') parser.add_option('', '--password', metavar='password', default='guest', help='rabbitmq password, default: %default') (options, args) = parser.parse_args() _run(options.host, options.port, options.vhost, options.user, options.password) if __name__ == '__main__': main()
6 外掛和訊息追蹤
RabbitMQ 支援使用外掛來支援 Management, Federation, Shovel 和 STOMP。所有的外掛都在這裡。
6.1 rabbitmq-management 外掛
它提供 HTTP-based API 和 browser-based UI 以及 CLI 來管理 RabbitMQ。它的GUI的訪問地址是 http://<rabbitmq server ip>:15672/#/traces。它的GUI上,提供了一個 overview,還可以通過它來管理connection、channel、exchange 和 queue,以及 virtual host,tracing 和 policy等。
6.2 RabbitMQ 的 firehose 機制
該機制提供了一個檢視被轉發的訊息的途徑。當開啟 firehose 的時候,RabbitMQ 會自動建立 amq.rabbitmq.trace 和 amq.rabbitmq.log 兩個exchange。你可以程式設計建立queue 從這兩個 exchange 裡面獲取 trace 和 log,從而觀察每一個被處理的訊息。這裡有一個開原始碼實現。
6.3 rabbitmq_tracing 外掛
rabbitmq_tracing 外掛在 management 外掛增加了訊息追蹤的方法,它是從 firehose 中獲取資料。在激活了 rabbitmq-management,firehose 和 rabbitmq_tracing,你可以在 management GUI 中追蹤訊息:
自此,RabbitMQ 基本上算熟悉了,接下來可以開始分析 OpenStack 中是如何使用 RabbitMQ 了。
本文是 OpenStack 中的 RabbitMQ 使用研究 兩部分中的第一部分,將介紹 RabbitMQ 的基本概念,即 RabbitMQ 是什麼。第二部分將介紹其在 OpenStack 中的使用。
1 RabbitMQ 的基本概念
RabbitMQ 是實現了高階訊息佇列協議(AMQP)的開源訊息代理軟體(亦稱面向訊息的中介軟體)。
AMQP 是一個定義了在應用或者組織之間傳送訊息的協議的開放標準 (an open standard for passing business messages between applications or organizations),它最新的版本是 1.0。AMQP 目標在於解決在兩個應用之間傳送訊息存在的下列問題:
- 網路是不可靠的 =>訊息需要儲存後再轉發並有出錯處理機制
- 與本地呼叫相比,網路速度慢 =>得非同步呼叫
- 應用之間是不同的(比如不同語言實現、不同作業系統等) =>得與應用無關
- 應用會經常變化 =>同上
AMQP 使用非同步的、應用對應用的、二進位制資料通訊來解決這些問題。
RabbitMQ 是 AMQP 的一種實現,它包括Server (伺服器端)、Client (客戶端) 和 Plugins (外掛)。RabbitMQ 伺服器是用 Erlang 語言編寫的,其最新版本是剛剛(2015/02/11)釋出的 3.4.4,而 OpenStack Juno 中使用的 Server 是 2014年3月釋出的 3.2.4 版本。現在 RabbitMQ 支援的 AMQP 版本依然是0.9.1。
1.1 RabbitMQ 的概念非常清晰、簡潔
其基本概念參見下圖:
RabbitMQ 官網 和其它網站上有很多文章來描述其基本概念。簡單說明如下:
- Message (訊息):RabbitMQ 轉發的二進位制物件,包括Headers(頭)、Properties (屬性)和 Data (資料),其中資料部分不是必要的。具體見 1.2 部分的描述。
- Producer(生產者): 訊息的生產者,負責產生訊息並把訊息發到交換機 Exhange的應用。
- Consumer (消費者):使用佇列 Queue 從 Exchange 中獲取訊息的應用。
- Exchange (交換機):負責接收生產者的訊息並把它轉到到合適的佇列 Queue 。下面有 1.3 部分描述。
-
Queue (佇列):一個儲存Exchange 發來的訊息的緩衝,並將訊息主動傳送給Consumer,或者 Consumer 主動來獲取訊息。見 1.4 部分的描述。
- Binding (繫結):佇列 和 交換機 之間的關係。Exchange 根據訊息的屬性和 Binding 的屬性來轉發訊息。繫結的一個重要屬性是 binding_key。
- Connection (連線)和 Channel (通道):生產者和消費者需要和 RabbitMQ 建立 TCP 連線。一些應用需要多個connection,為了節省TCP 連線,可以使用 Channel,它可以被認為是一種輕型的共享 TCP 連線的連線。連線需要使用者認證,並且支援 TLS (SSL)。連線需要顯式關閉。
- Virtual Host (虛擬主機) :RabbitMQ 用來進行資源隔離的機制。一個虛機主機會隔離使用者、exchange、queue 等。預設的虛擬主機為 "/"。
1.2 關於訊息 message
訊息結構:
訊息的幾個重要屬性:
- routing_key:Direct 和 Topic 型別的 exchange 會根據本屬性來轉發訊息。
-
delivery_mode: 將其值設定為 2 將用於訊息的持久化,持久化的訊息會被儲存到磁碟上來防止其丟失。下面章節 3 有描述。
- reply_to:一般用來表示RPC實現中客戶端的回撥佇列的名字。下面章節 4 有描述。
- correlation_id:用於使用 RabbitMQ 來實現 RPC的情形。下面章節 4 有描述。
- content_type:表示訊息data的編碼格式名稱。實際上RabbitMQ只負責原樣傳送訊息因此不會使用該屬性,該屬性只被 Publisher 和 Consumer 使用。
訊息的確認/刪除機制:
Consumer 處理訊息可能會失敗,那麼 RabbitMQ 怎麼知道什麼時候來刪除 queue 中的訊息呢?它使用兩種機制:
- 當 RabbitMQ 主動將訊息發給 Consumer 以後,它會刪除訊息
- 當 Consumer 發回一個確認後,RabbitMQ 會刪除訊息。
第二種情況下,如果 RabbitMQ 沒收到確認,它會把訊息重新放進佇列(re-queued)並新增標識 'redelivered' 表明該訊息之前已經發送過 ,如果沒有Consumer的話,訊息將保持到有下一個 Consumer 為止。
Consumer 可以主動告訴 RabbitMQ 訊息處理失敗了(拒絕訊息),並告知RabbitMQ 是刪除訊息還是重新放進佇列。
1.3 exchange 交換機
exchange 有幾個重要的屬性:
- Name 名字:交換機名字。空字串名字的exchange為預設的exchange。
- Type 型別:Direct, Fanout, Topic, Headers。型別決定 exchange 的訊息轉發能力。下面 章節2 有描述。
- durable:值為 True/False。值為 true 的 exchange 在 rabbitmq 重啟後會被自動建立。OpenStack 使用的exchange的該值都為false。
- auto_delete:值為 True/False。設定為 true 的話,當所有消費者的連線都關閉後,該 exchange 會被自動刪除。OpenStack 使用的exchange的該值都為false。
- exclusive:值為 True/False。設定為 true 的話,該 exchange 只允許被建立的connection使用,並且在該 connection 關閉後它會被自動刪除。
RabbitMQ 預設會為每一種型別生成一個或者兩個的預設的 exchange:
- Fanout 型別:名字為 amq.fanout
- Topic 型別: 名字為 amq.topic
- Headers 型別:名字為 amq.match 和 amq.headers
- Direct 型別:名字為空字串的exchange 以及 amq.direct。其中名字為空的exchange比較特殊。在一個 Queue 被建立後,RabbitMQ 會自動建立它和該 exchange 之間的binding,並且設定其 binding_key 為該queue 的名字。這樣,該語句 channel.basic_publish(exchange='', routing_key='hello', body=message) 會讓該預設的 exchange 將該 message 轉發到名字為 'hello' 的佇列中。
1.4 佇列 Queue
佇列同樣有類似於 exchange 的 name、durable、auto_delete 和 exclusive 等屬性,並且含義相同。
Exchange 會將訊息分發(copy)到符合要求的所有佇列中。
Consumer 可以主動獲取或者被動接受Queue裡面的訊息:
- 被動接收訊息(訂閱訊息 "push API"):使用 basic.consume(short reserved-1, queue-name queue, consumer-tag consumer-tag,no-local no-local, no-ack no-ack, bit exclusive, no-wait no-wait,table arguments)
方法。見 5.1 示例程式碼。 - 主動獲取訊息 ("pull API"): 使用 basic.get(short reserved-1, queue-name queue, no-ack no-ack) 方法。
一個 Queue 允許有多個 Consumer,比如利用 RabbitMQ 來實現一個簡單的 load balancer。這時候,訊息會在這些 Consumer 之間根據 channel 的 prefetch level 做分發(請參見AQMP: QoS or message prefetching),如果該值一樣的話,訊息會被平均分發給這些Consumer。
1.5 rabbitmqctl Cli
RabbitMQ 提供Cli rabbitmqctl [-n <node>] [-q] <command> [<command options>] 來進行管理和配置。常用到的命令有:
- stop/start_app
- add/delete/list_vhosts
- list_queues/exchanges/bindings/connections/channels
- trace_on/off
2 訊息轉發機制
Exchange 根據它自身的型別 type、訊息的屬性 routing_key 或者 headers,以及 Binding 的屬性 binding_key 來轉發訊息。
Exchange 的型別 Type | 使用的訊息屬性 | 使用的Binding 屬性 | 轉發模式 |
Fanout | - (忽略訊息的轉發屬性) | - (忽略binding的轉發屬性) | Exchange 將訊息轉發到所有與它有 binding 關係的佇列中。 這種方法轉發效率較高。OpenStack 大量使用這種型別的 exchange。 |
Direct | routing_key (任意的字串,比如 "abc") | binding_key (任意的字串,比如 "abc") | Exchange 只將訊息轉到 binding 的 binding_key 等於訊息的 routing_key 的佇列中。 |
Topic | routing_key (以 "." 分割的多單詞字串,比如 abc.efg.hij) | binding_key (包含 "#" 和 "*" 的以 “.” 分割的多單詞字串,比如 *.efg.*) | Exchange 只將訊息轉到訊息的 routing_key 和 binding 的 binding_key 匹配的佇列中。匹配規則如下: (1)兩者以"."分割的單詞數目相同 (2)"*"可代表一個單詞 (3)"#“可代表零個或多個單詞 |
Headers | headers (訊息頭) | binding_key | Exchange 只將訊息轉到訊息的 headers 和 binding 的 binding_key 匹配的佇列中。匹配規則待研究。 OpenStack不使用該型別的exchange。 |
參考文件:
https://www.rabbitmq.com/getstarted.html 這裡有詳細的闡述和示例原始碼。
http://www.cnblogs.com/starof/p/4173413.html 這裡有官網文件的中文版。
3 持久化
訊息的持久化意味著在 RabbitMQ 被重啟後,訊息依然還在。要實現持久化,得實現幾個相關元件的持久化:
(1). 交換機的持久化,需要將其 durable 屬性設為 true。chan.exchange_declare(exchange="sorting_room", type="direct", durable=True, auto_delete=False,)
(2). 佇列的持久化,需要將其 durable 屬性設定為 true。chan.queue_declare(queue="po_box", durable=True, exclusive=False, auto_delete=False)
(3). 訊息的持久化,需要將其 Delivery Mode 屬性設定成2 。msg.properties["delivery_mode"] = 24 RPC
可以使用 RabbitMQ 來實現 RPC 機制,這裡說說其實現原理:
過程:
(1). 客戶端 Client 設定訊息的 routing key 為 Service 的佇列 op_q;設定訊息的 reply-to 屬性為返回的 response 的目標佇列 reponse_q,設定其 correlation_id 為以隨機UUID,然後將訊息發到 exchange。比如 channel.basic_publish(exchange='', routing_key='op_q', properties=pika.BasicProperties(reply_to = reponse_q, correlation_id = self.corr_id),body=request)
(2). Exchange 將訊息轉發到 Service 的 op_q
(3). Service 收到該訊息後進行處理,然後將response 發到 exchange,並設定訊息的 routing_key 為原訊息的 reply_to 屬性,以及設定其 correlation_id 為原訊息的 correlation_id 。
ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response))
(4). Exchange 將訊息轉發到 reponse_q
(5). Client 逐一接受 response_q 中的訊息,檢查訊息的 correlation_id 是否為等於它發出的訊息的correlation_id,是的話表明該訊息為它需要的response。
這裡有詳細的闡述。
5 Python AMQP SDK
常用的Python AMQP SDK包括:
- py-amqplib (支援 AMQP 0.8): http://barryp.org/software/py-amqplib/
- pika (支援 AMQP 0.9.1,Python 2.6 和 2.7):https://github.com/pika/pika
- txamqp: https://launchpad.net/txamqp
5.1 一個簡單的使用 py-amqplib 的 Consumer 實現
#建立Connection和Channel連線到 RabbitMQ 伺服器 conn = amqp.Connection(host="localhost:5672", userid="guest", password="1111", virtual_host="/", insist=False) chan = conn.channel() #建立 queue result = chan.queue_declare(queue="debug", durable=True, exclusive=False, auto_delete=False) #建立 exchange result = chan.exchange_declare(exchange="sorting_room2", type="topic", durable=True, auto_delete=False,) #建立 binding result = chan.queue_bind(queue="debug", exchange="sorting_room2", routing_key="*.debug") #回撥函式,當有 message 到達 queue 後,該函式會被呼叫 def recv_callback(msg): print 'Received: ' + msg.body + ' from channel #' + str(msg.channel.channel_id)
# lChannel.basic_ack(msg.delivery_tag) #如果no_ack=False的話,可以需要發回一個確認
#啟動一個 consumer,consumer_tag 是該 consumer 的一個唯一識別符號
#no_ack = True 表示該 consumer 不會發回確認
chan.basic_consume(queue='debug', no_ack=True, callback=recv_callback, consumer_tag="debugtag")
#等待有訊息發到 queue while True: chan.wait()
#終止該 consumer chan.basic_cancel("testtag") #關閉 connection 和 channel chan.close() conn.close()
5.2 一個簡單的使用 py-amqplib 的 Producer 實現程式碼
from amqplib import client_0_8 as amqp import sys
#建立 connection 和 channel conn = amqp.Connection(host="localhost:5672", userid="guest", password="1111", virtual_host="/", insist=False) chan = conn.channel()
#建立 message msg = amqp.Message(sys.argv[1]) msg.properties["delivery_mode"] = 2
#傳送 message chan.basic_publish(msg,exchange="sorting_room2",routing_key=(sys.argv[2]))
#關閉 connection 和 channel chan.close() conn.close()
5.3 使用 pika
5.3.1 安裝 pika
wget https://pypi.python.org/packages/source/p/pika/pika-0.9.14.tar.gz #md5=b99aad4b88961d3c7e4876b8327fc97c tar zxvf pika-0.9.14.tar.gz cd pika-0.9.14 python setup.py install
5.3.2 使用 pika 程式設計(來源)
#!/usr/bin/env python ''' rabbitmq trace scripts. require (rabbitmq_tracing): $ sudo rabbitmq-plugins enable rabbitmq_tracing usage: $ sudo rabbitmqctl trace_on $ ./rabbitmqtrace.py << output >> ''' import sys import time from optparse import OptionParser import pika __AUTHOR__ = 'smallfish' __VERSION__ = '0.0.1' def _out(args): print time.strftime('%Y-%m-%d %H:%M:%S'), args def _run(host, port, vhost, user, password): conn = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port, virtual_host=vhost, credentials=pika.PlainCredentials(user, password))) chan = conn.channel() def _on_message(ch, method, properties, body): ret = {} ret['routing_key'] = method.routing_key ret['headers'] = properties.headers ret['body'] = body _out(ret) _out('start subscribe amq.rabbitmq.trace') ret = chan.queue_declare(exclusive=False, auto_delete=True) queue = ret.method.queue chan.queue_bind(exchange='amq.rabbitmq.trace', queue=queue, routing_key='#') chan.queue_bind(exchange='amq.rabbitmq.log', queue=queue, routing_key='#') chan.basic_consume(_on_message, queue=queue, no_ack=True) chan.start_consuming() def main(): parser = OptionParser('usage: %prog') parser.add_option('', '--host', metavar='host', default='localhost', help='rabbitmq host address, default: %default') parser.add_option('', '--port', metavar='port', default=5672, type='int', help='rabbitmq port, default: %default') parser.add_option('', '--vhost', metavar='vhost', default='/', help='rabbitmq vhost, default: %default') parser.add_option('', '--user', metavar='user', default='guest', help='rabbitmq user, default: %default') parser.add_option('', '--password', metavar='password', default='guest', help='rabbitmq password, default: %default') (options, args) = parser.parse_args() _run(options.host, options.port, options.vhost, options.user, options.password) if __name__ == '__main__': main()
6 外掛和訊息追蹤
RabbitMQ 支援使用外掛來支援 Management, Federation, Shovel 和 STOMP。所有的外掛都在這裡。
6.1 rabbitmq-management 外掛
它提供 HTTP-based API 和 browser-based UI 以及 CLI 來管理 RabbitMQ。它的GUI的訪問地址是 http://<rabbitmq server ip>:15672/#/traces。它的GUI上,提供了一個 overview,還可以通過它來管理connection、channel、exchange 和 queue,以及 virtual host,tracing 和 policy等。
6.2 RabbitMQ 的 firehose 機制
該機制提供了一個檢視被轉發的訊息的途徑。當開啟 firehose 的時候,RabbitMQ 會自動建立 amq.rabbitmq.trace 和 amq.rabbitmq.log 兩個exchange。你可以程式設計建立queue 從這兩個 exchange 裡面獲取 trace 和 log,從而觀察每一個被處理的訊息。這裡有一個開原始碼實現。
6.3 rabbitmq_tracing 外掛
rabbitmq_tracing 外掛在 management 外掛增加了訊息追蹤的方法,它是從 firehose 中獲取資料。在激活了 rabbitmq-management,firehose 和 rabbitmq_tracing,你可以在 management GUI 中追蹤訊息:
自此,RabbitMQ 基本上算熟悉了,接下來可以開始分析 OpenStack 中是如何使用 RabbitMQ 了。