5.rabbitmq的work queues模式
阿新 • • 發佈:2021-02-16
1.模型圖
適用場景:消費者消費速度慢導致訊息在佇列中堆積
2.實現程式碼
工具類
public class MqUtil {
// 建立連線工廠
private static final ConnectionFactory factory = new ConnectionFactory();
static {
// 設定連線mq的主機
factory.setHost("192.168.175.159");
// 設定連線埠號
factory. setPort(5672);
// 設定連線的虛擬主機
factory.setVirtualHost("javatest");
// 設定訪問虛擬主機的使用者名稱
factory.setUsername("javatest");
// 設定訪問虛擬主機的密碼
factory.setPassword("javatest");
}
// 獲取連線
public static Connection getConnection(){
try {
return factory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
// 關閉資源
public static void close(Channel channel,Connection connection) {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
生產者
public class Send {
public static void main(String[] args) throws IOException {
// 獲取連線
Connection connection = MqUtil.getConnection();
// 建立通道
Channel channel = connection.createChannel();
// 將通道與佇列繫結
channel.queueDeclare("work",true,false,false,null);
// 釋出多條訊息
for (int i=0;i<20;i++) {
channel.basicPublish("","work",null,("work" + i).getBytes());
}
// 關閉資源
MqUtil.close(channel,connection);
}
}
消費者1
public class Recv1 {
public static void main(String[] args) throws IOException {
Connection connection = MqUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
}
}
消費者2
public class Recv2 {
public static void main(String[] args) throws IOException {
Connection connection = MqUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
}
}
結果展示
消費者2
消費者1
3.消費者的自動確認機制
詳情連線:link
從上面的最終結果可以看出,rabbitmq將訊息已迴圈的方式分配給兩個消費者進行消費。這是rabbitmq的一個預設機制。在生產實踐中存在兩個問題:
- 設定消費為自動確認,當消費者出現異常時會導致資料的丟失
- 沒有進行通道的預取設定,將佇列中所有的訊息全部分配給消費者,導致記憶體 的消耗增加
程式碼展示
public class Recv1 {
public static void main(String[] args) throws IOException {
Connection connection = MqUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare("work",true,false,false,null);
// 設定通道的預取數量為1,官方推薦100到300,資料會影響其吞吐量
channel.basicQos(4);
// 關閉訊息的自動確認機制
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
// 在處理完訊息後手動進行確認
/*
* 引數1: 訊息標籤
* 引數2: 是否批量進行確認
* */
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}