1. 程式人生 > >RabbitMQ 入門【精+轉】

RabbitMQ 入門【精+轉】

ted ldap 綁定 mode tro 分組 tag 隨機生成 tutorial

rabbitmq可以用一本書取講,這裏只是介紹一些使用過程中,常用到的基本的知識點。
官方文檔覆蓋的內容,非常全面:http://www.rabbitmq.com/documentation.html 。

1. 介紹

RabbitMQ,即消息隊列系統,它是一款開源消息隊列中間件,采用Erlang語言開發,RabbitMQ是AMQP(Advanced Message Queueing Protocol)的標準實現。

AMQP是一個公開發布的異步消息的規範,是提供統一消息服務的應用層標準高級消息隊列協議,為面向消息的中間件設計.消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。

https://www.rabbitmq.com/tutorials/amqp-concepts.html

相對於JMS(Java Message Service)規範來說,JMS使用的是特定語言的APIs,而消息格式可自由定義,而AMQP對消息的格式和傳輸是有要求的,但實現不會受操作系統、開發語言以及平臺等的限制。

JMS和AMQP還有一個較大的區別:JMS有隊列(Queues)和主題(Topics)兩種消息傳遞模型,發送到 JMS隊列 的消息最多只能被一個Client消費,發送到 JMS主題 的消息可能會被多個Clients消費;AMQP只有隊列(Queues),隊列的消息只能被單個接受者消費,發送者並不直接把消息發送到隊列中,而是發送到Exchange中,該Exchage會與一個或多個隊列綁定,能夠實現與JMS隊列和主題同樣的功能。

另外還有一種 MQTT協議,意為消息隊列遙測傳輸,是IBM開發的一個即時通訊協議。由於其維護一個長連接以輕量級低消耗著稱,所以常用於移動端消息推送服務開發。MQTT是基於TCP的應用層協議封裝,實現了異步Pub/Sub,在物聯網(IoT)應用廣泛。

RabbitMQ可通過庫、插件的形式,支持JMS和MQTT協議。參考:http://geek.csdn.net/news/detail/71894

1.1 主要概念

技術分享圖片

  1. Broker
    接收和分發消息的應用,RabbitMQ Server就是Message Broker

  2. Exchange
    message到達broker的第一站,根據分發規則,匹配查詢表中的routing key,分發消息到queue中去。常用的類型有:direct, topic, fanout。

    如果沒有隊列綁定到exchange上,那麽該exchange上的消息都會被丟棄,因為它不存儲消息又不知道該怎麽處理消息。

  3. Queue
    消息隊列載體,每個消息都會被投入到一個或多個隊列

  4. Binding
    在exchange和queue之間建立關系就叫Binding,消費者聲明隊列的時候一般會指定routing_key,也可以叫binding_key。Binding信息被保存到exchange中的查詢表中,用於message的分發依據。

  5. Routing Key
    這裏區分一下binding和routing: binding是一個將exchange和queue關聯起來的動作,routing_key可以理解成隊列的一個屬性,表示這個隊列接受符合該routing_key的消息,routing_key需要在發送消息的時候指定。

  6. Vhost
    於多租戶和安全因素設計的,把AMQP的基本組件劃分到一個虛擬的分組中,類似於網絡中的namespace概念。當多個不同的用戶使用同一個RabbitMQ server提供的服務時,可以劃分出多個vhost,每個用戶在自己的vhost創建exchange/queue等

  7. Producer
    消息生產者,就是投遞消息的程序。只負責把消息發送exchange,附帶一些消息屬性。

  8. Consumer
    消息消費者,就是接受消息的程序。

  9. Channel
    如果每一次訪問RabbitMQ都建立一個Connection,在消息量大的時候建立TCP Connection的開銷將是巨大的,效率也較低。
    Channel是在connection內部建立的邏輯連接,如果應用程序支持多線程,通常每個thread創建單獨的channel進行通訊,AMQP method包含了channel id幫助客戶端和message broker識別channel,所以channel之間是完全隔離的。Channel作為輕量級的Connection極大減少了操作系統建立TCP connection的開銷。

1.2 對比

rabbitmq
activemq
rocketmq
kafka
zeromq
redis

celery
待續

2. 安裝配置

CentOS 6.7,安裝3.6.14最新穩定版本:

1
2
3
4
wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
rpm --import https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
yum install -y socat

如果機器上有epel源,先把它禁用掉:enabled=0,否則會默認從這個源按照低版本rabbitmq 。
如果已安裝老版本,可能需要卸載 rpm -qa|grep erlang|awk ‘{print "yum remove -y "$1}‘|sh
繼續

1
2
3
4
wget http://packages.erlang-solutions.com/rpm/centos/6/x86_64/erlang-20.1-1.el6.x86_64.rpm
wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.14/rabbitmq-server-3.6.14-1.el6.noarch.rpm

yum localinstall -y erlang-20.1-1.el6.x86_64.rpm rabbitmq-server-3.6.14-1.el6.noarch.rpm

確保本地主機名能夠正常解析出自己的ip,或 127.0.0.1. (ping rabbitmq-01)

1
2
3
4
5
6
7
8
9
10
11
12
13
ulimit -S -n 4096
ulimit -n 65534

# limits.conf
cat /etc/security/limits.conf
* soft nofile 65535
* hard nofile 65535

# 從配置文件模板創建配置文件
sudo cp -a /usr/share/doc/rabbitmq-server-3.6.14/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config

# 啟動
/etc/init.d/rabbitmq-server restart

默認用戶名密碼 guest/guest, 具有vhost / 的所有權限,只能在本地訪問。
隊列元數據及內容信息,默認在目錄 /var/lib/rabbitmq/mnesia 下。

2.1 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 啟用管理插件
rabbitmq-plugins enable rabbitmq_management

# /etc/rabbitmq/rabbitmq.config 配置
[
{rabbit,
[%%
{tcp_listeners, [5672]},
{vm_memory_high_watermark, 0.6},
%% {vm_memory_high_watermark_paging_ratio, 0.5},
{hipe_compile, true}
]},
{rabbitmq_management,
[%% Preload schema definitions from a previously exported definitions file. See
]}
].

%%是Erlang的註釋符號。

  • vm_memory_high_watermark
    RabbitMQ在使用當前機器的40%以上內存時候,會發出內存警告,並阻止RabbitMQ所有連接(producer連接)。這個閾值便由 vm_memory_high_watermark 控制
  • vm_memory_high_watermark_paging_ratio
    當內存中的數據達到一定數量後,他需要被page out出來。比如默認這個ratio=0.5,機器內存8G,於是 memory watermark=0.4 8G幾即 3.2G。3.2G paging_raio = 1.6G,當消息擠壓的量達到1.6G後,開始paging到磁盤上。
    一搬不去改它。
  • hipe_compile
    開啟Erlang HiPE編譯選項(相當於Erlang的jit技術),能夠提高性能20%-50%。在Erlang R17後HiPE已經相當穩定,RabbitMQ官方也建議開啟此選項。
    開啟之後,每次啟動 rabbitmq-server,需要多花1分鐘左右。

看下 rabbitmqctl status 信息,混個眼熟:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

Status of node ‘rabbit@rabbitmq-01‘
[{pid,6232},
{running_applications,
[{rabbitmq_management,"RabbitMQ Management Console","3.6.14"},
{rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.14"},
{rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.14"},
{cowboy,"Small, fast, modular HTTP server.","1.0.4"},
{rabbitmq_consistent_hash_exchange,"Consistent Hash Exchange Type",
"3.6.14"},
{rabbitmq_sharding,"RabbitMQ Sharding Plugin","3.6.14"},
{rabbit,"RabbitMQ","3.6.14"},
{amqp_client,"RabbitMQ AMQP Client","3.6.14"},
{rabbit_common,
"Modules shared by rabbitmq-server and rabbitmq-erlang-client",
"3.6.14"},
{os_mon,"CPO CXC 138 46","2.4.3"},
{mnesia,"MNESIA CXC 138 12","4.15.1"},
{cowlib,"Support library for manipulating Web protocols.","1.0.2"},
{compiler,"ERTS CXC 138 10","7.1.2"},
{recon,"Diagnostic tools for production use","2.3.2"},
{syntax_tools,"Syntax tools","2.1.3"},
{crypto,"CRYPTO","4.1"},
{stdlib,"ERTS CXC 138 10","3.4.2"},
{kernel,"ERTS CXC 138 10","5.4"}]},
{os,{unix,linux}},
{erlang_version,
"Erlang/OTP 20 [erts-9.1] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:64] [hipe] [kernel-poll:true]\n"},
{memory,
[{connection_readers,0},
{connection_writers,0},
{connection_channels,0},
{connection_other,8864},
{queue_procs,48686248},
{queue_slave_procs,0},
{plugins,14194848},
{other_proc,12618480},
{metrics,323944},
{mgmt_db,12627800},
{mnesia,701856},
{binary,22261264},
{msg_index,634656},
{allocated_unused,364165712},
{reserved_unallocated,0},
{total,596238336}]},
{alarms,[]},
{listeners,
[{clustering,25672,"::"},{amqp,5672,"0.0.0.0"},{http,15672,"0.0.0.0"}]},
{vm_memory_calculation_strategy,rss},
{vm_memory_high_watermark,0.6},
{vm_memory_limit,4952820940},
{disk_free_limit,50000000},
{disk_free,1626125135872},
{file_descriptors,
[{total_limit,65435},
{total_used,58},
{sockets_limit,58889},
{sockets_used,0}]},
{processes,[{limit,1048576},{used,446}]},
{run_queue,0},
{uptime,1232025},
{kernel,{net_ticktime,60}}]

2.2 命令行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 添加新的 vhost
rabbitmqctl add_vhost /some0
rabbitmqctl list_vhost

# 添加登錄用戶 admin
rabbitmqctl add_user admin admin
rabbitmqctl list_users

# 設置為管理員角色
rabbitmqctl set_user_tags admin administrator

# 設置權限
rabbitmqctl set_permissions -p /some0 admin ‘.*‘ ‘.*‘ ‘.*‘
rabbitmqctl list_permissions -p /some0
rabbitmqctl list_user_permissions admin

在開始介紹概念之前,先可以從Web UI上來認識一下rabbitmq:
rabbitmq overview 首頁監控面板:
技術分享圖片

rabbitmq 客戶端的連接信息:
技術分享圖片

某個channel的詳情:
技術分享圖片

exchanges信息:
技術分享圖片

queues信息:
技術分享圖片

策略定義:
技術分享圖片

3. Exchange類型

AMQP 0-9-1 定義了四種內置類型的exchange type: direct, fanout, topic, header。exchange除了類型以外,還可以指定一些屬性:

  • Name: 交換器名字。一般以 . 號分隔以作區分
  • Durability: 持久化的exchange在broker重啟之後依然存在。相對應是 transient exchange
  • Auto-delete: 如果設置了該屬性,在最後一個隊列unbound之後,exchange會自動刪除
  • Arguments: 可以用在滿足插件擴展上

    • alternate-exchange
      RabbitMQ自己擴展的功能,不是AMQP協議定義的。
      Alternate Exchange屬性的作用,創建Exchange指定該 x-argumentsalternate-exchange屬性,發送消息的時候根據route key沒有找到可以投遞的隊列,這就會將此消息路由到 Alternate Exchange 屬性指定的 Exchange (就是一個普通的exchange)上了。

      比如把MySQL的binlog訂閱出來,因為裏面有許多表,每個表的dml行數有多有少。我們可以將變更量多的表單獨放到一個隊列,其它表一起放到一個隊列,就可以為原始的exchange添加 alternate-exchange 屬性,將其它表的數據重新投遞到另一個exchange。

3.1 fanout

fanout類型的exchange是最容易理解的,它會把來自生產者的消息廣播到所有綁定的queues上。這種情況一般會把消息的routing_key設置為空‘‘,甚至不關心隊列的名字。如下圖:
技術分享圖片

amq.gen-RQ6...amq.gen-As8...是消費者隨機生成了兩個隊列,綁定到fanout exchange上,C1,C2會各自收到一模一樣的消息。

3.2 direct

direct類型的exchange轉發消息到隊列裏,是直接基於消息的routing key。
技術分享圖片

C1在聲明隊列的時候,指定routing_key=error。C2的隊列上綁定了info,error,warning三個key。
於是error類型的消息會被同時發送到C1,C2(準確的說是兩個隊列上),而info,warning類型的消息只發送到隊列amqp.gen-Agl...

如果要達到Round-Robin輪詢效果,即兩個Consumer依次從同一個隊列裏取消息,那麽可以在聲明隊列的時候指定相同的 queue name,rabbitmq會自動均衡的發送消息給多個Consumer,可水平擴展消費者的處理能力(如果要保證處理順序,得設置prefetch_count=1)。

3.3 topic

topic類型的exchange大大提升了消息路由的靈活性。不像fanout那樣無腦的全部轉發,也不像direct那樣指定所有的routing_key,否則不匹配的key的消息就會被丟棄。
比如有一個收集日誌的系統,模塊包括auth/cron/kernel/app1/app2,日誌級別包括error,info,warning。現在要把所有模塊的error日誌規整在一起,可以設計routing_key: \<module>.\<severity> (auth.error, auth.info, …, app1.error, app1.info…),然後設置queue的binding_key=’*.error’

topic exchange 會根據 . 劃分word,有以下兩種正則符號用於匹配routing_key:

  • *: 代表一個word
  • #: 代表0個或多個word

拿官網的例圖來說:<敏捷度>.<顏色>.<物種>
技術分享圖片
上圖創建了3個bindings:

  • 隊列Q1的binding_key=*.orange.*,即對所有橙色的動物感興趣
  • 隊列Q2綁定了*.*.rabbitlazy.#,即訂閱了所有和兔子相關的消息,以及反應遲鈍的動物

於是:

  • routing_key為quick.orange.rabbit的消息,會被發送到兩個隊列
  • routing_key為lazy.orange.elephant的消息,也會被發送到兩個隊列
  • routing_key為quick.orange.fox的消息,只會發送到Q1
  • routing_key為lazy.brown.fox的消息,只會發送到Q2
  • routing_key為lazy.pink.rabbit的消息,只會發送到Q2。雖然匹配到了lazy.#*.*.rabbit,但只會發送一次
  • routing_key為quick.brown.fox的消息,會被丟棄,因為沒有任何綁定的隊列得到匹配
  • routing_key為lazy.orange.male.rabbit的消息,還是會發送到Q2,因為 lazy.#
    然而orangequick.orange.male.rabbit,也破壞了約定,但沒得到匹配,消息丟棄。
  • routing_key為#,接受所有消息,相當於fanout exchange
  • routing_key沒有*#時,相當於direct exchange

3.4 headers

header類型的exchange用的不多,是在routing_key不能滿足使用場景的情況下(如routing_key必須是字符串),在消息的頭部加入一個或多個key/value,然後在聲明隊列的時候也指定要綁定的header。

binding的時候有個參數x-match,指定headers所有的k/v都要匹配成功(all)還是任意一個匹配則接受(any)。

3.5 x-consistent-hash

這是個第三方插件形式存在的exchange,目前已內置於rabbitmq:https://github.com/rabbitmq/rabbitmq-consistent-hash-exchange

x-consistent-hash類型的exchange可以根據routing_key,用一致性哈希算法,將消息路由到不同的隊列上。它可以盡可能的保證每個隊列上的消息數量相同,也可以隨時添加更多的隊列來“分流”,並且能保證同一個routing_key會進入相同的queue。

要達到這樣的效果,queue routing key必須是一個字符串類型的數字。比如Q1:routing_key=’10’, Q2:routing_key=’20’,那麽消息就會按照1:2的比例,發送到Q1,Q2。

3.6 x-modulus-hash

第三方插件形成存在的exchange,從3.6.0版本開始,也內置到了rabbitmq發行版:https://github.com/rabbitmq/rabbitmq-sharding

x-modulus-hash類型的exchange與 x-consistent-hash 很像,也叫 sharding exchange,即將message在多個隊列之間進行分區發送。它的實現方法是根據 routing_key 先獲得hash,再用 Hash mod N 得到隊列,N就是綁定到exchange上的隊列個數。

4. Queue屬性

Queue 要先於 Exchange 創建,否則生產者發布的消息,在沒有綁定隊列之前,會丟失。
已存在的Queue可以重復declare,但前提是屬性要相同。

  • Name: 隊列名稱。可以在應用裏面指定,或者交給broker生成
  • Durable:持久化的Queue在broker重啟之後,依然存在。
    註意,這裏的持久化與消息持久無關。是個 property
  • Exclusive: 為True時,表示當Consumer的Connection端口之後,隊列自動刪除。一般由broker生成的隨機隊列名,指定這個選項 。
    排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一個連接創建的排他隊列的
  • Auto-delete: 當最後一個consumer取消訂閱之後,隊列自動刪除

  • Arguments: 設置可選的一些參數,如

    • x-message-ttl
      消息在隊裏裏最大存活時間,超過這個ttl就會被丟棄。單位毫秒

    • x-max_length
      隊列裏最多容納的消息個數,超過這個值,則會從隊列頭部drop掉消息

    • x-max-priority
      設置了這個參數,就表示這是一個具有優先級的隊列。它的值是可定義的優先級最大值,一般10以內就夠了。
      在生產商Publish消息的時候,消息Property上可設置Priority

    • x-queue-mode
      這個參數是控制是否為”延遲隊列”,Lazy Queue是在3.6.0引入的,它會盡量把消息存在磁盤上,節省內存
      RabbitMQ一開始的設計初衷,是做異步、解耦,所以會把消息放在內存裏面,以便快速的發送給消費者(持久化類型的消息會同時存在於磁盤和內存緩存中)。

      如果用它來暫時存放大量消息,而不消費或者消費太慢,會導致性能明顯下降,因為為了釋放內存,消息得swap到磁盤上 —— 會阻塞隊列接收新消息。如果內存使用達到broker設置的 water-mark,也會拒絕接收新消息。
      Lazy Queue(x-queue-mode=lazy)的作用就是一接收到新消息,馬上存到文件系統,完全避免了前面提到的內存占用。這會增加磁盤I/O(順序的),與處理持久化類型的消息很相似。

    • x-dead-letter-exchange
      死信。當消息在一個隊列中變成死信後,它能被重新publish到另一個Exchange,這個Exchange就是DLX。消息變成死信一向有以下幾種情況:

      • 消息被拒絕(basic.reject or basic.nack)並且requeue=false
      • 消息TTL過期
      • 隊列達到最大長度

      DLX也是一下正常的Exchange同一般的Exchange沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性,當這個隊列中有死信時,RabbitMQ就會自動的將這個消息重新發布到設置的Exchange中去,進而被路由到另一個隊列。
      死信被重新 requeue 時,可以改變它的routing_key,以便新的隊列處理,routing_key用x-dead-letter-routing-key指定,如果不指定則繼續使用消息原來的routing_key。

5. Message屬性

  • routing_key
    路由關鍵字,exchange根據這個關鍵字進行消息投遞
  • delivery_mode
    • 1: Non-persistent,消息不持久化到磁盤,盡快被消費掉。重啟broker之後消息丟失
    • 2: Persistent,消息持久化。當然被取走的消息,也就不存在了
  • headers
    消息頭信息,key/value形式,可以認為給消息打上了各種各樣的標簽。可用於代替 routing_key 去路由(結合headers來下的exchange),或者第三方插件使用。
  • properties
    實際上 headers 和 delivery_mode 也是properties的一部分,因為使用較多,所以單獨拿出去。這裏也只提幾個:
    • priority
      消息優先級。數字,優先級高的消息會排在隊列頭部
    • correlation_id 和 reply_to
      這兩個一般用於實現服務間RPC調用, 即生產者發起請求到rabbitmq隊列,等待處理結果返回,消費者處理完消息後返回結果給調用方。
      reply_to 在消息裏面告訴消費者,處理完的結果放到哪個隊列,調用方根據 correlation_id 找到結果。詳情參考 https://www.rabbitmq.com/tutorials/tutorial-six-python.html
    • expiration
      消息自身的Time-To-Live,用的較少,也叫 Per-Message TTL In Publisher.
      前面提到,隊列的arguemnts可以設置 x-message-ttl ,也叫 Per-Queue Message TTL In Queues.消息是否過期以兩者的最小值為準,並且消息自身過期時間到了之後,不會自動從隊列刪除,而是在發送給消費者的時候丟棄。
      隊列自身也有個 x-expires,它指的是隊列在多久沒有消費者連上來,超過這個時間後隊列自動刪除。
  • payload: 消息正文

6. 插件

RabbitMQ支持插件式的來擴展功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
列舉server上安裝的所有插件
# rabbitmq-plugins list
Configured: E = explicitly enabled; e = implicitly enabled
| Status: * = running on rabbit@rabbitmq-01
|/
[e*] amqp_client 3.6.14
[e*] cowboy 1.0.4
[e*] cowlib 1.0.2
[ ] rabbitmq_amqp1_0 3.6.14
[ ] rabbitmq_auth_backend_ldap 3.6.14
[ ] rabbitmq_auth_mechanism_ssl 3.6.14
[E*] rabbitmq_consistent_hash_exchange 3.6.14
[ ] rabbitmq_event_exchange 3.6.14
[ ] rabbitmq_federation 3.6.14
[ ] rabbitmq_federation_management 3.6.14
[ ] rabbitmq_jms_topic_exchange 3.6.14
[E*] rabbitmq_management 3.6.14
[e*] rabbitmq_management_agent 3.6.14
[ ] rabbitmq_management_visualiser 3.6.14
[ ] rabbitmq_mqtt 3.6.14
[ ] rabbitmq_random_exchange 3.6.14
[ ] rabbitmq_recent_history_exchange 3.6.14
[E*] rabbitmq_sharding 3.6.14
[ ] rabbitmq_shovel 3.6.14
[ ] rabbitmq_shovel_management 3.6.14
[ ] rabbitmq_stomp 3.6.14
[ ] rabbitmq_top 3.6.14
[ ] rabbitmq_tracing 3.6.14
[ ] rabbitmq_trust_store 3.6.14
[e*] rabbitmq_web_dispatch 3.6.14
[ ] rabbitmq_web_mqtt 3.6.14
[ ] rabbitmq_web_mqtt_examples 3.6.14
[ ] rabbitmq_web_stomp 3.6.14
[ ] rabbitmq_web_stomp_examples 3.6.14
[ ] sockjs 0.3.4

啟用插件
# rabbitmq-plugins enable plugin-name

下面是幾個常用插件:

  1. rabbitmq_management
    管理 rabbitmq server 的插件,提供給予HTTP的API和 WebUI,提供管理exchanges、管理queues、管理users、管理policies,監控,發布/接收消息。功能強大,基本是必定開啟的插件。
    開啟管理插件後,也可以選擇不使用Web界面,從 http://localhost:15672/cli/rabbitmqadmin 下載 rabbitmqadmin 命令行工具,它用在一些腳本裏面會很方便。(提示: rabbitmqctl 是不能創建exchange和queue,但rabbitmqadmin可以)

  2. rabbitmq_federation
    與MySQL Federated 存儲引擎很相似,可以認為 federated exchange 是其它exchange(也叫upstream exchange)的“軟連接”、“流量復制”。消息是被publish到上遊exchange,然後消費者是從其它broker上的federated exchange訂閱消息。
    Federated exchanges/queues 是通過 AMQP 協議的Erlang客戶端從真實broker裏面取數據(不會消費源數據),可以實現跨網絡的消息提取,或者將不同地方的消息匯總到一處。應用場景有 broker / cluster 數據遷移,模仿真實數據的線下測試。

  3. rabbitmq_shovel
    shovel插件就是一個 消費者 + 生產者:從一個queue消費內容,發送到另一個exchange上,甚至可以對消息做些轉換。你可以自己實現將消息從源broker消費,重新publish到另一個exchange,但shovel幫我們做好了。

  4. rabbitmq_mqtt
    實現了 MQTT 3.1 協議的adapter,如文章開頭所述。

  5. rabbitmq_consistent_hash_exchange
    一致性hash exchange,如前文所述。

6. 策略 Policy

首先為什麽rabbitmq會有策略這個東西。

前面我們講到,queue和exchange有一些固定屬性,如durableexclusiveauto-delete等,還有一些可選參數,也叫x-arguments,如x-max-lengthx-queue-mode。這些都是客戶端在定義隊列和交換器時指定的。

如果事後想修改 TTL 或者 queue length limit ,那麽得修改應用、重新部署,甚至涉及到刪除隊列,重新declare。Policy就是解決這個痛點的,在服務端對匹配的 exchanges 或者 queues 設置參數,無需動應用。更多請參考 https://www.rabbitmq.com/parameters.html

一個 policy 包含以下內容:

  • name: 策略名字
  • pattern: 對哪些queues(exchanges)的應用策略,正則表達式
  • definition: 策略內容定義,key/value形式(也可以認為是JSON格式)
  • apply-to: 策略應用在什麽身上,queuesexchangesall。默認是all
  • priority: 策略優先級,默認0

每個exchange/queue只能“註入”一個policy,所以如果要設置多個策略,把key/value組合成json,定義在一起。設置完成會馬上生效,包括後面新創建的exchange、queues。

1
2
3
4
5
6
7
8
將exchange設置為 alternate exchange:(策略名:AE)
rabbitmqctl set_policy -p /some0 AE "^maxwell.some3$" ‘{"alternate-exchange":"maxwell.AE"}‘ --apply-to exchanges

將vhost /some0 的所有隊列都設置成 Lazy Queue
rabbitmqctl set_policy -p /some0 Lazy "^" ‘{"queue-mode":"lazy"}‘ --apply-to queues

隊列名匹配 ‘two-messages‘ 的隊列,設置最大隊列消息數為2,超過之後的行為是 禁止接收新消息(與之對應的是 drop-head: 刪除頭部老的消息)
rabbitmqctl set_policy my-pol "^two-messages$" ‘{"max-length":2,"overflow":"reject-publish"}‘ --apply-to queues

7. 消息可靠性

有的系統要保證消息不允許丟失,甚至不允許重復,有的系統追求的是高性能,所以要在性能和可靠性之間權衡。rabbitmq在多個層面提供消息可靠性保證。

7.1 持久化

聲明持久化的exchange: channel.exchange_delcare(exchange_name, durable)
聲明持久化的隊列:channel.queueDeclare(queue_name, durable, exclusive, auto_delete, arguments)
發布的持久化消息,投遞模式為2: delivery_mode=2

http://www.rabbitmq.com/reliability.html
persistent

7.1 ack & confirm

持久化保證了在broker或者機器出現異常的時候,消息不會丟失,要保證發送者在pub消息、接收sub消息時出現網絡異常,客戶端也應該有相應的處理。

Consumer Delivery Acknowledgements

rabbitmq對Consumer處理消息提供 acknowledgements 確認機制,客戶端通過basic.consume註冊到broker(push),或者通過basic.get pull 消息,都可以在指定是否開啟ack

delivery tags是實現 ack 的關鍵,RabbitMQ會用 basic.deliver 方法向消費者推送消息,這個方法攜帶了一個 delivery tag,它是單調遞增的正整數,在一個channel中唯一代表了一次投遞。

確認模式包括自動確認和手動確認。
自動確認就是rabbitmq一旦把消息發送出去後,就認為成功,完成確認。此模式性能最高,只要消費者能處理的過來,但自然降低了消息到達處理的可靠性,比如一個消息還在路上,消費者的TCP連接或者channel就關閉了,那麽消息也就丟失。如果消費者處理不過來,可能會導致消息在客戶端擠壓,內存過載,引發異常。所以自動確認一般用在消息比較平穩、客戶端能處理的來的系統。

手動確認,就是客戶端需要自己發送確認命令,包括:

  • basic.ack —— 確認成功,客戶端成功處理
  • basic.nack —— 確認失敗,客戶端處理失敗,但依然刪掉消息
  • basic.reject —— 確認失敗,客戶端處理失敗,消息不刪除,可重新發送。

手動確認模式,可以控制消息處理的速度(流控QoS),通過 prefetch 設置該channel上最大沒有確認的消息數,server會等待有空閑的配額時才繼續發送給消費者。
手動確認模式如果不設置 prefetch_count,那麽消費者可能會接收許多的消息但未ack,從而導致內存耗盡,所以這點需要小心。正常來說,100-300是個比較可控的範圍。(當然如果是 pull 模式,就不存在QoS一說)

basic.ackbasic.nack可以設置 multiple 字段,批量確認來減少網絡傳輸。比如說在信道 ch 上有 delivery tags 5, 6, 7, 8 沒有確認,當客戶端發回的確認幀是8並且 multiple=true,那麽5-8的tags都被ack。

在啟用手動確認時,發生網絡連接斷開或者消費者崩潰,而無法返回 ack/nack 命令時,(檢測方法是 heartbead)rabbitmq會自動將沒有確認的消息 requeue,所以客戶端處理消息時,最好能滿足冪等性,即能夠重復處理這些消息。

Publisher Confirms

rabbitmq對Producer發布消息提供 confirm 機制:客戶端可以發送一個 confirm.select 命令將channel設置成confirm工作模式。
所有在該信道上面發布的消息都將會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之後,broker就會發送一個確認給生產者(basic.ack),這就使得生產者知道消息已經正確到達目的隊列了。如果消息和隊列是可持久化的,那麽確認消息會在將消息寫入磁盤之後發出,broker回傳給生產者的確認消息中delivery-tag域包含了確認消息的序列號。

如果RabbitMQ因為自身內部錯誤導致消息丟失,就會發送一條nack消息,生產者應用程序同樣可以在回調方法中處理該nack消息,確保消息不會再發送之前就丟失。

然後對於需要持久化的消息的確認,不能完全保證數據被刷到磁盤上,因為每個消息調用 fsync 的帶來的IO代價太高,rabbitmq會每隔幾百毫秒,批量將消息從文件系統緩存 fsync 刷到磁盤。(了解MySQL的話對這個應該不陌生)

7.2 事務

RabbitMQ 實現了AMQP 0-9-1協議裏的事務,這樣說唯一能確保消息不丟失的方式,信道可以設置成 transaction 模式:發布消息,commit/rollback消息。

但是事務在這裏太重了,而且會極大的降低性能。不用。

7.3 rabbitmq分布式

待聊

5. python使用示例

https://pika.readthedocs.io/en/0.10.0/intro.html

下面的示例是使用Maxwell或者MySQL binlog增量流,json數據進入rabbitmq,然後通過 pika —— python版本的rabbitmq client,重新組裝成sql,達到數據增量同步的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def binlog_sync(self):
logger.info("connect to rabbitmq server [%s], vhost=%s", rabbitmq_conn_info.get(‘host‘), rabbitmq_conn_info.get(‘vhost‘, ‘/‘))
## rabbitmq 用戶認證信息
credentials = pika.PlainCredentials(rabbitmq_conn_info.get(‘user‘, ‘guest‘),
rabbitmq_conn_info.get(‘password‘, ‘guest‘)
)
## rabbitmq tcp連接
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=rabbitmq_conn_info.get(‘host‘),
port=rabbitmq_conn_info.get(‘port‘, 5672),
virtual_host=rabbitmq_conn_info.get(‘vhost‘, ‘/‘),
credentials=credentials
)
)
## rabbitmq 信道,避免頻繁tcp斷連
channel = connection.channel()

# exchange_name = ‘maxwell.some‘ + str(self.corpmod)
# exchange_other = ‘maxwell.AE‘
logger.info("declare mq exchange [%s], type=[%s]", self.exchange_name, self.exchange_type)
## 創建 exchange,如果已經存在相同名字,就不會重復創建,但要求屬性要相同
## 指定exchange_type,durable, arguments 。這裏的alternate-exchange放到策略裏從創建,因為目前maxwell作為消費者,沒有支持arguemnts參數
channel.exchange_declare(exchange=self.exchange_name,
exchange_type=self.exchange_type,
durable=True,
# arguments={‘alternate-exchange‘: exchange_other}
)

"""
channel.exchange_declare(exchange=exchange_other, exchange_type=‘topic‘, durable=True) # alternative exchange
channel.queue_declare(queue=‘ae_other‘, durable=True)
channel.queue_bind(exchange=exchange_other,
queue=‘ae_other‘,
routing_key=‘d_ec_some.*‘)
"""
logger.info("declare queue name=[%s]", self.queue_name)
## 創建 queue,如果以經存在相同名字的隊列,則不會創建,但要求屬性相同,否則報錯
## 指定了 lazy queue
channel.queue_declare(queue=self.queue_name, durable=True, arguments={‘x-queue-mode‘: ‘lazy‘})

## 將routing_key 綁定到隊列上
for key in self.queue_bind_key:
logger.info("bind routing_key [%s] to queue [%s]", key, self.queue_name)
channel.queue_bind(exchange=self.exchange_name,
queue=self.queue_name,
routing_key=key)

# consume callback, internal
## 客戶端處理消息
def callback(ch, method, properties, body):
# print(" [x] Received %s" % body)
logger.debug("Received message: %s", body)
try:
data_row = json.loads(body.decode(‘utf-8‘))
self.process_data(data_row)

if ret == -2: # requeue
## 處理異常,如Ctrl+C斷開,重新排隊
logger.warning("message data: %s (requeue)", data_row)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
# return
except ValueError as e:
logger.error("proces Error: %s(skip)", e)
logger.error(" received data: %s", body)
## 處理異常,但跳過
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.error("proces Error: %s(skip)", e)
logger.error(" message data: %s", data_row)
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
## 發送確認成功
ch.basic_ack(delivery_tag=method.delivery_tag)

## 設置最多 50 個未確認
channel.basic_qos(prefetch_count=50)

# 開始消費,拿到的消息調用callback處理
channel.basic_consume(callback, queue=self.queue_name, no_ack=False)

# print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
logger.info("start comsuming")

參考

  • https://www.rabbitmq.com/tutorials/amqp-concepts.html
  • http://www.rabbitmq.com/admin-guide.html
  • https://geewu.gitbooks.io/rabbitmq-quick/content/index.html
  • http://blog.csdn.net/anzhsoft/article/details/19607841
  • http://dbaplus.cn/news-141-1464-1.html

原文連接地址:http://seanlook.com/2018/01/06/rabbitmq-introduce/

RabbitMQ 入門【精+轉】