RabbitMQ客戶端開發向導
Ⅰ、高層接口
- ConnectionFactory
- Connection
- Channel
- Consumor
Ⅱ、操作流程及API
【一】創建連接工廠ConnectionFactory
ConnectionFactory factory = new ConnectionFactory();
? 我們可以為actory
設置各種參數來進行連接初始化
factory.setUsername("guest");//設置服務器登錄賬號 factory.setPassword("guest");//設置服務器登錄密碼 factory.setHost("127.0.0.1");//設置服務器IP factory.setPort("15427");//設置服務器端口 factory.setVirtualHost("/");//設置虛擬主機
【二】創建連接Connection
Connection connection = factory.newConnection();
【三】創建信道Channel
Channel channel = connection.createChannel();
知識點:
? 連接和信道的關系:連接是客戶端與服務器開啟的
TCP
連接,由於TCP
連接的開啟和銷毀都需要消耗大量的性能,所以我們在連接的基礎上使用了信道的概念,對連接進行邏輯再分,每個連接都可以創建多個信道,這些信道共用一個連接。
? 信道就是RabbitMQ
中最實用的組件了,幾乎所有針對交換器、隊列、生產者、消費者的操作全部都在這裏面執行。
【四】創建交換器Exchange
Exchange.DeclareOK exchangeDeclare( String exchange, // 聲明交換器名稱 String type, // 聲明交換器類型 boolean durable, // 聲明是否持久化 boolean autoDelete, // 聲明是否自動刪除 boolean internal, // 聲明是否內置 Map<String, Object> arguments // 創建交換器的其他參數 ) throws IOException Exchange.DeclareOK exchangeDeclare( String exchange, // 聲明交換器名稱 BuiltinExchangeType type, // 聲明交換器類型 boolean durable, // 聲明是否持久化 boolean autoDelete, // 聲明是否自動刪除 boolean internal, // 聲明是否內置 Map<String, Object> arguments // 創建交換器的其他參數 ) throws IOException
? 創建交換器的方法有上面兩種,這兩種的區別僅在於聲明交換器的類型type
參數的類型不同,第一種常用。其實上面的方法每個都有很多個重載的方法,采用一些默認的參數。這裏只列舉參數最全的方法,來介紹其各個參數的意義。
知識點:交換器類型
?
RabbitMQ
的交換器擁有四種類型,分別為fanout
、direct
、topic
、headers
。fanout
類型的交換器會將消息路由到所有與其綁定的隊列中,類似於廣播;direct
類型是默認的交換器類型,它只會將消息路由到指定的隊列中,這個隊列的綁定key必須與消息的路由key完全一致;topic
類型的交換器是最常使用的交換器類型,是一種模糊匹配的交換器,它會將消息路由到所有綁定key與消息路由key可匹配的隊列上;至於headers
類型的交換器並不常用,因為其性能較差,並不實用。? 這裏
BuiltinExchangeType
類型是一個枚舉,它裏面定義了這四種 類型的枚舉值:FANOUT
、DIRECT
、TOPIC
、HEADERS
。
知識點:交換器持久化
? 開啟交換器的持久化,那麽交換器在創建好之後會持久化到磁盤,一般對於生產環境中長期使用的交換器,最好開啟持久化功能,用以提升
RabbitMQ
的可用性(高可用要點之一),服務器異常宕機的情況下可以硬件恢復。
知識點:交換器自動刪除
? 交換器在不再使用的情況下是可以自動刪除的,只要在創建交換器的時候設置
autoDelete
屬性為true
即可開啟自動刪除功能,默認為false
。? 自動刪除功能必須要在交換器曾經綁定過隊列或者交換器的情況下,處於不再使用的時候才會自動刪除,如果是剛剛創建的尚未綁定隊列或者交換器的交換器或者早已創建只是未進行隊列或者交換器綁定的交換器是不會自動刪除的。
? 不再使用的交換器指的即使那些曾經綁定過隊列或者交換器,現在已經沒有任何綁定隊列或者交換器的情況下的交換器。
知識點:內置交換器
? 內置交換器是一種特殊的交換器,這種交換器不能直接接收生產者發送的消息,只能作為類似於隊列的方式綁定到另一個交換器,來接收這個交換器中路由的消息,內置交換器同樣可以綁定隊列和路由消息,只是其接收消息的來源與普通交換器不同。
? 交換器的創建可以在代碼中實現,也可以不再代碼中實現。推薦在代碼中實現,因為當要創建的交換器已存在於RabbitMQ
服務器中時,是不會再次創建的,而是直接返回創建成功。
【五】綁定交換器exchangeBind
Exchange.bindOK exchangeBind(
String destination, // 指定目標交換器
String source, // 指定源交換器
String routingKey, // 指定綁定Key
Map<String, Object> arguments // 其他一些結構化參數
) throws IOException
知識點:交換器綁定
? 交換器一般用來被隊列綁定,但其實它也可以被另一個交換器綁定,綁定其實就是將二者關聯起來,綁定兩個交換器,就是將兩個交換器關聯起來,這裏面有一個主動方,一個被動方,主動方是要執行綁定的交換器(目標交換器),被動方是被綁定的交換器(源交換器),綁定的過程其實就是將目標交換器的綁定key送給源交換器,這時其實可以將目標交換器看成是一個隊列,將自己綁定到源交換器上,依靠的也是綁定key,不過這裏的綁定key是沒明確的路由Key,而非
topic
類似的模糊key。源交換器會將目標交換器當做一個隊列進行看待,將接收到的路由key與目標交換器綁定key完全匹配消息路由到這個目標交換器中。
? 這個方法同樣有重載的方法,來默認化一些參數。
【六】創建隊列Queue
Queue.DeclareOK queueDeclare(
String queue, // 聲明隊列名稱
boolean durable, // 聲明是否持久化
boolean exclusive, // 聲明是否排他
boolean autoDelete, // 聲明是否自動刪除
Map<String, Object> arguments // 設置隊列的其他一些參數
) throws IOException
知識點:隊列持久化
? 創建隊列的時候也可以設置是否支持持久化到磁盤,生產環境中我們一般都會將其設置為持久化,這也保證了服務器宕機的情況下重啟後,隊列可以恢復,以保證數據安全(高可用要點之二)
知識點:排他隊列
? 創建隊列的時候有一個排他參數
exclusive
,排他隊列只對首次創建該隊列的信道所在的連接可見,並且該連接內的所有信道都可以訪問這個排他隊列,在這個連接斷開之後,該隊列自動刪除,由此可見這個隊列可以說是綁到連接上的,對同一服務器的其他連接不可見。? 這種排他優先於持久化,即使設置了隊列持久化,在連接斷開後,該隊列也會自動刪除。
? 非排他隊列不依附於連接而存在,同一服務器上的多個連接都可以訪問這個隊列。
知識點:隊列自動刪除
? 隊列的自動刪除類似於交換器的自動刪除,都必須是曾經使用過的隊列才能執行自動刪除,如果是創建之後根本就沒有用過的隊列是不會觸發自動刪除的。
? 這個方法同樣有重載的方法,來默認化一些參數。
【七】綁定隊列queueBind
Queue.BindOK queueBind(
String queue, // 指定隊列名稱
String exchange, // 指定交換器名稱
String routingKey, // 聲明綁定key
Map<String,Object> arguments // 定義綁定的一些參數
) throws IOException
? 這裏routingKey
的值需要由指定的交換器的類型累決定使用什麽形式的key,如果是fanout
類型的交換器,會忽略routingKey
的值,如果是direct
類型的交換器,使用明確的綁定Key,如果是topic
類型的交換器,則使用帶有匹配符*
或#
的模糊key。
【八】發布消息basicPublish
void basicPublish(
String exchange, // 指定目的交換器
String routingKey, // 聲明消息的路由key
boolean mandatory, // 是否為無法路由的消息進行返回處理
boolean immediate, // 是否對路由到無消費者隊列的消息進行返回處理
BasicProperties props, // 消息的一些基本屬性設置
byte[] body // 消息體
) throws IOException
知識點:
mandatory
? 消息發布的時候設置消息的
mandatory
屬性用於設置消息在發送到交換器之後無法路由到隊列的情況對消息的處理方式,設置為true
表示將消息返回到生產者,否則直接丟棄消息。? 上述無法路由的情況可以是在無法找到匹配消息路由key的隊列,導致消息無法路由到隊列中
? 當
mandatory=true
時,出現無法路由消息被返回,那麽返回的消息又回到生產者,怎麽接收呢,這就要靠返回監聽器ReturnListener
知識點:
ReturnListener
? 我們在設置消息的
mandatory=true
的時候,就需要對返回的消息進行處理了,我們可以在信道中添加返回監聽器來監聽返回的消息,一旦監聽到這些消息,我們就著手對其進行再處理,一般我們會進行重發。channel.addReturnListener(new ReturnListener(){ @Override publish void handleReturn( int repayCode, // 回應碼 String repayText, // 回應內容 String exchange, // 來源交換器 String routingKey, // 消息的路由key AMQP.BasicProperties basicProperties, // 消息的其他屬性 byte[] body // 消息體 ) throws IOException { // do some thing to handle the return message } });
【九】消費消息basicConsumer、basicGet
1、推送消息basicConsume
String basicConsume(
String queue, // 指定隊列名稱
boolean autoAck, // 是否自動回應
String consumerTag, // 聲明消費者標簽,區分不同的消費者
boolean noLocal, // 是否不能將消息推送給同一個連接內的消費者
boolean exclusive, // 是否排他
Map<String, Object> arguments, // 設置消費者的其他參數
Consumer callback // 設置消費者回調函數,處理消息
) throws IOException
知識點:自動回應
? 推送消息時設置消息自動回應,那麽消息在推送給消費者後就會自動回應服務器,服務器收到回回應就會刪除這條消息,這樣無法保證消息能被正確處理,因為消費者雖然收到了消息,但它可能無法處理、或者拒絕等,這時服務器中消息卻已被刪除,導致消息丟失。(高可用要點之三)
? 一般情況下我們將其設置為
false
,然後在回調函數中消息處理完畢之後手動進行回應。void basicAck( long deliveryTag, // 指定推送標識編號 boolean multiple // 是否批量回應 ) throws IOException
註意:推送標識
?
deliveryTag
表示的是推送編號,是一個單調遞增正整數編號,它用來標識channel
中一次消息推送,與消息綁在一起。? 當一個消費者向服務器註冊
basicConsume
之後,服務器就會使用basic.delivery
給消費者推送消息,deliveryTag
就用來標識這樣一個推送。但要註意,它只在當前信道channel
內有效。? 消費者受到消息的時候同時會收到這個推送標識,當要回應、拒絕消息的時候就需要帶著這個標識。
註意:批量回應
?
multiple
參數為boolean
值,用來表示是否進行批量回應,當值為true
時表示進行批量回應,它會對推送編號小於給定編號的所有消息進行回應。false
表示只回應指定編號的消息。
知識點:推送範圍
?
noLocal
這個參數表示是否可以將消息推送給與消息發布者同一連接內的消費者,如果noLocal=false
表示可以推送,noLocal=true
則表示不能推送,那麽就只能推送到其他連接中的消費者。
知識點:排他
知識點:回調函數
? 回調函數主要用於定義針對消息的處理邏輯,一般采用如下方式定義:
Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery( String consumerTag, // 消費者標簽,可用於消費者驗證發送目標的正確性 Envelope envelope, // AMQP操作參數,可以獲取其中參數進行消息處理 Amqp.BasicProperties properties, // 消息的基本屬性 Byte[] body // 消息體,用於存放推送的消息 ){ // do something to handle deleveried message } }
知識點:AMQP操作參數
?
Envelope
是RabbitMQ
定義的用於封裝AMQP操作參數的類,裏面主要封裝了四個參數:private final long _deliveryTag;// 推送編號 private final boolean _redeliver; // 是否重發標簽 private final String _exchange; // 當前操作對應的交換器 private final String _routingKey; // 關聯的路由Key
? 其中
_redeliber=true
表示這是一個失敗的basicAck
之後的消息的重新推送。這裏面的這四個參數我們都可以在消費者客戶端處理消息的時候使用。
? 其實basicConsume
是消費者訂閱方法,目的在於訂閱某個隊列,是消息推送的前提,真正的消息推送並沒有在Channel
類中實現,因為推送操作是由RabbitMQ
服務器自動發起的,不需要生產者或者消費者手動觸發,所以也就沒用提供接口。
? 真正的消息推送是由RabbitMQ
服務器的Basic.delivery
方法實現的。
2、拉取消息basicGet
GetResponse basicGet(
String queue, // 指定隊列名稱
boolean autoAck // 是否自動回應
) throws IOException;
? 拉取消息就是指消費者主動從服務器獲取消息,每次只能獲取一條消息。推送消息是被動的獲取。這裏的autoAck
和之前推送的設置一樣,一般設置為false
,表示不主動回應,采用手動回應(高可用要點之三)
【十】拒絕消息basicReject、basicNack
1、拒絕一個消息basicReject
void basicReject(
long deliveryTag, // 消息推送編號
boolean requeue // 是否重新入隊
) throws IOException;
2、拒絕多個消息basicNack
void basicNack(
long deliveryTag, // 消息推送編號
boolean multiple, // 是否批量拒絕
boolean requeue // 是否重新入隊
)throws IOException;
知識點:重新入隊
? 當一個消息推送到某一個消費者,這個消費者無法處理時它進行了拒絕操作,如果指定
requeue
值為true
,表示被拒絕的消息還可以重新發送到隊列,可被繼續推送到其他消費者,如果設置為false
,那麽這條消息會被立刻從隊列刪除。? 如果將這兩個方法的
requeue
參數設置為false
,那麽可以啟用死信隊列功能,因為這樣的話,返回的消息會變成死信,如果服務器中設置有死信交換器DLX
,並且已關聯到該隊列,那麽這個消息就會被發送到死信交換器,從而被路由到綁定的死信隊列中得以保留,我們可以通過排查這些消息來進行服務器優化。
知識點:批量拒絕
? 消息的單個拒絕與批量拒絕使用的不是同一個方法,批量拒絕的方法
basicNack
中有個決定是否批量拒絕的參數mutiple
,如果設置為false
,表示不執行批量拒絕,那麽它的效果等同於basicReject
方法,如果設置為true
,表示拒絕小於指定推送編號的所有未被當前消費者消費的消息。
【十一】取消消費者basicCancel
void basicCancel(String consumerTag // 指定要取消的消費者標簽
) throws IOException;
【十二】關閉連接
channel.close();// 關閉信道
connection.close();// 關閉TCP連接
RabbitMQ客戶端開發向導