第4篇 rabbitmq可靠確認模式的java封裝及示例
對rabbitmq的封裝,有幾個目標:
1 提供send介面
2 提供consume介面
3 保證訊息的事務性處理
所謂事務性處理,是指對一個訊息的處理必須嚴格可控,必須滿足原子性,只有兩種可能的處理結果:
(1) 處理成功,從佇列中刪除訊息
(2) 處理失敗(網路問題,程式問題,服務掛了),將訊息重新放回佇列
為了做到這點,我們使用rabbitmq的手動ack模式,這個後面細說。
1 send介面
public interface MessageSender {
DetailRes send(Object message);
}
send介面相對簡單,我們使用spring的RabbitTemplate來實現,程式碼如下:
//1 構造template, exchange, routingkey等
//2 設定message序列化方法
//3 設定傳送確認
//4 構造sender方法
public MessageSender buildMessageSender(final String exchange, final String routingKey, final String queue) throws IOException, TimeoutException {
Connection connection = connectionFactory.createConnection();
//1
buildQueue(exchange, routingKey, queue, connection);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setExchange(exchange);
rabbitTemplate.setRoutingKey(routingKey);
//2
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//3
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.info("send message failed: " + cause); //+ correlationData.toString());
throw new RuntimeException("send error " + cause);
}
}
});
//4
return new MessageSender() {
@Override
public DetailRes send(Object message) {
try {
rabbitTemplate.convertAndSend(message);
} catch (RuntimeException e) {
e.printStackTrace();
log.info("send failed " + e);
try {
//retry
rabbitTemplate.convertAndSend(message);
} catch (RuntimeException error) {
error.printStackTrace();
log.info("send failed again " + error);
return new DetailRes(false, error.toString());
}
}
return new DetailRes(true, "");
}
};
}
2 consume介面
public interface MessageConsumer {
DetailRes consume();
}
在consume介面中,會呼叫使用者自己的MessageProcess,介面定義如下:
public interface MessageProcess<T> {
DetailRes process(T message);
}
consume的實現相對來說複雜一點,程式碼如下:
//1 建立連線和channel
//2 設定message序列化方法
//3 構造consumer
public <T> MessageConsumer buildMessageConsumer(String exchange, String routingKey,
final String queue, final MessageProcess<T> messageProcess) throws IOException {
final Connection connection = connectionFactory.createConnection();
//1
buildQueue(exchange, routingKey, queue, connection);
//2
final MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
final MessageConverter messageConverter = new Jackson2JsonMessageConverter();
//3
return new MessageConsumer() {
QueueingConsumer consumer;
{
consumer = buildQueueConsumer(connection, queue);
}
@Override
//1 通過delivery獲取原始資料
//2 將原始資料轉換為特定型別的包
//3 處理資料
//4 手動傳送ack確認
public DetailRes consume() {
QueueingConsumer.Delivery delivery = null;
Channel channel = consumer.getChannel();
try {
//1
delivery = consumer.nextDelivery();
Message message = new Message(delivery.getBody(),
messagePropertiesConverter.toMessageProperties(delivery.getProperties(), delivery.getEnvelope(), "UTF-8"));
//2
@SuppressWarnings("unchecked")
T messageBean = (T) messageConverter.fromMessage(message);
//3
DetailRes detailRes = messageProcess.process(messageBean);
//4
if (detailRes.isSuccess()) {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} else {
log.info("send message failed: " + detailRes.getErrMsg());
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
return detailRes;
} catch (InterruptedException e) {
e.printStackTrace();
return new DetailRes(false, "interrupted exception " + e.toString());
} catch (IOException e) {
e.printStackTrace();
retry(delivery, channel);
log.info("io exception : " + e);
return new DetailRes(false, "io exception " + e.toString());
} catch (ShutdownSignalException e) {
e.printStackTrace();
try {
channel.close();
} catch (IOException io) {
io.printStackTrace();
} catch (TimeoutException timeout) {
timeout.printStackTrace();
}
consumer = buildQueueConsumer(connection, queue);
return new DetailRes(false, "shutdown exception " + e.toString());
} catch (Exception e) {
e.printStackTrace();
log.info("exception : " + e);
retry(delivery, channel);
return new DetailRes(false, "exception " + e.toString());
}
}
};
}
3 保證訊息的事務性處理
rabbitmq預設的處理方式為auto ack,這意味著當你從訊息佇列取出一個訊息時,ack自動傳送,mq就會將訊息刪除。而為了保證訊息的正確處理,我們需要將訊息處理修改為手動確認的方式。
(1) sender的手工確認模式
首先將ConnectionFactory的模式設定為publisherConfirms,如下
connectionFactory.setPublisherConfirms(true);
之後設定rabbitTemplate的confirmCallback,如下:
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.info("send message failed: " + cause); //+ correlationData.toString());
throw new RuntimeException("send error " + cause);
}
}
});
(2) consume的手工確認模式
首先在queue建立中指定模式
channel.exchangeDeclare(exchange, "direct", true, false, null);
/**
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
channel.queueDeclare(queue, true, false, false, null);
只有在訊息處理成功後傳送ack確認,或失敗後傳送nack使資訊重新投遞
if (detailRes.isSuccess()) {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} else {
log.info("send message failed: " + detailRes.getErrMsg());
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
4 自動重連機制
為了保證rabbitmq的高可用性,我們使用rabbitmq Cluster模式,並配合haproxy。這樣,在一臺機器down掉時或者網路發生抖動時,就會發生當前連線失敗的情況,如果不對這種情況做處理,就會造成當前的服務不可用。
在spring-rabbitmq中,已實現了connection的自動重連,但是connection重連後,channel的狀態並不正確。因此我們需要自己捕捉ShutdownSignalException異常,並重新生成channel。如下:
catch (ShutdownSignalException e) {
e.printStackTrace();
channel.close();
//recreate channel
consumer = buildQueueConsumer(connection, queue);
}
5 consumer執行緒池
在對訊息處理的過程中,我們期望多執行緒並行執行來增加效率,因此對consumer做了一個執行緒池的封裝。
執行緒池通過builder模式構造,需要準備如下引數:
//執行緒數量
int threadCount;
//處理間隔(每個執行緒處理完成後休息的時間)
long intervalMils;
//exchange及queue資訊
String exchange;
String routingKey;
String queue;
//使用者自定義處理介面
MessageProcess<T> messageProcess;
核心迴圈也較為簡單,程式碼如下:
public void run() {
while (!stop) {
try {
//2
DetailRes detailRes = messageConsumer.consume();
if (infoHolder.intervalMils > 0) {
try {
Thread.sleep(infoHolder.intervalMils);
} catch (InterruptedException e) {
e.printStackTrace();
log.info("interrupt " + e);
}
}
if (!detailRes.isSuccess()) {
log.info("run error " + detailRes.getErrMsg());
}
} catch (Exception e) {
e.printStackTrace();
log.info("run exception " + e);
}
}
}
6 使用示例
最後,我們還是用一個例子做結。
(1) 定義model
//參考lombok
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserMessage {
int id;
String name;
}
(2) rabbitmq配置
配置我們使用@Configuration實現,如下:
@Configuration
public class RabbitMQConf {
@Bean
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setPublisherConfirms(true); // enable confirm mode
return connectionFactory;
}
}
(3) sender示例
@Service
public class SenderExample {
private static final String EXCHANGE = "example";
private static final String ROUTING = "user-example";
private static final String QUEUE = "user-example";
@Autowired
ConnectionFactory connectionFactory;
private MessageSender messageSender;
@PostConstruct
public void init() throws IOException, TimeoutException {
MQAccessBuilder mqAccessBuilder = new MQAccessBuilder(connectionFactory);
messageSender = mqAccessBuilder.buildMessageSender(EXCHANGE, ROUTING, QUEUE);
}
public DetailRes send(UserMessage userMessage) {
return messageSender.send(userMessage);
}
}
(4) MessageProcess(使用者自定義處理介面)示例,本例中我們只是簡單的將資訊打印出來
public class UserMessageProcess implements MessageProcess<UserMessage> {
@Override
public DetailRes process(UserMessage userMessage) {
System.out.println(userMessage);
return new DetailRes(true, "");
}
}
(5) consumer示例
@Service
public class ConsumerExample {
private static final String EXCHANGE = "example";
private static final String ROUTING = "user-example";
private static final String QUEUE = "user-example";
@Autowired
ConnectionFactory connectionFactory;
private MessageConsumer messageConsumer;
@PostConstruct
public void init() throws IOException, TimeoutException {
MQAccessBuilder mqAccessBuilder = new MQAccessBuilder(connectionFactory);
messageConsumer = mqAccessBuilder.buildMessageConsumer(EXCHANGE, ROUTING, QUEUE, new UserMessageProcess());
}
public DetailRes consume() {
return messageConsumer.consume();
}
}
(6) 執行緒池consumer示例
在main函式中,我們使用一個獨立執行緒傳送資料,並使用執行緒池接收資料。
@Service
public class PoolExample {
private static final String EXCHANGE = "example";
private static final String ROUTING = "user-example";
private static final String QUEUE = "user-example";
@Autowired
ConnectionFactory connectionFactory;
private ThreadPoolConsumer<UserMessage> threadPoolConsumer;
@PostConstruct
public void init() {
MQAccessBuilder mqAccessBuilder = new MQAccessBuilder(connectionFactory);
MessageProcess<UserMessage> messageProcess = new UserMessageProcess();
threadPoolConsumer = new ThreadPoolConsumer.ThreadPoolConsumerBuilder<UserMessage>()
.setThreadCount(Constants.THREAD_COUNT).setIntervalMils(Constants.INTERVAL_MILS)
.setExchange(EXCHANGE).setRoutingKey(ROUTING).setQueue(QUEUE)
.setMQAccessBuilder(mqAccessBuilder).setMessageProcess(messageProcess)
.build();
}
public void start() throws IOException {
threadPoolConsumer.start();
}
public void stop() {
threadPoolConsumer.stop();
}
public static void main(String[] args) throws IOException {
ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext.xml");
PoolExample poolExample = ac.getBean(PoolExample.class);
final SenderExample senderExample = ac.getBean(SenderExample.class);
poolExample.start();
new Thread(new Runnable() {
int id = 0;
@Override
public void run() {
while (true) {
senderExample.send(new UserMessage(id++, "" + System.nanoTime()));
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
7 github地址