1. 程式人生 > >RabbitMQ 釋出訂閱

RabbitMQ 釋出訂閱

 

網際網路公司對訊息佇列是深度使用者,因此需要我們瞭解訊息佇列的方方面面,良好的設計及深入的理解,更有利於我們對訊息佇列的規劃。

當前我們使用訊息佇列中發現一些問題

1、實際上是非同步無返回遠端呼叫,由釋出者定義佇列,消費者訂閱已定義的佇列。

2、並沒有體現解耦設計,而且開發人員間依然要像單體專案開發那樣針對同一個功能不斷溝通互動,提高了開發時間以及成本。

3、沒有訊息版本的實現,導致釋出者服務和消費者服務必須一起更新。如果沒有保持一致可能導致批量的合法訊息被丟到死信佇列,甚至可能要啟動舊服務將舊版本的訊息消費掉才能更新服務。並且開發人員要進入釋出流程指導各服務的釋出順序。

4、訂閱者的物件定義在“叢集”,但現實中的確存在“節點”的訂閱需求,如節點的配置更新、本地快取的重新整理等。

快速、高效的使用訊息佇列,不需要過多的溝通成本,是我們不斷的追求。因此釋出訂閱映入我們眼簾,公司內部稱為分散式事件。

RabbitMQ的釋出訂閱能解決我們的什麼問題呢?

  1. 不需要傳送端和接收端同時釋出。相比較普通的佇列方式,如果傳送端釋出,而消費端未釋出,那麼就會有大量的訊息積壓。
  2. 實現一次釋出,多次處理。

RabbitMQ釋出訂閱模式:像使用郵箱一樣,不需要傳送端和接收端共同釋出。

釋出訂閱基礎概念:

Exchange在定義的時候是有型別的,以決定到底是哪些Queue符合條件,可以接收訊息:

  1. fanout:所有bind到此exchange的queue都可以接收訊息
  2. direct:通過routingKey和exchange決定的那個唯一的queue可以接收訊息
  3. topic:所有符合routingKey(此時可以是一個表示式)的routingKey所bind的queue可以接收訊息
  4. headers:通過headers 來決定把訊息發給哪些queue(這個很少用)

fanout訂閱釋出模式(廣播模式)

direct訂閱釋出模式(廣播模式)

 

topic定義釋出模式(廣播模式)

實現釋出訂閱模式下訊息的釋出訂閱主要有以下幾個步驟:

  1. 建立訊息會話IMQSession(前提連線IMQConnection已經建立好了)
  2. 宣告要訂閱的訊息主題
  3. 宣告訊息主題訂閱者
  4. 宣告訊息傳送者
  5. 傳送訊息
  6. 主題訂閱者接收訊息
  7. 關閉主題訂閱者
  8. 關閉會話

生產者傳送廣播是實時的,消費者需要提前等待生產者發生訊息,這個又叫訂閱釋出,收音機模式,就像只有收音機打開了才能聽到鎖定的FM頻道,但是如果在節目開始一段時間,再開啟收音機的話,之前的節目就收聽不到了。即訂閱之前的訊息都是收不到的。

釋出訂閱相關的執行命令:

rabbitmqctl list_exchanges  列出所有exchange

臨時佇列:

    我們需要每次連線至mq的時候使用一個新佇列,使用完了就銷燬,這裡可使用臨時佇列:

result = channel.queue_declare()

   然後就可以通過result.method.queue獲取臨時佇列名稱,提供給消費者使用。 另外消費者用完後需要銷燬,可新增一個exclusive選項:result = channel.queue_declare(exclusive=True)  代表該佇列是排他性佇列。

繫結:Binding

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

檢視系統所有的繫結命令

rabbitmqctl list_binding

釋出訂閱相關的概念主要包括exchange、binding、routingkey、及queue。我們只需要按照步驟,使用合適的交換器型別,即可實現釋出訂閱的一對多訊息處理。