RocketMQ訊息佇列還沒入門就想放棄(轉)
https://www.jianshu.com/p/dd7ca2d10767
RocketMQ訊息佇列還沒入門就想放棄
2018.10.19 20:32* 字數 1182 閱讀 616評論 0喜歡 7
20170712182011089.gif
題外話
什麼情況下的非同步操作需要使用訊息佇列而不是多執行緒?
- 訊息佇列和多執行緒兩者並不衝突,多執行緒可以作為佇列的生產者和消費者。
使用外部的訊息佇列時,第一是可以提高應用的穩定性,當程式fail後,已經寫入外部訊息佇列的資料依舊是儲存的,如果使用兩步commit的佇列的話,可以更加提高這個專案。 - 用執行緒的話,會佔用主伺服器資源, 訊息佇列的話, 可以放到其他機器上執行, 讓主伺服器儘量多的服務其他請求。我個人認為, 如果使用者不急著知道結果的操作, 用訊息佇列, 否則再考慮用不用執行緒。
- 解耦更充分,架構更合理
- 多執行緒是在程式語言層面解決問題
- 訊息佇列是在架構層面解決問題
我認為架構層面解決問題是“覺悟比較高的方式“,理想情況下應該限制語言層面濫用多執行緒,能不用就不用
- 不關心執行結果的都可以放到訊息佇列,不需要及時到達,放到訊息佇列中慢慢消化
- 批量傳送郵件時,資料量龐大,如果使用多執行緒對系統不安全
訊息佇列能解決什麼問題
- 非同步處理
- 應用解耦
- 流量削鋒
- 日誌處理
- 訊息通訊
# 環境介紹
注意儘量將rocketmq的1.應用版本,2.jar包依賴,3.recketmq-console-ng的jar包依賴版本保持一致,不然可能會出現非常詭異的問題
此專案所使用版本: rocketmq:4.3.0
,OS: win10
- jar包依賴
compile group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.3.0'
-
下載 rocketmq應用
-
windows下rocketmq環境配置與啟動
參考 https://www.jianshu.com/p/4a275e779afa- 在rocketmq的bin目錄下啟動NAMESERVER(相當於服務註冊中心)
start mqnamesrv.cmd
- 啟動 broker(真正工作的伺服器,儲存訊息的伺服器)
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
3.1 mac
Build from Release
- 在rocketmq的bin目錄下啟動NAMESERVER(相當於服務註冊中心)
> unzip rocketmq-all-4.3.2-source-release.zip
> cd rocketmq-all-4.3.2/
> mvn -Prelease-all -DskipTests clean install -U
> cd distribution/target/apache-rocketmq
Start Name Server
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
Start Broker
> nohup sh bin/mqbroker -n localhost:9876 &
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success...
- 視覺化rocketmq管理專案下載
https://github.com/apache/rocketmq-externals.git- 將這個專案裡面
rocketmq-console-ng
裡的rocketmq依賴修改成與你專案依賴的版本一致,次專案是4.3.0image.png
- 將這個專案裡面
- 第三步已經把rocketmq的nameServer與broker啟動起來
- 啟動rocket-console-ng視覺化管理專案,該專案是基於springboot的
-
訪問rocket-console-ng的服務地址
image.png
到此環境搭建完成!!!
回到自己的程式↓↓↓
# 配置資訊
###producer
#該應用是否啟用生產者
rocketmq:
producer:
isOnOff: on
#傳送同一類訊息的設定為同一個group,保證唯一,預設不需要設定,rocketmq會使用[email protected](pid代表jvm名字)作為唯一標示
groupName: ${spring.application.name}
#mq的nameserver地址
namesrvAddr: 127.0.0.1:9876
#訊息最大長度 預設1024*4(4M)
maxMessageSize: 4096
#傳送訊息超時時間,預設3000
sendMsgTimeout: 3000
#傳送訊息失敗重試次數,預設2
retryTimesWhenSendFailed: 2
###consumer
##該應用是否啟用消費者
consumer:
isOnOff: on
groupName: ${spring.application.name}
#mq的nameserver地址
namesrvAddr: 127.0.0.1:9876
#該消費者訂閱的主題和tags("*"號表示訂閱該主題下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*;
topics: futaotopic~*;
consumeThreadMin: 20
consumeThreadMax: 64
#設定一次消費訊息的條數,預設為1條
consumeMessageBatchMaxSize: 1
reConsumerTimes: 3
# 生產者Bean
package com.futao.springmvcdemo.service.impl
import com.futao.springmvcdemo.foundation.LogicException
import com.futao.springmvcdemo.model.entity.constvar.ErrorMessage
import com.futao.springmvcdemo.model.system.SystemConfig
import com.futao.springmvcdemo.service.RocketMqService
import org.apache.commons.lang3.StringUtils
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently
import org.apache.rocketmq.client.producer.DefaultMQProducer
import org.apache.rocketmq.common.consumer.ConsumeFromWhere
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.stereotype.Service
import java.nio.charset.Charset
/**
* @author futao
* Created on 2018/10/18.
*/
@Service
open class RocketMqServiceImpl : RocketMqService {
private val logger = LoggerFactory.getLogger(RocketMqServiceImpl::class.java)
/**
* 傳送同一類訊息的設定為同一個group,保證唯一,預設不需要設定,rocketmq會使用[email protected](pid代表jvm名字)作為唯一標示
*/
@Value("\${rocketmq.producer.groupName}")
private lateinit var producerGroupName: String
@Value("\${rocketmq.producer.namesrvAddr}")
private lateinit var producerNamesrvAddr: String
/**
* 訊息最大大小,預設4M
*/
@Value("\${rocketmq.producer.maxMessageSize}")
private var maxMessageSize: Int = 0
/**
* 訊息傳送超時時間,預設3秒
*/
@Value("\${rocketmq.producer.sendMsgTimeout}")
private var sendMsgTimeout: Int = 0
/**
* 訊息傳送失敗重試次數,預設2次
*/
@Value("\${rocketmq.producer.retryTimesWhenSendFailed}")
private var retryTimesWhenSendFailed: Int = 0
/**
* 生產者Bean
*/
@Bean
override fun producer(): DefaultMQProducer {
if (this.producerGroupName.isEmpty()) {
throw LogicException.le(ErrorMessage.ROCKET_MQ_PRODUCER_GROUP_NAME_EMPTY)
}
if (this.producerNamesrvAddr.isEmpty()) {
throw LogicException.le(ErrorMessage.ROCKET_MQ_PRODUCER_NAME_SERVER_EMPTY)
}
val defaultMQProducer = DefaultMQProducer(producerGroupName)
defaultMQProducer.namesrvAddr = producerNamesrvAddr
defaultMQProducer.maxMessageSize = maxMessageSize
defaultMQProducer.sendMsgTimeout = sendMsgTimeout
defaultMQProducer.isVipChannelEnabled = false
//訊息傳送到mq伺服器失敗重試次數
defaultMQProducer.retryTimesWhenSendFailed = retryTimesWhenSendFailed
try {
defaultMQProducer.start()
logger.info("rocketMq Producer start success; nameServer:{},producerGroupName:{}", producerNamesrvAddr, producerGroupName)
} catch (e: Exception) {
logger.error("rocketMq Producer start fail;{}", e.message, e)
}
return defaultMQProducer
}
}
# 消費者
@Value("\${rocketmq.consumer.namesrvAddr}")
private lateinit var consumerNamesrvAddr: String
@Value("\${rocketmq.consumer.groupName}")
private lateinit var consumerGroupName: String
@Value("\${rocketmq.consumer.consumeThreadMin}")
private var consumeThreadMin: Int = 0
@Value("\${rocketmq.consumer.consumeThreadMax}")
private var consumeThreadMax: Int = 0
@Value("\${rocketmq.consumer.topics}")
private lateinit var topics: String
@Value("\${rocketmq.consumer.consumeMessageBatchMaxSize}")
private var consumeMessageBatchMaxSize: Int = 0
// @Resource
// private lateinit var mqMessageListenerProcessor: MQConsumeMsgListenerProcessor
@Value("\${reConsumerTimes}")
private var reConsumerTimes: Int = 0
/**
* 消費者Bean
*/
@Bean
override fun consumer(): DefaultMQPushConsumer {
val topic = SystemConfig.ROCKET_MQ_TOPIC_MAIL
val tag = SystemConfig.ROCKET_MQ_TAG_MAIL_REGISTER
if (this.consumerGroupName.isEmpty()) {
throw LogicException.le(ErrorMessage.ROCKET_MQ_CONSUMER_GROUP_NAME_EMPTY)
}
if (this.consumerNamesrvAddr.isEmpty()) {
throw LogicException.le(ErrorMessage.ROCKET_MQ_PRODUCER_NAME_SERVER_EMPTY)
}
if (this.topics.isEmpty()) {
throw LogicException.le(ErrorMessage.ROCKET_MQ_CONSUMER_TOPICS_EMPTY)
}
try {
//DefaultMQPushConsumer DefaultMQPullConsumer
val defaultMQPushConsumer = DefaultMQPushConsumer(consumerGroupName)
defaultMQPushConsumer.namesrvAddr = consumerNamesrvAddr
defaultMQPushConsumer.consumeThreadMin = consumeThreadMin
defaultMQPushConsumer.isVipChannelEnabled = false
// defaultMQPushConsumer.createTopic()
defaultMQPushConsumer.consumeThreadMax = consumeThreadMax
//消費模式 叢集還是廣播,預設為叢集(自動負載均衡)
//廣播消費: 訊息會發給Consume Group中的每一個消費者進行消費,如果設定為廣播訊息會導致NOT_ONLINE異常,https://github.com/apache/rocketmq/issues/296
defaultMQPushConsumer.messageModel = MessageModel.CLUSTERING
// 設定消費模型,
//consumer.setMessageModel(MessageModel.CLUSTERING);
// * 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費
// * 如果非第一次啟動,那麼按照上次消費的位置繼續消費
defaultMQPushConsumer.consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
//設定一次消費訊息的條數,預設為1條
defaultMQPushConsumer.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize
//訂閱topic
defaultMQPushConsumer.subscribe(topic, tag)
// defaultMQPushConsumer.registerMessageListener(mqMessageListenerProcessor)
defaultMQPushConsumer.registerMessageListener(
MessageListenerConcurrently { msgs, _ ->
if (msgs == null || msgs.isEmpty()) {
logger.info("接受到的訊息為空,不處理,直接返回成功")
[email protected] ConsumeConcurrentlyStatus.CONSUME_SUCCESS
}
val msg = msgs[0]
logger.info("接收到的訊息為:" + msg.toString())
if (msg.topic == topic && msg.tags == tag) {
//判斷該訊息是否重複消費(RocketMQ不保證訊息不重複,如果你的業務需要保證嚴格的不重複訊息,需要你自己在業務端去重)
//獲取該訊息重試次數
if (msg.reconsumeTimes >= reConsumerTimes) {
//訊息已經重試了3次,如果不需要再次消費,則返回成功
//TODO("如果重試了三次還是失敗則執行對於失敗的業務邏輯")
logger.error("訊息重試消費失敗:", msg)
[email protected] ConsumeConcurrentlyStatus.CONSUME_SUCCESS
} else {
//如果失敗重試次數還沒到三次則繼續重試
ConsumeConcurrentlyStatus.RECONSUME_LATER
}
//TODO("開始正常的業務邏輯")
println(StringUtils.repeat(":", 30) + String(msg.body, Charset.forName(SystemConfig.UTF8_ENCODE)))
}
[email protected] ConsumeConcurrentlyStatus.CONSUME_SUCCESS //消費成功
}
)
defaultMQPushConsumer.start()
logger.info("rocketMq Consumer start success; namesrvAddr:{},groupName:{},topics:{}", consumerNamesrvAddr, consumerGroupName, topics)
return defaultMQPushConsumer
} catch (e: Exception) {
logger.error("rocketMq Consumer start fail;{}", e.message, e)
return DefaultMQPushConsumer()
}
}
# 簡單測試
- 傳送註冊郵件的topic與tag配置
- 個人理解的topic: 一類業務可以歸為一個topic,比如所有的發郵件功能
- 個人理解的tag: 某類業務下的細分,比如傳送郵件業務下的傳送註冊郵件可以使用一個tag,傳送忘記密碼郵件可以再使用一個tag
/**
* rocket mq 傳送郵件的 topic
*/
public static final String ROCKET_MQ_TOPIC_MAIL = "topic_mail";
/**
* rocket mq 傳送郵件-註冊郵件的tag
*/
public static final String ROCKET_MQ_TAG_MAIL_REGISTER = "tag_mail_register";
- 傳送郵件訊息佇列Service
@Resource
lateinit var producer: DefaultMQProducer
/**
* 通過訊息佇列傳送郵件
*/
override fun sendMq(mailM: MailM) {
val message = Message(SystemConfig.ROCKET_MQ_TOPIC_MAIL, SystemConfig.ROCKET_MQ_TAG_MAIL_REGISTER, JSON.toJSONString(mailM).toByteArray(Charset.forName(SystemConfig.UTF8_ENCODE)))
try {
producer.send(message)
} catch (e: Exception) {
logger.error(e.message, e)
}
}
- 請求controller
@GetMapping("sendMailMq")
open fun sendMailMq() {
val mailM = MailM().apply {
to = arrayOf("[email protected]")
cc = arrayOf("[email protected]")
subject = "訊息佇列"
content = "<h1>您好,RocketMq</h1>"
}
mailService.sendMq(mailM)
}
- 在請求了controller之後可以在rocketmq-console-ng控制檯檢視到相應的topic與訊息資訊
-
topic
image.png
-
已傳送到rocketmq伺服器上的訊息
image.png
-
檢視訊息狀態
image.png
-
檢視控制檯
image.png
-
# 坑:
image.png
- 訊息不能被消費使用RocketMq控制檯resend提示NOT_CONSUME_YET:檢查rocketmq應用版本,rocketmq-console-ng依賴版本,自己的專案依賴jar包版本是否一致
- Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException:檢查rocketmq應用版本,rocketmq-console-ng依賴版本,自己的專案依賴jar包版本是否一致
- Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message:嘗試把消費者的消費模式改成叢集模式
- NOT_CONSUME_YET:如果還是不能解決請不要使用公司的網路,公司的網路可能會有很多的限制,用自己的手機進行測試(我被這個網路給坑慘了)
# 資源:
Windows下安裝RocketMq:https://www.jianshu.com/p/4a275e779afa
RocketMq名詞解釋: https://my.oschina.net/javamaster/blog/2051703
解釋Push與Pull區別: https://www.jianshu.com/p/f071d5069059?utm_source=oschina-app
官網:http://rocketmq.apache.org/
windows下rocketmq的訊息資訊儲存在 C:\Users\user\store
資料夾下,刪除該資料夾即可刪除所有的訊息