Rabbitmq學習記錄
優點
- 實現了
AMQP
標準的訊息伺服器 - 可靠性,使用持久化、傳輸確認、釋出確認等保證
- 靈活的路由
- 叢集部署簡單
- 支援多種協議,以及多種語言客戶端
- 易用的使用者介面
...
MAC
下的安裝
採用Homebrew
進行安裝:
➜ ~ brew update
➜ ~ brew install rabbitmq
通過brew
安裝的檔案位於/usr/local/Cellar/rabbitmq/3.7.14/sbin
中。開啟外掛:
➜ /usr/local/Cellar/rabbitmq/3.7.14 sudo sbin/rabbitmq-plugins enable rabbitmq_management
直接執行rabbitmq-server
➜ ~ rabbitmq-server
zsh: command not found: rabbitmq-server
因為使用的是zsh
,所以需要將路徑新增至.zshrc
中:
➜ ~ cat .zshrc # If you come from bash you might have to change your $PATH. # export PATH=$HOME/bin:/usr/local/bin:$PATH # Path to your oh-my-zsh installation. export PATH=$PATH:/usr/local/opt/rabbitmq/sbin ...
再source .zshrc
使它生效。這下rabbit-server
是有用了,但是啟動卻報錯了:
2019-04-25 15:18:09.525358
args: []
format: "Error when reading /Users/littlemay/.erlang.cookie: eacces"
label: {error_logger,error_msg}
原因是/Users/littlemay/.erlang.cookie
這個檔案沒有許可權,需要更改檔案所有者以及許可權:
➜ ~ ls -l /Users/littlemay/.erlang.cookie -r-------- 1 root staff 20 4 25 00:00 /Users/littlemay/.erlang.cookie ➜ ~ sudo chown littlemay:staff /Users/littlemay/.erlang.cookie ➜ ~ sudo chmod u+x /Users/littlemay/.erlang.cookie
折騰了半個小時,此時終於可以不使用sudo
啟動了。
可以使用rabbitmqctl start_app/stop_app
啟動/停止rabbitmq
。網頁端的使用者介面訪問:localhost:15762
,預設使用者名稱密碼都為guest
。
基本概念
Message
:訊息,AMQP
中預定了14
個屬性,即後文pika
中會用到的BasicProperties
Producer
:訊息的生產者,也是一個向交換機發布訊息的客戶端應用程式Consumer
:訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式。Exchange
:交換機,用來接收生產者傳送的訊息並根據規則將這些訊息路由給伺服器中的佇列Queue
:訊息佇列,用來儲存訊息直到傳送給消費者。一個訊息可投入一個或多個佇列。如果一個佇列沒有被任何消費者佇列,那麼它將一直在佇列裡面,等待消費者連線到這個佇列將其取走。多個消費者可以訂閱同一個佇列,此時佇列的訊息會被平均分攤(round-robin
)給多個消費者處理。Binding
:繫結,將Exchange
和Queue
按照路由規則繫結Channel
:通道,多路複用連線中的一條獨立的雙向資料流通道。通道是建立在真實的TCP
連線內的虛擬連線,AMQP
命令都是通過通道發出去的,不管是釋出訊息、訂閱佇列還是接收訊息,這些動作都是通過通道完成。因為對於作業系統來說建立和銷燬TCP
都是非常昂貴的開銷,所以引入了通道的概念,以複用一條TCP
連線Virtual Host
:虛擬主機,用作不同使用者的許可權分離。預設的vhost
是/
。Broker
:表示訊息佇列伺服器實體
Routing Key
與Binding Key
Binding Key
是在Exchange
與Queue
進行Binding
時使用的路由規則。多個Queue
允許使用同一個Binding Key
與Exchange
進行繫結。
Routing Key
是當生產者將訊息傳送給Exchange
時,指定的路由規則。它會依據Exchange Type
及Binding Key
聯合使用來決定將訊息投放到哪個佇列中。
Routing Key
的長度限制為255 bytes
。
Exchange Types
通常使用的有四種:
fanout
:將所有傳送到該Exchange
的訊息路由到所有與它繫結的Queue
中,此時Binding Key
與Routing Key
不起作用direct
:將所有傳送到該Exchange
的訊息路由到Binding Key
與Routing Key
完全匹配的Queue
中topic
:將所有傳送到該Exchange
的訊息路由到Binding Key
與Routing Key
相匹配的Queue
中。採用.
進行分隔(每分隔開的一段獨立字串成為一個單詞),*
表示匹配一個單詞,#
表示匹配0
個或者多個。headers
:不依賴於Routing Key
與Bingding Key
的匹配規則路由訊息,而是根據傳送訊息內容中的headers
屬性進行匹配,它與direct
功能一致,但是效能十分低,一般不使用。
Message Acknowledgment
消費者接收到了訊息,RabbitMQ
並不會立即將訊息從佇列中丟棄,而是在接收到消費者的ACK
響應之後,才進行丟棄。
如果RabbitMQ
沒有收到回執並檢測到消費者的RabbitMQ
斷開,將會傳遞給其他消費者直到佇列接收到了ACK
,以此保證每一個訊息都能被有效傳遞。因此,如果忘記回覆ACK
,那麼佇列中的訊息會越來越多,RabbitMQ server
不會再發送資料給它,可以起到限流的作用。
Prefetch Count
允許消費者每次從佇列中獲取任意數量的訊息。針對任務粒度小,執行時間短的消費者,可以設定更大的預讀取數。
RPC
遠過程呼叫
除了非同步通訊之外,RabbitMQ
也支援同步呼叫。實現原理如圖:
- 生產者在生產請求訊息時,會在請求訊息的屬性中設定兩個
replyTo
值:一個為Queue Name
,用於告知消費者將處理完成後的通知訊息返回到該佇列;另一個為correlationId
,是請求訊息的唯一標示,隨著請求訊息一同傳送給消費者,也會隨著響應訊息返回給生產者,生產者就能夠通過correlationId
來判定來判定請求是否被成功執行,最終實現請求和響應的一一配對。 - 生產者只有在接收到響應訊息之後才會繼續發生下一次請求訊息,以此實現同步的效果
Python3
實踐
首先需要安裝pika
,分別建立消費者和生產者:
生產者
# rabbit_p.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
for i in range(20):
body = '%s Hello rabbitmq' % i
channel.basic_publish(exchange='',
routing_key='hello',
body=body)
connection.close()
消費者
# rabbit_c1.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 為了防止queue不存在,最好消費者和生產者都建立
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(body)
# 如果不開啟ack,訊息會一直顯示unacked,每當消費者重啟又會重複消費
channel.basic_consume(on_message_callback=callback,
queue='hello',
auto_ack=True)
channel.start_consuming()
使用
basic_consume
而不是迴圈呼叫basic_get
如果使用exchange=''
,會繫結在RabbitMQ
預設的Exchange
下,它綁定了所有的queue
,routing key
使用queue name
。
如果再開啟一個消費者,那麼他們會輪流處理訊息。
prefetch_count
為了防止有的消費者空閒,所以可以設定prefetch_count
引數(使用basic_qos
來指定),當RabbitMQ
發現有個消費者遲遲沒有返回ACK
,便不會再將訊息分配給它,此時需要我們採用主動ACK
的方式,消費者1
不進行ack
,消費者2
進行ack
:
# rabbit_c1.py
...
def callback(ch, method, properties, body):
print(body)
# basic_qos需要放在basic_consume前才有效
channel.basic_qos(prefetch_count=3)
channel.basic_consume(on_message_callback=callback,
queue='hello',
)
...
# rabbit_c2.py
...
def callback(ch, method, properties, body):
print(body)
# 主動使用ack告知呼叫方
ch.basic_ack(delivery_tag=method.delivery_tag)
...
可以發現,消費者1
只處理了三條資料(並且網頁端顯示一直是unacked
),其他的資料都由消費者2
處理了。接著此時關閉消費者1
,會發現這三條資料又被分配給消費者2
所處理了。
Exchange
fanout型別:
需要在消費者和生產者上面都加上對exchange
的宣告,兩個消費者分別訂閱了hello1
與hello2
佇列,這兩個佇列都連向hello_fanout
這個exchange
:
# rabbit_c1.py
...
channel.exchange_declare(exchange='hello_fanout', exchange_type='fanout')
channel.queue_declare(queue='hello1')
channel.queue_bind(exchange='hello_fanout', queue='hello1', routing_key='hello.1')
...
# rabbit_c2.py
...
channel.exchange_declare(exchange='hello_fanout', exchange_type='fanout')
channel.queue_declare(queue='hello2')
channel.queue_bind(exchange='hello_fanout', queue='hello1', routing_key='hello.2')
...
# rabbit_p.py
channel.exchange_declare(exchange='hello_fanout', exchange_type='fanout')
channel.queue_declare(queue='hello1')
channel.queue_declare(queue='hello2')
for i in range(20):
body = '%s Hello rabbitmq' % i
channel.basic_publish(exchange='hello_fanout',
routing_key='world',
body=body)
在fanout
模式下,routing_key
即使不匹配也沒有關係。消費者1
和消費者2
的輸出結果一樣。
direct
將消費者修改為:
# rabbit_c1.py
...
# 修改exchange的type為direct
channel.exchange_declare(exchange='hello_direct', exchange_type='direct')
channel.queue_declare(queue='hello1')
# 指定路由規則
channel.queue_bind(exchange='hello_direct', queue='hello1', routing_key='hello.1')
...
channel.basic_consume(on_message_callback=callback,
queue='hello1',
)
...
生產者:
# rabbit_p.y
...
channel.exchange_declare(exchange='hello', exchange_type='direct')
channel.queue_declare(queue='hello1')
channel.queue_declare(queue='hello2')
for i in range(20):
body = '%s Hello rabbitmq' % i
if i < 10:
channel.basic_publish(exchange='hello',
routing_key='hello.1',
body=body)
else:
channel.basic_publish(exchange='hello',
routing_key='hello.2',
body=body)
...
此時i<10
的訊息會由消費者1
處理。
topic
Binding Key
可以存在*
與#
,但是生產者的Routing Key
應當是精確的。修改消費者與生產者分別為:
#rabbit_c1.py
channel.exchange_declare(exchange='hello_topic', exchange_type='topic')
channel.queue_declare(queue='hello1')
channel.queue_bind(exchange='hello_topic', queue='hello1', routing_key='hello.*')
...
#rabbit_c2.py 綁定了兩種規則
channel.queue_bind(exchange='hello_topic', queue='hello2', routing_key='hello.*')
channel.queue_bind(exchange='hello_topic', queue='hello2', routing_key='*.hello.*')
#rabbit_p.py
...
channel.exchange_declare(exchange='hello_topic', exchange_type='topic')
for i in range(20):
body = '%s Hello rabbitmq' % i
if 10 < i < 20:
channel.basic_publish(exchange='hello_topic',
routing_key='hello.2',
body=body)
else:
channel.basic_publish(exchange='hello_topic',
routing_key='2.hello.2',
body=body)
...
一個佇列可以使用多種路由規則與同一exchange
繫結。此時消費者2
可以收到全部訊息,而消費者1
只能收到11-19
之間的訊息。
同樣的,一個佇列可以與多個exchange
繫結,讓消費者2
訂閱的佇列同時與hello_topic
和hello_direct
相連:
# rabbit_c2.py
channel.exchange_declare(exchange='hello_topic', exchange_type='topic')
channel.exchange_declare(exchange='hello_direct', exchange_type='direct')
channel.queue_declare(queue='hello2')
channel.queue_bind(exchange='hello_direct', queue='hello2', routing_key='hello.2')
channel.queue_bind(exchange='hello_topic', queue='hello2', routing_key='hello.*')
# rabbit_p.py
channel.exchange_declare(exchange='hello_topic', exchange_type='topic')
channel.exchange_declare(exchange='hello_direct', exchange_type='direct')
for i in range(20):
body = '%s Hello rabbitmq' % i
if 10 < i < 20:
channel.basic_publish(exchange='hello_topic',
routing_key='hello.1',
body=body)
else:
channel.basic_publish(exchange='hello_direct',
routing_key='hello.2',
body=body)
此時,消費者2
可以收到全部訊息。
exclusive
前面測試的時候,如果不重啟RabbitServer
,如果沒有刪除交換機或者佇列的話,以前的路由規則仍然存在,經常會出現匪夷所思的問題。這是因為持久化的問題,留到下節介紹。
實際上可以使用臨時佇列的方式進行處理,exclusive
引數指明這個佇列為排他性佇列,即只有自己可以訪問,此時生產者不能再進行生命,當連線斷掉,這個queue
會被刪除(這點與auto_delete
引數功能一致):
# rabbit_c3.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='hello_direct', exchange_type='direct')
# 如果queue為空,會自動建立一個名字唯一的佇列,想要連線斷開自動刪除,可以使用exclusive,也可以使用auto_delete
# 佇列名採用r.method.queue獲取
r = channel.queue_declare(queue='', exclusive=True)
#r = channel.queue_declare(queue='', auto_delete=True)
channel.queue_bind(exchange='hello_direct', queue=r.method.queue, routing_key='hello.2')
def callback(ch, method, properties, body):
print(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(on_message_callback=callback,
queue=r.method.queue)
channel.start_consuming()
這樣可以實現一個動態建立佇列的消費者。
持久化
當將RabbitMQ
停掉,會發現不論是exchange
還是queue
的所有資訊都丟失了,因此如果需要恢復exchange
和queue
,需要在宣告交換機和佇列時就提供持久化引數(durable=True
):
# rabbit_c1.py
channel.exchange_declare(exchange='hello_topic', exchange_type='topic', durable=True)
channel.queue_declare(queue='hello1', durable=True)
需要注意的是,如果消費者聲明瞭durable=True
,那麼生產者在再次宣告時,也必須將其宣告為durable=True
,否則會報錯。或者也可以刪除後再次宣告:
channel.queue_delete(queue='hello1')
channel.exchange_delete(exchange='hello_topic')
但是想要使得佇列裡的訊息不丟失,需要額外在basic_publish
時也指定訊息的引數(delivery_mode=2
):
# rabbit_p.py
channel.basic_publish(exchange='hello_topic',
routing_key='hello.1',
properties=pika.BasicProperties(delivery_mode=2),
body=body)
這樣從交換機、佇列層面保證了訊息只要進入了,就不會丟失,但是如果在投遞的過程中丟失,比如訊息並未到達交換機或者沒有對應的佇列(訊息會被丟棄),此時應當使用RabbitMQ
提供的confirm mode
。
以上測試是在最新的pika==1.0.1
之下,但是這個版本下channel.basic_publish
並沒有返回了,所以切回pika=0.13.1
下測試,接下來的環境都為Python=3.7 pika=0.13.1
:
# rabbit_p.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='test', exchange_type='fanout')
channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False)
# 開啟確認
channel.confirm_delivery()
res = channel.basic_publish(exchange='test',
routing_key='test',
body='Hello World!',
properties=pika.BasicProperties(content_type='text/plain',
delivery_mode=2),
mandatory=True)
此時由於沒有消費者繫結名為test
的exchange
,因此會返回False
。
每一個版本的
pika
函式引數都有很大不同T_T
confirm mode
的確認結果表示,一條persisting
的訊息投向durable
佇列成功,並且成功寫到磁碟。
basic_reject
與basic_ack
相對應的是basic_reject
,它可以拒絕一條訊息,如果要拒絕多條訊息,使用basic_nack
:
# rabbit_c1.py
def callback(ch, method, properties, body):
global count
if count % 2 == 0:
ch.basic_reject(delivery_tag=method.delivery_tag)
else:
print(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
count += 1
預設情況下,requeue=True
,表示訊息被拒絕後可以由其他消費者處理。
RPC
呼叫
首先定義生產者端:
import pika
import uuid
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='rpc_p', exchange_type='fanout')
ch.exchange_declare(exchange='rpc_r', exchange_type='direct')
ch.queue_declare(queue='rpc_p')
ch.queue_declare(queue='rpc_r')
ch.queue_bind(exchange='rpc_p', queue='rpc_p')
ch.queue_bind(exchange='rpc_r', queue='rpc_r')
def callback(channel, method, properties, body):
print(body, properties.correlation_id)
channel.basic_ack(delivery_tag=method.delivery_tag)
ch.basic_consume(callback, queue='rpc_r')
correlation_id = uuid.uuid4().hex
print(correlation_id)
ch.basic_publish(exchange='rpc_p',
routing_key='',
body='1,2,3,4',
properties=pika.BasicProperties(
reply_to='rpc_r',
correlation_id=correlation_id
))
ch.start_consuming()
生產者指定自己發出訊息的交換機為rpc_p
,需要接受回覆使用的交換機為rpc_r
,並分別繫結佇列rpc_p
和rpc_r
,在釋出訊息時,指定reply_to
引數以及標識這條訊息的唯一id
。
消費者:
import pika
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='rpc_r', exchange_type='direct')
ch.exchange_declare(exchange='rpc_p', exchange_type='fanout')
ch.queue_declare(queue='rpc_p')
ch.queue_bind(exchange='rpc_p', queue='rpc_p')
def callback(channel, method, properties, body):
print(body)
body = body.decode('utf-8')
s = sum(int(x) for x in body.split(','))
channel.basic_publish(exchange='rpc_r',
routing_key=properties.reply_to,
body=str(s),
properties=pika.BasicProperties(
correlation_id=properties.correlation_id
))
channel.basic_ack(delivery_tag=method.delivery_tag)
ch.basic_consume(callback, queue='rpc_p')
ch.start_consuming()
消費者只需要一直監聽來自佇列rpc_p
的訊息,並且在接收到訊息後將確認資訊傳送至reply_to
所指向的佇列,同時指明自己所收到的是哪條訊息。
輸出結果:
# 生產者
7bfd8db775144308945b679ed4c5a2ed
b'10' 7bfd8db775144308945b679ed4c5a2ed
# 消費者
b'1,2,3,4'
References
: