rabbitmq3-剖析原始碼講解簡單佇列
一、簡單佇列的模型:
對文中有些術語不懂的,或者簡稱的不懂的,建議讀讀前面的系列的文章,因為有很多的概念在裡面不可能每一篇博文都去講這個,這樣部落格也就會太臃腫,這一系列的博文是博主在整體的學習了一遍之後自我係統化總結的,希望能幫助初學者有一個系統化的學習依據。
很多時候我並沒有想到這個模式的應用場景,的確這個模式真的是有點太簡單了,但是最近通過別人所提出的問題的時候我發現在某些場景下這個模式也是必不可缺的。
首先rabbitmq不提供順序訊息,但是通過某些方式我們可以做到,例如簡單佇列,在不設定優先佇列和延遲佇列的情況是有可能實現的(注意是有可能,因為這個條件是比較嚴苛的,需要單執行緒的P和單執行緒的C)
總之這篇博文中我們先開個頭講講各個方法的引數以及他們的意義。
二、具體的程式碼的實現:
1、先建立連線工具類
建立一個連線的工具類,在這裡大家可以回憶一下jdbc的連線的過程,以及jdbc的連線池,大體的思想是一致的不過rabbitmq的客戶端採用的機制不一樣,這裡有一篇博文分析的非常的不錯提供給大家。RabbitMQ客戶連線池的實現。後面博文的程式碼都會基於這個來寫。
public class ConnUtils {
public static Connection getConn() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
// 設定rabbitmq的伺服器地址
factory.setHost("192.168.0.200");
// 設定rabbitmq的使用者名稱和密碼,預設都是guest,但是在前面的[rabbitmq1-概述及其使用docker安裝](https://blog.csdn.net/weixin_42849915/article/details/81977968)中有截圖講如何設定這個東西,不設定也是可以的,但是還是建議大家設並設定許可權
factory.setUsername("fkxuexi");
factory.setPassword("fkxuexi" );
factory.setVirtualHost("spring_cloud");
factory.setPort(AMQP.PROTOCOL.PORT);
Connection connection = factory.newConnection();
return connection;
}
}
其中大家可以關注一下ConnectionFactory ,從下面的程式碼看ConnectionFactory或許是維護了一個Channel的連線池,這裡博主沒有深究過,因為大體的原理都是差不多的,無非就維護一個列表然後通過加鎖和執行緒通訊來實現,如果大家有興趣可以去深究一下jdbc的連線池和這個的區別。
public static final int DEFAULT_CHANNEL_MAX = 0;
private int requestedChannelMax = DEFAULT_CHANNEL_MAX; 預設的channel的連線數,預設的為0在以後的配置中注意下
2、建立P:
這個裡面有大量對應引數的註釋,以及對應生產端的應答以保證可靠性的操作,同時也有部分的原始碼的註釋分析
/**
* rabbitmq 是3.7.7的版本
*/
public class Producer {
/**
* 在這裡一步小心看到一到面試題,routing_key 和 binding_key 的最大長度是多少?
* 同樣也是255
*/
// 設定queue的名稱,注意這個名字的長度是有限制的 if(queue.length() > 255) 是要小於255的
public static final String QUEUE_NAME = "simple_worker";
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取一個連線
Connection conn = ConnUtils.getConn();
// 建立一個channel,和exchange進行打交道的一直是這個貨
Channel channel = conn.createChannel();
// 開啟持久化
boolean durable = true ;
// 關閉排他,如果這個地方為true的話將會開啟一個獨佔佇列,只對首次申明他的連線可以,且在連線斷開時自動刪除
boolean exclusive = false;
/**
* true if we are declaring an autodelete queue (server will delete it when no longer in use),
* 當這個佇列長時間沒有使用的話將會被刪除
*/
boolean autoDelete = false;
/**
* 最後一個引數是 Map<String, Object> arguments 這個引數我們在這裡先不做說明,後面說道rabbitmq的高階特性的時候會說道,
* 可以提前預告一下,可以設定哪些東西:①:優先順序佇列;②:延遲佇列……
*
* 這裡有一個疑問:我們每次重複執行這條語句,會不會把佇列給覆蓋了呀,這裡是不會的我們看一下相關程式碼
* talk is cheap,show me the code -- linus Torvalds
* void recordQueue(AMQP.Queue.DeclareOk ok, RecordedQueue q) {
* this.recordedQueues.put(ok.getQueue(), q);
* }
* private final Map<String, RecordedQueue> recordedQueues = Collections.synchronizedMap(new LinkedHashMap<String, RecordedQueue>());
* 這是一個加了鎖的map,現在不用擔心重複的申明隊列了吧
*/
channel.queueDeclare(QUEUE_NAME,durable,exclusive,autoDelete,null);
/**
* rabbitmq的可靠性的一個實現方式,訊息傳送到達broker的可靠性的實現
* 生產者的應答 這裡我們不講事務模式,開啟事務模式的話,效能將會降低250倍
* http://www.rabbitmq.com/confirms.html#publisher-confirms-ordering
*In most cases, RabbitMQ will acknowledge messages to publishers in the same order they were published
* (this applies for messages published on a single channel). However, publisher acknowledgements
* are emitted asynchronously and can confirm a single message or a group of messages. The exact moment
* when a confirm is emitted depends on the delivery mode of a message (persistent vs. transient) and
* the properties of the queue(s) the message was routed to (see above). Which is to say that different
* messages can be considered ready for acknowledgement at different times. This means that acknowledgements
* can arrive in a different order compared to their respective messages. Applications should not depend
* on the order of acknowledgements when possible.
* 上面的一段話的意思是:大多數情況下,mq將以與釋出時間相同的順序進行確認,但是這適用於單頻道上釋出的訊息,但是
* 不同的訊息可以在不同的時刻進行確認,所以應用程式不應該儘可能的依賴預確認的順序
*/
channel.confirmSelect();// 將channel 置為confirm 模式
/**
* When in confirm mode, returns the sequence number of the next message to be published.
* 當時confirm模式的時候,我們可以拿到傳送的訊息的序列號
* 其實這個就是deliverTag
* if (nextPublishSeqNo > 0) {
* // 在這裡維護了一個未應答的set,維護未應答訊息的狀態以及生成SeqNo(deliveryTay)
* unconfirmedSet.add(getNextPublishSeqNo());
* nextPublishSeqNo++;
* }
* 上面也說過了,非同步的confirm,響應的順序並不一定是嚴格的按照訊息的投遞的順序的,同時如果訊息長時間沒有響應,
* 也可能是訊息沒有投遞到,這個時候我們就可以在記憶體中維護一份訊息id的狀態表,當然這個表肯定不會太大,太大則意味著這要麼你的
* 系統的mq的吞吐量不行,要麼網路延遲大,系統都這樣了,多了也就不提了。
*
* 當然這樣會造成訊息的重複消費,在我的前一篇的部落格中關於可靠性的分析當中,我提到了就算這邊沒有訊息的重複投遞,在C
* 端依舊是有可能造成訊息的重複的,因為存在機率C在消費了訊息之後,傳送ack的工程中網路中斷,那麼這個訊息將會被重入佇列
* 在requeue為true的情況下
*/
long nextPublishSeqNo = channel.getNextPublishSeqNo();
System.out.println("我倒要看看你和deliverTay到底是不是同一個傢伙,nextPublishSeqNo = "+nextPublishSeqNo);
String msg = "我就一個測試訊息,你想咋地";
// 這個地方我們還沒有使用到路由,下面我們會一一說明五種模式中的幾種路由的使用的方法
String exchange = "";
// routing key 我們直接指定為佇列的名字
/**順便提一提,這rabbitmq java客戶端沒有註釋這是真蛋疼
* {@link com.rabbitmq.client.AMQP.BasicProperties} 具體可以檢視這個裡面的設定
*/
channel.basicPublish(exchange,QUEUE_NAME,null,msg.getBytes());
channel.addConfirmListener(new ConfirmListener() {
/**
* 第一個引數是在同一個channel中是唯一的,防止重複投遞
* @param deliveryTag
* @param multiple 如果為true則表名小於當前的deliverTag的都被確認
* @throws IOException
*/
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("訊息已經送達到broker deliverTay:"+ deliveryTag );
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("這個在broken異常,或者無法投遞訊息時出現 deliverTay:"+ deliveryTag );
}
});
// 這裡我們休眠5秒鐘否則當channel和connection關閉後,無法接受到應答的訊息
Thread.sleep(5000);
channel.close();
conn.close();
}
2、建立C:
這個裡面也有對於消費者端的應答以保證訊息的可靠性,同時對於訊息的重複消費問題的解答
/**
* 注意這個地方不能如果叫Consumer的話,會和com.rabbitmq.client.Consumer衝突,所以這個地方我們叫consume
*/
public class Consume {
/**
* 在這裡一步小心看到一到面試題,routing_key 和 binding_key 的最大長度是多少?
* 同樣也是255
*/
// 設定queue的名稱,注意這個名字的長度是有限制的 if(queue.length() > 255) 是要小於255的
public static final String QUEUE_NAME = "simple_worker";
public static void main(String[] args) throws IOException, TimeoutException {
// 這個裡面我就不寫那麼多的註釋了
Connection conn = ConnUtils.getConn();
final Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
/**
* Called when a <code><b>basic.deliver</b></code> is received for this consumer.
* 即處理投遞過來的訊息的
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 這個是和我們在productor中獲取的是一樣的
long deliveryTag = envelope.getDeliveryTag();
/**
* TODO 假裝這個地方是我們的邏輯處理 ,
*
* doSomething();
*/
String msg = new String(body,"utf-8");
System.out.println("我是訊息,這個地方我們獲取到了訊息並不代表,我們進行了應答,訊息為:"+msg);
/**
* 這裡我們分別測試 成功應答和拒絕應答,我們通過rabbitmq的web控制檯來檢視
*
* 注意:rabbitmq的消費端的可靠性的保證,1、當consumer掛掉了(channel斷掉)則將訊息重回佇列並投遞給其他的消費者
* 但是這裡面是有可能造成重複的消費的,
* 假若我們考慮這種場景:當consumer1在處理完邏輯之後,傳送應答由於網路中斷,這個應答並沒有到達broker那麼channel
* 斷開,所以這個時候訊息會重回佇列,會被投遞給其他的消費者進行消費,所以這個時候的機制我們仍然可以維護一份訊息id的
* 狀態表,同理這個表依舊不可能太大
*/
/**
* multiple 如果為true的話,則代表只要deliverTag小於當前的一律都被確認,deliveryTay是在同一個channel是主鍵遞增的
* 如果為false的話,那麼則代表只確定當前的
*/
boolean multiple = false;
// 1、成功應答
channel.basicAck(deliveryTag,false);
}
};
/**
* 這個地方我們關閉自動應答,自動應答模式:如果訊息投遞到了,不管你consumer 處理是否完成,則broker任務訊息已經被消費了,然後
* 就會刪除訊息,所以這裡我們開啟手動應答,這又這樣我們在handleDelivery中的應答才能生效
*/
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
三、指標觀察:
1、執行生產者是broker的狀態
我們可以看到,nextPublishSeqNo和deliveryTay是同一個東西
Features 中的D:則代表durable持久化的意思,Ready則是代表訊息等待表消費,Unacked則表明有多少條訊息是已經被投遞,但是沒有應答的,後面我們在應答哪裡做休眠來測試。
2、執行消費者進行應答:
為了過程更直觀,在上面的程式中加入下列語句:
// 1、成功應答
try {
Thread.sleep(60000);//休眠一分鐘,來觀察控制檯的指標有什麼不同
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(deliveryTag,false);
從上圖可以看到訊息已經被消費。其中經歷了從 Ready – Unacked 到最中的被消費然後broker完全的刪除掉
3、下面檢視消費者進行否定應答
依舊需要修改下列的程式碼
// 1、成功應答
try {
Thread.sleep(60000);//休眠一分鐘,來觀察控制檯的指標有什麼不同
} catch (InterruptedException e) {
e.printStackTrace();
}
// 這個引數可以使得,被拒絕的訊息重回佇列
boolean requeue = true;
channel.basicNack(deliveryTag,false,requeue);
這裡不管你等多久,一直會保持這個狀態,原因是隻有一個消費者,那麼當訊息被拒絕後立即又會被投遞給這個消費者,一直會持久Unacked的狀態,從下面的列印情況可以佐證我們上面的說法。
正常的狀態下應該回歸到Ready的狀態的,現在我們關閉消費者,檢視狀態,當我們關閉消費者後,這個訊息就重回隊列了。當然我們也可以設定requeue為false進行測試,這個就有大家自行測試了。
ps:如果控制檯有些不太會用的話,可以檢視一下我的第一篇博文rabbitmq1-概述及其使用docker安裝裡面有一點點的小介紹,後面會專門寫一關於控制檯的使用的博文。