用rocketmq構造事件中心
我們主管的想法總是很多,總是提出我之前沒聽過的東西,而且據他說他之前也都沒有做過。難道說主管是把這個專案當做試驗場了嗎?因此雖然我並沒體驗到他在程式碼上有多麼精妙(因為他從來不寫程式碼),但是我還是很佩服他的,至少我受益匪淺。而且他總能在業務中提出很多問題,提出各種複雜的架構,過去我覺得太煩了,增加了很多無謂的工作量,老闆需要什麼我們就做什麼不就行了嗎,但是現在我漸漸體會到這也是程式設計師一種能力的體現。扯遠了。
總之主管就是提出一種事件驅動的框架,比如某一張表新增資料,就觸發一個新增事件,然後觸發另一個模組的監聽程式碼,更新或新增另一張表格的資料,當然,要分散式的。所有的事件要集中發到一個事件中心,然後微服務的所有監聽器都能監聽到這個事件(所有框架都是為微服務準備的,然而我們直到現在都沒有對服務拆分)。關鍵點是事件集中,然後監聽是分散式的。他讓我們自己去找合適的工具,或者直接用訊息佇列做。
因為專案比較急,我就用rocketmq實現了,為此我還發現了rocketmq的一個特性。rocketmq有兩種消費模式,訂閱模式和普通的消費模式,主管想用訂閱模式做,他覺得所有訂閱統一topic的消費者都能收到訊息,但實際上是不行的,因為所有的消費者叢集都會收到訊息,那麼這個事件訊息就會被所有的消費者叢集消費一遍,這明顯是不行的。所以只能用普通消費的模式,剛開始我是用笨辦法,所有的消費者叢集訂閱不同的topic,然後所有觸發的事件往所有訂閱的消費者叢集傳送訊息,比如機器觸發更新時間,然後班次和產品需要監聽這個事件,我就傳送兩條不同topic的訊息。這實際上是很low的,如果增加新的監聽者,我就要改變訊息傳送端的程式碼,侵入性太高了。
但是後來主管在網上查閱到資料,訂閱同一個topic的消費者叢集,在發出訊息時,所有的消費者叢集會各消費一次訊息。這是我之前沒有發現的特性,因為我之前只用過一個叢集。不過這也給我一個警告,那就是不要把訂閱同一個topic的相同業務放到兩個叢集中,那樣就會重複消費了。
總之這樣就簡單了,讓不同的模組都屬於不同的消費叢集,這樣只要訂閱同一個topic,只要傳送一條訊息,所有的模組都能消費一次。原理上就是這樣。
然後我建立了兩個列舉,訊息型別列舉和監聽器型別列舉
public enum MessageTypeEnum {//訊息型別 COMPANYADD("companyAdd","公司新增"),COMPANYEDIT("companyEdit","公司編輯"),COMPANYDELETE("companyDelete","公司刪除") ,USERREGIST("userRegist","使用者註冊"),USERUNREGIST("userUnregist","使用者登出"),USERLOGIN("userLogin","使用者登出") ,STATIONUPDATE("stationUpdate","主站更新") ,MACHINEDELETE("machineDelete","機器刪除"),MACHINEUPDATE("machineUpdate","機器更新"),MACHINEADD("machineAdd","機器新增") ,ALARMDATADELIVERY("alarmDataDelivery","警報資料到達"),ALARMDATARELIEVE("alarmDataRelieve","警報資料解除") ,MONITORDATAADD("monitorDataAdd","每模資料新增"),MONITORDATA("monitorData","每模資料發出"),MACHINESTATUS("machineStatus","硬體介面機器狀態"),TOURCHANGE("tourChange","班次切換") ,TOURDAYCHANGE("tourDayChange","天切換"),UDPDATA("udpData","警報產生,警報消除,引數修改,工藝備份") ,MONITORHOURCOUNT("monitorHourCount","每模資料小時統計"),MACHINEUNLINK("machineUnlink","機器失去連線"); private String code; private String desc; private MessageTypeEnum(String code, String desc) { this.code = code; this.desc = desc; } public String getCode() { return code; } public void setCode(String code) { this.code = code; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } }
public enum ListenTypeEnum {//監聽器型別
MONITORCOUNTDAY("MonitorCountDay","MonitorCountDayGroup","每模資料每日統計"),MONITORCOUNTHOUR("MonitorCountHour","MonitorCountHourGroup","每模資料小時統計"),MONITORCOUNTTIME("MonitorCountTime","MonitorCountTimeGroup","每模資料時間段統計")
,TOUR("tour","tourGroup","班次"),MACHINE("machine","machineGroup","機器"),MONITORDATA("monitorData","monitorDataGroup","每模資料")
,ABOX("abox","aboxGroup","硬體介面除每模資料外"),MACHINECONCERN("machineConCern","machineConcernGroup","關注管理"),MACHINETOURRATE("machineRateTour","machineRateGroup","班次利用率")
,MACHINERATEDAY("machineRateDay","machineRateDayGroup","統計機器的每天的利用率"),MONITORCOUNTTOUR("monitorCountTour","monitorCountGroup","統計每班生產數量")
,ALARMDATA("alarmData","alarmDataGroup","機器警報");
private String code;
private String groupName;
private String desc;
private ListenTypeEnum(String code, String groupName, String desc) {
this.code = code;
this.groupName = groupName;
this.desc = desc;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getGroupName() {
return groupName;
}
public void setGroupName(String groupName) {
this.groupName = groupName;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
}
然後用一個訊息中心統一發送訊息,
@Service
public class MessageSenderServiceImpl implements IMessageSenderService{
Logger logger = LoggerFactory.getLogger(this.getClass());
@Resource
DefaultMQProducer producer;
/**
* 傳送訊息到mq
* @param type 訊息型別
* @param message 訊息內容
* @throws Exception
*/
@Override
public void sendMessage(String type, String message) throws Exception {
String uuid = UUID.randomUUID().toString();
if(MessageTypeEnum.MONITORDATA.getCode().equals(type) //每模資料傳送
|| MessageTypeEnum.MONITORDATAADD.getCode().equals(type)//每模資料新增
|| MessageTypeEnum.MACHINEADD.getCode().equals(type)//機器新增
|| MessageTypeEnum.MACHINEUPDATE.getCode().equals(type)//機器編輯
|| MessageTypeEnum.MACHINEDELETE.getCode().equals(type)//機器刪除
|| MessageTypeEnum.USERUNREGIST.getCode().equals(type)//使用者賬號登出
|| MessageTypeEnum.TOURCHANGE.getCode().equals(type)//班次切換
|| MessageTypeEnum.TOURDAYCHANGE.getCode().equals(type)//班次天切換
|| MessageTypeEnum.STATIONUPDATE.getCode().equals(type)//基站編輯
|| MessageTypeEnum.MONITORHOURCOUNT.getCode().equals(type)//每模資料小時統計
|| MessageTypeEnum.MACHINEUNLINK.getCode().equals(type)//機器失去連線
|| MessageTypeEnum.MACHINESTATUS.getCode().equals(type)//機器心跳資料
){
sendMessage(type, type, uuid, message);
}
}
/**
* 傳送訊息到mq
* @param topic 主題
* @param tag 頻道
* @param key 訊息key
* @param message 訊息內容
* @throws Exception
*/
@Override
public void sendMessage(String topic, String tag, String key, String message) throws Exception {
Message msg = new Message(topic// topic
, tag// tag
, key// key
, (message).getBytes()) ;// body
SendResult sendResult = producer.send(msg);
logger.info(sendResult.toString());
}
}
這樣訊息就能統一處理而不需要在每個傳送訊息的類中修改程式碼,雖然寫得時候可能不會省多少力,但是我越發體會到將相同功能的模組整合統一處理的好處,不僅結構更加清晰明瞭,對程式碼的侵入性下降,而且修改的效率更高。
最後是每個模組專門寫的監聽器,這裡就寫一個例子
@WebListener
public class TourListener implements ServletContextListener{
Logger logger = LoggerFactory.getLogger(this.getClass());
Map<String, Integer> map = new HashMap<>();
ITourRuntimeService tourRuntimeService;
/**
* 初始化事件的監聽
* @Description
* @param
* @return
* @throws
* @date 2018年8月17日
*/
@Override
public void contextInitialized(ServletContextEvent event) {
ServletContext application = event.getServletContext();
WebApplicationContext appctx = WebApplicationContextUtils.getWebApplicationContext(application);
Environment environment = appctx.getEnvironment();
String enabled = environment.getProperty("mq.cumsumer.enabled");
if("0".equals(enabled))
return;
String namesrvAddr = environment.getProperty("mq.cumsumer.namesrvAddr");
//消費叢集名字
String groupName = ListenTypeEnum.TOUR.getGroupName();
//訂閱主題
String[] topic = new String[] {MessageTypeEnum.MACHINEADD.getCode()
,MessageTypeEnum.MACHINEDELETE.getCode()};
logger.info(" * * * * * * * * * * * * * * * * * * * * * * 消費監聽器啟動 * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * ");
logger.info("namesrvAddr="+namesrvAddr);
tourRuntimeService = appctx.getBean(ITourRuntimeService.class);
startConsumer(topic, groupName, namesrvAddr,1);
}
/**
* 開啟消費者
* @param topic 話題
* @param groupName 組名稱
* @param namesrvAddr 地址
* @param maxSize 最大數量
* @date 2018年8月17日
*/
public void startConsumer(String[] topic, String groupName,String namesrvAddr, int maxSize) {
BaseListenerProxy.commonMessageListenrOrderly(map, logger, topic, groupName, namesrvAddr, maxSize, new BaseListenrCallBack() {
@Override
public void process(String topic, String tag, String key, String body) throws Exception {
if(tag.equals(MessageTypeEnum.MACHINEADD.getCode())) {
String machineId = StringUtils.split(body, ",")[0];
tourRuntimeService.onEventMachineAdd(key, machineId);
}else if(tag.equals(MessageTypeEnum.MACHINEDELETE.getCode())) {
tourRuntimeService.onEventMachineDelete(key, body);
}
}
});
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
logger.info("Destoryed");
}
}
BaseListenerProxy是我寫的一個匿名代理類模式,自從學會了這個用法,我真是越來越喜歡用了,只要有通用的程式碼我都會寫一個匿名代理類
public interface BaseListenrCallBack {
void process(String topic, String tag, String key, String body) throws Exception;
}
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.slf4j.Logger;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.kq.highnet2.framework.base.common.exception.DiyMessageException;
public class BaseListenerProxy {
/**
* 有序消費模式
* @param map
* @param logger
* @param topic
* @param groupName
* @param namesrvAddr
* @param maxSize
* @param callback
*/
public static void commonMessageListenrOrderly(Map<String, Integer> map, Logger logger, String[] topics, String groupName,String namesrvAddr, int maxSize, BaseListenrCallBack callback) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeMessageBatchMaxSize(maxSize);
try {
consumer.setConsumeMessageBatchMaxSize(maxSize);
for(String topic:topics) {
consumer.subscribe(topic, "*");
}
/**
* Consumer物件在使用之前必須要呼叫start初始化,初始化一次即可 < br >
*/
consumer.registerMessageListener(
new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
logger.info("==========CONSUME_START===========");
logger.info(Thread.currentThread().getName()
+ " Receive New Messages: " + msgs.size());
Date begin = new Date();
String spikeTrace = UUID.randomUUID().toString().replace("-", "");
try {
logger.info("0001 "+spikeTrace+" MQ消費-SIZE("+msgs.size()+")");
for(MessageExt msg:msgs) {
callback.process(msg.getTopic(), msg.getTags(), msg.getKeys(), new String(msg.getBody()));
}
logger.info("0001 "+(new Date().getTime()-begin.getTime())+" "+spikeTrace+" MQ消費成功");
logger.info("==========CONSUME_SUCCESS===========");
return ConsumeOrderlyStatus.SUCCESS;
} catch(DiyMessageException e) {
logger.info("0001 "+(new Date().getTime()-begin.getTime())+" "+spikeTrace+" MQ消費結束");
logger.info("==========CONSUME_EXCEPTION===========");
logger.error(e.getMessage(),e);
return ConsumeOrderlyStatus.SUCCESS;
}catch (Exception e) {
MessageExt msg = msgs.get(0);
if(map.containsKey(msg.getKeys())) {
map.put(msg.getKeys(), map.get(msg.getKeys()) + 1);
}else {
map.put(msg.getKeys(), 1);
}
logger.info(msg.getKeys()+":"+map.get(msg.getKeys()));
if(map.get(msg.getKeys())>3) {
map.remove(msg.getKeys());
return ConsumeOrderlyStatus.SUCCESS;
}
logger.info("0001 "+(new Date().getTime()-begin.getTime())+" "+spikeTrace+" MQ消費失敗");
logger.info("==========RECONSUME_LATER===========");
logger.error(e.getMessage(),e);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
}
);
consumer.start();
logger.info("ConsumerStarted.");
} catch (MQClientException e) {
logger.error(e.getMessage(),e);
logger.info("==========ConsumerStartedFailed===========");
e.printStackTrace();
}
}
/**
* 併發消費模式
* @param map
* @param logger
* @param topics
* @param groupName
* @param namesrvAddr
* @param maxSize
* @param callback
*/
public static void commonMessageListenerConcurrently(Map<String, Integer> map, Logger logger, String[] topics, String groupName,String namesrvAddr, int maxSize, BaseListenrCallBack callback) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeMessageBatchMaxSize(maxSize);
try {
consumer.setConsumeMessageBatchMaxSize(maxSize);
for(String topic:topics) {
consumer.subscribe(topic, "*");
}
/**
* Consumer物件在使用之前必須要呼叫start初始化,初始化一次即可 < br >
*/
consumer.registerMessageListener(
new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List< MessageExt > msgs, ConsumeConcurrentlyContext context) {
logger.info(Thread.currentThread().getName()
+ " Receive New Messages: " + msgs.size());
try {
for(MessageExt msg:msgs) {
callback.process(msg.getTopic(), msg.getTags(), msg.getKeys(), new String(msg.getBody()));
}
logger.info("==========CONSUME_SUCCESS===========");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.info("==========RECONSUME_LATER===========");
logger.error(e.getMessage(),e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
}
);
consumer.start();
logger.info("ConsumerStarted.");
} catch (MQClientException e) {
logger.error(e.getMessage(),e);
logger.info("==========ConsumerStartedFailed===========");
e.printStackTrace();
}
}
}
我主要寫了兩種模式,有序消費模式和併發消費模式,這麼長的程式碼使用時只要短短几行,真是太爽了。