rabbitmq學習之路(三)訊息應答、持久化以及公平轉發
阿新 • • 發佈:2018-12-30
上兩篇博文簡單介紹了下rabbitmq的使用方式,接下來,筆者再給大家介紹下rabbitmq的基礎配置:設定訊息的應答、持久化以及公平轉發。
下面,筆者簡單的來解釋下這個三個配置:
1. 訊息應答:
預設情況下,只要有消費者,訊息進去佇列後,訊息就會被全部分配好到相應的消費者進行處理,對應的訊息也會在佇列中去除。如果某個消費者處理過程中突然掛了,那麼這些訊息就沒有被處理,所以我們可以設定訊息為應答模式,也就是在消費者處理完一條訊息後,就告訴mq此訊息已被處理完,那麼未作出應答的訊息就會被轉發到其它的消費者進行處理。
2. 訊息持久化:
如果我們不設定訊息持久化,那麼在伺服器重啟後,所有的佇列以及相應的資料都會丟失,所以設定持久化,相應的資料就會儲存在磁碟中,不會丟失。
3. 公平轉發
由於佇列中的訊息的分配機制,會導致某些消費者一直處於繁忙的狀態,而讓其他處理完的消費者處於等待狀態,就算再新增一個消費者也不會把訊息分配到此消費者,所以我們可以設定公平轉發,這樣可以保證多個消費者之間公平的處理訊息,同時還可以動態新增消費者加入工作。
好了,接下來,給大家看看相應的程式碼,說明已於註釋中:
生產者:
/**
* Project Name:qyk_testJava
* File Name:Producer.java
* Package Name:com.qiyongkang.mq.rabbitMq.basic
* Date:2017年3月6日下午5:27:59
* Copyright (c) 2017, Thinkive(http://www.thinkive.com/) All Rights Reserved.
*
*/
package com.qiyongkang.mq.rabbitMq.basic;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
/**
* ClassName:Producer <br/>
* Function: TODO ADD FUNCTION. <br/>
* Reason: TODO ADD REASON. <br/>
* Date: 2017年3月6日 下午5:27:59 <br/>
*
* @author qiyongkang
* @version
* @since JDK 1.6
* @see
*/
public class Producer {
// 佇列名稱
private final static String QUEUE_NAME = "qyk.basic";
public static void main(String[] args) throws IOException {
// 建立連線和頻道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 宣告佇列, 設定佇列持久化
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
// 傳送10條訊息,依次在訊息後面附加1-10個點
for (int i = 0; i < 10; i++) {
String dots = "";
for (int j = 0; j <= i; j++) {
dots += ".";
}
String message = "helloworld" + dots + dots.length();
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
// 關閉頻道和資源
channel.close();
connection.close();
}
}
消費者:
/**
* Project Name:qyk_testJava
* File Name:Consumer.java
* Package Name:com.qiyongkang.mq.rabbitMq.basic
* Date:2017年3月6日下午5:27:51
* Copyright (c) 2017, Thinkive(http://www.thinkive.com/) All Rights Reserved.
*
*/
package com.qiyongkang.mq.rabbitMq.basic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* ClassName:Consumer <br/>
* Function: TODO ADD FUNCTION. <br/>
* Reason: TODO ADD REASON. <br/>
* Date: 2017年3月6日 下午5:27:51 <br/>
*
* @author qiyongkang
* @version
* @since JDK 1.6
* @see
*/
public class Consumer {
// 佇列名稱
private final static String QUEUE_NAME = "qyk.basic";
public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException {
// 區分不同工作程序的輸出
int hashCode = Thread.currentThread().hashCode();
// 建立連線和頻道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 宣告佇列、設定佇列持久化
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
System.out.println(hashCode + " [*] Waiting for messages. To exit press CTRL+C");
//設定最大服務轉發訊息數量, 公平轉發
int prefetchCount = 1;
channel.basicQos(prefetchCount);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定消費佇列,開啟應答機制, 注意false才是開啟手動應對
boolean ack = false ;
channel.basicConsume(QUEUE_NAME, ack, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(hashCode + " [x] Received '" + message + "'");
doWork(message);
System.out.println(hashCode + " [x] Done");
//另外需要在每次處理完成一個訊息後,手動傳送一次應答。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
/**
* 每個點耗時1s
*
* @param task
* @throws InterruptedException
*/
private static void doWork(String task) throws InterruptedException {
for (char ch : task.toCharArray()) {
if (ch == '.')
Thread.sleep(1000);
}
}
}
這裡,我們把消費者執行多次,就可以模擬多個消費者了。
好了,rabbitmq的這三個配置就簡單的介紹到這兒了~