RabbitMQ實戰教程(五)-路由模式
1. 路由模式
跟訂閱模式類似,只不過在訂閱模式的基礎上加上路由,訂閱模式是分發到所有繫結到該交換機的佇列,路由模式只分發到繫結在該交換機上面指定的路由鍵佇列.
2. Direct 直接交換機
直連交換機(Direct Exchange
)是一種帶路由功能的交換機,它將訊息中的Routing Key
與該交換機關聯的所有Binding
中的Routing Key
進行比較,如果完全相等
將訊息傳送到Binding
對應的佇列中 .
適用場景 : 根據任務的優先順序把訊息傳送到對應的佇列中,分配更多資源處理優先順序高的佇列.
生產者宣告一個direct型別的交換機,然後傳送訊息到這個交換機指定路由鍵. 消費者指定消費交換機的路由鍵,即可以接到到訊息,其他消費者接收不到 .
在Fanout
交換機中
生產者 :
// 第二個引數就是路由鍵 channel.basicPublish(EXCHANE_NAME,"",null,message.getBytes());
消費者 :
// 第三個引數就是路由鍵 channel.queueBind(QUEUE,EXCAHNGE_NAME,"");
3. 程式碼演示
同樣的,只是交換機型別改為
driect
,加了個路由鍵而已 .這裡演示3個,即表示一個日誌事件,根據日誌型別進行處理
3.1 工具類
package com.makesailing.neo.utils;
import com.rabbitmq.client. Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* # RabbitMQ連線工具類
*
* @Author: jamie.li
* @Date: Created in 2018/9/16 14:38
*/
public class ConnectionUtils {
public static final String host = "127.0.0.1";
public static final Integer port = 5672;
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
// 如果有 使用者名稱 密碼 vhost 配置即可
connectionFactory.setUsername("jamie");
connectionFactory.setPassword("123456");
connectionFactory.setVirtualHost("/simple");
return connectionFactory.newConnection();
}
}
3.2 日誌生產者
package com.makesailing.neo.provider;
import com.makesailing.neo.utils.ConnectionUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* # 日誌訊息 提供者
*
* @Author: jamie.li
* @Date: Created in 2018/9/16 22:41
*/
public class LogSend {
private static final String EXCHANGE_NAME = "test_exchange_direct";
private static final String INTO_ROUTING_NAME= "info";
private static final String WARN_ROUTING_NAME= "warn";
private static final String ERROR_ROUTING_NAME= "error";
public static void main(String[] args) throws IOException, TimeoutException {
// 建立連線
Connection connection = ConnectionUtils.getConnection();
// 獲取通道
Channel channel = connection.createChannel();
// 宣告一個direct 路由交換機
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 傳送info路由鍵訊息
String infoMessage = "Hello RabbitMQ Info Log";
channel.basicPublish(EXCHANGE_NAME, INTO_ROUTING_NAME, null, infoMessage.getBytes());
System.out.println(" LogSend routing info message : '" + infoMessage + "'");
// 傳送warn路由鍵訊息
String warnMessage = "Hello RabbitMQ Warn Log";
channel.basicPublish(EXCHANGE_NAME, WARN_ROUTING_NAME, null, warnMessage.getBytes());
System.out.println(" LogSend routing warn message : '" + warnMessage + "'");
// 傳送info路由鍵訊息
String errorMessage = "Hello RabbitMQ Error Log";
channel.basicPublish(EXCHANGE_NAME, ERROR_ROUTING_NAME, null, errorMessage.getBytes());
System.out.println(" LogSend routing error message : '" + errorMessage + "'");
channel.close();
connection.close();
}
}
3.3 error日誌消費者
package com.makesailing.neo.consumer;
import com.makesailing.neo.utils.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* # 錯誤日誌 消費者
*
* @Author: jamie.li
* @Date: Created in 2018/9/16 23:09
*/
public class ErrorReceive {
private static final String EXCHANGE_NAME = "test_exchange_direct";
// info日誌佇列
private static final String QUEUE_NAME = "test_queue_routing_error";
private static final String ERROR_ROUTING_NAME= "error";
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取連線
Connection connection = ConnectionUtils.getConnection();
// 開啟通道
Channel channel = connection.createChannel();
// 申明要消費的佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 繫結佇列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ERROR_ROUTING_NAME);
// 這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。
channel.basicQos(1);
// 建立一個回撥的消費者處理類
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 接收到的訊息
String message = new String(body);
System.out.println(" ErrorReceive '" + message + "' , 任務處理中");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" ErrorReceive done ");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 消費訊息
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
3.4 info日誌消費者
package com.makesailing.neo.consumer;
import com.makesailing.neo.utils.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* # info 日誌消費者
*
* @Author: jamie.li
* @Date: Created in 2018/9/16 23:02
*/
public class InfoReceive {
private static final String EXCHANGE_NAME = "test_exchange_direct";
// info日誌佇列
private static final String QUEUE_NAME = "test_queue_routing_info";
private static final String INTO_ROUTING_NAME= "info";
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取連線
Connection connection = ConnectionUtils.getConnection();
// 開啟通道
Channel channel = connection.createChannel();
// 申明要消費的佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 繫結佇列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, INTO_ROUTING_NAME);
// 這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。
channel.basicQos(1);
// 建立一個回撥的消費者處理類
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 接收到的訊息
String message = new String(body);
System.out.println(" InfoReceive '" + message + "' , 任務處理中");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" InfoReceive done ");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 消費訊息
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
3.5 warn日誌消費者
package com.makesailing.neo.consumer;
import com.makesailing.neo.utils.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* # 警告日誌 消費者
*
* @Author: jamie.li
* @Date: Created in 2018/9/16 23:09
*/
public class WarnReceive {
private static final String EXCHANGE_NAME = "test_exchange_direct";
// info日誌佇列
private static final String QUEUE_NAME = "test_queue_routing_warn";
private static final String INTO_ROUTING_NAME= "info";
private static final String WARN_ROUTING_NAME= "warn";
private static final String ERROR_ROUTING_NAME= "error";
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取連線
Connection connection = ConnectionUtils.getConnection();
// 開啟通道
Channel channel = connection.createChannel();
// 申明要消費的佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 繫結佇列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, WARN_ROUTING_NAME);
//channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, INTO_ROUTING_NAME);
//channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ERROR_ROUTING_NAME);
// 這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。
channel.basicQos(1);
// 建立一個回撥的消費者處理類
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 接收到的訊息
String message = new String(body);
System.out.println("WarnReceive '" + message + "' , 任務處理中");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" WarnReceive done ");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 消費訊息
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
4. 測試
提前在RabbitMQ Management
建立一個direct
交換機,或者先執行一次生產者(執行時會判斷交換機是否存在,不存在則建立交換機),這樣保證交換機存在,不然直接啟動消費者會提示交換機不存在。
注意點
如果在沒有佇列繫結在交換機上面時,往交換機發送訊息會丟失,之後繫結在交換機上面的佇列接收不到之前的訊息,也就是先執行第一次傳送,建立了交換機,但是還沒有佇列繫結在交換機上面,如果這次傳送的訊息就會丟失。
然後再啟動3上消費者,最後在啟動生產者.
執行結果 :
5. 多繫結情況
5.1 同一消費者繫結佇列多個路由鍵
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warn");
在`warn消費者中佇列繫結多個路由鍵 :
package com.makesailing.neo.consumer;
import com.makesailing.neo.utils.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* # 警告日誌 消費者
*
* @Author: jamie.li
* @Date: Created in 2018/9/16 23:09
*/
public class WarnReceive {
private static final String EXCHANGE_NAME = "test_exchange_direct";
// info日誌佇列
private static final String QUEUE_NAME = "test_queue_routing_warn";
private static final String INTO_ROUTING_NAME= "info";
private static final String WARN_ROUTING_NAME= "warn";
private static final String ERROR_ROUTING_NAME= "error";
public static void main(String[] args) throws IOException, TimeoutException {
// 獲取連線
Connection connection = ConnectionUtils.getConnection();
// 開啟通道
Channel channel = connection.createChannel();
// 申明要消費的佇列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 繫結佇列到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, WARN_ROUTING_NAME);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, INTO_ROUTING_NAME);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ERROR_ROUTING_NAME);
// 這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。
channel.basicQos(1);
// 建立一個回撥的消費者處理類
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 接收到的訊息
String message = new String(body);
System.out.println("WarnReceive '" + message + "' , 任務處理中");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" WarnReceive done ");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 消費訊息
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
執行結果:
如果一個消費者綁定了這3個路由鍵,那麼當生產者傳送其中一個路由鍵訊息時,該消費者都能接收到訊息.
5.2 多個消費者繫結相同的路由鍵
即消費者1繫結info,消費者2繫結 info、error .
那麼生產者傳送info路由鍵訊息時,消費者1 、2都能接收到訊息,傳送error路由鍵訊息時,只有消費者2能接收到訊息.