1. 程式人生 > 實用技巧 >RabbitMQ基礎元件封裝實踐

RabbitMQ基礎元件封裝實踐

1、實現基礎元件實現關鍵點

基礎元件封裝設計-迅速訊息傳送

基礎元件封裝設計-確認訊息傳送

基礎元件封裝設計-延遲訊息傳送

2、基礎元件需要實現的功能

迅速、延遲、可靠

訊息非同步化序列化

連結池化、高效能

完備的補償機制

3、建立工程

rabbit-common : 公共模組

rabbit-api: 提供給第三方使用

rabbit-core-producer: 用於傳送訊息(核心)

rabbit-task: 用於做可靠性處理

首先建立rabbit-parent工程

pom.xml 如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <modules>
        <module>rabbit-common</module>
        <module>rabbit-api</module>
        <module>rabbit-core-producer</module>
        <module>rabbit-task</module>
    </modules>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.16.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>rabbit-parent</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>
    <name>rabbit-parent</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>8</java.version>
        <fasterxml.uuid.version>3.1.4</fasterxml.uuid.version>
        <org.codehaus.jackson.version>1.9.13</org.codehaus.jackson.version>
        <druid.version>1.0.24</druid.version>
        <!--  噹噹網:分散式定時任務-->
        <elastic-job.version>2.1.4</elastic-job.version>
        <guava.version>20.0</guava.version>
        <commons-langs3.version>3.3.1</commons-langs3.version>
        <commons-io.version>2.4</commons-io.version>
        <commons-collections.version>3.2.2</commons-collections.version>
        <curator.version>2.11.0</curator.version>
        <fastjson.version>1.1.26</fastjson.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>${guava.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>${commons-io.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
       <!--        對json格式的支援-->
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>${org.codehaus.jackson.version}</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.uuid</groupId>
            <artifactId>java-uuid-generator</artifactId>
            <version>${fasterxml.uuid.version}</version>
        </dependency>

    </dependencies>
</project>

  

二、rabbit-api: 提供給第三方使用

1、建立Message類

@Data
public class Message implements Serializable {


    private static final long serialVersionUID = -6142443586615984421L;

    /**
     * 訊息的唯一ID
     */
    private String messageId;


    /**
     * 訊息的主題
     */
    private String topic;

    /**
     * 訊息的路由規則
     */
    private String routingKey = "";

    /**
     * 訊息的附加屬性
     */
    private Map<String,Object> attributes = new HashMap<>();

    /**
     * 延遲訊息的引數配置
     */
    private int delayMills;

    /**
     * 訊息型別.預設為Confirm訊息
     */
    private String messageType = MessageType.CONFIRM;

    public Message(){

    }

    public Message(String messageId, String topic, String routingKey, Map<String, Object> attributes, int delayMills, String messageType) {
        this.messageId = messageId;
        this.topic = topic;
        this.routingKey = routingKey;
        this.attributes = attributes;
        this.delayMills = delayMills;
        this.messageType = messageType;
    }
}

  

2、建立訊息型別類

public final class MessageType {

    /**
     * 迅速訊息: 不需要保障訊息的可靠性,也不需要做confirm確認
     */
    public  final static String RAPID = "0";


    /**
     * 確認訊息: 不需要保障訊息的可靠性,但是會做訊息confirm確認
     */
    public  final static String CONFIRM = "1";


    /**
     * 可靠性訊息: 一定訊息保障訊息100%投遞,不允許有任何的訊息丟失
     * PS: 保障資料庫和所發的訊息的原子性操作(最終一致的)
     */
    public  final static String RELIANT = "2";
}

  

3、構建訊息類,採用構造者模式

public class MessageBuilder {

    /**
     * 訊息的唯一ID
     */
    private String messageId;


    /**
     * 訊息的主題
     */
    private String topic;

    /**
     * 訊息的路由規則
     */
    private String routingKey = "";

    /**
     * 訊息的附加屬性
     */
    private Map<String,Object> attributes = new HashMap<>();

    /**
     * 延遲訊息的引數配置
     */
    private int delayMills;

    /**
     * 訊息型別.預設為Confirm訊息
     */
    private String messageType = MessageType.CONFIRM;


    private MessageBuilder(){

    }

    public static MessageBuilder create(){
        return new MessageBuilder();
    }

    public MessageBuilder withMessageId(String messageId){
        this.messageId = messageId;
        return  this;
    }

    public MessageBuilder withTopic(String topic){
        this.topic = topic;
        return  this;
    }

    public MessageBuilder withRoutingKey(String routingKey){
        this.routingKey = routingKey;
        return  this;
    }

    public MessageBuilder withAttributes(Map<String,Object> attributes){
        this.attributes = attributes;
        return  this;
    }

    public MessageBuilder withAttribute(String key, Object object){
        this.attributes.put(key,object);
        return  this;
    }

    public MessageBuilder withMessageId(int delayMills){
        this.delayMills = delayMills;
        return  this;
    }

    public MessageBuilder withMessageType(String messageType){
        this.messageType = messageType;
        return  this;
    }

    public Message build(){
        if(messageId == null){
            messageId = UUID.randomUUID().toString();
        }
        if(topic == null){
            throw new MessageRunTimeException("this topic is null");
        }
        Message message = new Message(messageId,topic,routingKey,attributes,delayMills,messageType);
        return  message;
    }


}

  

4、建立異常類

MessageException類

public class MessageException  extends  Exception{


    private static final long serialVersionUID = -8283764568495174322L;

    public MessageException(){
        super();
    }

    public MessageException(String message){
        super(message);
    }

    public MessageException(String message, Throwable cause){
        super(message, cause);
    }

    public MessageException( Throwable cause){
        super( cause);
    }
}

  

建立MessageRunTimeException 類

public class MessageRunTimeException extends  RuntimeException{


    private static final long serialVersionUID = -2591307228826723236L;

    public MessageRunTimeException(){
        super();
    }

    public MessageRunTimeException(String message){
        super(message);
    }

    public MessageRunTimeException(String message, Throwable cause){
        super(message, cause);
    }

    public MessageRunTimeException(Throwable cause){
        super( cause);
    }
}

  

5、建立訊息生成者介面

public interface MessageProducer {

    void send(Message message) throws MessageRunTimeException;

    /**
     * 訊息的傳送,附帶SendCallback回撥執行響應的業務邏輯處理
     * @param message
     * @throws MessageRunTimeException
     */
    void send(Message message, SendCallback sendCallback) throws MessageRunTimeException;

    void send(List<Message> messages) throws MessageRunTimeException;
}

  

傳送回撥介面。回撥函式處理

public interface SendCallback {

    void onSuccess();

    void onFailure();
}

  

6、建立消費者監聽訊息類

public interface MessageListener {

    void onMessage(Message message);
}

 

三、rabbit-common : 公共模組 

1、新增依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rabbit-parent</artifactId>
        <groupId>com.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>rabbit-common</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.example</groupId>
            <artifactId>rabbit-api</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.1.1</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.10</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
    </dependencies>

</project>

  

2、建立序列化和反序列化介面

public interface Serializer {

    byte[] serializeRaw(Object data);

    String serialize(Object data);

    byte[] deserializeRaw(Object data);

    <T> T deserialize(String content);

    <T> T deserialize(byte[] content);

}

  

3、建立介面SerializerFactory

public interface SerializerFactory {
    Serializer create();
}

  

4、序列化反序列化實現

public class JacksonSerializer implements Serializer {

    private static final Logger LOGGER = LoggerFactory.getLogger(JacksonSerializer.class);
    private static final ObjectMapper mapper = new ObjectMapper();

    static {
        mapper.disable(SerializationFeature.INDENT_OUTPUT);
        mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
        mapper.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true);
        mapper.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
        mapper.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
        mapper.configure(JsonParser.Feature.ALLOW_NUMERIC_LEADING_ZEROS, true);
        mapper.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true);
        mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, true);
        mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
    }

    private final JavaType type;

    private JacksonSerializer(JavaType type) {
        this.type = type;
    }

    public JacksonSerializer(Type type) {
        this.type = mapper.getTypeFactory().constructType(type);
    }

    public static JacksonSerializer createParametricType(Class<?> cls) {
        return new JacksonSerializer(mapper.getTypeFactory().constructType(cls));
    }

    @Override
    public byte[] serializeRaw(Object data) {
        try {
            return mapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            LOGGER.error("序列化出錯", e);
        }
        return null;
    }

    @Override
    public String serialize(Object data) {
        try {
            return mapper.writeValueAsString(data);
        } catch (JsonProcessingException e) {
            LOGGER.error("序列化出錯", e);
        }
        return null;
    }

    @Override
    public byte[] deserializeRaw(Object data) {
        return new byte[0];
    }

    @Override
    public <T> T deserialize(String content) {
        try {
            return mapper.readValue(content, type);
        } catch (IOException e) {
            LOGGER.error("反序列化出錯", e);
        }
        return null;
    }

    @Override
    public <T> T deserialize(byte[] content) {
        try {
            return mapper.readValue(content, type);
        } catch (IOException e) {
            LOGGER.error("反序列化出錯", e);
        }
        return null;
    }

}

  

5、序列化工廠實現

public class JacksonSerializerFactory implements SerializerFactory{

	public static final SerializerFactory INSTANCE = new JacksonSerializerFactory();
	
	@Override
	public Serializer create() {
		return JacksonSerializer.createParametricType(Message.class);
	}

}

  

6、普通訊息Convert

public class GenericMessageConverter implements MessageConverter {
	
	private Serializer serializer;

	public GenericMessageConverter(Serializer serializer) {
		Preconditions.checkNotNull(serializer);
		this.serializer = serializer;
	}

	/**
	 * 使用序列化: org.springframework.amqp.core.Message 轉為Object
	 */
	@Override
	public Object fromMessage(org.springframework.amqp.core.Message message) throws MessageConversionException {
		return this.serializer.deserialize(message.getBody());
	}

	/**
	 * 使用反序列化 將自己的object轉換為org.springframework.amqp.core.Message
	 */
	@Override
	public org.springframework.amqp.core.Message toMessage(Object object, MessageProperties messageProperties)
			throws MessageConversionException {
		return new org.springframework.amqp.core.Message(this.serializer.serializeRaw(object), messageProperties);
	}

}

  

  

7、擴充套件訊息Convert,

引用了GenericMessageConverter,可以理解為裝飾者模式。比如設定過期時間。

public class RabbitMessageConverter implements MessageConverter {

	private GenericMessageConverter delegate;
	
//	private final String delaultExprie = String.valueOf(24 * 60 * 60 * 1000);
	
	public RabbitMessageConverter(GenericMessageConverter genericMessageConverter) {
		Preconditions.checkNotNull(genericMessageConverter);
		this.delegate = genericMessageConverter;
	}
	
	@Override
	public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
//		messageProperties.setExpiration(delaultExprie);
		com.example.api.Message message = (com.example.api.Message)object;
		messageProperties.setDelay(message.getDelayMills());
		return this.delegate.toMessage(object, messageProperties);
	}

	@Override
	public Object fromMessage(Message message) throws MessageConversionException {
		com.example.api.Message msg = (com.example.api.Message) this.delegate.fromMessage(message);
		return msg;
	}

}

  

四、rabbit-core-producer

1、增加依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rabbit-parent</artifactId>
        <groupId>com.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>rabbit-core-producer</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.example</groupId>
            <artifactId>rabbit-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>7.0.0-beta1</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

</project>

 

2、自動裝配

建立META-INF資料夾下建立spring.facories檔案

內容為:

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.example.producer.autoconfigure.RabbitProducerAutoConfiguration

  

3、傳送訊息的實際實現類

@Component
public class ProducerClient implements MessageProducer {

    @Autowired
    RabbitBroker rabbitBroker;

    @Override
    public void send(Message message) throws MessageRunTimeException {
        Preconditions.checkNotNull(message.getTopic());
        String messageType = message.getMessageType();
        switch (messageType){
            case MessageType.RAPID:
                rabbitBroker.rapidSend(message);
                break;
            case MessageType.CONFIRM:
                rabbitBroker.confirmSend(message);
                break;
            case MessageType.RELIANT:
                rabbitBroker.reliantSend(message);
                break;
            default:
                break;
        }
    }

    @Override
    public void send(Message message, SendCallback sendCallback) throws MessageRunTimeException {

    }

    @Override
    public void send(List<Message> messages) throws MessageRunTimeException {

    }
}

  

4、建立介面RabbitBroker

作用: 具體傳送不同種類訊息的介面

public interface RabbitBroker {

    void rapidSend(Message message);
    void confirmSend(Message message);
    void reliantSend(Message message);
    void sendMessages();
}

  

建立實現類RabbitBrokerImpl

@Slf4j
@Component
public class RabbitBrokerImpl implements  RabbitBroker {

    @Autowired
    private RabbitTemplateContainer rabbitTemplateContainer;


    @Autowired
    private MessageStoreService messageStoreService;

    @Override
    public void rapidSend(Message message) {
        message.setMessageType(MessageType.RAPID);
        sendKernel(message);
    }

    /**
     * 傳送訊息的核心方法, 使用非同步執行緒池進行傳送訊息
     * @param message
     */
    private void sendKernel(Message message) {
        AsyncBaseQueue.submit(() -> {
            CorrelationData correlationData = new CorrelationData(
                    String.format("%s#%s", message.getMessageId(), System.currentTimeMillis()));
            String topic = message.getTopic();
            String routingKey = message.getRoutingKey();
            RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getTemplate(message);
            rabbitTemplate.convertAndSend(topic,routingKey, message, correlationData);
            log.info("#RabbitBrokerImpl.sendKernel# 傳送訊息到RabbitMQ,messageId={}", message.getMessageId());
        });

    }

    @Override
    public void confirmSend(Message message) {
        message.setMessageType(MessageType.CONFIRM);
        sendKernel(message);
    }

    @Override
    public void reliantSend(Message message) {
        message.setMessageType(MessageType.RELIANT);
        BrokerMessage bm = messageStoreService.selectByMessageId(message.getMessageId());
        if(bm == null) {
            //1. 把資料庫的訊息傳送日誌先記錄好
            Date now = new Date();
            BrokerMessage brokerMessage = new BrokerMessage();
            brokerMessage.setMessageId(message.getMessageId());
            brokerMessage.setStatus(BrokerMessageStatus.SENDING.getCode());
            //tryCount 在最開始傳送的時候不需要進行設定
            brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerMessageConst.TIMEOUT));
            brokerMessage.setCreateTime(now);
            brokerMessage.setUpdateTime(now);
            brokerMessage.setMessage(message);
            messageStoreService.insert(brokerMessage);
        }
        //2. 執行真正的傳送訊息邏輯
        sendKernel(message);
    }

    @Override
    public void sendMessages() {

    }
}

  

建立非同步訊息佇列(使用執行緒池)

@Slf4j
public class AsyncBaseQueue {

    private static final int THREAD_SIZE = Runtime.getRuntime().availableProcessors();

    private static final int QUEUE_SIZE = 10000;

    private static ExecutorService senderAsync = new ThreadPoolExecutor(THREAD_SIZE, THREAD_SIZE,
            60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(QUEUE_SIZE),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("rabbitmq_client_async_sender");
                    return t;
                }
            },
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    log.error("async sender is error rejected, runnable:{}, executor:{}", r, executor);
                }
            }
    );

    public static  void submit(Runnable runnable){
        senderAsync.submit(runnable);
    }

}

  

池化RabbitTemplate

/**
 * @description: 池化RabbitTemplate
 * 每一個topic對應一個RabbitTemplate的好處
 * 1、提高發送的效率
 * 2、可以根據不同的需求定製不同的RabbitTemplate,比如每一個topic都有自己的routingKey規則
 * @author:
 * @create: 2020-08-01 17:22
 */
@Slf4j
@Component
public class RabbitTemplateContainer implements RabbitTemplate.ConfirmCallback {

    private Map<String /* TOPIC */, RabbitTemplate> rabbitTemplateMap = Maps.newConcurrentMap();

    private Splitter splitter = Splitter.on("#");

    private SerializerFactory serializerFactory = JacksonSerializerFactory.INSTANCE;

    @Autowired
    private ConnectionFactory connectionFactory;


    @Autowired
    private MessageStoreService messageStoreService;

    public RabbitTemplate getTemplate(Message message) throws MessageRunTimeException {
        Preconditions.checkNotNull(message);
        String topic = message.getTopic();
        RabbitTemplate rabbitTemplate = rabbitTemplateMap.get(topic);
        if(rabbitTemplate != null){
            return rabbitTemplate;
        }
        log.info("topic={} is not exists, create", topic);
        RabbitTemplate newTemplate = new RabbitTemplate(connectionFactory);
        newTemplate.setExchange(topic);
        newTemplate.setRoutingKey(message.getRoutingKey());
        newTemplate.setRetryTemplate(new RetryTemplate());

        // 新增序列化反序列化和converter物件
        Serializer serializer = serializerFactory.create();
        GenericMessageConverter gmc = new GenericMessageConverter(serializer);
        RabbitMessageConverter rmc = new RabbitMessageConverter(gmc);
        newTemplate.setMessageConverter(rmc);

        String messageType = message.getMessageType();
        if(!MessageType.RAPID.equals(messageType)){
            newTemplate.setConfirmCallback(this);
        }
        rabbitTemplateMap.putIfAbsent(topic,newTemplate);

        return  rabbitTemplateMap.get(topic);

    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        //訊息應答
       List<String> strings =  splitter.splitToList(correlationData.getId());
       String messageId = strings.get(0);
       long sendTime = Long.parseLong(strings.get(1));
        String messageType = strings.get(2);
       if(ack){
           log.info("傳送訊息成功,confirm messageId={}, sendTime={}" , messageId, sendTime);
       }else {
           log.info("傳送訊息失敗,confirm messageId={}, sendTime={}" , messageId, sendTime);
       }
        if(ack) {
            //	當Broker 返回ACK成功時, 就是更新一下日誌表裡對應的訊息傳送狀態為 SEND_OK

            // 	如果當前訊息型別為reliant 我們就去資料庫查詢並進行更新
            if(MessageType.RELIANT.endsWith(messageType)) {
                this.messageStoreService.succuess(messageId);
            }
            log.info("傳送訊息成功,confirm messageId={}, sendTime={}" , messageId, sendTime);
        } else {
            log.info("傳送訊息失敗,confirm messageId={}, sendTime={}" , messageId, sendTime);

        }
    }
}