Spring Boot優雅使用RocketMQ的方法例項
前言
MQ,是一種跨程序的通訊機制,用於上下游傳遞訊息。在傳統的網際網路架構中通常使用MQ來對上下游來做解耦合。
舉例:當A系統對B系統進行訊息通訊,如A系統釋出一條系統公告,B系統可以訂閱該頻道進行系統公告同步,整個過程中A系統並不關係B系統會不會同步,由訂閱該頻道的系統自行處理。
什麼是RocketMQ?#
官方說明:
隨著使用越來越多的佇列和虛擬主題,ActiveMQ IO模組遇到了瓶頸。我們盡力通過節流,斷路器或降級來解決此問題,但效果不佳。因此,我們那時開始關注流行的訊息傳遞解決方案Kafka。不幸的是,Kafka不能滿足我們的要求,特別是在低延遲和高可靠性方面。
看到這裡可以很清楚的知道RcoketMQ 是一款低延遲、高可靠、可伸縮、易於使用的訊息中介軟體。
具有以下特性:
- 支援釋出/訂閱(Pub/Sub)和點對點(P2P)訊息模型
- 能夠保證嚴格的訊息順序,在一個佇列中可靠的先進先出(FIFO)和嚴格的順序傳遞
- 提供豐富的訊息拉取模式,支援拉(pull)和推(push)兩種訊息模式
- 單一佇列百萬訊息的堆積能力,億級訊息堆積能力
- 支援多種訊息協議,如 JMS、MQTT 等
- 分散式高可用的部署架構,滿足至少一次訊息傳遞語義
RocketMQ環境安裝#
下載地址:https://rocketmq.apache.org/dowloading/releases/
從官方下載二進位制或者原始碼來進行使用。原始碼編譯需要Maven3.2x,JDK8
在根目錄進行打包:
mvn -Prelease-all -DskipTests clean packager -U
distribution/target/apache-rocketmq資料夾中會存在一個資料夾版,zip,tar三個可執行的完整程式。
使用rocketmq-4.6.0.zip:
- 啟動名稱服務 mqnamesrv.cmd
- 啟動資料中心 mqbroker.cmd -n localhost:9876
SpringBoot環境中使用RocketMQ#
SpringBoot 入門:https://www.jb51.net/article/177449.htm
SpringBoot 常用start:https://www.jb51.net/article/177451.htm
當前環境版本為:
- SpringBoot 2.0.6.RELEASE
- SpringCloud Finchley.RELEASE
- SpringCldod Alibaba 0.2.1.RELEASE
- RocketMQ 4.3.0
在專案工程中匯入:
<!-- MQ Begin --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>${rocketmq.version}</version> </dependency> <!-- MQ End -->
由於我們這邊已經有工程了所以就不在進行建立這種過程了。主要是看看如何使用RocketMQ。
建立RocketMQProperties配置屬性類,類中內容如下:
@ConfigurationProperties(prefix = "rocketmq") public class RocketMQProperties { private boolean isEnable = false; private String namesrvAddr = "localhost:9876"; private String groupName = "default"; private int producerMaxMessageSize = 1024; private int producerSendMsgTimeout = 2000; private int producerRetryTimesWhenSendFailed = 2; private int consumerConsumeThreadMin = 5; private int consumerConsumeThreadMax = 30; private int consumerConsumeMessageBatchMaxSize = 1; //省略get set }
現在我們所有子系統中的生產者,消費者對應:
isEnable 是否開啟mq
namesrvAddr 叢集地址
groupName 分組名稱
設定為統一已方便系統對接,如有其它需求在進行擴充套件,類中我們已經給了預設值也可以在配置檔案或配置中心中獲取配置,配置如下:
#傳送同一類訊息的設定為同一個group,保證唯一,預設不需要設定,rocketmq會使用ip@pid(pid代表jvm名字)作為唯一標示 rocketmq.groupName=please_rename_unique_group_name #是否開啟自動配置 rocketmq.isEnable=true #mq的nameserver地址 rocketmq.namesrvAddr=127.0.0.1:9876 #訊息最大長度 預設1024*4(4M) rocketmq.producer.maxMessageSize=4096 #傳送訊息超時時間,預設3000 rocketmq.producer.sendMsgTimeout=3000 #傳送訊息失敗重試次數,預設2 rocketmq.producer.retryTimesWhenSendFailed=2 #消費者執行緒數量 rocketmq.consumer.consumeThreadMin=5 rocketmq.consumer.consumeThreadMax=32 #設定一次消費訊息的條數,預設為1條 rocketmq.consumer.consumeMessageBatchMaxSize=1
建立消費者介面 RocketConsumer.java 該介面使用者約束消費者需要的核心步驟:
/** * 消費者介面 * * @author SimpleWu * */ public interface RocketConsumer { /** * 初始化消費者 */ public abstract void init(); /** * 註冊監聽 * * @param messageListener */ public void registerMessageListener(MessageListener messageListener); }
建立抽象消費者 AbstractRocketConsumer.java:
/** * 消費者基本資訊 * * @author SimpelWu */ public abstract class AbstractRocketConsumer implements RocketConsumer { protected String topics; protected String tags; protected MessageListener messageListener; protected String consumerTitel; protected MQPushConsumer mqPushConsumer; /** * 必要的資訊 * * @param topics * @param tags * @param consumerTitel */ public void necessary(String topics,String tags,String consumerTitel) { this.topics = topics; this.tags = tags; this.consumerTitel = consumerTitel; } public abstract void init(); @Override public void registerMessageListener(MessageListener messageListener) { this.messageListener = messageListener; } }
在類中我們必須指定這個topics,tags與訊息監聽邏輯
public abstract void init();
該方法是用於初始化消費者,由子類實現。
接下來我們編寫自動配置類RocketMQConfiguation.java,該類使用者初始化一個預設的生產者連線,以及載入所有的消費者。
@EnableConfigurationProperties({ RocketMQProperties.class }) 使用該配置檔案
@Configuration 標註為配置類
@ConditionalOnProperty(prefix = "rocketmq",value = "isEnable",havingValue = "true") 只有當配置中指定rocketmq.isEnable = true的時候才會生效
核心內容如下:
/** * mq配置 * * @author SimpleWu */ @Configuration @EnableConfigurationProperties({ RocketMQProperties.class }) @ConditionalOnProperty(prefix = "rocketmq",havingValue = "true") public class RocketMQConfiguation { private RocketMQProperties properties; private ApplicationContext applicationContext; private Logger log = LoggerFactory.getLogger(RocketMQConfiguation.class); public RocketMQConfiguation(RocketMQProperties properties,ApplicationContext applicationContext) { this.properties = properties; this.applicationContext = applicationContext; } /** * 注入一個預設的消費者 * @return * @throws MQClientException */ @Bean public DefaultMQProducer getRocketMQProducer() throws MQClientException { if (StringUtils.isEmpty(properties.getGroupName())) { throw new MQClientException(-1,"groupName is blank"); } if (StringUtils.isEmpty(properties.getNamesrvAddr())) { throw new MQClientException(-1,"nameServerAddr is blank"); } DefaultMQProducer producer; producer = new DefaultMQProducer(properties.getGroupName()); producer.setNamesrvAddr(properties.getNamesrvAddr()); // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY"); // 如果需要同一個jvm中不同的producer往不同的mq叢集傳送訊息,需要設定不同的instanceName // producer.setInstanceName(instanceName); producer.setMaxMessageSize(properties.getProducerMaxMessageSize()); producer.setSendMsgTimeout(properties.getProducerSendMsgTimeout()); // 如果傳送訊息失敗,設定重試次數,預設為2次 producer.setRetryTimesWhenSendFailed(properties.getProducerRetryTimesWhenSendFailed()); try { producer.start(); log.info("producer is start ! groupName:{},namesrvAddr:{}",properties.getGroupName(),properties.getNamesrvAddr()); } catch (MQClientException e) { log.error(String.format("producer is error {}",e.getMessage(),e)); throw e; } return producer; } /** * SpringBoot啟動時載入所有消費者 */ @PostConstruct public void initConsumer() { Map<String,AbstractRocketConsumer> consumers = applicationContext.getBeansOfType(AbstractRocketConsumer.class); if (consumers == null || consumers.size() == 0) { log.info("init rocket consumer 0"); } Iterator<String> beans = consumers.keySet().iterator(); while (beans.hasNext()) { String beanName = (String) beans.next(); AbstractRocketConsumer consumer = consumers.get(beanName); consumer.init(); createConsumer(consumer); log.info("init success consumer title {},toips {},tags {}",consumer.consumerTitel,consumer.tags,consumer.topics); } } /** * 通過消費者信心建立消費者 * * @param consumerPojo */ public void createConsumer(AbstractRocketConsumer arc) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.properties.getGroupName()); consumer.setNamesrvAddr(this.properties.getNamesrvAddr()); consumer.setConsumeThreadMin(this.properties.getConsumerConsumeThreadMin()); consumer.setConsumeThreadMax(this.properties.getConsumerConsumeThreadMax()); consumer.registerMessageListener(arc.messageListenerConcurrently); /** * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費 如果非第一次啟動,那麼按照上次消費的位置繼續消費 */ // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /** * 設定消費模型,叢集還是廣播,預設為叢集 */ // consumer.setMessageModel(MessageModel.CLUSTERING); /** * 設定一次消費訊息的條數,預設為1條 */ consumer.setConsumeMessageBatchMaxSize(this.properties.getConsumerConsumeMessageBatchMaxSize()); try { consumer.subscribe(arc.topics,arc.tags); consumer.start(); arc.mqPushConsumer=consumer; } catch (MQClientException e) { log.error("info consumer title {}",arc.consumerTitel,e); } } }
然後在src/main/resources資料夾中建立目錄與檔案META-INF/spring.factories裡面新增自動配置類即可開啟啟動配置,我們只需要匯入依賴即可:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.xcloud.config.rocketmq.RocketMQConfiguation
接下來在服務中匯入依賴,然後通過我們的抽象類獲取所有必要資訊對消費者進行建立,該步驟會在所有消費者初始化完成後進行,且只會管理是Spring Bean的消費者。
下面我們看看如何建立一個消費者,建立消費者的步驟非常簡單,只需要繼承AbstractRocketConsumer然後再加上Spring的@Component就能夠完成消費者的建立,我們可以在類中自定義消費的主題與標籤。
在專案可以根據需求當消費者建立失敗的時候是否繼續啟動工程。
建立一個預設的消費者 DefaultConsumerMQ.java
@Component public class DefaultConsumerMQ extends AbstractRocketConsumer { /** * 初始化消費者 */ @Override public void init() { // 設定主題,標籤與消費者標題 super.necessary("TopicTest","*","這是標題"); //消費者具體執行邏輯 registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) { msgs.forEach(msg -> { System.out.printf("consumer message boyd %s %n",new String(msg.getBody())); }); // 標記該訊息已經被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); } }
super.necessary("TopicTest","這是標題"); 是必須要設定的,代表該消費者監聽TopicTest主題下所有tags,標題那個欄位是我自己定義的,所以對於該配置來說沒什麼意義。
我們可以在這裡注入Spring的Bean來進行任意邏輯處理。
建立一個訊息傳送類進行測試
@Override public String qmtest(@PathVariable("name")String name) throws MQClientException,RemotingException,MQBrokerException,InterruptedException,UnsupportedEncodingException { Message msg = new Message("TopicTest","tags1",name.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 傳送訊息到一個Broker SendResult sendResult = defaultMQProducer.send(msg); // 通過sendResult返回訊息是否成功送達 System.out.printf("%s%n",sendResult); return null; }
我們來通過Http請求測試:
http://localhost:10001/demo/base/mq/hello consumer message boyd hello http://localhost:10001/demo/base/mq/嘿嘿嘿嘿嘿 consumer message boyd 嘿嘿嘿嘿嘿
好了到這裡簡單的start算是設計完成了,後面還有一些:順序訊息生產,順序消費訊息,非同步訊息生產等一系列功能,官人可參照官方去自行處理。
- ActiveMQ 沒經過大規模吞吐量場景的驗證,社群不高不活躍。
- RabbitMQ 叢集動態擴充套件麻煩,且與當前程式語言不至於難以定製化。
- kafka 支援主要的MQ功能,功能無法達到程式需求的要求,所以不使用,且與當前程式語言不至於難以定製化。
- rocketMQ 經過全世界的女人的洗禮,已經很強大;MQ功能較為完善,還是分散式的,擴充套件性好;支援複雜MQ業務場景。(業務複雜可做首選)
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對我們的支援。