RabbitMQ基礎元件封裝實踐
1、實現基礎元件實現關鍵點
基礎元件封裝設計-迅速訊息傳送
基礎元件封裝設計-確認訊息傳送
基礎元件封裝設計-延遲訊息傳送
2、基礎元件需要實現的功能
迅速、延遲、可靠
訊息非同步化序列化
連結池化、高效能
完備的補償機制
3、建立工程
rabbit-common : 公共模組
rabbit-api: 提供給第三方使用
rabbit-core-producer: 用於傳送訊息(核心)
rabbit-task: 用於做可靠性處理
首先建立rabbit-parent工程
pom.xml 如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <modules> <module>rabbit-common</module> <module>rabbit-api</module> <module>rabbit-core-producer</module> <module>rabbit-task</module> </modules> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.16.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>rabbit-parent</artifactId> <version>1.0-SNAPSHOT</version> <packaging>pom</packaging> <name>rabbit-parent</name> <description>Demo project for Spring Boot</description> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>8</java.version> <fasterxml.uuid.version>3.1.4</fasterxml.uuid.version> <org.codehaus.jackson.version>1.9.13</org.codehaus.jackson.version> <druid.version>1.0.24</druid.version> <!-- 噹噹網:分散式定時任務--> <elastic-job.version>2.1.4</elastic-job.version> <guava.version>20.0</guava.version> <commons-langs3.version>3.3.1</commons-langs3.version> <commons-io.version>2.4</commons-io.version> <commons-collections.version>3.2.2</commons-collections.version> <curator.version>2.11.0</curator.version> <fastjson.version>1.1.26</fastjson.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>${commons-io.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <!-- 對json格式的支援--> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> <version>${org.codehaus.jackson.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <dependency> <groupId>com.fasterxml.uuid</groupId> <artifactId>java-uuid-generator</artifactId> <version>${fasterxml.uuid.version}</version> </dependency> </dependencies> </project>
二、rabbit-api: 提供給第三方使用
1、建立Message類
@Data public class Message implements Serializable { private static final long serialVersionUID = -6142443586615984421L; /** * 訊息的唯一ID */ private String messageId; /** * 訊息的主題 */ private String topic; /** * 訊息的路由規則 */ private String routingKey = ""; /** * 訊息的附加屬性 */ private Map<String,Object> attributes = new HashMap<>(); /** * 延遲訊息的引數配置 */ private int delayMills; /** * 訊息型別.預設為Confirm訊息 */ private String messageType = MessageType.CONFIRM; public Message(){ } public Message(String messageId, String topic, String routingKey, Map<String, Object> attributes, int delayMills, String messageType) { this.messageId = messageId; this.topic = topic; this.routingKey = routingKey; this.attributes = attributes; this.delayMills = delayMills; this.messageType = messageType; } }
2、建立訊息型別類
public final class MessageType { /** * 迅速訊息: 不需要保障訊息的可靠性,也不需要做confirm確認 */ public final static String RAPID = "0"; /** * 確認訊息: 不需要保障訊息的可靠性,但是會做訊息confirm確認 */ public final static String CONFIRM = "1"; /** * 可靠性訊息: 一定訊息保障訊息100%投遞,不允許有任何的訊息丟失 * PS: 保障資料庫和所發的訊息的原子性操作(最終一致的) */ public final static String RELIANT = "2"; }
3、構建訊息類,採用構造者模式
public class MessageBuilder { /** * 訊息的唯一ID */ private String messageId; /** * 訊息的主題 */ private String topic; /** * 訊息的路由規則 */ private String routingKey = ""; /** * 訊息的附加屬性 */ private Map<String,Object> attributes = new HashMap<>(); /** * 延遲訊息的引數配置 */ private int delayMills; /** * 訊息型別.預設為Confirm訊息 */ private String messageType = MessageType.CONFIRM; private MessageBuilder(){ } public static MessageBuilder create(){ return new MessageBuilder(); } public MessageBuilder withMessageId(String messageId){ this.messageId = messageId; return this; } public MessageBuilder withTopic(String topic){ this.topic = topic; return this; } public MessageBuilder withRoutingKey(String routingKey){ this.routingKey = routingKey; return this; } public MessageBuilder withAttributes(Map<String,Object> attributes){ this.attributes = attributes; return this; } public MessageBuilder withAttribute(String key, Object object){ this.attributes.put(key,object); return this; } public MessageBuilder withMessageId(int delayMills){ this.delayMills = delayMills; return this; } public MessageBuilder withMessageType(String messageType){ this.messageType = messageType; return this; } public Message build(){ if(messageId == null){ messageId = UUID.randomUUID().toString(); } if(topic == null){ throw new MessageRunTimeException("this topic is null"); } Message message = new Message(messageId,topic,routingKey,attributes,delayMills,messageType); return message; } }
4、建立異常類
MessageException類
public class MessageException extends Exception{ private static final long serialVersionUID = -8283764568495174322L; public MessageException(){ super(); } public MessageException(String message){ super(message); } public MessageException(String message, Throwable cause){ super(message, cause); } public MessageException( Throwable cause){ super( cause); } }
建立MessageRunTimeException 類
public class MessageRunTimeException extends RuntimeException{ private static final long serialVersionUID = -2591307228826723236L; public MessageRunTimeException(){ super(); } public MessageRunTimeException(String message){ super(message); } public MessageRunTimeException(String message, Throwable cause){ super(message, cause); } public MessageRunTimeException(Throwable cause){ super( cause); } }
5、建立訊息生成者介面
public interface MessageProducer { void send(Message message) throws MessageRunTimeException; /** * 訊息的傳送,附帶SendCallback回撥執行響應的業務邏輯處理 * @param message * @throws MessageRunTimeException */ void send(Message message, SendCallback sendCallback) throws MessageRunTimeException; void send(List<Message> messages) throws MessageRunTimeException; }
傳送回撥介面。回撥函式處理
public interface SendCallback { void onSuccess(); void onFailure(); }
6、建立消費者監聽訊息類
public interface MessageListener { void onMessage(Message message); }
三、rabbit-common : 公共模組
1、新增依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>rabbit-parent</artifactId> <groupId>com.example</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>rabbit-common</artifactId> <dependencies> <dependency> <groupId>com.example</groupId> <artifactId>rabbit-api</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.10</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> </dependencies> </project>
2、建立序列化和反序列化介面
public interface Serializer { byte[] serializeRaw(Object data); String serialize(Object data); byte[] deserializeRaw(Object data); <T> T deserialize(String content); <T> T deserialize(byte[] content); }
3、建立介面SerializerFactory
public interface SerializerFactory { Serializer create(); }
4、序列化反序列化實現
public class JacksonSerializer implements Serializer { private static final Logger LOGGER = LoggerFactory.getLogger(JacksonSerializer.class); private static final ObjectMapper mapper = new ObjectMapper(); static { mapper.disable(SerializationFeature.INDENT_OUTPUT); mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); mapper.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true); mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true); mapper.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true); mapper.configure(JsonParser.Feature.ALLOW_NUMERIC_LEADING_ZEROS, true); mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true); mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true); mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); } private final JavaType type; private JacksonSerializer(JavaType type) { this.type = type; } public JacksonSerializer(Type type) { this.type = mapper.getTypeFactory().constructType(type); } public static JacksonSerializer createParametricType(Class<?> cls) { return new JacksonSerializer(mapper.getTypeFactory().constructType(cls)); } @Override public byte[] serializeRaw(Object data) { try { return mapper.writeValueAsBytes(data); } catch (JsonProcessingException e) { LOGGER.error("序列化出錯", e); } return null; } @Override public String serialize(Object data) { try { return mapper.writeValueAsString(data); } catch (JsonProcessingException e) { LOGGER.error("序列化出錯", e); } return null; } @Override public byte[] deserializeRaw(Object data) { return new byte[0]; } @Override public <T> T deserialize(String content) { try { return mapper.readValue(content, type); } catch (IOException e) { LOGGER.error("反序列化出錯", e); } return null; } @Override public <T> T deserialize(byte[] content) { try { return mapper.readValue(content, type); } catch (IOException e) { LOGGER.error("反序列化出錯", e); } return null; } }
5、序列化工廠實現
public class JacksonSerializerFactory implements SerializerFactory{ public static final SerializerFactory INSTANCE = new JacksonSerializerFactory(); @Override public Serializer create() { return JacksonSerializer.createParametricType(Message.class); } }
6、普通訊息Convert
public class GenericMessageConverter implements MessageConverter { private Serializer serializer; public GenericMessageConverter(Serializer serializer) { Preconditions.checkNotNull(serializer); this.serializer = serializer; } /** * 使用序列化: org.springframework.amqp.core.Message 轉為Object */ @Override public Object fromMessage(org.springframework.amqp.core.Message message) throws MessageConversionException { return this.serializer.deserialize(message.getBody()); } /** * 使用反序列化 將自己的object轉換為org.springframework.amqp.core.Message */ @Override public org.springframework.amqp.core.Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { return new org.springframework.amqp.core.Message(this.serializer.serializeRaw(object), messageProperties); } }
7、擴充套件訊息Convert,
引用了GenericMessageConverter,可以理解為裝飾者模式。比如設定過期時間。
public class RabbitMessageConverter implements MessageConverter { private GenericMessageConverter delegate; // private final String delaultExprie = String.valueOf(24 * 60 * 60 * 1000); public RabbitMessageConverter(GenericMessageConverter genericMessageConverter) { Preconditions.checkNotNull(genericMessageConverter); this.delegate = genericMessageConverter; } @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { // messageProperties.setExpiration(delaultExprie); com.example.api.Message message = (com.example.api.Message)object; messageProperties.setDelay(message.getDelayMills()); return this.delegate.toMessage(object, messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { com.example.api.Message msg = (com.example.api.Message) this.delegate.fromMessage(message); return msg; } }
四、rabbit-core-producer
1、增加依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>rabbit-parent</artifactId> <groupId>com.example</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>rabbit-core-producer</artifactId> <dependencies> <dependency> <groupId>com.example</groupId> <artifactId>rabbit-common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <version>7.0.0-beta1</version> <scope>compile</scope> </dependency> </dependencies> </project>
2、自動裝配
建立META-INF資料夾下建立spring.facories檔案
內容為:
# Auto Configure org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.example.producer.autoconfigure.RabbitProducerAutoConfiguration
3、傳送訊息的實際實現類
@Component public class ProducerClient implements MessageProducer { @Autowired RabbitBroker rabbitBroker; @Override public void send(Message message) throws MessageRunTimeException { Preconditions.checkNotNull(message.getTopic()); String messageType = message.getMessageType(); switch (messageType){ case MessageType.RAPID: rabbitBroker.rapidSend(message); break; case MessageType.CONFIRM: rabbitBroker.confirmSend(message); break; case MessageType.RELIANT: rabbitBroker.reliantSend(message); break; default: break; } } @Override public void send(Message message, SendCallback sendCallback) throws MessageRunTimeException { } @Override public void send(List<Message> messages) throws MessageRunTimeException { } }
4、建立介面RabbitBroker
作用: 具體傳送不同種類訊息的介面
public interface RabbitBroker { void rapidSend(Message message); void confirmSend(Message message); void reliantSend(Message message); void sendMessages(); }
建立實現類RabbitBrokerImpl
@Slf4j @Component public class RabbitBrokerImpl implements RabbitBroker { @Autowired private RabbitTemplateContainer rabbitTemplateContainer; @Autowired private MessageStoreService messageStoreService; @Override public void rapidSend(Message message) { message.setMessageType(MessageType.RAPID); sendKernel(message); } /** * 傳送訊息的核心方法, 使用非同步執行緒池進行傳送訊息 * @param message */ private void sendKernel(Message message) { AsyncBaseQueue.submit(() -> { CorrelationData correlationData = new CorrelationData( String.format("%s#%s", message.getMessageId(), System.currentTimeMillis())); String topic = message.getTopic(); String routingKey = message.getRoutingKey(); RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getTemplate(message); rabbitTemplate.convertAndSend(topic,routingKey, message, correlationData); log.info("#RabbitBrokerImpl.sendKernel# 傳送訊息到RabbitMQ,messageId={}", message.getMessageId()); }); } @Override public void confirmSend(Message message) { message.setMessageType(MessageType.CONFIRM); sendKernel(message); } @Override public void reliantSend(Message message) { message.setMessageType(MessageType.RELIANT); BrokerMessage bm = messageStoreService.selectByMessageId(message.getMessageId()); if(bm == null) { //1. 把資料庫的訊息傳送日誌先記錄好 Date now = new Date(); BrokerMessage brokerMessage = new BrokerMessage(); brokerMessage.setMessageId(message.getMessageId()); brokerMessage.setStatus(BrokerMessageStatus.SENDING.getCode()); //tryCount 在最開始傳送的時候不需要進行設定 brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerMessageConst.TIMEOUT)); brokerMessage.setCreateTime(now); brokerMessage.setUpdateTime(now); brokerMessage.setMessage(message); messageStoreService.insert(brokerMessage); } //2. 執行真正的傳送訊息邏輯 sendKernel(message); } @Override public void sendMessages() { } }
建立非同步訊息佇列(使用執行緒池)
@Slf4j public class AsyncBaseQueue { private static final int THREAD_SIZE = Runtime.getRuntime().availableProcessors(); private static final int QUEUE_SIZE = 10000; private static ExecutorService senderAsync = new ThreadPoolExecutor(THREAD_SIZE, THREAD_SIZE, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(QUEUE_SIZE), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("rabbitmq_client_async_sender"); return t; } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { log.error("async sender is error rejected, runnable:{}, executor:{}", r, executor); } } ); public static void submit(Runnable runnable){ senderAsync.submit(runnable); } }
池化RabbitTemplate
/** * @description: 池化RabbitTemplate * 每一個topic對應一個RabbitTemplate的好處 * 1、提高發送的效率 * 2、可以根據不同的需求定製不同的RabbitTemplate,比如每一個topic都有自己的routingKey規則 * @author: * @create: 2020-08-01 17:22 */ @Slf4j @Component public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback { private Map<String /* TOPIC */, RabbitTemplate> rabbitTemplateMap = Maps.newConcurrentMap(); private Splitter splitter = Splitter.on("#"); private SerializerFactory serializerFactory = JacksonSerializerFactory.INSTANCE; @Autowired private ConnectionFactory connectionFactory; @Autowired private MessageStoreService messageStoreService; public RabbitTemplate getTemplate(Message message) throws MessageRunTimeException { Preconditions.checkNotNull(message); String topic = message.getTopic(); RabbitTemplate rabbitTemplate = rabbitTemplateMap.get(topic); if(rabbitTemplate != null){ return rabbitTemplate; } log.info("topic={} is not exists, create", topic); RabbitTemplate newTemplate = new RabbitTemplate(connectionFactory); newTemplate.setExchange(topic); newTemplate.setRoutingKey(message.getRoutingKey()); newTemplate.setRetryTemplate(new RetryTemplate()); // 新增序列化反序列化和converter物件 Serializer serializer = serializerFactory.create(); GenericMessageConverter gmc = new GenericMessageConverter(serializer); RabbitMessageConverter rmc = new RabbitMessageConverter(gmc); newTemplate.setMessageConverter(rmc); String messageType = message.getMessageType(); if(!MessageType.RAPID.equals(messageType)){ newTemplate.setConfirmCallback(this); } rabbitTemplateMap.putIfAbsent(topic,newTemplate); return rabbitTemplateMap.get(topic); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //訊息應答 List<String> strings = splitter.splitToList(correlationData.getId()); String messageId = strings.get(0); long sendTime = Long.parseLong(strings.get(1)); String messageType = strings.get(2); if(ack){ log.info("傳送訊息成功,confirm messageId={}, sendTime={}" , messageId, sendTime); }else { log.info("傳送訊息失敗,confirm messageId={}, sendTime={}" , messageId, sendTime); } if(ack) { // 當Broker 返回ACK成功時, 就是更新一下日誌表裡對應的訊息傳送狀態為 SEND_OK // 如果當前訊息型別為reliant 我們就去資料庫查詢並進行更新 if(MessageType.RELIANT.endsWith(messageType)) { this.messageStoreService.succuess(messageId); } log.info("傳送訊息成功,confirm messageId={}, sendTime={}" , messageId, sendTime); } else { log.info("傳送訊息失敗,confirm messageId={}, sendTime={}" , messageId, sendTime); } } }