1. 程式人生 > 實用技巧 >Rabbitmq學習記錄

Rabbitmq學習記錄

優點

  • 實現了AMQP標準的訊息伺服器
  • 可靠性,使用持久化、傳輸確認、釋出確認等保證
  • 靈活的路由
  • 叢集部署簡單
  • 支援多種協議,以及多種語言客戶端
  • 易用的使用者介面

...

MAC下的安裝

採用Homebrew進行安裝:

➜  ~ brew update
➜  ~ brew install rabbitmq

通過brew安裝的檔案位於/usr/local/Cellar/rabbitmq/3.7.14/sbin中。開啟外掛:

➜  /usr/local/Cellar/rabbitmq/3.7.14 sudo sbin/rabbitmq-plugins enable rabbitmq_management

直接執行rabbitmq-server

會提示找不到命令:

➜  ~ rabbitmq-server
zsh: command not found: rabbitmq-server

因為使用的是zsh,所以需要將路徑新增至.zshrc中:

➜  ~ cat .zshrc
# If you come from bash you might have to change your $PATH.
# export PATH=$HOME/bin:/usr/local/bin:$PATH
# Path to your oh-my-zsh installation.
export PATH=$PATH:/usr/local/opt/rabbitmq/sbin
...

source .zshrc使它生效。這下rabbit-server是有用了,但是啟動卻報錯了:

2019-04-25 15:18:09.525358
    args: []
    format: "Error when reading /Users/littlemay/.erlang.cookie: eacces"
    label: {error_logger,error_msg}

原因是/Users/littlemay/.erlang.cookie這個檔案沒有許可權,需要更改檔案所有者以及許可權:

➜  ~ ls -l /Users/littlemay/.erlang.cookie
-r--------  1 root  staff  20  4 25 00:00 /Users/littlemay/.erlang.cookie
➜  ~ sudo chown littlemay:staff /Users/littlemay/.erlang.cookie
➜  ~ sudo chmod u+x /Users/littlemay/.erlang.cookie

折騰了半個小時,此時終於可以不使用sudo啟動了。

可以使用rabbitmqctl start_app/stop_app啟動/停止rabbitmq。網頁端的使用者介面訪問:localhost:15762,預設使用者名稱密碼都為guest

基本概念

  • Message:訊息,AMQP中預定了14個屬性,即後文pika中會用到的BasicProperties
  • Producer:訊息的生產者,也是一個向交換機發布訊息的客戶端應用程式
  • Consumer:訊息的消費者,表示一個從訊息佇列中取得訊息的客戶端應用程式。
  • Exchange:交換機,用來接收生產者傳送的訊息並根據規則將這些訊息路由給伺服器中的佇列
  • Queue:訊息佇列,用來儲存訊息直到傳送給消費者。一個訊息可投入一個或多個佇列。如果一個佇列沒有被任何消費者佇列,那麼它將一直在佇列裡面,等待消費者連線到這個佇列將其取走。多個消費者可以訂閱同一個佇列,此時佇列的訊息會被平均分攤(round-robin)給多個消費者處理。
  • Binding:繫結,將ExchangeQueue按照路由規則繫結
  • Channel:通道,多路複用連線中的一條獨立的雙向資料流通道。通道是建立在真實的TCP連線內的虛擬連線,AMQP 命令都是通過通道發出去的,不管是釋出訊息、訂閱佇列還是接收訊息,這些動作都是通過通道完成。因為對於作業系統來說建立和銷燬TCP都是非常昂貴的開銷,所以引入了通道的概念,以複用一條TCP連線
  • Virtual Host:虛擬主機,用作不同使用者的許可權分離。預設的vhost/
  • Broker:表示訊息佇列伺服器實體

Routing KeyBinding Key

Binding Key是在ExchangeQueue進行Binding時使用的路由規則。多個Queue允許使用同一個Binding KeyExchange進行繫結。

Routing Key是當生產者將訊息傳送給Exchange時,指定的路由規則。它會依據Exchange TypeBinding Key聯合使用來決定將訊息投放到哪個佇列中。

Routing Key的長度限制為255 bytes

Exchange Types

通常使用的有四種:

  • fanout:將所有傳送到該Exchange的訊息路由到所有與它繫結的Queue中,此時Binding KeyRouting Key不起作用
  • direct:將所有傳送到該Exchange的訊息路由到Binding KeyRouting Key完全匹配的Queue
  • topic:將所有傳送到該Exchange的訊息路由到Binding KeyRouting Key相匹配的Queue中。採用.進行分隔(每分隔開的一段獨立字串成為一個單詞),*表示匹配一個單詞,#表示匹配0個或者多個。
  • headers:不依賴於Routing KeyBingding Key的匹配規則路由訊息,而是根據傳送訊息內容中的headers屬性進行匹配,它與direct功能一致,但是效能十分低,一般不使用。

Message Acknowledgment

消費者接收到了訊息,RabbitMQ並不會立即將訊息從佇列中丟棄,而是在接收到消費者的ACK響應之後,才進行丟棄。

如果RabbitMQ沒有收到回執並檢測到消費者的RabbitMQ斷開,將會傳遞給其他消費者直到佇列接收到了ACK,以此保證每一個訊息都能被有效傳遞。因此,如果忘記回覆ACK,那麼佇列中的訊息會越來越多,RabbitMQ server不會再發送資料給它,可以起到限流的作用。

Prefetch Count

允許消費者每次從佇列中獲取任意數量的訊息。針對任務粒度小,執行時間短的消費者,可以設定更大的預讀取數。

RPC遠過程呼叫

除了非同步通訊之外,RabbitMQ也支援同步呼叫。實現原理如圖:

  1. 生產者在生產請求訊息時,會在請求訊息的屬性中設定兩個replyTo 值:一個為Queue Name,用於告知消費者將處理完成後的通知訊息返回到該佇列;另一個為correlationId,是請求訊息的唯一標示,隨著請求訊息一同傳送給消費者,也會隨著響應訊息返回給生產者,生產者就能夠通過correlationId來判定來判定請求是否被成功執行,最終實現請求和響應的一一配對。
  2. 生產者只有在接收到響應訊息之後才會繼續發生下一次請求訊息,以此實現同步的效果

Python3實踐

首先需要安裝pika,分別建立消費者和生產者:

生產者

# rabbit_p.py
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello')

for i in range(20):
    body = '%s Hello rabbitmq' % i
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body=body)

connection.close()

消費者

# rabbit_c1.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 為了防止queue不存在,最好消費者和生產者都建立
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print(body)
# 如果不開啟ack,訊息會一直顯示unacked,每當消費者重啟又會重複消費
channel.basic_consume(on_message_callback=callback,
                       queue='hello',
                     auto_ack=True)

channel.start_consuming()

使用basic_consume而不是迴圈呼叫basic_get

如果使用exchange='',會繫結在RabbitMQ預設的Exchange下,它綁定了所有的queuerouting key使用queue name

如果再開啟一個消費者,那麼他們會輪流處理訊息。

prefetch_count

為了防止有的消費者空閒,所以可以設定prefetch_count引數(使用basic_qos來指定),當RabbitMQ發現有個消費者遲遲沒有返回ACK,便不會再將訊息分配給它,此時需要我們採用主動ACK的方式,消費者1不進行ack,消費者2進行ack:

# rabbit_c1.py
...
def callback(ch, method, properties, body):
    print(body)
# basic_qos需要放在basic_consume前才有效
channel.basic_qos(prefetch_count=3)
channel.basic_consume(on_message_callback=callback,
                       queue='hello',
                      )
...
# rabbit_c2.py
...
def callback(ch, method, properties, body):
    print(body)
    # 主動使用ack告知呼叫方
    ch.basic_ack(delivery_tag=method.delivery_tag)
...

可以發現,消費者1只處理了三條資料(並且網頁端顯示一直是unacked),其他的資料都由消費者2處理了。接著此時關閉消費者1,會發現這三條資料又被分配給消費者2所處理了。

Exchange

fanout型別:

需要在消費者和生產者上面都加上對exchange的宣告,兩個消費者分別訂閱了hello1hello2佇列,這兩個佇列都連向hello_fanout這個exchange

# rabbit_c1.py
...
channel.exchange_declare(exchange='hello_fanout', exchange_type='fanout')
channel.queue_declare(queue='hello1')
channel.queue_bind(exchange='hello_fanout', queue='hello1', routing_key='hello.1')
...
# rabbit_c2.py 
...
channel.exchange_declare(exchange='hello_fanout', exchange_type='fanout')
channel.queue_declare(queue='hello2')
channel.queue_bind(exchange='hello_fanout', queue='hello1', routing_key='hello.2')
...
# rabbit_p.py
channel.exchange_declare(exchange='hello_fanout', exchange_type='fanout')
channel.queue_declare(queue='hello1')
channel.queue_declare(queue='hello2')

for i in range(20):
    body = '%s Hello rabbitmq' % i
    channel.basic_publish(exchange='hello_fanout',
                          routing_key='world',
                          body=body)

fanout模式下,routing_key即使不匹配也沒有關係。消費者1和消費者2的輸出結果一樣。

direct

將消費者修改為:

# rabbit_c1.py
...
# 修改exchange的type為direct
channel.exchange_declare(exchange='hello_direct', exchange_type='direct')
channel.queue_declare(queue='hello1')
# 指定路由規則
channel.queue_bind(exchange='hello_direct', queue='hello1', routing_key='hello.1')
...
channel.basic_consume(on_message_callback=callback,
                       queue='hello1',
                      )

...

生產者:

# rabbit_p.y
...
channel.exchange_declare(exchange='hello', exchange_type='direct')
channel.queue_declare(queue='hello1')
channel.queue_declare(queue='hello2')

for i in range(20):
    body = '%s Hello rabbitmq' % i
    if i < 10:
        channel.basic_publish(exchange='hello',
                              routing_key='hello.1',
                              body=body)
    else:
        channel.basic_publish(exchange='hello',
                              routing_key='hello.2',
                              body=body)
...

此時i<10的訊息會由消費者1處理。

topic

Binding Key可以存在*#,但是生產者的Routing Key應當是精確的。修改消費者與生產者分別為:

#rabbit_c1.py
channel.exchange_declare(exchange='hello_topic', exchange_type='topic')
channel.queue_declare(queue='hello1')
channel.queue_bind(exchange='hello_topic', queue='hello1', routing_key='hello.*')
...
#rabbit_c2.py 綁定了兩種規則
channel.queue_bind(exchange='hello_topic', queue='hello2', routing_key='hello.*')
channel.queue_bind(exchange='hello_topic', queue='hello2', routing_key='*.hello.*')

#rabbit_p.py
...
channel.exchange_declare(exchange='hello_topic', exchange_type='topic')
for i in range(20):
    body = '%s Hello rabbitmq' % i
    if 10 < i < 20:
        channel.basic_publish(exchange='hello_topic',
                              routing_key='hello.2',
                              body=body)
    else:
        channel.basic_publish(exchange='hello_topic',
                              routing_key='2.hello.2',
                              body=body)
...


一個佇列可以使用多種路由規則與同一exchange繫結。此時消費者2可以收到全部訊息,而消費者1只能收到11-19之間的訊息。

同樣的,一個佇列可以與多個exchange繫結,讓消費者2訂閱的佇列同時與hello_topichello_direct相連:

# rabbit_c2.py
channel.exchange_declare(exchange='hello_topic', exchange_type='topic')
channel.exchange_declare(exchange='hello_direct', exchange_type='direct')
channel.queue_declare(queue='hello2')
channel.queue_bind(exchange='hello_direct', queue='hello2', routing_key='hello.2')
channel.queue_bind(exchange='hello_topic', queue='hello2', routing_key='hello.*')

# rabbit_p.py

channel.exchange_declare(exchange='hello_topic', exchange_type='topic')
channel.exchange_declare(exchange='hello_direct', exchange_type='direct')

for i in range(20):
    body = '%s Hello rabbitmq' % i
    if 10 < i < 20:
        channel.basic_publish(exchange='hello_topic',
                              routing_key='hello.1',
                              body=body)
    else:
        channel.basic_publish(exchange='hello_direct',
                              routing_key='hello.2',
                              body=body)

此時,消費者2可以收到全部訊息。

exclusive

前面測試的時候,如果不重啟RabbitServer,如果沒有刪除交換機或者佇列的話,以前的路由規則仍然存在,經常會出現匪夷所思的問題。這是因為持久化的問題,留到下節介紹。

實際上可以使用臨時佇列的方式進行處理,exclusive引數指明這個佇列為排他性佇列,即只有自己可以訪問,此時生產者不能再進行生命,當連線斷掉,這個queue會被刪除(這點與auto_delete引數功能一致):

# rabbit_c3.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='hello_direct', exchange_type='direct')
# 如果queue為空,會自動建立一個名字唯一的佇列,想要連線斷開自動刪除,可以使用exclusive,也可以使用auto_delete
# 佇列名採用r.method.queue獲取
r = channel.queue_declare(queue='', exclusive=True)
#r = channel.queue_declare(queue='', auto_delete=True)
channel.queue_bind(exchange='hello_direct', queue=r.method.queue, routing_key='hello.2')

def callback(ch, method, properties, body):
    print(body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(on_message_callback=callback,
                       queue=r.method.queue)

channel.start_consuming()

這樣可以實現一個動態建立佇列的消費者。

持久化

當將RabbitMQ停掉,會發現不論是exchange還是queue的所有資訊都丟失了,因此如果需要恢復exchangequeue,需要在宣告交換機和佇列時就提供持久化引數(durable=True):

# rabbit_c1.py
channel.exchange_declare(exchange='hello_topic', exchange_type='topic', durable=True)
channel.queue_declare(queue='hello1', durable=True)

需要注意的是,如果消費者聲明瞭durable=True,那麼生產者在再次宣告時,也必須將其宣告為durable=True,否則會報錯。或者也可以刪除後再次宣告:

channel.queue_delete(queue='hello1')
channel.exchange_delete(exchange='hello_topic')

但是想要使得佇列裡的訊息不丟失,需要額外在basic_publish時也指定訊息的引數(delivery_mode=2):

# rabbit_p.py
channel.basic_publish(exchange='hello_topic',
                      routing_key='hello.1',
                      properties=pika.BasicProperties(delivery_mode=2),
                      body=body)

這樣從交換機、佇列層面保證了訊息只要進入了,就不會丟失,但是如果在投遞的過程中丟失,比如訊息並未到達交換機或者沒有對應的佇列(訊息會被丟棄),此時應當使用RabbitMQ提供的confirm mode

以上測試是在最新的pika==1.0.1之下,但是這個版本下channel.basic_publish並沒有返回了,所以切回pika=0.13.1下測試,接下來的環境都為Python=3.7 pika=0.13.1

# rabbit_p.py
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='test', exchange_type='fanout')
channel.queue_declare(queue="test", durable=True, exclusive=False, auto_delete=False)

# 開啟確認
channel.confirm_delivery()
res = channel.basic_publish(exchange='test',
                         routing_key='test',
                         body='Hello World!',
                         properties=pika.BasicProperties(content_type='text/plain',
                                                         delivery_mode=2),
                            mandatory=True)

此時由於沒有消費者繫結名為testexchange,因此會返回False

每一個版本的pika函式引數都有很大不同T_T

confirm mode的確認結果表示,一條persisting的訊息投向durable佇列成功,並且成功寫到磁碟。

basic_reject

basic_ack相對應的是basic_reject,它可以拒絕一條訊息,如果要拒絕多條訊息,使用basic_nack

# rabbit_c1.py
def callback(ch, method, properties, body):
    global count
    if count % 2 == 0:
        ch.basic_reject(delivery_tag=method.delivery_tag)
    else:
        print(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    count += 1

預設情況下,requeue=True,表示訊息被拒絕後可以由其他消費者處理。

RPC呼叫

首先定義生產者端:

import pika
import uuid

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='rpc_p', exchange_type='fanout')
ch.exchange_declare(exchange='rpc_r', exchange_type='direct')
ch.queue_declare(queue='rpc_p')
ch.queue_declare(queue='rpc_r')
ch.queue_bind(exchange='rpc_p', queue='rpc_p')
ch.queue_bind(exchange='rpc_r', queue='rpc_r')


def callback(channel, method, properties, body):
    print(body, properties.correlation_id)
    channel.basic_ack(delivery_tag=method.delivery_tag)

ch.basic_consume(callback, queue='rpc_r')
correlation_id = uuid.uuid4().hex
print(correlation_id)
ch.basic_publish(exchange='rpc_p',
                 routing_key='',
                 body='1,2,3,4',
                 properties=pika.BasicProperties(
                     reply_to='rpc_r',
                     correlation_id=correlation_id
                 ))
ch.start_consuming()

生產者指定自己發出訊息的交換機為rpc_p,需要接受回覆使用的交換機為rpc_r,並分別繫結佇列rpc_prpc_r,在釋出訊息時,指定reply_to引數以及標識這條訊息的唯一id

消費者:

import pika

conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
ch = conn.channel()
ch.exchange_declare(exchange='rpc_r', exchange_type='direct')
ch.exchange_declare(exchange='rpc_p', exchange_type='fanout')
ch.queue_declare(queue='rpc_p')
ch.queue_bind(exchange='rpc_p', queue='rpc_p')

def callback(channel, method, properties, body):
    print(body)
    body = body.decode('utf-8')
    s = sum(int(x) for x in body.split(','))
    channel.basic_publish(exchange='rpc_r',
                          routing_key=properties.reply_to,
                          body=str(s),
                          properties=pika.BasicProperties(
                              correlation_id=properties.correlation_id
                          ))
    channel.basic_ack(delivery_tag=method.delivery_tag)

ch.basic_consume(callback, queue='rpc_p')
ch.start_consuming()

消費者只需要一直監聽來自佇列rpc_p的訊息,並且在接收到訊息後將確認資訊傳送至reply_to所指向的佇列,同時指明自己所收到的是哪條訊息。

輸出結果:

# 生產者
7bfd8db775144308945b679ed4c5a2ed
b'10' 7bfd8db775144308945b679ed4c5a2ed
# 消費者
b'1,2,3,4'

References

RabbitMQ 使用參考

訊息佇列之 RabbitMQ

RabbitMq的整理 exchange、route、queue關係

一篇全面透徹的RabbitMQ指南!