rabbitmq4-工作佇列及公平分發模式
阿新 • • 發佈:2018-11-16
建議大家如果沒有看前一篇文章的時候,還是看一看第一篇文章,因為上篇文章的確把很多的概念都講解的比較清楚。我發現有很多東西在單獨使用rabbitmq是做不了的,例如自定義message投遞的id,所以我希望快速的把這幾篇介紹的博文寫完,然後進入springboot的整合篇,但是我不建議新手一上來就開始使用springboot的整合,就想我在群裡面聽到的,不知道channel為何物更別提其他的概念了,只有一個穩紮穩打的基礎在往高階的地方學習的時候才不費力。
一、簡單工作佇列
我想大概這種模式的應用場景也就剩下了應用層面的解耦了吧,話不多話,下面直接用程式碼展示
二、生產者程式碼:
public class Producer {
public static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException{
final Connection conn = ConnUtils.getConn();
final Channel channel = conn.createChannel();
boolean durable = true;
boolean exclusive = false;
boolean autoDelete = false;
channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, null);
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 這個目前在單獨使用rabbitmq的時候沒有辦法找到自定義這個訊息標識的辦法,但是在和springboot整合之後會提供這樣的方法
System.out.println(multiple);
System.out.println("wtf 需要這麼熱嗎:::::"+deliveryTag);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("啊哈哈,你被拒絕了……");
}
});
// 這個地方也可以搞一個執行緒來進行傳送
channel.basicPublish("",QUEUE_NAME,null,"fuck 真他媽的熱 ".getBytes());
channel.basicPublish("",QUEUE_NAME,null,"fuck 真他媽的熱 +1".getBytes());
channel.basicPublish("",QUEUE_NAME,null,"fuck 真他媽的熱 +2".getBytes());
channel.basicPublish("",QUEUE_NAME,null,"fuck 真他媽的熱 +3".getBytes());
channel.basicPublish("",QUEUE_NAME,null,"fuck 真他媽的熱 +4".getBytes());
channel.basicPublish("",QUEUE_NAME,null,"fuck 真他媽的熱 +5".getBytes());
channel.close();
conn.close();
}
}
三、兩個消費者(只需要把程式碼拷貝一份就可以了)
public class Consumer01 {
public static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection conn = ConnUtils.getConn();
final Channel channel = conn.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
System.out.println("Recv001"+"message == "+new String(body,"utf-8"));
channel.basicAck(deliveryTag,false);
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
}
}
先啟動兩個消費者,因為訊息太少,如果先啟動生產者,在啟動消費者,一個消費者立馬就消費完了。
四、結果分析
我們發現兩個消費者總是已奇偶的形式出現的,加入兩個消費者的消費能力不一樣,消費者1消費能力比較高,但是以這種模式的話,那麼整個系統的消費能力的上線就有比較弱的消費者2來決定了。所以下面介紹一種公平分發模式:公平指的是能者多勞
我們在channel申明的下面加一行程式碼:我們分別設定consumer1的消費能力為3,consumer2的消費者能力為1
/**
* prefetchCount:告訴MQ不要同時給一個消費者推送超過prefetchCount個訊息,
* 即一點prefetchCount個訊息沒有應答,該消費者就會發生阻塞
* global:指的是該設定是針對該consumer還是針對channel級別
*/
channel.basicQos(3,false);
下面我們在觀察結果:
我們可以看到奇偶的模式不見了,而且消費者1的吞吐量是大於消費者2的
本節到這裡就結束了,有很多的介紹希望大家多去看看前面的文章。