rabbitmq--工作佇列(單發多收)模型
上一篇,我們介紹了rabbitmq中最簡單的一種模型——收發模型,收、發都只有一方,即一個應用,並且不涉及到exchange (實際上這種模型中也是有exchange的,exchange的型別為direct,且名稱為空字串"",只不過這裡的exchange是透明的,故常被我們所忽略了,好像收、發之間是直接連通的)。
本篇我們介紹一下rabbitmq中的另一種模型——工作佇列模型,也叫單發多收模型,它是收發模型的延伸。它支援一方傳送訊息,由多方共同接收處理,並且支援給多個消費方設定不同的QOS 。 如圖:
例如,一個大系統中每天都與產生大量的任務(訊息),需要釋出給各個子系統去共同處理,這時候就可以採用工作佇列模型 。
示例程式碼:
任務生產(釋出)者:
package com.tingcream.rabbitmq.workQueues; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; /** * (AMQP DEFAULT) * @author jelly * */ public class NewTask { private final static String QUEUE_NAME = "direct_task_queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); //主機 埠 vhost 使用者名稱 密碼 factory.setHost("192.168.9.102"); factory.setUsername("rabbitmq"); factory.setPassword("rabbitmq123"); factory.setPort(AMQP.PROTOCOL.PORT); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); boolean durable=true ; channel.queueDeclare(QUEUE_NAME, durable, false,false, null); //String message = "Hello rabbitmq"; //exchagne名稱預設為 (AMQP DEFAULT) direct //channel.basicPublish(exchange, routingKey, props, body); //channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); //System.out.println(" [x] Sent '" + message + "'"); for(int i=0;i<20;i++) { String message="hello direct task message E "+i; //釋出訊息 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } //關閉連線 channel.close(); connection.close(); } }
任務處理(消費)者A:
package com.tingcream.rabbitmq.workQueues; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class WorkA { private final static String QUEUE_NAME = "direct_task_queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); //主機 埠 vhost 使用者名稱 密碼 factory.setHost("192.168.9.102"); factory.setUsername("rabbitmq"); factory.setPassword("rabbitmq123"); factory.setPort(AMQP.PROTOCOL.PORT); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments) boolean durable=true ; channel.queueDeclare(QUEUE_NAME, durable, false, false, null);//channel向伺服器宣告一個佇列,設定durable為true,則當rabbitmq 伺服器重啟時,佇列不會丟失 System.out.println("WorkerA Waiting for messages"); //每次從佇列獲取的message的數量 channel.basicQos(1); // prefetchCount maximum number of messages that the server will deliver, 0 if unlimited final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("WorkerA Received :" + message ); try { doWork(message); }catch (Exception e){ channel.abort(); }finally { System.out.println("WorkerA Done"); channel.basicAck(envelope.getDeliveryTag(),false);//當訊息處理完畢後 在finally中 回覆一個ack 手動ack } } }; boolean autoAck=false; //訊息消費完成確認 不自動 ack 非自動ack channel.basicConsume(QUEUE_NAME, autoAck, consumer); } private static void doWork(String task) { try { Thread.sleep(5000);// 暫停2秒鐘 } catch (InterruptedException e) { e.printStackTrace(); } } }
任務處理(消費)者B:
package com.tingcream.rabbitmq.workQueues;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class WorkB {
private final static String QUEUE_NAME = "direct_task_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//主機 埠 vhost 使用者名稱 密碼
factory.setHost("192.168.9.102");
factory.setUsername("rabbitmq");
factory.setPassword("rabbitmq123");
factory.setPort(AMQP.PROTOCOL.PORT);
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments)
boolean durable=true ;//channel向伺服器宣告一個佇列,設定durable為true,則當rabbitmq 伺服器重啟時,佇列不會丟失
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
System.out.println("WorkerB Waiting for messages");
//每次從佇列獲取的message數量
channel.basicQos(2);
// prefetchCount maximum number of messages that the server will deliver, 0 if unlimited
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("WorkerB Received :" + message );
try {
doWork(message);
}catch (Exception e){
channel.abort();
}finally {
System.out.println("WorkerB Done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck=false;
//訊息消費完成確認 不自動 ack 非自動ack
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
private static void doWork(String task) {
try {
Thread.sleep(3000);// 暫停2秒鐘
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
注意:
1 、任務消費者A和B 可以共同消費處理任務生產者(NewTask)釋出的訊息。消費者A和B的qos可以根據伺服器效能的不同設定不同的qos,例如本例中消費者A的qos設定為了1,而消費者B的qos設定為了2 。
2 、consumer中有個重要的動作就是設定autoAck為false (預設為true),表示需要消費者程式手動ack。注意一旦consumer將autoAck設定為false之後,一定要記得處理完訊息之後,向伺服器傳送確認訊息,否則伺服器將會一直轉發該訊息。當設定了autoAck為false,若消費者A意外宕機,處理的訊息還沒有ack回去,則rabbitmq伺服器會將訊息轉給消費者B。
3、生產者釋出訊息時,設定了一個引數durable為true。這樣做的好處是,當這樣設定之後,伺服器收到訊息後就會立刻將訊息寫入到硬碟,就可以防止突然伺服器掛掉,而引起的資料丟失了。 但是伺服器如果剛收到訊息,還沒來得及寫入到硬碟,就掛掉了,這樣還是無法避免訊息的丟失。
4、 在本例(工作佇列模型)中,我們還是未看到exchange的身影,這一點同上一種模型——收發模型是一樣的。 實際上,在本例(工作佇列模型)中,是存在exchange的,exchange的名稱為空字串"",型別為direct ,同樣對我們是透明的,也被我們所忽略了。
實際上,我們介紹rabbitmq的收發模型和工作佇列模型所採用的exchange就是圖中縮圈出的這種。
關於exchange的更多內容會在我們後面的篇章中講到。