RabbitMQ 入門【精+轉】
rabbitmq可以用一本書取講,這裏只是介紹一些使用過程中,常用到的基本的知識點。
官方文檔覆蓋的內容,非常全面:http://www.rabbitmq.com/documentation.html 。
1. 介紹
RabbitMQ,即消息隊列系統,它是一款開源消息隊列中間件,采用Erlang語言開發,RabbitMQ是AMQP(Advanced Message Queueing Protocol)的標準實現。
AMQP是一個公開發布的異步消息的規範,是提供統一消息服務的應用層標準高級消息隊列協議,為面向消息的中間件設計.消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
https://www.rabbitmq.com/tutorials/amqp-concepts.html
相對於JMS(Java Message Service)規範來說,JMS使用的是特定語言的APIs,而消息格式可自由定義,而AMQP對消息的格式和傳輸是有要求的,但實現不會受操作系統、開發語言以及平臺等的限制。
JMS和AMQP還有一個較大的區別:JMS有隊列(Queues)和主題(Topics)兩種消息傳遞模型,發送到 JMS隊列 的消息最多只能被一個Client消費,發送到 JMS主題 的消息可能會被多個Clients消費;AMQP只有隊列(Queues),隊列的消息只能被單個接受者消費,發送者並不直接把消息發送到隊列中,而是發送到Exchange中,該Exchage會與一個或多個隊列綁定,能夠實現與JMS隊列和主題同樣的功能。
另外還有一種 MQTT協議,意為消息隊列遙測傳輸,是IBM開發的一個即時通訊協議。由於其維護一個長連接以輕量級低消耗著稱,所以常用於移動端消息推送服務開發。MQTT是基於TCP的應用層協議封裝,實現了異步Pub/Sub,在物聯網(IoT)應用廣泛。
RabbitMQ可通過庫、插件的形式,支持JMS和MQTT協議。參考:http://geek.csdn.net/news/detail/71894
1.1 主要概念
-
Broker
接收和分發消息的應用,RabbitMQ Server就是Message Broker -
Exchange
message到達broker的第一站,根據分發規則,匹配查詢表中的routing key,分發消息到queue中去。常用的類型有:direct, topic, fanout。 -
Queue
消息隊列載體,每個消息都會被投入到一個或多個隊列 -
Binding
在exchange和queue之間建立關系就叫Binding,消費者聲明隊列的時候一般會指定routing_key,也可以叫binding_key。Binding信息被保存到exchange中的查詢表中,用於message的分發依據。 -
Routing Key
這裏區分一下binding和routing: binding是一個將exchange和queue關聯起來的動作,routing_key可以理解成隊列的一個屬性,表示這個隊列接受符合該routing_key的消息,routing_key需要在發送消息的時候指定。 -
Vhost
於多租戶和安全因素設計的,把AMQP的基本組件劃分到一個虛擬的分組中,類似於網絡中的namespace概念。當多個不同的用戶使用同一個RabbitMQ server提供的服務時,可以劃分出多個vhost,每個用戶在自己的vhost創建exchange/queue等 -
Producer
消息生產者,就是投遞消息的程序。只負責把消息發送exchange,附帶一些消息屬性。 -
Consumer
消息消費者,就是接受消息的程序。 -
Channel
如果每一次訪問RabbitMQ都建立一個Connection,在消息量大的時候建立TCP Connection的開銷將是巨大的,效率也較低。
Channel是在connection內部建立的邏輯連接,如果應用程序支持多線程,通常每個thread創建單獨的channel進行通訊,AMQP method包含了channel id幫助客戶端和message broker識別channel,所以channel之間是完全隔離的。Channel作為輕量級的Connection極大減少了操作系統建立TCP connection的開銷。
1.2 對比
rabbitmq
activemq
rocketmq
kafka
zeromq
redis
celery
待續
2. 安裝配置
CentOS 6.7,安裝3.6.14最新穩定版本:
1
|
wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
|
如果機器上有epel源,先把它禁用掉:enabled=0,否則會默認從這個源按照低版本rabbitmq 。
如果已安裝老版本,可能需要卸載 rpm -qa|grep erlang|awk ‘{print "yum remove -y "$1}‘|sh
。
繼續
1
|
wget http://packages.erlang-solutions.com/rpm/centos/6/x86_64/erlang-20.1-1.el6.x86_64.rpm
|
確保本地主機名能夠正常解析出自己的ip,或 127.0.0.1. (ping rabbitmq-01)
1
|
ulimit -S -n 4096
|
默認用戶名密碼 guest
/guest
, 具有vhost /
的所有權限,只能在本地訪問。
隊列元數據及內容信息,默認在目錄 /var/lib/rabbitmq/mnesia
下。
2.1 配置
1
|
# 啟用管理插件
|
%%
是Erlang的註釋符號。
- vm_memory_high_watermark
RabbitMQ在使用當前機器的40%以上內存時候,會發出內存警告,並阻止RabbitMQ所有連接(producer連接)。這個閾值便由vm_memory_high_watermark
控制 - vm_memory_high_watermark_paging_ratio
當內存中的數據達到一定數量後,他需要被page out出來。比如默認這個ratio=0.5,機器內存8G,於是 memory watermark=0.4 8G幾即 3.2G。3.2G paging_raio = 1.6G,當消息擠壓的量達到1.6G後,開始paging到磁盤上。
一搬不去改它。 - hipe_compile
開啟Erlang HiPE編譯選項(相當於Erlang的jit技術),能夠提高性能20%-50%。在Erlang R17後HiPE已經相當穩定,RabbitMQ官方也建議開啟此選項。
開啟之後,每次啟動 rabbitmq-server,需要多花1分鐘左右。
看下 rabbitmqctl status
信息,混個眼熟:
1
|
|
2.2 命令行
1
|
# 添加新的 vhost
|
在開始介紹概念之前,先可以從Web UI上來認識一下rabbitmq:
rabbitmq overview 首頁監控面板:
rabbitmq 客戶端的連接信息:
某個channel的詳情:
exchanges信息:
queues信息:
策略定義:
3. Exchange類型
AMQP 0-9-1 定義了四種內置類型的exchange type: direct, fanout, topic, header。exchange除了類型以外,還可以指定一些屬性:
- Name: 交換器名字。一般以
.
號分隔以作區分 - Durability: 持久化的exchange在broker重啟之後依然存在。相對應是 transient exchange
- Auto-delete: 如果設置了該屬性,在最後一個隊列unbound之後,exchange會自動刪除
-
Arguments: 可以用在滿足插件擴展上
-
alternate-exchange
RabbitMQ自己擴展的功能,不是AMQP協議定義的。
Alternate Exchange屬性的作用,創建Exchange指定該x-arguments
的alternate-exchange
屬性,發送消息的時候根據route key沒有找到可以投遞的隊列,這就會將此消息路由到 Alternate Exchange 屬性指定的 Exchange (就是一個普通的exchange)上了。比如把MySQL的binlog訂閱出來,因為裏面有許多表,每個表的dml行數有多有少。我們可以將變更量多的表單獨放到一個隊列,其它表一起放到一個隊列,就可以為原始的exchange添加 alternate-exchange 屬性,將其它表的數據重新投遞到另一個exchange。
-
3.1 fanout
fanout類型的exchange是最容易理解的,它會把來自生產者的消息廣播到所有綁定的queues上。這種情況一般會把消息的routing_key設置為空‘‘
,甚至不關心隊列的名字。如下圖:
amq.gen-RQ6...
和amq.gen-As8...
是消費者隨機生成了兩個隊列,綁定到fanout exchange上,C1,C2會各自收到一模一樣的消息。
3.2 direct
direct類型的exchange轉發消息到隊列裏,是直接基於消息的routing key。
C1在聲明隊列的時候,指定routing_key=error。C2的隊列上綁定了info,error,warning三個key。
於是error類型的消息會被同時發送到C1,C2(準確的說是兩個隊列上),而info,warning類型的消息只發送到隊列amqp.gen-Agl...
。
如果要達到Round-Robin輪詢效果,即兩個Consumer依次從同一個隊列裏取消息,那麽可以在聲明隊列的時候指定相同的 queue name,rabbitmq會自動均衡的發送消息給多個Consumer,可水平擴展消費者的處理能力(如果要保證處理順序,得設置prefetch_count=1)。
3.3 topic
topic類型的exchange大大提升了消息路由的靈活性。不像fanout那樣無腦的全部轉發,也不像direct那樣指定所有的routing_key,否則不匹配的key的消息就會被丟棄。
比如有一個收集日誌的系統,模塊包括auth/cron/kernel/app1/app2,日誌級別包括error,info,warning。現在要把所有模塊的error日誌規整在一起,可以設計routing_key: \<module>.\<severity> (auth.error, auth.info, …, app1.error, app1.info…),然後設置queue的binding_key=’*.error’
topic exchange 會根據 .
劃分word,有以下兩種正則符號用於匹配routing_key:
*
: 代表一個word#
: 代表0個或多個word
拿官網的例圖來說:<敏捷度>.<顏色>.<物種>
上圖創建了3個bindings:
- 隊列Q1的binding_key=
*.orange.*
,即對所有橙色的動物感興趣 - 隊列Q2綁定了
*.*.rabbit
和lazy.#
,即訂閱了所有和兔子相關的消息,以及反應遲鈍的動物
於是:
- routing_key為
quick.orange.rabbit
的消息,會被發送到兩個隊列 - routing_key為
lazy.orange.elephant
的消息,也會被發送到兩個隊列 - routing_key為
quick.orange.fox
的消息,只會發送到Q1 - routing_key為
lazy.brown.fox
的消息,只會發送到Q2 - routing_key為
lazy.pink.rabbit
的消息,只會發送到Q2。雖然匹配到了lazy.#
和*.*.rabbit
,但只會發送一次 - routing_key為
quick.brown.fox
的消息,會被丟棄,因為沒有任何綁定的隊列得到匹配 - routing_key為
lazy.orange.male.rabbit
的消息,還是會發送到Q2,因為lazy.#
然而orange
、quick.orange.male.rabbit
,也破壞了約定,但沒得到匹配,消息丟棄。 - routing_key為
#
,接受所有消息,相當於fanout exchange - routing_key沒有
*
和#
時,相當於direct exchange
3.4 headers
header類型的exchange用的不多,是在routing_key不能滿足使用場景的情況下(如routing_key必須是字符串),在消息的頭部加入一個或多個key/value,然後在聲明隊列的時候也指定要綁定的header。
binding的時候有個參數x-match
,指定headers所有的k/v都要匹配成功(all
)還是任意一個匹配則接受(any
)。
3.5 x-consistent-hash
這是個第三方插件形式存在的exchange,目前已內置於rabbitmq:https://github.com/rabbitmq/rabbitmq-consistent-hash-exchange
x-consistent-hash
類型的exchange可以根據routing_key,用一致性哈希算法,將消息路由到不同的隊列上。它可以盡可能的保證每個隊列上的消息數量相同,也可以隨時添加更多的隊列來“分流”,並且能保證同一個routing_key會進入相同的queue。
要達到這樣的效果,queue routing key必須是一個字符串類型的數字。比如Q1:routing_key=’10’, Q2:routing_key=’20’,那麽消息就會按照1:2的比例,發送到Q1,Q2。
3.6 x-modulus-hash
第三方插件形成存在的exchange,從3.6.0版本開始,也內置到了rabbitmq發行版:https://github.com/rabbitmq/rabbitmq-sharding
x-modulus-hash
類型的exchange與 x-consistent-hash
很像,也叫 sharding exchange,即將message在多個隊列之間進行分區發送。它的實現方法是根據 routing_key 先獲得hash,再用 Hash mod N
得到隊列,N就是綁定到exchange上的隊列個數。
4. Queue屬性
Queue 要先於 Exchange 創建,否則生產者發布的消息,在沒有綁定隊列之前,會丟失。
已存在的Queue可以重復declare,但前提是屬性要相同。
- Name: 隊列名稱。可以在應用裏面指定,或者交給broker生成
- Durable:持久化的Queue在broker重啟之後,依然存在。
註意,這裏的持久化與消息持久無關。是個 property - Exclusive: 為True時,表示當Consumer的Connection端口之後,隊列自動刪除。一般由broker生成的隨機隊列名,指定這個選項 。
排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一個連接創建的排他隊列的 -
Auto-delete: 當最後一個consumer取消訂閱之後,隊列自動刪除
-
Arguments: 設置可選的一些參數,如
-
x-message-ttl
消息在隊裏裏最大存活時間,超過這個ttl就會被丟棄。單位毫秒 -
x-max_length
隊列裏最多容納的消息個數,超過這個值,則會從隊列頭部drop掉消息 -
x-max-priority
設置了這個參數,就表示這是一個具有優先級的隊列。它的值是可定義的優先級最大值,一般10以內就夠了。
在生產商Publish消息的時候,消息Property上可設置Priority -
x-queue-mode
這個參數是控制是否為”延遲隊列”,Lazy Queue是在3.6.0引入的,它會盡量把消息存在磁盤上,節省內存
RabbitMQ一開始的設計初衷,是做異步、解耦,所以會把消息放在內存裏面,以便快速的發送給消費者(持久化類型的消息會同時存在於磁盤和內存緩存中)。如果用它來暫時存放大量消息,而不消費或者消費太慢,會導致性能明顯下降,因為為了釋放內存,消息得swap到磁盤上 —— 會阻塞隊列接收新消息。如果內存使用達到broker設置的 water-mark,也會拒絕接收新消息。
Lazy Queue(x-queue-mode=lazy
)的作用就是一接收到新消息,馬上存到文件系統,完全避免了前面提到的內存占用。這會增加磁盤I/O(順序的),與處理持久化類型的消息很相似。 -
x-dead-letter-exchange
死信。當消息在一個隊列中變成死信後,它能被重新publish到另一個Exchange,這個Exchange就是DLX。消息變成死信一向有以下幾種情況:- 消息被拒絕(basic.reject or basic.nack)並且requeue=false
- 消息TTL過期
- 隊列達到最大長度
DLX也是一下正常的Exchange同一般的Exchange沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性,當這個隊列中有死信時,RabbitMQ就會自動的將這個消息重新發布到設置的Exchange中去,進而被路由到另一個隊列。
死信被重新 requeue 時,可以改變它的routing_key,以便新的隊列處理,routing_key用x-dead-letter-routing-key
指定,如果不指定則繼續使用消息原來的routing_key。
-
5. Message屬性
- routing_key
路由關鍵字,exchange根據這個關鍵字進行消息投遞 - delivery_mode
- 1: Non-persistent,消息不持久化到磁盤,盡快被消費掉。重啟broker之後消息丟失
- 2: Persistent,消息持久化。當然被取走的消息,也就不存在了
- headers
消息頭信息,key/value形式,可以認為給消息打上了各種各樣的標簽。可用於代替 routing_key 去路由(結合headers來下的exchange),或者第三方插件使用。 - properties
實際上 headers 和 delivery_mode 也是properties的一部分,因為使用較多,所以單獨拿出去。這裏也只提幾個:- priority
消息優先級。數字,優先級高的消息會排在隊列頭部 - correlation_id 和 reply_to
這兩個一般用於實現服務間RPC調用, 即生產者發起請求到rabbitmq隊列,等待處理結果返回,消費者處理完消息後返回結果給調用方。
reply_to 在消息裏面告訴消費者,處理完的結果放到哪個隊列,調用方根據 correlation_id 找到結果。詳情參考 https://www.rabbitmq.com/tutorials/tutorial-six-python.html - expiration
消息自身的Time-To-Live,用的較少,也叫 Per-Message TTL In Publisher.
前面提到,隊列的arguemnts可以設置 x-message-ttl ,也叫 Per-Queue Message TTL In Queues.消息是否過期以兩者的最小值為準,並且消息自身過期時間到了之後,不會自動從隊列刪除,而是在發送給消費者的時候丟棄。
隊列自身也有個x-expires
,它指的是隊列在多久沒有消費者連上來,超過這個時間後隊列自動刪除。
- priority
- payload: 消息正文
6. 插件
RabbitMQ支持插件式的來擴展功能。
1
|
列舉server上安裝的所有插件
|
下面是幾個常用插件:
-
rabbitmq_management
管理 rabbitmq server 的插件,提供給予HTTP的API和 WebUI,提供管理exchanges、管理queues、管理users、管理policies,監控,發布/接收消息。功能強大,基本是必定開啟的插件。
開啟管理插件後,也可以選擇不使用Web界面,從http://localhost:15672/cli/rabbitmqadmin
下載rabbitmqadmin
命令行工具,它用在一些腳本裏面會很方便。(提示: rabbitmqctl 是不能創建exchange和queue,但rabbitmqadmin可以) -
rabbitmq_federation
與MySQL Federated 存儲引擎很相似,可以認為 federated exchange 是其它exchange(也叫upstream exchange)的“軟連接”、“流量復制”。消息是被publish到上遊exchange,然後消費者是從其它broker上的federated exchange訂閱消息。
Federated exchanges/queues 是通過 AMQP 協議的Erlang客戶端從真實broker裏面取數據(不會消費源數據),可以實現跨網絡的消息提取,或者將不同地方的消息匯總到一處。應用場景有 broker / cluster 數據遷移,模仿真實數據的線下測試。 -
rabbitmq_shovel
shovel插件就是一個 消費者 + 生產者:從一個queue消費內容,發送到另一個exchange上,甚至可以對消息做些轉換。你可以自己實現將消息從源broker消費,重新publish到另一個exchange,但shovel幫我們做好了。 -
rabbitmq_mqtt
實現了 MQTT 3.1 協議的adapter,如文章開頭所述。 -
rabbitmq_consistent_hash_exchange
一致性hash exchange,如前文所述。
6. 策略 Policy
首先為什麽rabbitmq會有策略這個東西。
前面我們講到,queue和exchange有一些固定屬性,如durable
、exclusive
、auto-delete
等,還有一些可選參數,也叫x-arguments
,如x-max-length
、x-queue-mode
。這些都是客戶端在定義隊列和交換器時指定的。
如果事後想修改 TTL 或者 queue length limit ,那麽得修改應用、重新部署,甚至涉及到刪除隊列,重新declare。Policy就是解決這個痛點的,在服務端對匹配的 exchanges 或者 queues 設置參數,無需動應用。更多請參考 https://www.rabbitmq.com/parameters.html
一個 policy 包含以下內容:
- name: 策略名字
- pattern: 對哪些queues(exchanges)的應用策略,正則表達式
- definition: 策略內容定義,key/value形式(也可以認為是JSON格式)
- apply-to: 策略應用在什麽身上,
queues
、exchanges
、all
。默認是all - priority: 策略優先級,默認0
每個exchange/queue只能“註入”一個policy,所以如果要設置多個策略,把key/value組合成json,定義在一起。設置完成會馬上生效,包括後面新創建的exchange、queues。
1
|
將exchange設置為 alternate exchange:(策略名:AE)
|
7. 消息可靠性
有的系統要保證消息不允許丟失,甚至不允許重復,有的系統追求的是高性能,所以要在性能和可靠性之間權衡。rabbitmq在多個層面提供消息可靠性保證。
7.1 持久化
聲明持久化的exchange: channel.exchange_delcare(exchange_name, durable)
聲明持久化的隊列:channel.queueDeclare(queue_name, durable, exclusive, auto_delete, arguments)
發布的持久化消息,投遞模式為2: delivery_mode=2
http://www.rabbitmq.com/reliability.html
persistent
7.1 ack & confirm
持久化保證了在broker或者機器出現異常的時候,消息不會丟失,要保證發送者在pub消息、接收sub消息時出現網絡異常,客戶端也應該有相應的處理。
Consumer Delivery Acknowledgements
rabbitmq對Consumer處理消息提供 acknowledgements 確認機制,客戶端通過basic.consume
註冊到broker(push),或者通過basic.get
pull 消息,都可以在指定是否開啟ack
。
delivery tags是實現 ack 的關鍵,RabbitMQ會用 basic.deliver
方法向消費者推送消息,這個方法攜帶了一個 delivery tag,它是單調遞增的正整數,在一個channel中唯一代表了一次投遞。
確認模式包括自動確認和手動確認。
自動確認就是rabbitmq一旦把消息發送出去後,就認為成功,完成確認。此模式性能最高,只要消費者能處理的過來,但自然降低了消息到達處理的可靠性,比如一個消息還在路上,消費者的TCP連接或者channel就關閉了,那麽消息也就丟失。如果消費者處理不過來,可能會導致消息在客戶端擠壓,內存過載,引發異常。所以自動確認一般用在消息比較平穩、客戶端能處理的來的系統。
手動確認,就是客戶端需要自己發送確認命令,包括:
- basic.ack —— 確認成功,客戶端成功處理
- basic.nack —— 確認失敗,客戶端處理失敗,但依然刪掉消息
- basic.reject —— 確認失敗,客戶端處理失敗,消息不刪除,可重新發送。
手動確認模式,可以控制消息處理的速度(流控QoS),通過 prefetch 設置該channel上最大沒有確認的消息數,server會等待有空閑的配額時才繼續發送給消費者。
手動確認模式如果不設置 prefetch_count,那麽消費者可能會接收許多的消息但未ack,從而導致內存耗盡,所以這點需要小心。正常來說,100-300是個比較可控的範圍。(當然如果是 pull 模式,就不存在QoS一說)
basic.ack
和basic.nack
可以設置 multiple
字段,批量確認來減少網絡傳輸。比如說在信道 ch
上有 delivery tags 5, 6, 7, 8 沒有確認,當客戶端發回的確認幀是8並且 multiple=true,那麽5-8的tags都被ack。
在啟用手動確認時,發生網絡連接斷開或者消費者崩潰,而無法返回 ack/nack 命令時,(檢測方法是 heartbead)rabbitmq會自動將沒有確認的消息 requeue,所以客戶端處理消息時,最好能滿足冪等性,即能夠重復處理這些消息。
Publisher Confirms
rabbitmq對Producer發布消息提供 confirm 機制:客戶端可以發送一個 confirm.select
命令將channel設置成confirm
工作模式。
所有在該信道上面發布的消息都將會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之後,broker就會發送一個確認給生產者(basic.ack),這就使得生產者知道消息已經正確到達目的隊列了。如果消息和隊列是可持久化的,那麽確認消息會在將消息寫入磁盤之後發出,broker回傳給生產者的確認消息中delivery-tag域包含了確認消息的序列號。
如果RabbitMQ因為自身內部錯誤導致消息丟失,就會發送一條nack消息,生產者應用程序同樣可以在回調方法中處理該nack消息,確保消息不會再發送之前就丟失。
然後對於需要持久化的消息的確認,不能完全保證數據被刷到磁盤上,因為每個消息調用 fsync 的帶來的IO代價太高,rabbitmq會每隔幾百毫秒,批量將消息從文件系統緩存 fsync 刷到磁盤。(了解MySQL的話對這個應該不陌生)
7.2 事務
RabbitMQ 實現了AMQP 0-9-1協議裏的事務,這樣說唯一能確保消息不丟失的方式,信道可以設置成 transaction 模式:發布消息,commit/rollback消息。
但是事務在這裏太重了,而且會極大的降低性能。不用。
7.3 rabbitmq分布式
待聊
5. python使用示例
https://pika.readthedocs.io/en/0.10.0/intro.html
下面的示例是使用Maxwell或者MySQL binlog增量流,json數據進入rabbitmq,然後通過 pika —— python版本的rabbitmq client,重新組裝成sql,達到數據增量同步的效果。
1
|
def binlog_sync(self):
|
參考
- https://www.rabbitmq.com/tutorials/amqp-concepts.html
- http://www.rabbitmq.com/admin-guide.html
- https://geewu.gitbooks.io/rabbitmq-quick/content/index.html
- http://blog.csdn.net/anzhsoft/article/details/19607841
- http://dbaplus.cn/news-141-1464-1.html
原文連接地址:http://seanlook.com/2018/01/06/rabbitmq-introduce/
RabbitMQ 入門【精+轉】