rabbitMQ實現可靠訊息投遞
RabbitMQ訊息的可靠性主要包括兩方面,一方面是通過實現消費的重試機制(通過@Retryable來實現重試,可以設定重試次數和重試頻率,但是要保證冪等性),另一方面就是實現訊息生產者的可靠投遞(注意消費單冪等),下面主要講下生產者實現的可靠訊息投遞。
rabbitTemplate的傳送流程是這樣的:
1 傳送資料並返回(不確認rabbitmq伺服器已成功接收)
2 非同步的接收從rabbitmq返回的ack確認資訊
3 收到ack後呼叫confirmCallback函式 注意:在confirmCallback中是沒有原message的,所以無法在這個函式中呼叫重發,confirmCallback只有一個通知的作用 在這種情況下,如果在2,3步中任何時候切斷連線,我們都無法確認資料是否真的已經成功傳送出去,從而造成資料丟失的問題。
最完美的解決方案只有1種: 使用rabbitmq的事務機制。 但是在這種情況下,rabbitmq的效率極低,每秒鐘處理的message在幾百條左右。實在不可取。
第二種解決方式,使用同步的傳送機制,也就是說,客戶端傳送資料,rabbitmq收到後返回ack,再收到ack後,send函式才返回。程式碼類似這樣:
建立channel
send message
wait for ack(or 超時)
close channel
返回成功or失敗
同樣的,由於每次傳送message都要重新建立連線,效率很低。
基於上面的分析,我們使用一種新的方式來做到資料的不丟失。
在rabbitTemplate非同步確認的基礎上
1 在redis中快取已傳送的message
2 通過confirmCallback或者被確認的ack,將被確認的message從本地刪除
3 定時掃描本地的message,如果大於一定時間未被確認,則重發
當然了,這種解決方式也有一定的問題: 想象這種場景,rabbitmq接收到了訊息,在傳送ack確認時,網路斷了,造成客戶端沒有收到ack,重發訊息。(相比於丟失訊息,重發訊息要好解決的多,我們可以在consumer端做到冪等)。 自動重試的程式碼如下:
package cn.chinotan.service.reliabletransmission;
/**
* @program: test
* @description: rabbitMq常量
* @author: xingcheng
* @create: 2018-08-12 12:30
**/
public class MyConstant {
public static final String MY_EXCHANGE = "my_exchange";
public static final String ERROR_EXCHANGE = "error_exchange";
public static final String MY_QUEUE_THREE = "my_queue_three";
public final static String KEY_PREFIX = "test:rabbitMq:";
/**
* consumer失敗後等待時間(mils)
*/
public static final int ONE_MINUTE = 1 * 60 * 1000;
/**
* MQ訊息retry時間
*/
public static final int RETRY_TIME_INTERVAL = ONE_MINUTE;
/**
* MQ訊息有效時間
*/
public static final int VALID_TIME = ONE_MINUTE;
}
package cn.chinotan.service.reliabletransmission;
import java.io.Serializable;
/**
* @program: test
* @description: 包裝訊息
* @author: xingcheng
* @create: 2018-09-24 15:32
**/
public class MessageWithTime implements Serializable {
private String id;
private long time;
private String message;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
package cn.chinotan.service.reliabletransmission;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* rabbitMQ配置
*/
@Configuration
public class ReliableRabbitConfig {
@Bean
public DirectExchange myExchange() {
return new DirectExchange(MyConstant.MY_EXCHANGE, true, false);
}
@Bean
public Queue myQueueOne() {
return new Queue(MyConstant.MY_QUEUE_THREE, true);
}
@Bean
public Binding queueOneBinding() {
return BindingBuilder.bind(myQueueOne()).to(myExchange()).withQueueName();
}
}
package cn.chinotan.service.reliabletransmission;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.UUID;
/**
* @program: test
* @description: rabbitService
* @author: xingcheng
* @create: 2018-09-24 14:28
**/
@Service
public class RabbitMQService {
Logger logger = LoggerFactory.getLogger(RabbitMQService.class);
@Autowired
StringRedisTemplate redisTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
public Boolean send(String exchange, String routingKey, Object message) {
try {
String key = StringUtils.join(MyConstant.KEY_PREFIX, UUID.randomUUID().toString().replace("-", "").toLowerCase());
// 傳送前儲存訊息和時間和id到redis快取中
MessageWithTime messageWithTime = new MessageWithTime();
messageWithTime.setId(key);
messageWithTime.setMessage(JSONObject.toJSONString(message));
messageWithTime.setTime(System.currentTimeMillis());
redisTemplate.opsForValue().set(key, JSONObject.toJSONString(messageWithTime));
// 非同步回撥通知
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
logger.info("message send success--id:[{}]", correlationData.getId());
// 傳送成功後,刪除redis快取
redisTemplate.delete(correlationData.getId());
} else {
// 傳送失敗後列印日誌,進行重試
logger.error("message send fail--id:[{}]", correlationData.getId());
}
});
CorrelationData correlationData = new CorrelationData(key);
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
} catch (Exception e) {
logger.error("傳送訊息異常{}", e);
return false;
}
return true;
}
Boolean send(String exchange, String routingKey, MessageWithTime message) {
try {
// 非同步回撥通知
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
logger.info("message send success--id:[{}]", correlationData.getId());
// 傳送成功後,刪除redis快取
redisTemplate.delete(correlationData.getId());
} else {
// 傳送失敗後列印日誌,進行重試
logger.error("message send fail--id:[{}]", correlationData.getId());
}
});
CorrelationData correlationData = new CorrelationData(message.getId());
Map map = JSON.parseObject(message.getMessage(), Map.class);
rabbitTemplate.convertAndSend(exchange, routingKey, map, correlationData);
} catch (Exception e) {
logger.error("傳送訊息異常{}", e);
return false;
}
return true;
}
}
package cn.chinotan.service.reliabletransmission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Map;
/**
* 生產者
*/
@Service
public class ReliableProducr {
private static final Logger LOGGER = LoggerFactory.getLogger(ReliableProducr.class);
@Autowired
private RabbitMQService rabbitMQService;
public Boolean send(Map msg) {
return rabbitMQService.send(MyConstant.MY_EXCHANGE, MyConstant.MY_QUEUE_THREE, msg);
}
public Boolean send(MessageWithTime msg) {
return rabbitMQService.send(MyConstant.MY_EXCHANGE, MyConstant.MY_QUEUE_THREE, msg);
}
}
package cn.chinotan.service.reliabletransmission;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
/**
* @program: test
* @description: 可靠投遞監聽器
* @author: xingcheng
* @create: 2018-09-24 16:05
**/
@WebListener
public class ReliableTransContextListener implements ServletContextListener {
Logger logger = LoggerFactory.getLogger(ReliableTransContextListener.class);
private WebApplicationContext springContext;
@Override
public void contextInitialized(ServletContextEvent sce) {
logger.info("ReliableTransContextListener init start...........");
springContext = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext());
if (springContext != null) {
RetryCache retryCache = (RetryCache) springContext.getBean("retryCache");
new Thread(() -> retryCache.startRetry()).start();
}
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
}
}
package cn.chinotan.service.reliabletransmission;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Set;
/**
* @program: test
* @description: 快取重試
* @author: xingcheng
* @create: 2018-09-24 16:12
**/
@Component("retryCache")
public class RetryCache {
private boolean stop = false;
Logger logger = LoggerFactory.getLogger(RetryCache.class);
@Autowired
private ReliableProducr producr;
@Autowired
private StringRedisTemplate redisTemplate;
private final String STAR = "*";
public void startRetry() {
while (!stop) {
try {
Thread.sleep(MyConstant.RETRY_TIME_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
long now = System.currentTimeMillis();
Set<String> keys = redisTemplate.keys(StringUtils.join(MyConstant.KEY_PREFIX, STAR));
if (keys != null && !keys.isEmpty()) {
List<String> list = redisTemplate.opsForValue().multiGet(keys);
list.forEach(value -> {
MessageWithTime messageWithTime = JSON.parseObject(value, MessageWithTime.class);
if (null != messageWithTime) {
if (messageWithTime.getTime() + 3 * MyConstant.VALID_TIME < now) {
logger.error("send message {} failed after 3 min ", messageWithTime);
redisTemplate.delete(messageWithTime.getId());
} else if (messageWithTime.getTime() + MyConstant.VALID_TIME < now) {
Boolean send = producr.send(messageWithTime);
logger.info("進行重新投遞訊息");
if (!send) {
logger.error("retry send message failed {}", messageWithTime);
}
}
}
});
}
}
}
}
package cn.chinotan.service.reliabletransmission;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
/**
* queueThree消費者
*/
@Component
public class MyQueueThreeConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(MyQueueThreeConsumer.class);
/**
* 消費者做好冪等
*
* @param content
*/
@RabbitListener(queues = MyConstant.MY_QUEUE_THREE)
@RabbitHandler
public void process(Map content) {
LOGGER.info("消費者,queueThree開始執行 {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
LOGGER.info("消費者,queueThree消費內容:[{}]", JSON.toJSONString(content));
}
}
import cn.chinotan.service.reliabletransmission.MyConstant;
import cn.chinotan.service.reliabletransmission.RabbitMQService;
import cn.chinotan.service.reliabletransmission.ReliableProducr;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
import java.util.Map;
/**
* @program: test
* @description: 可靠投遞測試
* @author: xingcheng
* @create: 2018-09-24 15:57
**/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = MyApplication.class)
public class ReliableTransmissionTest {
@Autowired
private ReliableProducr producr;
@Autowired
private RabbitMQService rabbitMQService;
/**
* 正常情況測試
* @throws Exception
*/
@Test
public void reliableTransmissionTest() throws Exception {
Map<String, String> map = new HashMap<>();
map.put("name", "xingheng");
producr.send(map);
}
/**
* 異常情況測試
* @throws Exception
*/
@Test
public void reliableTransmissionFailTest() throws Exception {
Map<String, String> map = new HashMap<>();
map.put("name", "xingheng");
rabbitMQService.send(MyConstant.ERROR_EXCHANGE, MyConstant.MY_QUEUE_THREE, map);
}
}
注意事項:
1.配置中要開啟發布者確認,類似這樣:
spring: rabbitmq: publisher-confirms: true
2.如果要測試異常情況只需要將訊息傳送到一個不存在的交換機即可
3.注意消費端冪等
簡單測試結果:
在重試一次後,會將它傳送到正確的交換機,於是傳送成功