1. 程式人生 > >RocketMQ訊息佇列還沒入門就想放棄(轉)

RocketMQ訊息佇列還沒入門就想放棄(轉)

https://www.jianshu.com/p/dd7ca2d10767

 

RocketMQ訊息佇列還沒入門就想放棄

96 FutaoSmile丶 關注

2018.10.19 20:32* 字數 1182 閱讀 616評論 0喜歡 7

20170712182011089.gif

題外話

什麼情況下的非同步操作需要使用訊息佇列而不是多執行緒?

  • 訊息佇列和多執行緒兩者並不衝突,多執行緒可以作為佇列的生產者和消費者。
    使用外部的訊息佇列時,第一是可以提高應用的穩定性,當程式fail後,已經寫入外部訊息佇列的資料依舊是儲存的,如果使用兩步commit的佇列的話,可以更加提高這個專案。
  • 用執行緒的話,會佔用主伺服器資源, 訊息佇列的話, 可以放到其他機器上執行, 讓主伺服器儘量多的服務其他請求。我個人認為, 如果使用者不急著知道結果的操作, 用訊息佇列, 否則再考慮用不用執行緒。
  • 解耦更充分,架構更合理
    • 多執行緒是在程式語言層面解決問題
    • 訊息佇列是在架構層面解決問題
      我認為架構層面解決問題是“覺悟比較高的方式“,理想情況下應該限制語言層面濫用多執行緒,能不用就不用
  • 不關心執行結果的都可以放到訊息佇列,不需要及時到達,放到訊息佇列中慢慢消化
  • 批量傳送郵件時,資料量龐大,如果使用多執行緒對系統不安全

訊息佇列能解決什麼問題

  • 非同步處理
  • 應用解耦
  • 流量削鋒
  • 日誌處理
  • 訊息通訊

# 環境介紹

注意儘量將rocketmq的1.應用版本,2.jar包依賴,3.recketmq-console-ng的jar包依賴版本保持一致,不然可能會出現非常詭異的問題
此專案所使用版本: rocketmq: 4.3.0,OS: win10

  1. jar包依賴
compile group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.3.0'
  1. 下載 rocketmq應用

    http://rocketmq.apache.org/release_notes/release-notes-4.3.0/

  2. windows下rocketmq環境配置與啟動
    參考 https://www.jianshu.com/p/4a275e779afa

    • 在rocketmq的bin目錄下啟動NAMESERVER(相當於服務註冊中心)
      start mqnamesrv.cmd
    • 啟動 broker(真正工作的伺服器,儲存訊息的伺服器)
      start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
      3.1 mac
      Build from Release
 > unzip rocketmq-all-4.3.2-source-release.zip
  > cd rocketmq-all-4.3.2/
  > mvn -Prelease-all -DskipTests clean install -U
  > cd distribution/target/apache-rocketmq

Start Name Server

  > nohup sh bin/mqnamesrv &
  > tail -f ~/logs/rocketmqlogs/namesrv.log
  The Name Server boot success...

Start Broker

  > nohup sh bin/mqbroker -n localhost:9876 &
  > tail -f ~/logs/rocketmqlogs/broker.log 
  The broker[%s, 172.30.30.233:10911] boot success...

  1. 視覺化rocketmq管理專案下載
    https://github.com/apache/rocketmq-externals.git
    • 將這個專案裡面rocketmq-console-ng裡的rocketmq依賴修改成與你專案依賴的版本一致,次專案是4.3.0

      image.png

  2. 第三步已經把rocketmq的nameServer與broker啟動起來
  3. 啟動rocket-console-ng視覺化管理專案,該專案是基於springboot的
  4. 訪問rocket-console-ng的服務地址

     

    image.png

     

    到此環境搭建完成!!!
    回到自己的程式↓↓↓

# 配置資訊

###producer
#該應用是否啟用生產者
rocketmq:
  producer:
    isOnOff: on
    #傳送同一類訊息的設定為同一個group,保證唯一,預設不需要設定,rocketmq會使用[email protected](pid代表jvm名字)作為唯一標示
    groupName: ${spring.application.name}
    #mq的nameserver地址
    namesrvAddr: 127.0.0.1:9876
    #訊息最大長度 預設1024*4(4M)
    maxMessageSize: 4096
    #傳送訊息超時時間,預設3000
    sendMsgTimeout: 3000
    #傳送訊息失敗重試次數,預設2
    retryTimesWhenSendFailed: 2

  ###consumer
  ##該應用是否啟用消費者
  consumer:
    isOnOff: on
    groupName: ${spring.application.name}
    #mq的nameserver地址
    namesrvAddr: 127.0.0.1:9876
    #該消費者訂閱的主題和tags("*"號表示訂閱該主題下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
    topics: futaotopic~*;
    consumeThreadMin: 20
    consumeThreadMax: 64
    #設定一次消費訊息的條數,預設為1條
    consumeMessageBatchMaxSize: 1

reConsumerTimes: 3

# 生產者Bean

package com.futao.springmvcdemo.service.impl

import com.futao.springmvcdemo.foundation.LogicException
import com.futao.springmvcdemo.model.entity.constvar.ErrorMessage
import com.futao.springmvcdemo.model.system.SystemConfig
import com.futao.springmvcdemo.service.RocketMqService
import org.apache.commons.lang3.StringUtils
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently
import org.apache.rocketmq.client.producer.DefaultMQProducer
import org.apache.rocketmq.common.consumer.ConsumeFromWhere
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.stereotype.Service
import java.nio.charset.Charset
/**
 * @author futao
 * Created on 2018/10/18.
 */
@Service
open class RocketMqServiceImpl : RocketMqService {
    private val logger = LoggerFactory.getLogger(RocketMqServiceImpl::class.java)
/**
     * 傳送同一類訊息的設定為同一個group,保證唯一,預設不需要設定,rocketmq會使用[email protected](pid代表jvm名字)作為唯一標示
     */
    @Value("\${rocketmq.producer.groupName}")
    private lateinit var producerGroupName: String

    @Value("\${rocketmq.producer.namesrvAddr}")
    private lateinit var producerNamesrvAddr: String
    /**
     * 訊息最大大小,預設4M
     */
    @Value("\${rocketmq.producer.maxMessageSize}")
    private var maxMessageSize: Int = 0
    /**
     * 訊息傳送超時時間,預設3秒
     */
    @Value("\${rocketmq.producer.sendMsgTimeout}")
    private var sendMsgTimeout: Int = 0
    /**
     * 訊息傳送失敗重試次數,預設2次
     */
    @Value("\${rocketmq.producer.retryTimesWhenSendFailed}")
    private var retryTimesWhenSendFailed: Int = 0


    /**
     * 生產者Bean
     */
    @Bean
    override fun producer(): DefaultMQProducer {
        if (this.producerGroupName.isEmpty()) {
            throw LogicException.le(ErrorMessage.ROCKET_MQ_PRODUCER_GROUP_NAME_EMPTY)
        }
        if (this.producerNamesrvAddr.isEmpty()) {
            throw LogicException.le(ErrorMessage.ROCKET_MQ_PRODUCER_NAME_SERVER_EMPTY)
        }
        val defaultMQProducer = DefaultMQProducer(producerGroupName)
        defaultMQProducer.namesrvAddr = producerNamesrvAddr
        defaultMQProducer.maxMessageSize = maxMessageSize
        defaultMQProducer.sendMsgTimeout = sendMsgTimeout
        defaultMQProducer.isVipChannelEnabled = false
        //訊息傳送到mq伺服器失敗重試次數
        defaultMQProducer.retryTimesWhenSendFailed = retryTimesWhenSendFailed
        try {
            defaultMQProducer.start()
            logger.info("rocketMq Producer start success; nameServer:{},producerGroupName:{}", producerNamesrvAddr, producerGroupName)
        } catch (e: Exception) {
            logger.error("rocketMq Producer start fail;{}", e.message, e)
        }
        return defaultMQProducer
    }
}

# 消費者

@Value("\${rocketmq.consumer.namesrvAddr}")
    private lateinit var consumerNamesrvAddr: String

    @Value("\${rocketmq.consumer.groupName}")
    private lateinit var consumerGroupName: String

    @Value("\${rocketmq.consumer.consumeThreadMin}")
    private var consumeThreadMin: Int = 0

    @Value("\${rocketmq.consumer.consumeThreadMax}")
    private var consumeThreadMax: Int = 0

    @Value("\${rocketmq.consumer.topics}")
    private lateinit var topics: String

    @Value("\${rocketmq.consumer.consumeMessageBatchMaxSize}")
    private var consumeMessageBatchMaxSize: Int = 0

//    @Resource
//    private lateinit var mqMessageListenerProcessor: MQConsumeMsgListenerProcessor


    @Value("\${reConsumerTimes}")
    private var reConsumerTimes: Int = 0


    /**
     * 消費者Bean
     */
    @Bean
    override fun consumer(): DefaultMQPushConsumer {
        val topic = SystemConfig.ROCKET_MQ_TOPIC_MAIL
        val tag = SystemConfig.ROCKET_MQ_TAG_MAIL_REGISTER
        if (this.consumerGroupName.isEmpty()) {
            throw LogicException.le(ErrorMessage.ROCKET_MQ_CONSUMER_GROUP_NAME_EMPTY)
        }
        if (this.consumerNamesrvAddr.isEmpty()) {
            throw LogicException.le(ErrorMessage.ROCKET_MQ_PRODUCER_NAME_SERVER_EMPTY)
        }
        if (this.topics.isEmpty()) {
            throw LogicException.le(ErrorMessage.ROCKET_MQ_CONSUMER_TOPICS_EMPTY)
        }
        try {
            //DefaultMQPushConsumer DefaultMQPullConsumer
            val defaultMQPushConsumer = DefaultMQPushConsumer(consumerGroupName)
            defaultMQPushConsumer.namesrvAddr = consumerNamesrvAddr
            defaultMQPushConsumer.consumeThreadMin = consumeThreadMin
            defaultMQPushConsumer.isVipChannelEnabled = false
//        defaultMQPushConsumer.createTopic()
            defaultMQPushConsumer.consumeThreadMax = consumeThreadMax
            //消費模式 叢集還是廣播,預設為叢集(自動負載均衡)
            //廣播消費: 訊息會發給Consume Group中的每一個消費者進行消費,如果設定為廣播訊息會導致NOT_ONLINE異常,https://github.com/apache/rocketmq/issues/296
            defaultMQPushConsumer.messageModel = MessageModel.CLUSTERING
            // 設定消費模型,
            //consumer.setMessageModel(MessageModel.CLUSTERING);

            // * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費
            // * 如果非第一次啟動,那麼按照上次消費的位置繼續消費
            defaultMQPushConsumer.consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
            //設定一次消費訊息的條數,預設為1條
            defaultMQPushConsumer.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize
            //訂閱topic
            defaultMQPushConsumer.subscribe(topic, tag)

            //        defaultMQPushConsumer.registerMessageListener(mqMessageListenerProcessor)
            defaultMQPushConsumer.registerMessageListener(
                    MessageListenerConcurrently { msgs, _ ->
                        if (msgs == null || msgs.isEmpty()) {
                            logger.info("接受到的訊息為空,不處理,直接返回成功")
                            [email protected] ConsumeConcurrentlyStatus.CONSUME_SUCCESS
                        }
                        val msg = msgs[0]
                        logger.info("接收到的訊息為:" + msg.toString())
                        if (msg.topic == topic && msg.tags == tag) {
                            //判斷該訊息是否重複消費(RocketMQ不保證訊息不重複,如果你的業務需要保證嚴格的不重複訊息,需要你自己在業務端去重)
                            //獲取該訊息重試次數
                            if (msg.reconsumeTimes >= reConsumerTimes) {
                                //訊息已經重試了3次,如果不需要再次消費,則返回成功
                                //TODO("如果重試了三次還是失敗則執行對於失敗的業務邏輯")
                                logger.error("訊息重試消費失敗:", msg)
                                [email protected] ConsumeConcurrentlyStatus.CONSUME_SUCCESS
                            } else {
                                //如果失敗重試次數還沒到三次則繼續重試
                                ConsumeConcurrentlyStatus.RECONSUME_LATER
                            }
                            //TODO("開始正常的業務邏輯")
                            println(StringUtils.repeat(":", 30) + String(msg.body, Charset.forName(SystemConfig.UTF8_ENCODE)))
                        }
                        [email protected] ConsumeConcurrentlyStatus.CONSUME_SUCCESS    //消費成功
                    }
            )
            defaultMQPushConsumer.start()
            logger.info("rocketMq Consumer start success; namesrvAddr:{},groupName:{},topics:{}", consumerNamesrvAddr, consumerGroupName, topics)
            return defaultMQPushConsumer
        } catch (e: Exception) {
            logger.error("rocketMq Consumer start fail;{}", e.message, e)
            return DefaultMQPushConsumer()
        }
    }

# 簡單測試

  • 傳送註冊郵件的topic與tag配置
    • 個人理解的topic: 一類業務可以歸為一個topic,比如所有的發郵件功能
    • 個人理解的tag: 某類業務下的細分,比如傳送郵件業務下的傳送註冊郵件可以使用一個tag,傳送忘記密碼郵件可以再使用一個tag
    /**
     * rocket mq 傳送郵件的 topic
     */
    public static final String ROCKET_MQ_TOPIC_MAIL = "topic_mail";

    /**
     * rocket mq 傳送郵件-註冊郵件的tag
     */
    public static final String ROCKET_MQ_TAG_MAIL_REGISTER = "tag_mail_register";
  • 傳送郵件訊息佇列Service
    @Resource
    lateinit var producer: DefaultMQProducer
/**
     * 通過訊息佇列傳送郵件
     */
    override fun sendMq(mailM: MailM) {
        val message = Message(SystemConfig.ROCKET_MQ_TOPIC_MAIL, SystemConfig.ROCKET_MQ_TAG_MAIL_REGISTER, JSON.toJSONString(mailM).toByteArray(Charset.forName(SystemConfig.UTF8_ENCODE)))
        try {
            producer.send(message)
        } catch (e: Exception) {
            logger.error(e.message, e)
        }
    }
  • 請求controller
@GetMapping("sendMailMq")
    open fun sendMailMq() {
        val mailM = MailM().apply {
            to = arrayOf("[email protected]")
            cc = arrayOf("[email protected]")
            subject = "訊息佇列"
            content = "<h1>您好,RocketMq</h1>"
        }
        mailService.sendMq(mailM)
    }
  • 在請求了controller之後可以在rocketmq-console-ng控制檯檢視到相應的topic與訊息資訊
    • topic

       

      image.png

    • 已傳送到rocketmq伺服器上的訊息

       

      image.png

    • 檢視訊息狀態

       

      image.png

    • 檢視控制檯

       

      image.png

# 坑:

image.png

  • 訊息不能被消費使用RocketMq控制檯resend提示NOT_CONSUME_YET:檢查rocketmq應用版本,rocketmq-console-ng依賴版本,自己的專案依賴jar包版本是否一致
  • Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException:檢查rocketmq應用版本,rocketmq-console-ng依賴版本,自己的專案依賴jar包版本是否一致
  • Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message:嘗試把消費者的消費模式改成叢集模式
  • NOT_CONSUME_YET:如果還是不能解決請不要使用公司的網路,公司的網路可能會有很多的限制,用自己的手機進行測試(我被這個網路給坑慘了)

# 資源:

Windows下安裝RocketMq:https://www.jianshu.com/p/4a275e779afa
RocketMq名詞解釋: https://my.oschina.net/javamaster/blog/2051703
解釋Push與Pull區別: https://www.jianshu.com/p/f071d5069059?utm_source=oschina-app
官網:http://rocketmq.apache.org/
windows下rocketmq的訊息資訊儲存在 C:\Users\user\store資料夾下,刪除該資料夾即可刪除所有的訊息

我的完整專案:地址 https://gitee.com/FutaoSmile/springboot_framework