rabbitmq常見面試題
1、使用RabbitMQ有什麼好處?
1.解耦,系統A在程式碼中直接呼叫系統B和系統C的程式碼,如果將來D系統接入,系統A還需要修改程式碼,過於麻煩!
2.非同步,將訊息寫入訊息佇列,非必要的業務邏輯以非同步的方式執行,加快響應速度
3.削峰,併發量大的時候,所有的請求直接懟到資料庫,造成資料庫連線異常
2、RabbitMQ 中的 broker 是指什麼?cluster 又是指什麼?
broker 是指一個或多個 erlang node 的邏輯分組,且 node 上執行著 RabbitMQ 應用程式。cluster 是在 broker 的基礎之上,增加了 node 之間共享元資料的約束。
3、RabbitMQ 概念裡的 channel、exchange 和 queue 是邏輯概念,還是對應著程序實體?分別起什麼作用?
queue 具有自己的 erlang 程序;exchange 內部實現為儲存 binding 關係的查詢表;channel 是實際進行路由工作的實體,即負責按照 routing_key 將 message 投遞給 queue 。由 AMQP 協議描述可知,channel 是真實 TCP 連線之上的虛擬連線,所有 AMQP 命令都是通過 channel 傳送的,且每一個 channel 有唯一的 ID。一個 channel 只能被單獨一個作業系統執行緒使用,故投遞到特定 channel 上的 message 是有順序的。但一個作業系統執行緒上允許使用多個 channel 。
4、vhost 是什麼?起什麼作用?
vhost 可以理解為虛擬 broker ,即 mini-RabbitMQ server。其內部均含有獨立的 queue、exchange 和 binding 等,但最最重要的是,其擁有獨立的許可權系統,可以做到 vhost 範圍的使用者控制。當然,從 RabbitMQ 的全域性角度,vhost 可以作為不同許可權隔離的手段(一個典型的例子就是不同的應用可以跑在不同的 vhost 中)。
5、訊息基於什麼傳輸?
由於TCP連線的建立和銷燬開銷較大,且併發數受系統資源限制,會造成效能瓶頸。RabbitMQ使用通道的方式來傳輸資料。通道是建立在真實的TCP連線內的虛擬連線,且每條TCP連線上的通道數量沒有限制。
6、訊息如何分發?
若該佇列至少有一個消費者訂閱,訊息將以迴圈(round-robin)的方式傳送給消費者。每條訊息只會分發給一個訂閱的消費者(前提是消費者能夠正常處理訊息並進行確認)。
7、訊息怎麼路由?
從概念上來說,訊息路由必須有三部分:交換器、路由、繫結。生產者把訊息釋出到交換器上;繫結決定了訊息如何從路由器路由到特定的佇列;訊息最終到達佇列,並被消費者接收。
訊息釋出到交換器時,訊息將擁有一個路由鍵(routing key),在訊息建立時設定。
通過佇列路由鍵,可以把佇列繫結到交換器上。
訊息到達交換器後,RabbitMQ會將訊息的路由鍵與佇列的路由鍵進行匹配(針對不同的交換器有不同的路由規則)。如果能夠匹配到佇列,則訊息會投遞到相應佇列中;如果不能匹配到任何佇列,訊息將進入 “黑洞”。
常用的交換器主要分為一下三種:
direct:如果路由鍵完全匹配,訊息就被投遞到相應的佇列
fanout:如果交換器收到訊息,將會廣播到所有繫結的佇列上
topic:可以使來自不同源頭的訊息能夠到達同一個佇列。 使用topic交換器時,可以使用萬用字元,比如:“*” 匹配特定位置的任意文字, “.” 把路由鍵分為了幾部分,“#” 匹配所有規則等。特別注意:發往topic交換器的訊息不能隨意的設定選擇鍵(routing_key),必須是由"."隔開的一系列的識別符號組成。
8、什麼是元資料?元資料分為哪些型別?包括哪些內容?與 cluster 相關的元資料有哪些?元資料是如何儲存的?元資料在 cluster 中是如何分佈的?
在非 cluster 模式下,元資料主要分為 Queue 元資料(queue 名字和屬性等)、Exchange 元資料(exchange 名字、型別和屬性等)、Binding 元資料(存放路由關係的查詢表)、Vhost 元資料(vhost 範圍內針對前三者的名字空間約束和安全屬性設定)。在 cluster 模式下,還包括 cluster 中 node 位置資訊和 node 關係資訊。元資料按照 erlang node 的型別確定是僅保存於 RAM 中,還是同時儲存在 RAM 和 disk 上。元資料在 cluster 中是全 node 分佈的。
下圖所示為 queue 的元資料在單 node 和 cluster 兩種模式下的分佈圖。
9、在單 node 系統和多 node 構成的 cluster 系統中宣告 queue、exchange ,以及進行 binding 會有什麼不同?
答:當你在單 node 上宣告 queue 時,只要該 node 上相關元資料進行了變更,你就會得到 Queue.Declare-ok 迴應;而在 cluster 上宣告 queue ,則要求 cluster 上的全部 node 都要進行元資料成功更新,才會得到 Queue.Declare-ok 迴應。另外,若 node 型別為 RAM node 則變更的資料僅儲存在記憶體中,若型別為 disk node 則還要變更儲存在磁碟上的資料。
死信佇列&死信交換器:DLX 全稱(Dead-Letter-Exchange),稱之為死信交換器,當訊息變成一個死信之後,如果這個訊息所在的佇列存在x-dead-letter-exchange引數,那麼它會被髮送到x-dead-letter-exchange對應值的交換器上,這個交換器就稱之為死信交換器,與這個死信交換器繫結的佇列就是死信佇列。
10、如何確保訊息正確地傳送至RabbitMQ?
RabbitMQ使用傳送方確認模式,確保訊息正確地傳送到RabbitMQ。傳送方確認模式:將通道設定成confirm模式(傳送方確認模式),則所有在通道上釋出的訊息都會被指派一個唯一的ID。一旦訊息被投遞到目的佇列後,或者訊息被寫入磁碟後(可持久化的訊息),通道會發送一個確認給生產者(包含訊息唯一ID)。如果RabbitMQ發生內部錯誤從而導致訊息丟失,會發送一條nack(not acknowledged,未確認)訊息。傳送方確認模式是非同步的,生產者應用程式在等待確認的同時,可以繼續傳送訊息。當確認訊息到達生產者應用程式,生產者應用程式的回撥方法就會被觸發來處理確認訊息。
11、如何確保訊息接收方消費了訊息?
接收方訊息確認機制:消費者接收每一條訊息後都必須進行確認(訊息接收和訊息確認是兩個不同操作)。只有消費者確認了訊息,RabbitMQ才能安全地把訊息從佇列中刪除。這裡並沒有用到超時機制,RabbitMQ僅通過Consumer的連線中斷來確認是否需要重新發送訊息。也就是說,只要連線不中斷,RabbitMQ給了Consumer足夠長的時間來處理訊息。
下面羅列幾種特殊情況:
如果消費者接收到訊息,在確認之前斷開了連線或取消訂閱,RabbitMQ會認為訊息沒有被分發,然後重新分發給下一個訂閱的消費者。(可能存在訊息重複消費的隱患,需要根據bizId去重)
如果消費者接收到訊息卻沒有確認訊息,連線也未斷開,則RabbitMQ認為該消費者繁忙,將不會給該消費者分發更多的訊息。
12、如何避免訊息重複投遞或重複消費?
在訊息生產時,MQ內部針對每條生產者傳送的訊息生成一個inner-msg-id,作為去重和冪等的依據(訊息投遞失敗並重傳),避免重複的訊息進入佇列;在訊息消費時,要求訊息體中必須要有一個bizId(對於同一業務全域性唯一,如支付ID、訂單ID、帖子ID等)作為去重和冪等的依據,避免同一條訊息被重複消費。
這個問題針對業務場景來答分以下幾點:
1.比如,你拿到這個訊息做資料庫的insert操作。那就容易了,給這個訊息做一個唯一主鍵,那麼就算出現重複消費的情況,就會導致主鍵衝突,避免資料庫出現髒資料。
2.再比如,你拿到這個訊息做redis的set的操作,那就容易了,不用解決,因為你無論set幾次結果都是一樣的,set操作本來就算冪等操作。
3.如果上面兩種情況還不行,上大招。準備一個第三方介質,來做消費記錄。以redis為例,給訊息分配一個全域性id,只要消費過該訊息,將<id,message>以K-V形式寫入redis。那消費者開始消費前,先去redis中查詢有沒消費記錄即可。
13、如何解決丟資料的問題?
1.生產者丟資料
生產者的訊息沒有投遞到MQ中怎麼辦?從生產者弄丟資料這個角度來看,RabbitMQ提供transaction和confirm模式來確保生產者不丟訊息。
transaction機制就是說,傳送訊息前,開啟事物(channel.txSelect()),然後傳送訊息,如果傳送過程中出現什麼異常,事物就會回滾(channel.txRollback()),如果傳送成功則提交事物(channel.txCommit())。
然而缺點就是吞吐量下降了。因此,按照博主的經驗,生產上用confirm模式的居多。一旦channel進入confirm模式,所有在該通道上面釋出的訊息都將會被指派一個唯一的ID(從1開始),一旦訊息被投遞到所有匹配的佇列之後,rabbitMQ就會發送一個Ack給生產者(包含訊息的唯一ID),這就使得生產者知道訊息已經正確到達目的隊列了.如果rabiitMQ沒能處理該訊息,則會發送一個Nack訊息給你,你可以進行重試操作。
2.訊息佇列丟資料
處理訊息佇列丟資料的情況,一般是開啟持久化磁碟的配置。這個持久化配置可以和confirm機制配合使用,你可以在訊息持久化磁碟後,再給生產者傳送一個Ack訊號。這樣,如果訊息持久化磁碟之前,rabbitMQ陣亡了,那麼生產者收不到Ack訊號,生產者會自動重發。
那麼如何持久化呢,這裡順便說一下吧,其實也很容易,就下面兩步
①、將queue的持久化標識durable設定為true,則代表是一個持久的佇列
②、傳送訊息的時候將deliveryMode=2
這樣設定以後,rabbitMQ就算掛了,重啟後也能恢復資料。在訊息還沒有持久化到硬碟時,可能服務已經死掉,這種情況可以通過引入mirrored-queue即映象佇列,但也不能保證訊息百分百不丟失(整個叢集都掛掉)
3.消費者丟資料
啟用手動確認模式可以解決這個問題
①自動確認模式,消費者掛掉,待ack的訊息迴歸到佇列中。消費者丟擲異常,訊息會不斷的被重發,直到處理成功。不會丟失訊息,即便服務掛掉,沒有處理完成的訊息會重回佇列,但是異常會讓訊息不斷重試。
②手動確認模式,如果消費者來不及處理就死掉時,沒有響應ack時會重複傳送一條資訊給其他消費者;如果監聽程式處理異常了,且未對異常進行捕獲,會一直重複接收訊息,然後一直拋異常;如果對異常進行了捕獲,但是沒有在finally裡ack,也會一直重複傳送訊息(重試機制)。
③不確認模式,acknowledge="none" 不使用確認機制,只要訊息傳送完成會立即在佇列移除,無論客戶端異常還是斷開,只要傳送完就移除,不會重發。
14、死信佇列和延遲佇列的使用
死信訊息:
訊息被拒絕(Basic.Reject或Basic.Nack)並且設定 requeue 引數的值為 false
訊息過期了
佇列達到最大的長度
過期訊息:
在 rabbitmq 中存在2種方可設定訊息的過期時間,第一種通過對佇列進行設定,這種設定後,該佇列中所有的訊息都存在相同的過期時間,第二種通過對訊息本身進行設定,那麼每條訊息的過期時間都不一樣。如果同時使用這2種方法,那麼以過期時間小的那個數值為準。當訊息達到過期時間還沒有被消費,那麼那個訊息就成為了一個 死信 訊息。
佇列設定:在佇列申明的時候使用 x-message-ttl 引數,單位為 毫秒
單個訊息設定:是設定訊息屬性的 expiration 引數的值,單位為 毫秒
延時佇列:在rabbitmq中不存在延時佇列,但是我們可以通過設定訊息的過期時間和死信佇列來模擬出延時佇列。消費者監聽死信交換器繫結的佇列,而不要監聽訊息傳送的佇列。
有了以上的基礎知識,我們完成以下需求:
需求:使用者在系統中建立一個訂單,如果超過時間使用者沒有進行支付,那麼自動取消訂單。
分析:
1、上面這個情況,我們就適合使用延時佇列來實現,那麼延時佇列如何建立
2、延時佇列可以由 過期訊息+死信佇列 來時間
3、過期訊息通過佇列中設定 x-message-ttl 引數實現
4、死信佇列通過在佇列申明時,給佇列設定 x-dead-letter-exchange 引數,然後另外申明一個佇列繫結x-dead-letter-exchange對應的交換器。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 宣告一個接收被刪除的訊息的交換機和佇列
String EXCHANGE_DEAD_NAME = "exchange.dead";
String QUEUE_DEAD_NAME = "queue_dead";
channel.exchangeDeclare(EXCHANGE_DEAD_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, null);
channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "routingkey.dead");
String EXCHANGE_NAME = "exchange.fanout";
String QUEUE_NAME = "queue_name";
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Map<String, Object> arguments = new HashMap<String, Object>();
// 統一設定佇列中的所有訊息的過期時間
arguments.put("x-message-ttl", 30000);
// 設定超過多少毫秒沒有消費者來訪問佇列,就刪除佇列的時間
arguments.put("x-expires", 20000);
// 設定佇列的最新的N條訊息,如果超過N條,前面的訊息將從佇列中移除掉
arguments.put("x-max-length", 4);
// 設定佇列的內容的最大空間,超過該閾值就刪除之前的訊息
arguments.put("x-max-length-bytes", 1024);
// 將刪除的訊息推送到指定的交換機,一般x-dead-letter-exchange和x-dead-letter-routing-key需要同時設定
arguments.put("x-dead-letter-exchange", "exchange.dead");
// 將刪除的訊息推送到指定的交換機對應的路由鍵
arguments.put("x-dead-letter-routing-key", "routingkey.dead");
// 設定訊息的優先順序,優先順序大的優先被消費
arguments.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
String message = "Hello RabbitMQ: ";
for(int i = 1; i <= 5; i++) {
// expiration: 設定單條訊息的過期時間
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder()
.priority(i).expiration( i * 1000 + "");
channel.basicPublish(EXCHANGE_NAME, "", properties.build(), (message + i).getBytes("UTF-8"));
}
channel.close();
connection.close();
15、使用了訊息佇列會有什麼缺點?
1.系統可用性降低:你想啊,本來其他系統只要執行好好的,那你的系統就是正常的。現在你非要加個訊息佇列進去,那訊息佇列掛了,你的系統不是呵呵了。因此,系統可用性降低
2.系統複雜性增加:要多考慮很多方面的問題,比如一致性問題、如何保證訊息不被重複消費,如何保證保證訊息可靠傳輸。因此,需要考慮的東西更多,系統複雜性增大。
轉 https://www.cnblogs.com/qingfenglin/p/12027815.html