RabbitMQ學習(二):Java使用RabbitMQ要點知識
1、maven依賴
<dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.3</version> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency>
2、RabbitMQ重要方法介紹(基本常用的)
2.1、建立連線
// 建立連線工廠 ConnectionFactory cf = new ConnectionFactory(); // 設定rabbitmq伺服器IP地址 cf.setHost("*.*.*.*"); // 設定rabbitmq伺服器使用者名稱 cf.setUsername("***"); // 設定rabbitmq伺服器密碼 cf.setPassword("***"); // 指定埠,預設5672 cf.setPort(AMQP.PROTOCOL.PORT); // 獲取一個新的連線 connection = cf.newConnection(); // 建立一個通道 channel = connection.createChannel(); //關閉管道和連線 channel.close(); connection.close();
2.2、宣告佇列
/** * 申明一個佇列,如果這個佇列不存在,將會被建立 * @param queue 佇列名稱 * @param durable 永續性:true佇列會再重啟過後存在,但是其中的訊息不會存在。 * @param exclusive 是否只能由建立者使用,其他連線不能使用。 * @param autoDelete 是否自動刪除(沒有連線自動刪除) * @param arguments 佇列的其他屬性(構造引數) * @return Queue.DeclareOk:宣告佇列的聲明確認方法已成功宣告。 * @throws java.io.IOException if an error is encountered */ channel.queueDeclare("testQueue", true, false, false, null);
此方法一般由Producer呼叫建立訊息佇列。如果由Consumer建立佇列,有可能Producer釋出訊息的時候Queue還沒有被建立好,會造成訊息丟失的情況。
2.3、宣告Exchange
/**
* 宣告一個 exchange.
* @param exchange 名稱
* @param type exchange type:direct、fanout、topic、headers
* @param durable 持久化
* @param autoDelete 是否自動刪除(沒有連線自動刪除)
* @param arguments 佇列的其他屬性(構造引數)
* @return 成功地聲明瞭一個聲明確認方法來指示交換。
* @throws java.io.IOException if an error is encountered
*/
channel.exchangeDeclare("leitao","topic", true,false,null);
2.4、將queue和Exchange進行繫結(Binding)
/**
* 將佇列繫結到Exchange,不需要額外的引數。
* @param queue 佇列名稱
* @param exchange 交換機名稱
* @param routingKey 路由關鍵字
* @return Queue.BindOk:如果成功建立繫結,則返回繫結確認方法。
* @throws java.io.IOException if an error is encountered
*/
channel.queueBind("testQueue", "leitao", "testRoutingKey");
2.5、釋出訊息
/**
* 釋出一條不用持久化的訊息,且設定兩個監聽。
* @param exchange 訊息交換機名稱,空字串將使用直接交換器模式,傳送到預設的Exchange=amq.direct。此狀態下,RoutingKey預設和Queue名稱相同
* @param routingKey 路由關鍵字
* @param mandatory 監聽是否有符合的佇列
* @param immediate 監聽符合的佇列上是有至少一個Consumer
* @param BasicProperties 設定訊息持久化:MessageProperties.PERSISTENT_TEXT_PLAIN是持久化;MessageProperties.TEXT_PLAIN是非持久化。
* @param body 訊息物件轉換的byte[]
* @throws java.io.IOException if an error is encountered
*/
channel.basicPublish("",queueName,true,false,MessageProperties.TEXT_PLAIN,SerializationUtils.serialize(object));
當exchange的值為空字串或者是amq.direct時,此時的交換器型別預設是direct型別,可以不用單獨宣告Exchange,也不用單獨進行Binding,系統預設將queue名稱作為RoutingKey進行了繫結。
兩個傳入引數的含義
mandatory
當mandatory標誌位設定為true時,如果exchange根據自身型別和訊息routeKey無法找到一個符合條件的queue,那麼會呼叫basic.return方法將訊息返回給生產者(Basic.Return + Content-Header + Content-Body);當mandatory設定為false時,出現上述情形broker會直接將訊息扔掉。
immediate
當immediate標誌位設定為true時,如果exchange在將訊息路由到queue(s)時發現對於的queue上沒有消費者,那麼這條訊息不會放入佇列中。當與訊息routeKey關聯的所有queue(一個或者多個)都沒有消費者時,該訊息會通過basic.return方法返還給生產者。
概括來說,mandatory標誌告訴伺服器至少將該訊息route到一個佇列中,否則將訊息返還給生產者;immediate標誌告訴伺服器如果該訊息關聯的queue上有消費者,則馬上將訊息投遞給它,如果所有queue都沒有消費者,直接把訊息返還給生產者,不用將訊息入佇列等待消費者了。
注意:在RabbitMQ3.0以後的版本里,去掉了immediate引數的支援,傳送帶immediate=true標記的publish會返回如下錯誤:
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error;protocol method: #method<connection.close>(reply-code=540, reply-text=NOT_IMPLEMENTED - immediate=true, class-id=60, method-id=40)。
為什麼取消支援:immediate標記會影響映象佇列效能,增加程式碼複雜性,並建議採用“TTL”和“DLX”等方式替代。
2.6、接收訊息
/**
* 設定消費批量投遞數目,一次性投遞10條訊息。當消費者未確認訊息累計達到10條時,rabbitMQ將不會向此Channel上的消費者投遞訊息,直到未確認數小於10條再投遞
* @param prefetchCount 投遞數目
* @param global 是否針對整個Channel。true表示此投遞數是給Channel設定的,false是給Channel上的Consumer設定的。
* @throws java.io.IOException if an error is encountered
*/
channel.basicQos(10,false);
//整個傳輸管道最多15條,具體分到每個消費者身上又不能大於10條
channel.basicQos(15,true);
/**
* 開始一個非區域性、非排他性消費, with a server-generated consumerTag.
* 執行這個方法會回撥handleConsumeOk方法
* @param queue 佇列名稱
* @param autoAck 是否自動應答。false表示consumer在成功消費過後必須要手動回覆一下伺服器,如果不回覆,伺服器就將認為此條訊息消費失敗,繼續分發給其他consumer。
* @param callback 回撥方法類,一般為自己的Consumer類
* @return 由伺服器生成的consumertag
* @throws java.io.IOException if an error is encountered
*/
channel.basicConsume(queueName, false, Consumer);
2.7、Consumer處理訊息
/**
* 消費者收到訊息的回撥函式
* @param consumerTag 消費者標籤
* @param envelope 訊息的包裝資料
* @param properties 訊息的內容頭資料
* @param body 訊息物件的byte[]
* @throws IOException
*/
void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException;
3、Producer訊息確認機制
3.1、什麼是生產者訊息確認機制?
沒有訊息確認模式時,生產者不知道訊息是不是已經到達了Broker伺服器,這對於一些業務嚴謹的系統來說將是災難性的。訊息確認模式可以採用AMQP協議層面提供的事務機制實現(此文沒有這種實現方式),但是會降低RabbitMQ的吞吐量。RabbitMQ自身提供了一種更加高效的實現方式:confirm模式。
訊息生產者通過呼叫Channel.confirmSelect()方法將Channel通道設定成confirm模式。一旦通道被設定成confirm模式,該通道上的所有訊息都會被指派一個唯一的ID(從1開始),一旦訊息被對應的Exchange接收,Broker就會發送一個確認給生產者(其中deliveryTag就是此唯一的ID),這樣訊息生產者就知道訊息已經成功到達Broker。
confirm模式最大的好處在於他是非同步的,一旦釋出一條訊息,生產者應用程式就可以在等通道返回確認的同時繼續傳送下一條訊息,當訊息最終得到確認之後,生產者應用便可以通過回撥方法來處理該確認訊息,如果RabbitMQ因為自身內部錯誤導致訊息丟失,就會發送一條nack訊息,生產者應用程式同樣可以在回撥方法中處理該nack訊息。
在channel 被設定成 confirm 模式之後,所有被 publish 的後續訊息都將被 confirm(即 ack) 或者被nack一次。但是沒有對訊息被 confirm 的快慢做任何保證,並且同一條訊息不會既被 confirm又被nack 。
3.2、開啟confirm模式
如上所說生產者通過呼叫Channel.confirmSelect()方法將Channel通道設定成confirm模式。
注意:已經在transaction事務模式的channel是不能再設定成confirm模式的,即這兩種模式是不能共存的。
3.3、普通confirm模式
普通confirm模式是序列的,即每次傳送了一次訊息,生產者都要等待Broker的確認訊息,然後根據確認標記權衡訊息重發還是繼續發下一條。由於是序列的,在效率上是比較低下的。
(1)重點方法
/**
* 等待Broker返回訊息確認標記
* 注意,在非確定的通道,waitforconfirms丟擲IllegalStateException。
* @return 是否傳送成功
* @throws java.lang.IllegalStateException
*/
boolean waitForConfirms() throws InterruptedException;
(2)部分使用程式碼如下:
//注意:返回的時候Return在前,Confirm在後
channel.confirmSelect();
int i=1;
while (i<=50) {
//釋出訊息
channel.basicPublish("",queueName,true,MessageProperties.TEXT_PLAIN,SerializationUtils.serialize(object));
//等待Broker的確認回撥
if(channel.waitForConfirms())
System.out.println("send success!");
else
System.out.println("send error!");
i++;
}
3.4、批量confirm模式
批量confirm模式是非同步的方式,效率要比普通confirm模式高許多,但是此種方式也會造成執行緒阻塞,想要進行失敗重發就必須要捕獲異常。網路上還有采用waitForConfirms()實現批量confirm模式的,但是隻要一條失敗了,就必須把這批次的訊息統統再重發一次,非常的消耗效能,因此此文不予考慮。
(1)重點程式碼
/**
* 等待直到所有訊息被確認或者某個訊息傳送失敗。如果訊息傳送確認失敗了,
* waitForConfirmsOrDie 會丟擲IOException異常。當在非確認通道上呼叫時
* ,會丟擲IllegalStateException異常。
* @throws java.lang.IllegalStateException
*/
void waitForConfirmsOrDie() throws IOException, InterruptedException;
(2)部分程式碼如下:
//注意:返回的時候Return在前,Confirm在後
channel.confirmSelect();
int i=1;
while (i<=50) {
//釋出訊息
channel.basicPublish("",queueName,true,MessageProperties.TEXT_PLAIN,SerializationUtils.serialize(object));
i++;
}
channel.waitForConfirmsOrDie();
3.5、ConfirmListener監聽器模式
RabbitMQ提供了一個ConfirmListener介面專門用來進行確認監聽,我們可以實現ConfirmListener介面來建立自己的訊息確認監聽。ConfirmListener介面中包含兩個回撥方法:
/**
* 生產者傳送訊息到exchange成功的回撥方法
*/
void handleAck(long deliveryTag, boolean multiple) throws IOException;
/**
* 生產者傳送訊息到伺服器broker失敗的回撥方法,伺服器丟失了此訊息。
* 注意,丟失的訊息仍然可以傳遞給消費者,但broker不能保證這一點。
*/
void handleNack(long deliveryTag, boolean multiple) throws IOException;
其中deliveryTag是Broker給每條訊息指定的唯一ID(從1開始);multiple表示是否接收所有的應答訊息,比如multiple=true時,傳送100條訊息成功過後,我們並不會收到100次handleAck方法呼叫。
(1)重要方法
//註冊訊息確認監聽器
channel.addConfirmListener(new MyConfirmListener());
(2)部分使用程式碼如下:
//注意:返回的時候Return在前,Confirm在後
channel.confirmSelect();
//註冊訊息確認監聽器
channel.addConfirmListener(new MyConfirmListener());
//註冊訊息結果返回監聽器
channel.addReturnListener(new MyReturnListener());
int i=1;
while (i<=50) {
//釋出訊息
channel.basicPublish("",queueName,true,MessageProperties.TEXT_PLAIN,SerializationUtils.
serialize(object));
i++;
}
//自定義的訊息確認監聽器
public class MyConfirmListener implements ConfirmListener{
/**
* 生產者傳送訊息到exchange成功的回撥方法
* 訊息被Exchange接受以後,如果沒有匹配的Queue,則會被丟棄。但是可以設定ReturnListener監聽來監聽有沒有匹配的佇列。
* 因此handleAck執行了,並不能完全表示訊息已經進入了對應的佇列,只能表示對應的exchange成功的接收了訊息。
* 訊息被exchange接收過後,還需要通過一定的匹配規則分發到對應的佇列queue中。
*/
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//注意:deliveryTag是broker給訊息指定的唯一id(從1開始)
System.out.println("Exchange接收訊息:"+deliveryTag+"(deliveryTag)成功!multiple="+multiple);
}
/**
* 生產者傳送訊息到伺服器broker失敗的回撥方法,伺服器丟失了此訊息。
* 注意,丟失的訊息仍然可以傳遞給消費者,但broker不能保證這一點。(不明白,既然丟失了,為啥還能傳送)
*/
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Exchange接收訊息:"+deliveryTag+"(deliveryTag)失敗!伺服器broker丟失了訊息");
}
}
//自定義的結果返回監聽器
/**
* 實現此介面以通知交付basicpublish失敗時,“mandatory”或“immediate”的標誌監聽(原始碼註釋翻譯)。
* 在釋出訊息時設定mandatory等於true,監聽訊息是否有相匹配的佇列,
* 沒有時ReturnListener將執行handleReturn方法,訊息將返給傳送者
*/
public class MyReturnListener implements ReturnListener {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
BasicProperties properties, byte[] body) throws IOException {
System.out.println("訊息傳送到佇列失敗:回覆失敗編碼:"+replyCode+";回覆失敗文字:"+replyText+";失敗訊息物件:"+SerializationUtils.deserialize(body));
}
}
4、Consumer訊息確認機制
為了保證訊息從佇列可靠地到達消費者,RabbitMQ提供訊息確認機制(message acknowledgment)。消費者在註冊消費者時,可以指定noAck引數,當noAck=false時,RabbitMQ會等待消費者顯式發回ack訊號後才從記憶體(或磁碟,如果是持久化訊息的話)中移去訊息。否則,RabbitMQ會在佇列中訊息被消費後立即刪除它。
當noAck=false時,對於RabbitMQ伺服器端而言,佇列中的訊息分成了兩部分:一部分是等待投遞給消費者的訊息(web管理介面上的Ready狀態);一部分是已經投遞給消費者,但是還沒有收到消費者ack訊號的訊息(web管理介面上的Unacked狀態)。如果伺服器端一直沒有收到消費者的ack訊號,並且消費此訊息的消費者已經斷開連線,則伺服器端會安排該訊息重新進入佇列,等待投遞給下一個消費者(也可能還是原來的那個消費者)。
(1)重要方法
/**
*1. 開始一個非區域性、非排他性消費, with a server-generated consumerTag.
* 注意:執行這個方法會回撥handleConsumeOk方法,在此方法中處理訊息。
* @param queue 佇列名稱
* @param autoAck 是否自動應答。false表示consumer在成功消費過後必須要手動回覆一下伺服器,如果不回覆,伺服器就將認為此條訊息消費失敗,繼續分發給其他consumer。
* @param callback 回撥方法類
* @return 由伺服器生成的consumertag
* @throws java.io.IOException if an error is encountered
*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
/**
*2
consumer處理成功後,通知broker刪除佇列中的訊息,如果設定multiple=true,表示支援批量確認機制以減少網路流量。
例如:有值為5,6,7,8 deliveryTag的投遞
如果此時channel.basicAck(8, true);則表示前面未確認的5,6,7投遞也一起確認處理完畢。
如果此時channel.basicAck(8, false);則僅表示deliveryTag=8的訊息已經成功處理。
*/
void basicAck(long deliveryTag, boolean multiple) throws IOException;
/**3
consumer處理失敗後,例如:有值為5,6,7,8 deliveryTag的投遞。
如果channel.basicNack(8, true, true);表示deliveryTag=8之前未確認的訊息都處理失敗且將這些訊息重新放回佇列中。
如果channel.basicNack(8, true, false);表示deliveryTag=8之前未確認的訊息都處理失敗且將這些訊息直接丟棄。
如果channel.basicNack(8, false, true);表示deliveryTag=8的訊息處理失敗且將該訊息重新放回佇列。
如果channel.basicNack(8, false, false);表示deliveryTag=8的訊息處理失敗且將該訊息直接丟棄。
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
/**4
相比channel.basicNack,除了沒有multiple批量確認機制之外,其他語義完全一樣。
如果channel.basicReject(8, true);表示deliveryTag=8的訊息處理失敗且將該訊息重新放回佇列。
如果channel.basicReject(8, false);表示deliveryTag=8的訊息處理失敗且將該訊息直接丟棄。
*/
void basicReject(long deliveryTag, boolean requeue) throws IOException;
(2)部分使用程式碼如下:
//this表示自己的Consumer
channel.basicConsume(queueName, false, this);
...
@Override
public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] body) throws IOException {
if (body == null)
return;
Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(body);
/**
* 專門處理奇數訊息的消費者
*/
int tagId = (Integer) map.get("tagId");
if (tagId % 2 != 0) {
//處理訊息
System.out.println("接收並處理訊息:"+tagId);
//通知伺服器此訊息已經被處理了
channel.basicAck(envelope.getDeliveryTag(), false);
}else{
//通知伺服器訊息處理失敗,重新放回佇列。false表示處理失敗訊息不放會佇列,直接刪除
channel.basicReject(envelope.getDeliveryTag(), true);
}
}
5、Demo專案整體程式碼
此demo就是向RabbitMQ伺服器上面傳送20個訊息,訊息體是map,裡面裝的是tagId=數字。然後註冊了兩個消費者,分別處理奇數和偶數。
5.1、連線工具類
/**
* 連線工具類
*/
public class ConnectionUtil {
Channel channel;
Connection connection;
String queueName;
public ConnectionUtil(String queueName) throws IOException {
this.queueName = queueName;
// 建立連線工廠
ConnectionFactory cf = new ConnectionFactory();
// 設定rabbitmq伺服器IP地址
cf.setHost("*.16.0.*");
// 設定rabbitmq伺服器使用者名稱
cf.setUsername("*");
// 設定rabbitmq伺服器密碼
cf.setPassword("*");
cf.setPort(AMQP.PROTOCOL.PORT);
// 獲取一個新的連線
connection = cf.newConnection();
// 建立一個通道
channel = connection.createChannel();
/**
*申明一個佇列,如果這個佇列不存在,將會被建立
* @param queue 佇列名稱
* @param durable 永續性:true佇列會再重啟過後存在,但是其中的訊息不會存在。
* @param exclusive 是否只能由建立者使用
* @param autoDelete 是否自動刪除(沒有連線自動刪除)
* @param arguments 佇列的其他屬性(構造引數)
* @return 宣告佇列的聲明確認方法已成功宣告。
* @throws java.io.IOException if an error is encountered
*/
channel.queueDeclare(queueName, true, false, false, null);
}
public void close() throws IOException{
channel.close();
connection.close();
}
}
5.2、具體生產者
/**
* 訊息生產者
*/
public class MessageProducer {
private ConnectionUtil connectionUtil;
public MessageProducer(ConnectionUtil connectionUtil){
this.connectionUtil=connectionUtil;
}
/**
* 傳送訊息到佇列中
*/
public void sendMessage(Serializable object) throws IOException{
/**
* Publish a message
* @param exchange 訊息交換機名稱,空字串將使用直接交換器模式,傳送到預設的Exchange=amq.direct
* @param routingKey 路由關鍵字
* @param mandatory 監聽是否有符合的佇列
* @param BasicProperties 設定訊息持久化:MessageProperties.PERSISTENT_TEXT_PLAIN是持久化;MessageProperties.TEXT_PLAIN是非持久化
* @param body 訊息物件
* @throws java.io.IOException if an error is encountered
*/
connectionUtil.channel.basicPublish("", connectionUtil.queueName, true, MessageProperties.TEXT_PLAIN, SerializationUtils.serialize(object));
System.out.println("MessageProducer傳送了一條訊息:"+object);
}
}
5.3、公共消費者父類
/**
* 訊息消費者基礎類
*/
public class MessageConsumer implements Consumer {
//消費者標籤,註冊成功時由rabbitmq伺服器自動生成
protected String consumerTag;
protected ConnectionUtil connectionUtil;
public MessageConsumer(ConnectionUtil connectionUtil){
this.connectionUtil=connectionUtil;
}
public void basicConsume(){
try {
/**
* 設定消費投遞數目,一次性投遞10條訊息。當消費者未確認訊息達到10條時,rabbitMQ將不會向此消費者投遞訊息,直到未確認數小於10條再投遞
* @param prefetchCount 投遞數目
* @param global 是否針對整個Channel。true表示此投遞數是給Channel設定的,false是給Channel上的Consumer設定的。
* @throws java.io.IOException if an error is encountered
*/
connectionUtil.channel.basicQos(10,false);//表示每個消費者最多10條
connectionUtil.channel.basicQos(15,true);//整個傳輸管道最多15條,具體分到每個消費者身上又不能大於10條
/**
* 開始一個非區域性、非排他性消費, with a server-generated consumerTag.
* 執行這個方法會回撥handleConsumeOk方法
* @param queue 佇列名稱
* @param autoAck 是否自動應答。false表示consumer在成功消費過後必須要手動回覆一下伺服器,如果不回覆,伺服器就將認為此條訊息消費失敗,繼續分發給其他consumer。
* @param callback 回撥方法類
* @return 由伺服器生成的consumertag
* @throws java.io.IOException if an error is encountered
*/
connectionUtil.channel.basicConsume(connectionUtil.queueName, false, this);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 收到訊息時的回撥函式
*/
public void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException {
//子類重寫覆蓋具體操作
}
/**
* 消費者註冊成功回撥函式
*/
public void handleConsumeOk(String consumerTag) {
this.consumerTag=consumerTag;
System.out.println("消費者:"+consumerTag+",註冊成功!");
}
/**
* 手動取消消費者註冊成功回撥函式
* 當呼叫Channel類的void basicCancel(String consumerTag) throws IOException;方法觸發此回撥函式
*/
public void handleCancelOk(String consumerTag) {
System.out.println(consumerTag+" 手動取消消費者註冊成功!");
}
/**
* 當消費者因為其他原因被動取消註冊時呼叫,比如queue被刪除了。
*/
public void handleCancel(String consumerTag) throws IOException {
System.out.println("因為外部原因消費者:"+consumerTag+" 取消註冊!");
}
/**
* 當通道或基礎連線被關閉時呼叫
*/
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
System.out.println("通道或基礎連線被關閉");
}
/**
* Called when a <code><b>basic.recover-ok</b></code> is received
* in reply to a <code><b>basic.recover</b></code>. All messages
* received before this is invoked that haven't been <i>ack</i>'ed will be
* re-delivered. All messages received afterwards won't be.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
*/
public void handleRecoverOk(String consumerTag) {
}
}
5.4、具體的消費者
/**
* 專門處理偶數訊息的消費者
*/
public class EvenConsumer extends MessageConsumer {
public EvenConsumer(ConnectionUtil connectionUtil) {
super(connectionUtil);
}
@Override
public void handleConsumeOk(String consumerTag) {
this.consumerTag=consumerTag;
System.out.println("EvenConsumer消費者:"+consumerTag+",註冊成功!");
}
@Override
public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] body) throws IOException {
if (body == null)
return;
Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(body);
int tagId = (Integer) map.get("tagId");
if (tagId % 2 == 0) {
//處理訊息
System.out.println("EvenConsumer接收並處理訊息:"+tagId);
//通知伺服器此訊息已經被處理了
connectionUtil.channel.basicAck(envelope.getDeliveryTag(), false);
}else{
//通知伺服器訊息處理失敗,重新放回佇列。false表示處理失敗訊息不放會佇列,直接刪除
connectionUtil.channel.basicReject(envelope.getDeliveryTag(), true);
}
}
}
/**
* 專門處理奇數訊息的消費者
*/
public class OddConsumer extends MessageConsumer {
public OddConsumer(ConnectionUtil connectionUtil) {
super(connectionUtil);
}
@Override
public void handleConsumeOk(String consumerTag) {
this.consumerTag=consumerTag;
System.out.println("OddConsumer消費者:"+consumerTag+",註冊成功!");
}
@Override
public void handleDelivery(String arg0, Envelope envelope, BasicProperties arg2, byte[] body) throws IOException {
if (body == null)
return;
Map<String, Object> map = (Map<String, Object>) SerializationUtils.deserialize(body);
int tagId = (Integer) map.get("tagId");
if (tagId % 2 != 0) {
//處理訊息
System.out.println("OddConsumer接收並處理訊息:"+tagId);
//通知伺服器此訊息已經被處理了
connectionUtil.channel.basicAck(envelope.getDeliveryTag(), false);
}else{
//通知伺服器訊息處理失敗,重新放回佇列。false表示處理失敗訊息不放會佇列,直接刪除
connectionUtil.channel.basicReject(envelope.getDeliveryTag(), true);
}
}
}
5.5、監聽器
/**
*producer傳送確認事件。
*/
public class MyConfirmListener implements ConfirmListener{
/**
* 生產者傳送訊息到exchange成功的回撥方法
* 訊息被Exchange接受以後,如果沒有匹配的Queue,則會被丟棄。但是可以設定ReturnListener監聽來監聽有沒有匹配的佇列。
* 因此handleAck執行了,並不能完全表示訊息已經進入了對應的佇列,只能表示對應的exchange成功的接收了訊息。
* 訊息被exchange接收過後,還需要通過一定的匹配規則分發到對應的佇列queue中。
*/
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//注意:deliveryTag是broker給訊息指定的唯一id(從1開始)
System.out.println("Exchange接收訊息:"+deliveryTag+"(deliveryTag)成功!multiple="+multiple);
}
/**
* 生產者傳送訊息到伺服器broker失敗的回撥方法,伺服器丟失了此訊息。
* 注意,丟失的訊息仍然可以傳遞給消費者,但broker不能保證這一點。
*/
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Exchange接收訊息:"+deliveryTag+"(deliveryTag)失敗!伺服器broker丟失了訊息");
}
}
/**
* 實現此介面以通知交付basicpublish失敗時,“mandatory”或“immediate”的標誌監聽(原始碼註釋翻譯)。
* 在釋出訊息時設定mandatory等於true,監聽訊息是否有相匹配的佇列,
* 沒有時ReturnListener將執行handleReturn方法,訊息將返給傳送者 。
* 由於3.0版本過後取消了支援immediate,此處不做過多的解釋。
*/
public class MyReturnListener implements ReturnListener {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
BasicProperties properties, byte[] body) throws IOException {
System.out.println("訊息傳送到佇列失敗:回覆失敗編碼:"+replyCode+";回覆失敗文字:"+replyText+";失敗訊息物件:"+SerializationUtils.deserialize(body));
}
}
5.6、客戶端
public class Client {
public static void main(String[] args) {
new Client();
}
public Client(){
try {
//發訊息
publishMessage();
//註冊消費者
addConsumer();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void publishMessage() throws IOException, InterruptedException{
ConnectionUtil connectionUtil=new ConnectionUtil("testqueue");
MessageProducer producer=new MessageProducer(connectionUtil);
connectionUtil.channel.confirmSelect();
//注意:返回的時候Return在前,Confirm在後
connectionUtil.channel.addConfirmListener(new MyConfirmListener());
connectionUtil.channel.addReturnListener(new MyReturnListener());
int i=1;
while (i<=10) {
HashMap<String, Object> map=new HashMap<String, Object>();
map.put("tagId", i);
producer.sendMessage(map);
i++;
}
}
public void addConsumer() throws IOException{
ConnectionUtil connectionUtil=new ConnectionUtil("testqueue");
OddConsumer odd=new OddConsumer(connectionUtil);
odd.basicConsume();
EvenConsumer even=new EvenConsumer(connectionUtil);
even.basicConsume();
}
}
5.7、測試結果
MessageProducer傳送了一條訊息:{tagId=1}
MessageProducer傳送了一條訊息:{tagId=2}
MessageProducer傳送了一條訊息:{tagId=3}
Exchange接收訊息:1(deliveryTag)成功!multiple=false
Exchange接收訊息:2(deliveryTag)成功!multiple=false
MessageProducer傳送了一條訊息:{tagId=4}
Exchange接收訊息:3(deliveryTag)成功!multiple=false
MessageProducer傳送了一條訊息:{tagId=5}
Exchange接收訊息:4(deliveryTag)成功!multiple=false
MessageProducer傳送了一條訊息:{tagId=6}
Exchange接收訊息:5(deliveryTag)成功!multiple=false
MessageProducer傳送了一條訊息:{tagId=7}
Exchange接收訊息:6(deliveryTag)成功!multiple=false
MessageProducer傳送了一條訊息:{tagId=8}
Exchange接收訊息:7(deliveryTag)成功!multiple=false
Exchange接收訊息:8(deliveryTag)成功!multiple=false
MessageProducer傳送了一條訊息:{tagId=9}
Exchange接收訊息:9(deliveryTag)成功!multiple=false
MessageProducer傳送了一條訊息:{tagId=10}
Exchange接收訊息:10(deliveryTag)成功!multiple=false
OddConsumer消費者:amq.ctag-z8s8LaSgYvo02jktCZrCYA,註冊成功!
OddConsumer接收並處理訊息:1
OddConsumer接收並處理訊息:3
OddConsumer接收並處理訊息:5
OddConsumer接收並處理訊息:7
OddConsumer接收並處理訊息:9
EvenConsumer消費者:amq.ctag-LpN6Q5VvNY3wCof2lXqS4A,註冊成功!
EvenConsumer接收並處理訊息:4
EvenConsumer接收並處理訊息:8
EvenConsumer接收並處理訊息:2
EvenConsumer接收並處理訊息:10
EvenConsumer接收並處理訊息:6
6、Demo完整原始碼下載地址
【四川樂山程式設計師聯盟,歡迎大家加群相互交流學習5 7 1 8 1 4 7 4 3】