1. 程式人生 > >OpenStack 中 RabbitMQ 的使用 OpenStack 中 RabbitMQ 的使用

OpenStack 中 RabbitMQ 的使用 OpenStack 中 RabbitMQ 的使用

OpenStack 中 RabbitMQ 的使用

本文是 OpenStack 中的 RabbitMQ 使用研究 兩部分中的第一部分,將介紹 RabbitMQ 的基本概念,即 RabbitMQ 是什麼。第二部分將介紹其在 OpenStack 中的使用。

 

1 RabbitMQ 的基本概念

RabbitMQ 是實現了高階訊息佇列協議(AMQP)的開源訊息代理軟體(亦稱面向訊息的中介軟體)。

AMQP 是一個定義了在應用或者組織之間傳送訊息的協議的開放標準 (an open standard for passing business messages between applications or organizations),它最新的版本是 1.0。AMQP 目標在於解決在兩個應用之間傳送訊息存在的下列問題:

  • 網路是不可靠的 =>訊息需要儲存後再轉發並有出錯處理機制
  • 與本地呼叫相比,網路速度慢 =>得非同步呼叫
  • 應用之間是不同的(比如不同語言實現、不同作業系統等) =>得與應用無關
  • 應用會經常變化 =>同上

AMQP 使用非同步的、應用對應用的、二進位制資料通訊來解決這些問題。

RabbitMQ 是 AMQP 的一種實現,它包括Server (伺服器端)、Client (客戶端) 和 Plugins (外掛)。RabbitMQ 伺服器是用 Erlang 語言編寫的,其最新版本是剛剛(2015/02/11)釋出的 3.4.4

,而 OpenStack Juno 中使用的 Server 是 2014年3月釋出的 3.2.4 版本。現在 RabbitMQ 支援的 AMQP 版本依然是0.9.1。

1.1 RabbitMQ 的概念非常清晰、簡潔

其基本概念參見下圖:

 

RabbitMQ 官網 和其它網站上有很多文章來描述其基本概念。簡單說明如下:

  • Message (訊息):RabbitMQ 轉發的二進位制物件,包括Headers(頭)、Properties (屬性)和 Data (資料),其中資料部分不是必要的。具體見 1.2 部分的描述。
  • Producer(生產者)
    : 訊息的生產者,負責產生訊息並把訊息發到交換機 Exhange的應用。
  • Consumer (消費者):使用佇列 Queue 從 Exchange 中獲取訊息的應用。
  • Exchange (交換機):負責接收生產者的訊息並把它轉到到合適的佇列 Queue 。下面有 1.3 部分描述。
  • Queue (佇列):一個儲存Exchange 發來的訊息的緩衝,並將訊息主動傳送給Consumer,或者 Consumer 主動來獲取訊息。見 1.4 部分的描述。

  • Binding (繫結):佇列 和 交換機 之間的關係。Exchange 根據訊息的屬性和 Binding 的屬性來轉發訊息。繫結的一個重要屬性是 binding_key。
  • Connection (連線)和 Channel (通道):生產者和消費者需要和 RabbitMQ 建立 TCP 連線。一些應用需要多個connection,為了節省TCP 連線,可以使用 Channel,它可以被認為是一種輕型的共享 TCP 連線的連線。連線需要使用者認證,並且支援 TLS (SSL)。連線需要顯式關閉。
  • Virtual Host (虛擬主機) :RabbitMQ 用來進行資源隔離的機制。一個虛機主機會隔離使用者、exchange、queue 等。預設的虛擬主機為 "/"。

1.2 關於訊息 message

訊息結構:

訊息的幾個重要屬性:

  • routing_key:Direct 和 Topic 型別的 exchange 會根據本屬性來轉發訊息。
  • delivery_mode: 將其值設定為 2 將用於訊息的持久化,持久化的訊息會被儲存到磁碟上來防止其丟失。下面章節 3 有描述。

  • reply_to:一般用來表示RPC實現中客戶端的回撥佇列的名字。下面章節 4 有描述。
  • correlation_id:用於使用 RabbitMQ 來實現 RPC的情形。下面章節 4 有描述。
  • content_type:表示訊息data的編碼格式名稱。實際上RabbitMQ只負責原樣傳送訊息因此不會使用該屬性,該屬性只被 Publisher 和 Consumer 使用。

訊息的確認/刪除機制:

Consumer 處理訊息可能會失敗,那麼 RabbitMQ 怎麼知道什麼時候來刪除 queue 中的訊息呢?它使用兩種機制:

  • 當 RabbitMQ 主動將訊息發給 Consumer 以後,它會刪除訊息
  • 當 Consumer 發回一個確認後,RabbitMQ 會刪除訊息。

第二種情況下,如果 RabbitMQ 沒收到確認,它會把訊息重新放進佇列(re-queued)並新增標識 'redelivered' 表明該訊息之前已經發送過 ,如果沒有Consumer的話,訊息將保持到有下一個 Consumer 為止。

Consumer 可以主動告訴 RabbitMQ 訊息處理失敗了(拒絕訊息),並告知RabbitMQ 是刪除訊息還是重新放進佇列。

1.3 exchange 交換機

exchange 有幾個重要的屬性:

  • Name 名字:交換機名字。空字串名字的exchange為預設的exchange。
  • Type 型別:Direct, Fanout, Topic, Headers。型別決定 exchange 的訊息轉發能力。下面 章節2 有描述。
  • durable:值為 True/False。值為 true 的 exchange 在 rabbitmq 重啟後會被自動建立。OpenStack 使用的exchange的該值都為false。
  • auto_delete:值為 True/False。設定為 true 的話,當所有消費者的連線都關閉後,該 exchange 會被自動刪除。OpenStack 使用的exchange的該值都為false。
  • exclusive:值為 True/False。設定為 true 的話,該 exchange 只允許被建立的connection使用,並且在該 connection 關閉後它會被自動刪除。

RabbitMQ 預設會為每一種型別生成一個或者兩個的預設的 exchange:

  • Fanout 型別:名字為 amq.fanout
  • Topic 型別: 名字為 amq.topic
  • Headers 型別:名字為 amq.match 和 amq.headers
  • Direct 型別:名字為空字串的exchange 以及 amq.direct。其中名字為空的exchange比較特殊。在一個 Queue 被建立後,RabbitMQ 會自動建立它和該 exchange 之間的binding,並且設定其 binding_key 為該queue 的名字。這樣,該語句 channel.basic_publish(exchange=''routing_key='hello',             body=message) 會讓該預設的 exchange 將該 message 轉發到名字為 'hello' 的佇列中。

1.4 佇列 Queue

佇列同樣有類似於 exchange 的 name、durable、auto_delete 和 exclusive 等屬性,並且含義相同。

Exchange 會將訊息分發(copy)到符合要求的所有佇列中。

Consumer 可以主動獲取或者被動接受Queue裡面的訊息:

一個 Queue 允許有多個 Consumer,比如利用 RabbitMQ 來實現一個簡單的 load balancer。這時候,訊息會在這些 Consumer 之間根據 channel 的 prefetch level 做分發(請參見AQMP: QoS or message prefetching),如果該值一樣的話,訊息會被平均分發給這些Consumer。

1.5 rabbitmqctl  Cli

RabbitMQ 提供Cli  rabbitmqctl [-n <node>] [-q] <command> [<command options>] 來進行管理和配置。常用到的命令有:

  • stop/start_app
  • add/delete/list_vhosts
  • list_queues/exchanges/bindings/connections/channels
  • trace_on/off

2 訊息轉發機制

Exchange 根據它自身的型別 type、訊息的屬性 routing_key 或者 headers,以及 Binding 的屬性 binding_key 來轉發訊息。

Exchange 的型別 Type 使用的訊息屬性 使用的Binding 屬性 轉發模式
Fanout - (忽略訊息的轉發屬性) - (忽略binding的轉發屬性)

Exchange 將訊息轉發到所有與它有 binding 關係的佇列中。

這種方法轉發效率較高。OpenStack 大量使用這種型別的 exchange。

Direct routing_key (任意的字串,比如 "abc") binding_key (任意的字串,比如 "abc") Exchange 只將訊息轉到 binding 的 binding_key 等於訊息的 routing_key 的佇列中。
Topic routing_key (以 "." 分割的多單詞字串,比如 abc.efg.hij) binding_key (包含 "#" 和 "*" 的以 “.” 分割的多單詞字串,比如 *.efg.*)

Exchange 只將訊息轉到訊息的 routing_key 和 binding 的 binding_key 匹配的佇列中。匹配規則如下:

(1)兩者以"."分割的單詞數目相同

(2)"*"可代表一個單詞

(3)"#“可代表零個或多個單詞

Headers headers (訊息頭) binding_key

Exchange 只將訊息轉到訊息的 headers 和 binding 的 binding_key 匹配的佇列中。匹配規則待研究。

OpenStack不使用該型別的exchange。

參考文件:

https://www.rabbitmq.com/getstarted.html 這裡有詳細的闡述和示例原始碼。

http://www.cnblogs.com/starof/p/4173413.html 這裡有官網文件的中文版。

3 持久化

訊息的持久化意味著在 RabbitMQ 被重啟後,訊息依然還在。要實現持久化,得實現幾個相關元件的持久化:

(1). 交換機的持久化,需要將其 durable 屬性設為 true。chan.exchange_declare(exchange="sorting_room", type="direct", durable=True, auto_delete=False,)

(2). 佇列的持久化,需要將其 durable 屬性設定為 true。chan.queue_declare(queue="po_box", durable=True, exclusive=False, auto_delete=False)

(3). 訊息的持久化,需要將其 Delivery Mode 屬性設定成2 。msg.properties["delivery_mode"] = 2

4 RPC

可以使用 RabbitMQ 來實現 RPC 機制,這裡說說其實現原理:

過程:

(1). 客戶端 Client 設定訊息的 routing key 為 Service 的佇列 op_q;設定訊息的 reply-to 屬性為返回的 response 的目標佇列 reponse_q,設定其 correlation_id 為以隨機UUID,然後將訊息發到 exchange。比如 channel.basic_publish(exchange='', routing_key='op_q', properties=pika.BasicProperties(reply_to = reponse_q, correlation_id = self.corr_id),body=request)

(2). Exchange 將訊息轉發到 Service 的 op_q

(3). Service 收到該訊息後進行處理,然後將response 發到 exchange,並設定訊息的 routing_key 為原訊息的 reply_to 屬性,以及設定其 correlation_id 為原訊息的 correlation_id 。

ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response))

(4). Exchange 將訊息轉發到 reponse_q

(5). Client 逐一接受 response_q 中的訊息,檢查訊息的 correlation_id 是否為等於它發出的訊息的correlation_id,是的話表明該訊息為它需要的response。

 這裡有詳細的闡述。

5 Python AMQP SDK

常用的Python AMQP SDK包括:

5.1 一個簡單的使用 py-amqplib 的 Consumer 實現

複製程式碼
#建立Connection和Channel連線到 RabbitMQ 伺服器
conn = amqp.Connection(host="localhost:5672", userid="guest", password="1111", virtual_host="/", insist=False)
chan = conn.channel()

#建立 queue
result = chan.queue_declare(queue="debug", durable=True, exclusive=False, auto_delete=False)

#建立 exchange
result = chan.exchange_declare(exchange="sorting_room2", type="topic", durable=True, auto_delete=False,)

#建立 binding
result = chan.queue_bind(queue="debug", exchange="sorting_room2", routing_key="*.debug")

#回撥函式,當有 message 到達 queue 後,該函式會被呼叫
def recv_callback(msg):
    print 'Received: ' + msg.body + ' from channel #' + str(msg.channel.channel_id)
# lChannel.basic_ack(msg.delivery_tag) #如果no_ack=False的話,可以需要發回一個確認

#啟動一個 consumer,consumer_tag 是該 consumer 的一個唯一識別符號
#no_ack = True 表示該 consumer 不會發回確認
chan.basic_consume(queue='debug', no_ack=True, callback=recv_callback, consumer_tag="debugtag")
#等待有訊息發到 queue
while True:
    chan.wait()

#終止該 consumer chan.basic_cancel("testtag") #關閉 connection 和 channel chan.close() conn.close()
複製程式碼

5.2 一個簡單的使用 py-amqplib 的 Producer 實現程式碼

複製程式碼
from amqplib import client_0_8 as amqp
import sys

#建立 connection 和 channel conn = amqp.Connection(host="localhost:5672", userid="guest", password="1111", virtual_host="/", insist=False) chan = conn.channel()
#建立 message msg = amqp.Message(sys.argv[1]) msg.properties["delivery_mode"] = 2

#傳送 message chan.basic_publish(msg,exchange="sorting_room2",routing_key=(sys.argv[2]))
#關閉 connection 和 channel chan.close() conn.close()
複製程式碼

5.3 使用 pika

5.3.1 安裝 pika

 

wget https://pypi.python.org/packages/source/p/pika/pika-0.9.14.tar.gz #md5=b99aad4b88961d3c7e4876b8327fc97c  
tar zxvf pika-0.9.14.tar.gz  
cd pika-0.9.14
python setup.py install

 

5.3.2 使用 pika 程式設計(來源

複製程式碼
#!/usr/bin/env python
'''
rabbitmq trace scripts.
require (rabbitmq_tracing):
    $ sudo rabbitmq-plugins enable rabbitmq_tracing
usage:
    $ sudo rabbitmqctl trace_on
    $ ./rabbitmqtrace.py
    << output >>
'''
import sys
import time
from optparse import OptionParser
import pika

__AUTHOR__  = 'smallfish'
__VERSION__ = '0.0.1'

def _out(args):
    print time.strftime('%Y-%m-%d %H:%M:%S'), args

def _run(host, port, vhost, user, password):
    conn = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port, virtual_host=vhost,
        credentials=pika.PlainCredentials(user, password)))
    chan = conn.channel()
    def _on_message(ch, method, properties, body):
        ret = {}
        ret['routing_key'] = method.routing_key
        ret['headers'] = properties.headers
        ret['body'] = body
        _out(ret)
    _out('start subscribe amq.rabbitmq.trace')
    ret = chan.queue_declare(exclusive=False, auto_delete=True)
    queue = ret.method.queue
    chan.queue_bind(exchange='amq.rabbitmq.trace', queue=queue, routing_key='#')
    chan.queue_bind(exchange='amq.rabbitmq.log', queue=queue, routing_key='#')
    chan.basic_consume(_on_message, queue=queue, no_ack=True)
    chan.start_consuming()

def main():
    parser = OptionParser('usage: %prog')
    parser.add_option('', '--host', metavar='host', default='localhost', help='rabbitmq host address, default: %default')
    parser.add_option('', '--port', metavar='port', default=5672, type='int', help='rabbitmq port, default: %default')
    parser.add_option('', '--vhost', metavar='vhost', default='/', help='rabbitmq vhost, default: %default')
    parser.add_option('', '--user', metavar='user', default='guest', help='rabbitmq user, default: %default')
    parser.add_option('', '--password', metavar='password', default='guest', help='rabbitmq password, default: %default')
    (options, args) = parser.parse_args()
    _run(options.host, options.port, options.vhost, options.user, options.password)

if __name__ == '__main__':
    main()
複製程式碼

 

6 外掛和訊息追蹤

RabbitMQ 支援使用外掛來支援 Management, Federation, Shovel  和 STOMP。所有的外掛都在這裡

6.1 rabbitmq-management 外掛

它提供 HTTP-based API 和 browser-based UI 以及 CLI 來管理 RabbitMQ。它的GUI的訪問地址是 http://<rabbitmq server ip>:15672/#/traces。它的GUI上,提供了一個 overview,還可以通過它來管理connection、channel、exchange 和 queue,以及 virtual host,tracing 和 policy等。

6.2 RabbitMQ 的 firehose 機制

該機制提供了一個檢視被轉發的訊息的途徑。當開啟 firehose 的時候,RabbitMQ 會自動建立 amq.rabbitmq.trace 和 amq.rabbitmq.log 兩個exchange。你可以程式設計建立queue 從這兩個 exchange 裡面獲取 trace 和 log,從而觀察每一個被處理的訊息。這裡有一個開原始碼實現

6.3 rabbitmq_tracing 外掛

rabbitmq_tracing 外掛在 management 外掛增加了訊息追蹤的方法,它是從 firehose 中獲取資料。在激活了 rabbitmq-management,firehose 和 rabbitmq_tracing,你可以在 management GUI  中追蹤訊息:

 

自此,RabbitMQ 基本上算熟悉了,接下來可以開始分析 OpenStack 中是如何使用 RabbitMQ 了。

本文是 OpenStack 中的 RabbitMQ 使用研究 兩部分中的第一部分,將介紹 RabbitMQ 的基本概念,即 RabbitMQ 是什麼。第二部分將介紹其在 OpenStack 中的使用。

 

1 RabbitMQ 的基本概念

RabbitMQ 是實現了高階訊息佇列協議(AMQP)的開源訊息代理軟體(亦稱面向訊息的中介軟體)。

AMQP 是一個定義了在應用或者組織之間傳送訊息的協議的開放標準 (an open standard for passing business messages between applications or organizations),它最新的版本是 1.0。AMQP 目標在於解決在兩個應用之間傳送訊息存在的下列問題:

  • 網路是不可靠的 =>訊息需要儲存後再轉發並有出錯處理機制
  • 與本地呼叫相比,網路速度慢 =>得非同步呼叫
  • 應用之間是不同的(比如不同語言實現、不同作業系統等) =>得與應用無關
  • 應用會經常變化 =>同上

AMQP 使用非同步的、應用對應用的、二進位制資料通訊來解決這些問題。

RabbitMQ 是 AMQP 的一種實現,它包括Server (伺服器端)、Client (客戶端) 和 Plugins (外掛)。RabbitMQ 伺服器是用 Erlang 語言編寫的,其最新版本是剛剛(2015/02/11)釋出的 3.4.4,而 OpenStack Juno 中使用的 Server 是 2014年3月釋出的 3.2.4 版本。現在 RabbitMQ 支援的 AMQP 版本依然是0.9.1。

1.1 RabbitMQ 的概念非常清晰、簡潔

其基本概念參見下圖:

 

RabbitMQ 官網 和其它網站上有很多文章來描述其基本概念。簡單說明如下:

  • Message (訊息):RabbitMQ 轉發的二進位制物件,包括Headers(頭)、Properties (屬性)和 Data (資料),其中資料部分不是必要的。具體見 1.2 部分的描述。
  • Producer(生產者): 訊息的生產者,負責產生訊息並把訊息發到交換機 Exhange的應用。
  • Consumer (消費者):使用佇列 Queue 從 Exchange 中獲取訊息的應用。
  • Exchange (交換機):負責接收生產者的訊息並把它轉到到合適的佇列 Queue 。下面有 1.3 部分描述。
  • Queue (佇列):一個儲存Exchange 發來的訊息的緩衝,並將訊息主動傳送給Consumer,或者 Consumer 主動來獲取訊息。見 1.4 部分的描述。

  • Binding (繫結):佇列 和 交換機 之間的關係。Exchange 根據訊息的屬性和 Binding 的屬性來轉發訊息。繫結的一個重要屬性是 binding_key。
  • Connection (連線)和 Channel (通道):生產者和消費者需要和 RabbitMQ 建立 TCP 連線。一些應用需要多個connection,為了節省TCP 連線,可以使用 Channel,它可以被認為是一種輕型的共享 TCP 連線的連線。連線需要使用者認證,並且支援 TLS (SSL)。連線需要顯式關閉。
  • Virtual Host (虛擬主機) :RabbitMQ 用來進行資源隔離的機制。一個虛機主機會隔離使用者、exchange、queue 等。預設的虛擬主機為 "/"。

1.2 關於訊息 message

訊息結構:

訊息的幾個重要屬性:

  • routing_key:Direct 和 Topic 型別的 exchange 會根據本屬性來轉發訊息。
  • delivery_mode: 將其值設定為 2 將用於訊息的持久化,持久化的訊息會被儲存到磁碟上來防止其丟失。下面章節 3 有描述。

  • reply_to:一般用來表示RPC實現中客戶端的回撥佇列的名字。下面章節 4 有描述。
  • correlation_id:用於使用 RabbitMQ 來實現 RPC的情形。下面章節 4 有描述。
  • content_type:表示訊息data的編碼格式名稱。實際上RabbitMQ只負責原樣傳送訊息因此不會使用該屬性,該屬性只被 Publisher 和 Consumer 使用。

訊息的確認/刪除機制:

Consumer 處理訊息可能會失敗,那麼 RabbitMQ 怎麼知道什麼時候來刪除 queue 中的訊息呢?它使用兩種機制:

  • 當 RabbitMQ 主動將訊息發給 Consumer 以後,它會刪除訊息
  • 當 Consumer 發回一個確認後,RabbitMQ 會刪除訊息。

第二種情況下,如果 RabbitMQ 沒收到確認,它會把訊息重新放進佇列(re-queued)並新增標識 'redelivered' 表明該訊息之前已經發送過 ,如果沒有Consumer的話,訊息將保持到有下一個 Consumer 為止。

Consumer 可以主動告訴 RabbitMQ 訊息處理失敗了(拒絕訊息),並告知RabbitMQ 是刪除訊息還是重新放進佇列。

1.3 exchange 交換機

exchange 有幾個重要的屬性:

  • Name 名字:交換機名字。空字串名字的exchange為預設的exchange。
  • Type 型別:Direct, Fanout, Topic, Headers。型別決定 exchange 的訊息轉發能力。下面 章節2 有描述。
  • durable:值為 True/False。值為 true 的 exchange 在 rabbitmq 重啟後會被自動建立。OpenStack 使用的exchange的該值都為false。
  • auto_delete:值為 True/False。設定為 true 的話,當所有消費者的連線都關閉後,該 exchange 會被自動刪除。OpenStack 使用的exchange的該值都為false。
  • exclusive:值為 True/False。設定為 true 的話,該 exchange 只允許被建立的connection使用,並且在該 connection 關閉後它會被自動刪除。

RabbitMQ 預設會為每一種型別生成一個或者兩個的預設的 exchange:

  • Fanout 型別:名字為 amq.fanout
  • Topic 型別: 名字為 amq.topic
  • Headers 型別:名字為 amq.match 和 amq.headers
  • Direct 型別:名字為空字串的exchange 以及 amq.direct。其中名字為空的exchange比較特殊。在一個 Queue 被建立後,RabbitMQ 會自動建立它和該 exchange 之間的binding,並且設定其 binding_key 為該queue 的名字。這樣,該語句 channel.basic_publish(exchange=''routing_key='hello',             body=message) 會讓該預設的 exchange 將該 message 轉發到名字為 'hello' 的佇列中。

1.4 佇列 Queue

佇列同樣有類似於 exchange 的 name、durable、auto_delete 和 exclusive 等屬性,並且含義相同。

Exchange 會將訊息分發(copy)到符合要求的所有佇列中。

Consumer 可以主動獲取或者被動接受Queue裡面的訊息:

一個 Queue 允許有多個 Consumer,比如利用 RabbitMQ 來實現一個簡單的 load balancer。這時候,訊息會在這些 Consumer 之間根據 channel 的 prefetch level 做分發(請參見AQMP: QoS or message prefetching),如果該值一樣的話,訊息會被平均分發給這些Consumer。

1.5 rabbitmqctl  Cli

RabbitMQ 提供Cli  rabbitmqctl [-n <node>] [-q] <command> [<command options>] 來進行管理和配置。常用到的命令有:

  • stop/start_app
  • add/delete/list_vhosts
  • list_queues/exchanges/bindings/connections/channels
  • trace_on/off

2 訊息轉發機制

Exchange 根據它自身的型別 type、訊息的屬性 routing_key 或者 headers,以及 Binding 的屬性 binding_key 來轉發訊息。

Exchange 的型別 Type 使用的訊息屬性 使用的Binding 屬性 轉發模式
Fanout - (忽略訊息的轉發屬性) - (忽略binding的轉發屬性)

Exchange 將訊息轉發到所有與它有 binding 關係的佇列中。

這種方法轉發效率較高。OpenStack 大量使用這種型別的 exchange。

Direct routing_key (任意的字串,比如 "abc") binding_key (任意的字串,比如 "abc") Exchange 只將訊息轉到 binding 的 binding_key 等於訊息的 routing_key 的佇列中。
Topic routing_key (以 "." 分割的多單詞字串,比如 abc.efg.hij) binding_key (包含 "#" 和 "*" 的以 “.” 分割的多單詞字串,比如 *.efg.*)

Exchange 只將訊息轉到訊息的 routing_key 和 binding 的 binding_key 匹配的佇列中。匹配規則如下:

(1)兩者以"."分割的單詞數目相同

(2)"*"可代表一個單詞

(3)"#“可代表零個或多個單詞

Headers headers (訊息頭) binding_key

Exchange 只將訊息轉到訊息的 headers 和 binding 的 binding_key 匹配的佇列中。匹配規則待研究。

OpenStack不使用該型別的exchange。

參考文件:

https://www.rabbitmq.com/getstarted.html 這裡有詳細的闡述和示例原始碼。

http://www.cnblogs.com/starof/p/4173413.html 這裡有官網文件的中文版。

3 持久化

訊息的持久化意味著在 RabbitMQ 被重啟後,訊息依然還在。要實現持久化,得實現幾個相關元件的持久化:

(1). 交換機的持久化,需要將其 durable 屬性設為 true。chan.exchange_declare(exchange="sorting_room", type="direct", durable=True, auto_delete=False,)

(2). 佇列的持久化,需要將其 durable 屬性設定為 true。chan.queue_declare(queue="po_box", durable=True, exclusive=False, auto_delete=False)

(3). 訊息的持久化,需要將其 Delivery Mode 屬性設定成2 。msg.properties["delivery_mode"] = 2

4 RPC

可以使用 RabbitMQ 來實現 RPC 機制,這裡說說其實現原理:

過程:

(1). 客戶端 Client 設定訊息的 routing key 為 Service 的佇列 op_q;設定訊息的 reply-to 屬性為返回的 response 的目標佇列 reponse_q,設定其 correlation_id 為以隨機UUID,然後將訊息發到 exchange。比如 channel.basic_publish(exchange='', routing_key='op_q', properties=pika.BasicProperties(reply_to = reponse_q, correlation_id = self.corr_id),body=request)

(2). Exchange 將訊息轉發到 Service 的 op_q

(3). Service 收到該訊息後進行處理,然後將response 發到 exchange,並設定訊息的 routing_key 為原訊息的 reply_to 屬性,以及設定其 correlation_id 為原訊息的 correlation_id 。

ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response))

(4). Exchange 將訊息轉發到 reponse_q

(5). Client 逐一接受 response_q 中的訊息,檢查訊息的 correlation_id 是否為等於它發出的訊息的correlation_id,是的話表明該訊息為它需要的response。

 這裡有詳細的闡述。

5 Python AMQP SDK

常用的Python AMQP SDK包括:

5.1 一個簡單的使用 py-amqplib 的 Consumer 實現

複製程式碼
#建立Connection和Channel連線到 RabbitMQ 伺服器
conn = amqp.Connection(host="localhost:5672", userid="guest", password="1111", virtual_host="/", insist=False)
chan = conn.channel()

#建立 queue
result = chan.queue_declare(queue="debug", durable=True, exclusive=False, auto_delete=False)

#建立 exchange
result = chan.exchange_declare(exchange="sorting_room2", type="topic", durable=True, auto_delete=False,)

#建立 binding
result = chan.queue_bind(queue="debug", exchange="sorting_room2", routing_key="*.debug")

#回撥函式,當有 message 到達 queue 後,該函式會被呼叫
def recv_callback(msg):
    print 'Received: ' + msg.body + ' from channel #' + str(msg.channel.channel_id)
# lChannel.basic_ack(msg.delivery_tag) #如果no_ack=False的話,可以需要發回一個確認

#啟動一個 consumer,consumer_tag 是該 consumer 的一個唯一識別符號
#no_ack = True 表示該 consumer 不會發回確認
chan.basic_consume(queue='debug', no_ack=True, callback=recv_callback, consumer_tag="debugtag")
#等待有訊息發到 queue
while True:
    chan.wait()

#終止該 consumer chan.basic_cancel("testtag") #關閉 connection 和 channel chan.close() conn.close()
複製程式碼

5.2 一個簡單的使用 py-amqplib 的 Producer 實現程式碼

複製程式碼
from amqplib import client_0_8 as amqp
import sys

#建立 connection 和 channel conn = amqp.Connection(host="localhost:5672", userid="guest", password="1111", virtual_host="/", insist=False) chan = conn.channel()
#建立 message msg = amqp.Message(sys.argv[1]) msg.properties["delivery_mode"] = 2

#傳送 message chan.basic_publish(msg,exchange="sorting_room2",routing_key=(sys.argv[2]))
#關閉 connection 和 channel chan.close() conn.close()
複製程式碼

5.3 使用 pika

5.3.1 安裝 pika

 

wget https://pypi.python.org/packages/source/p/pika/pika-0.9.14.tar.gz #md5=b99aad4b88961d3c7e4876b8327fc97c  
tar zxvf pika-0.9.14.tar.gz  
cd pika-0.9.14
python setup.py install

 

5.3.2 使用 pika 程式設計(來源

複製程式碼
#!/usr/bin/env python
'''
rabbitmq trace scripts.
require (rabbitmq_tracing):
    $ sudo rabbitmq-plugins enable rabbitmq_tracing
usage:
    $ sudo rabbitmqctl trace_on
    $ ./rabbitmqtrace.py
    << output >>
'''
import sys
import time
from optparse import OptionParser
import pika

__AUTHOR__  = 'smallfish'
__VERSION__ = '0.0.1'

def _out(args):
    print time.strftime('%Y-%m-%d %H:%M:%S'), args

def _run(host, port, vhost, user, password):
    conn = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port, virtual_host=vhost,
        credentials=pika.PlainCredentials(user, password)))
    chan = conn.channel()
    def _on_message(ch, method, properties, body):
        ret = {}
        ret['routing_key'] = method.routing_key
        ret['headers'] = properties.headers
        ret['body'] = body
        _out(ret)
    _out('start subscribe amq.rabbitmq.trace')
    ret = chan.queue_declare(exclusive=False, auto_delete=True)
    queue = ret.method.queue
    chan.queue_bind(exchange='amq.rabbitmq.trace', queue=queue, routing_key='#')
    chan.queue_bind(exchange='amq.rabbitmq.log', queue=queue, routing_key='#')
    chan.basic_consume(_on_message, queue=queue, no_ack=True)
    chan.start_consuming()

def main():
    parser = OptionParser('usage: %prog')
    parser.add_option('', '--host', metavar='host', default='localhost', help='rabbitmq host address, default: %default')
    parser.add_option('', '--port', metavar='port', default=5672, type='int', help='rabbitmq port, default: %default')
    parser.add_option('', '--vhost', metavar='vhost', default='/', help='rabbitmq vhost, default: %default')
    parser.add_option('', '--user', metavar='user', default='guest', help='rabbitmq user, default: %default')
    parser.add_option('', '--password', metavar='password', default='guest', help='rabbitmq password, default: %default')
    (options, args) = parser.parse_args()
    _run(options.host, options.port, options.vhost, options.user, options.password)

if __name__ == '__main__':
    main()
複製程式碼

 

6 外掛和訊息追蹤

RabbitMQ 支援使用外掛來支援 Management, Federation, Shovel  和 STOMP。所有的外掛都在這裡

6.1 rabbitmq-management 外掛

它提供 HTTP-based API 和 browser-based UI 以及 CLI 來管理 RabbitMQ。它的GUI的訪問地址是 http://<rabbitmq server ip>:15672/#/traces。它的GUI上,提供了一個 overview,還可以通過它來管理connection、channel、exchange 和 queue,以及 virtual host,tracing 和 policy等。

6.2 RabbitMQ 的 firehose 機制

該機制提供了一個檢視被轉發的訊息的途徑。當開啟 firehose 的時候,RabbitMQ 會自動建立 amq.rabbitmq.trace 和 amq.rabbitmq.log 兩個exchange。你可以程式設計建立queue 從這兩個 exchange 裡面獲取 trace 和 log,從而觀察每一個被處理的訊息。這裡有一個開原始碼實現

6.3 rabbitmq_tracing 外掛

rabbitmq_tracing 外掛在 management 外掛增加了訊息追蹤的方法,它是從 firehose 中獲取資料。在激活了 rabbitmq-management,firehose 和 rabbitmq_tracing,你可以在 management GUI  中追蹤訊息:

 

自此,RabbitMQ 基本上算熟悉了,接下來可以開始分析 OpenStack 中是如何使用 RabbitMQ 了。