1. 程式人生 > >rabbit MQ的rpc功能詳解

rabbit MQ的rpc功能詳解

Rabbit MQ的rpc功能:
Rabbit MQ在spring中使用主要是在傳送時使用convertAndSendRecieve函式實現RPC功能,此時Rabbit在傳送訊息的同時會建立另外一個訊息佇列在consumer接受到訊息並處理完成後向新建立的佇列中傳送訊息。
若不使用spring來管理queue和exchange,則是通過傳送訊息的
replyTo: 指定回撥佇列的名稱
correlationId: 使用correlationId來判斷訊息佇列和回撥佇列,correlationId必須唯一。
值得注意的是在MQ的rpc功能中若訊息處理錯誤,在consumer一端就會不停的迴圈傳送處理錯誤的訊息,該佇列就會一直處於阻塞狀態。
這裡寫圖片描述


Rabbit MQ中消費者和傳送者可以是多執行緒的,其中設定多個消費者從而達到多執行緒的目的,但是很多情況下我們只需要一個消費者就可以了,但是單個消費者若發生阻塞情況會影響整個系統的穩定性,主要可以發現在rabbit RPC中。
在spring中可以設定listener中的concurrency可以設定consumer的個數,此時在listener中會配置一個執行緒池,線上程池中初始化一定的consumer個數,主要在task-exector中配置執行緒池。需要注意的是在沒有配置task-executor時,預設使用SimpleAsyncTaskExcutor來適應concurrency的數量。後來又發現在使用spring boot時配置中只有
spring.rabbitmq.listener.concurrency=2
spring.rabbitmq.listener.max-concurrency=10
2個配置項,其中concurrency和spring中的配置項的意思一樣,索性跟蹤程式碼發現和spring中一樣都是使用的預設的SimpleAsyncTaskExcutor,測試時發現雖然設定了max-concurrency但是在訊息錯誤,佇列阻塞時並沒有建立新的consumer執行緒而是對應的佇列一直處於阻塞狀態。
在spring中該問題的解決方法是設定別的執行緒池而不是使用SimpleAsyncTaskExcutor,SimpleAsyncTaskExcutor並不是執行緒池主要是在設定concurrency時決定建立幾個執行緒,所以可以通過建立別的執行緒池來解決問題。在spring boot中建立執行緒池的方法還在研究中。