RabbitMQ拉模式批量消費訊息
實現RabbitMQ的消費者有兩種模式,推模式(Push)和拉模式(Pull)。
實現推模式推薦的方式是繼承 DefaultConsumer 基類,也可以使用Spring AMQP的 SimpleMessageListenerContainer 。
推模式是最常用的,但是有些情況下推模式並不適用的,比如說:
-
由於某些限制,消費者在某個條件成立時才能消費訊息
-
需要批量拉取訊息進行處理
實現拉模式
RabbitMQ的Channel提供了 basicGet 方法用於拉取訊息。
/** * Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get} * @see com.rabbitmq.client.AMQP.Basic.Get * @see com.rabbitmq.client.AMQP.Basic.GetOk * @see com.rabbitmq.client.AMQP.Basic.GetEmpty * @param queue the name of the queue * @param autoAck true if the server should consider messages * acknowledged once delivered; false if the server should expect * explicit acknowledgements * @return a {@link GetResponse} containing the retrieved message data * @throws java.io.IOException if an error is encountered */ GetResponse basicGet(String queue, boolean autoAck) throws IOException;
basicGet 返回 GetResponse 類。
public class GetResponse {
private final Envelope envelope;
private final BasicProperties props;
private final byte[] body;
private final int messageCount;
// ...
rabbitmq-client版本4.0.3
使用 basicGet 拉取訊息需要注意:
basicGet
DefaultConsumer
示例程式碼:
private void consume(Channel channel) throws IOException, InterruptedException { while (true) { if (!isConditionSatisfied()) { TimeUnit.MILLISECONDS.sleep(1); continue; } GetResponse response = channel.basicGet(CAOSH_TEST_QUEUE, false); if (response == null) { TimeUnit.MILLISECONDS.sleep(1); continue; } String data = new String(response.getBody()); logger.info("Get message <= {}", data); channel.basicAck(response.getEnvelope().getDeliveryTag(), false); } }
批量拉取訊息
RabbitMQ支援客戶端批量拉取訊息,客戶端可以連續呼叫 basicGet 方法拉取多條訊息,處理完成之後一次性ACK。需要注意:
basicGet
basicAck
示例程式碼:
String bridgeQueueName = extractorProperties.getBridgeQueueName();
int batchSize = extractorProperties.getBatchSize();
List<GetResponse> responseList = Lists.newArrayListWithCapacity(batchSize);
long tag = 0;
while (responseList.size() < batchSize) {
GetResponse getResponse = channel.basicGet(bridgeQueueName, false);
if (getResponse == null) {
break;
}
responseList.add(getResponse);
tag = getResponse.getEnvelope().getDeliveryTag();
}
if (responseList.isEmpty()) {
TimeUnit.MILLISECONDS.sleep(1);
} else {
logger.info("Get <{}> responses this batch", responseList.size());
// handle messages
channel.basicAck(tag, true);
}
關於QueueingConsumer
QueueingConsumer 在客戶端本地使用 BlockingQueue 緩衝訊息,其nextDelivery方法也可以用於實現拉模式(其本質上是 BlockingQueue.take ),但是 QueueingConsumer 現在已經標記為Deprecated。
歡迎工作一到五年的Java工程師朋友們加入Java架構開發: 855835163
群內提供免費的Java架構學習資料(裡面有高可用、高併發、高效能及分散式、Jvm效能調優、Spring原始碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用自己每一分每一秒的時間來學習提升自己,不要再用"沒有時間“來掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來的自己一個交代!