rabbitmq6-路由模式
阿新 • • 發佈:2018-11-16
一、前言:
大體的應用場景在前面都已經分析過,這裡面只是最基礎的rabbitmq的基礎的教程的使用方法,後面我們在實際使用的過程中是會結合springboot來開展的,所以這裡我們迅速的把基礎過萬,然後迅速進入實戰的部分。但是這裡建議還是把前面的相關的博文看一看,因為springboot封裝了rabbitmq的一些操作,但是你要知道封裝的內容是什麼,不要求完全精細的掌握底層,但是也要知道個大概,這樣有問題起碼你能有一個明確的思路,就是google或者百度你起碼能有一個關鍵詞來搜尋。
二、兩種模式:
圖上的兩種繫結模式都是合法的:
- 相同的routing key 繫結不同的佇列
- 不同的routing key 繫結同一個佇列
有人可能會說前面我話的圖routing key是在交換機和佇列之間binding key是在消費者和佇列之間,這裡申明一下這個是沒有問題的,因為按照Exchange的type來進行匹配的時候都是在佇列裡面進行匹配的,所以這在圖上是沒有問題的。
三、程式碼:
3.1、生產者:
- 這裡面我們演示圖二的情況,不同的routing key繫結同一個佇列。事實上進行這個實驗的完整做法是在傳送訊息的時候傳入routing key的,但是這裡為了簡單演示,所以直接傳送兩次訊息。
public class Producer {
public static final String EXCHANGE_NAME = "routing_model";
public static final String ROUTING_KEY1 = "routing_key1";
public static final String ROUTING_KEY2 = "routing_key2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection conn = ConnUtils.getConn();
Channel channel = conn.createChannel();
/**
* 注意下面我們持久化的是交換機,已經不是持久化的隊列了,交換機是不存放訊息的,只是一個訊息的搬運工
*/
String exchangeType = BuiltinExchangeType.DIRECT.getType();
boolean durable = true;
boolean autoDelete = false;
boolean internal = false;
Map<String,Object> arguments = null;
// 宣告一個交換機
channel.exchangeDeclare(EXCHANGE_NAME, exchangeType,durable,autoDelete,internal,arguments);
/**
* 繫結一個佇列,說明這個物件了對這個交換機上的資料感興趣,下面是繫結的規則:
* 1、同一個routing key 可以繫結多個佇列
* \---routing key1 ----Q1
* P --- X
* \---routing key2 ----Q2
* 2、不同的 routing key繫結一個佇列
* \----routing key1 -----Q1
* P ----- X
* \----routing kye2 ----------|
* \ Q2
* \----routing key3 ----------\
*
* 下面我們演示第二種,
*/
/**
* 下面是依據我的個人理解寫出來的,如果大家有不同的意見可以留言或者加群交流
* 在路由的模式下,生產者是不需要繫結佇列的,只是宣告交換機,同時交換機是無法存放訊息的,
* 假如我們直接宣告交換機,然後傳送訊息,但是消費者還有繫結宣告佇列,那麼這個訊息就有可能丟失,所以必須先建立交換機以及佇列
* 然後生產者傳送訊息,消費者接受
*
* 這個正常的做法應該是把 routing key作為方法的引數傳遞過來的,這裡面由於我們是做測試,就直接傳送兩個訊息以不同的routing可以
* 來發送
*/
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY1,null,"CEUIXCXI routing key1".getBytes());
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY2,null,"CEUIXCXI routing key2".getBytes());
}
}
希望大家能多看看我程式碼中的註釋資訊:
- 上面的持久化指的是交換機的持久化,現在我們接觸到的持久化有:訊息的持久化、交換機的持久化。
- 進行上面的實驗,我們要先建立交換機,所以在執行的時候一定要先執行以下上面的程式碼進行建立交換機,但是我們發現一個問題,我們執行上述的兩碼,由於我們沒有執行消費者的關係,我們發現在web控制檯我們檢視不到訊息。是的,我們檢視不到,大家記住交換機的作用,交換機只是訊息的搬運工,既不生產訊息亦不儲存訊息。
- 記住上面我們使用的模式是Exchanage的direct交換機模式
3.2、消費者:
public class Consume001 {
public static final String EXCHANGE_NAME = "routing_model";
public static final String QUEUE_NAME1 = "routing_queue1";
public static final String ROUTING_KEY1 = "routing_key1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection conn = ConnUtils.getConn();
final Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME1,true,false,false,null);
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,ROUTING_KEY1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body,"utf-8"));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE_NAME1,false,consumer);
}
}
public class Consume002 {
public static final String EXCHANGE_NAME = "routing_model";
public static final String QUEUE_NAME1 = "routing_queue1";
public static final String ROUTING_KEY1 = "routing_key2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection conn = ConnUtils.getConn();
final Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME1,true,false,false,null);
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,ROUTING_KEY1);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body,"utf-8"));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(QUEUE_NAME1,false,consumer);
}
}
可以看到我們使用不同的routing key綁定了同一個佇列
大概就是這樣了,如果大家有疑問歡迎留言和加群討論。