1. 程式人生 > >用rocketmq構造事件中心

用rocketmq構造事件中心

我們主管的想法總是很多,總是提出我之前沒聽過的東西,而且據他說他之前也都沒有做過。難道說主管是把這個專案當做試驗場了嗎?因此雖然我並沒體驗到他在程式碼上有多麼精妙(因為他從來不寫程式碼),但是我還是很佩服他的,至少我受益匪淺。而且他總能在業務中提出很多問題,提出各種複雜的架構,過去我覺得太煩了,增加了很多無謂的工作量,老闆需要什麼我們就做什麼不就行了嗎,但是現在我漸漸體會到這也是程式設計師一種能力的體現。扯遠了。

總之主管就是提出一種事件驅動的框架,比如某一張表新增資料,就觸發一個新增事件,然後觸發另一個模組的監聽程式碼,更新或新增另一張表格的資料,當然,要分散式的。所有的事件要集中發到一個事件中心,然後微服務的所有監聽器都能監聽到這個事件(所有框架都是為微服務準備的,然而我們直到現在都沒有對服務拆分)。關鍵點是事件集中,然後監聽是分散式的。他讓我們自己去找合適的工具,或者直接用訊息佇列做。

因為專案比較急,我就用rocketmq實現了,為此我還發現了rocketmq的一個特性。rocketmq有兩種消費模式,訂閱模式和普通的消費模式,主管想用訂閱模式做,他覺得所有訂閱統一topic的消費者都能收到訊息,但實際上是不行的,因為所有的消費者叢集都會收到訊息,那麼這個事件訊息就會被所有的消費者叢集消費一遍,這明顯是不行的。所以只能用普通消費的模式,剛開始我是用笨辦法,所有的消費者叢集訂閱不同的topic,然後所有觸發的事件往所有訂閱的消費者叢集傳送訊息,比如機器觸發更新時間,然後班次和產品需要監聽這個事件,我就傳送兩條不同topic的訊息。這實際上是很low的,如果增加新的監聽者,我就要改變訊息傳送端的程式碼,侵入性太高了。

但是後來主管在網上查閱到資料,訂閱同一個topic的消費者叢集,在發出訊息時,所有的消費者叢集會各消費一次訊息。這是我之前沒有發現的特性,因為我之前只用過一個叢集。不過這也給我一個警告,那就是不要把訂閱同一個topic的相同業務放到兩個叢集中,那樣就會重複消費了。

總之這樣就簡單了,讓不同的模組都屬於不同的消費叢集,這樣只要訂閱同一個topic,只要傳送一條訊息,所有的模組都能消費一次。原理上就是這樣。

然後我建立了兩個列舉,訊息型別列舉和監聽器型別列舉

public enum MessageTypeEnum {//訊息型別

	COMPANYADD("companyAdd","公司新增"),COMPANYEDIT("companyEdit","公司編輯"),COMPANYDELETE("companyDelete","公司刪除")
	,USERREGIST("userRegist","使用者註冊"),USERUNREGIST("userUnregist","使用者登出"),USERLOGIN("userLogin","使用者登出")
	,STATIONUPDATE("stationUpdate","主站更新")
	,MACHINEDELETE("machineDelete","機器刪除"),MACHINEUPDATE("machineUpdate","機器更新"),MACHINEADD("machineAdd","機器新增")
	,ALARMDATADELIVERY("alarmDataDelivery","警報資料到達"),ALARMDATARELIEVE("alarmDataRelieve","警報資料解除")
	,MONITORDATAADD("monitorDataAdd","每模資料新增"),MONITORDATA("monitorData","每模資料發出"),MACHINESTATUS("machineStatus","硬體介面機器狀態"),TOURCHANGE("tourChange","班次切換")
	,TOURDAYCHANGE("tourDayChange","天切換"),UDPDATA("udpData","警報產生,警報消除,引數修改,工藝備份")
	,MONITORHOURCOUNT("monitorHourCount","每模資料小時統計"),MACHINEUNLINK("machineUnlink","機器失去連線");
	private String code;
    private String desc;
	private MessageTypeEnum(String code, String desc) {
		this.code = code;
		this.desc = desc;
	}
	public String getCode() {
		return code;
	}
	public void setCode(String code) {
		this.code = code;
	}
	public String getDesc() {
		return desc;
	}
	public void setDesc(String desc) {
		this.desc = desc;
	}
	
    
}
public enum ListenTypeEnum {//監聽器型別

	MONITORCOUNTDAY("MonitorCountDay","MonitorCountDayGroup","每模資料每日統計"),MONITORCOUNTHOUR("MonitorCountHour","MonitorCountHourGroup","每模資料小時統計"),MONITORCOUNTTIME("MonitorCountTime","MonitorCountTimeGroup","每模資料時間段統計")
	,TOUR("tour","tourGroup","班次"),MACHINE("machine","machineGroup","機器"),MONITORDATA("monitorData","monitorDataGroup","每模資料")
	,ABOX("abox","aboxGroup","硬體介面除每模資料外"),MACHINECONCERN("machineConCern","machineConcernGroup","關注管理"),MACHINETOURRATE("machineRateTour","machineRateGroup","班次利用率")
	,MACHINERATEDAY("machineRateDay","machineRateDayGroup","統計機器的每天的利用率"),MONITORCOUNTTOUR("monitorCountTour","monitorCountGroup","統計每班生產數量")
	,ALARMDATA("alarmData","alarmDataGroup","機器警報");
	private String code;
	private String groupName;
    private String desc;
	private ListenTypeEnum(String code, String groupName, String desc) {
		this.code = code;
		this.groupName = groupName;
		this.desc = desc;
	}
	public String getCode() {
		return code;
	}
	public void setCode(String code) {
		this.code = code;
	}
	
	public String getGroupName() {
		return groupName;
	}
	public void setGroupName(String groupName) {
		this.groupName = groupName;
	}
	public String getDesc() {
		return desc;
	}
	public void setDesc(String desc) {
		this.desc = desc;
	}
	
    
}

然後用一個訊息中心統一發送訊息,

@Service
public class MessageSenderServiceImpl implements IMessageSenderService{
	Logger logger = LoggerFactory.getLogger(this.getClass());
    @Resource
    DefaultMQProducer producer;
	/**
	 * 傳送訊息到mq
	 * @param type 訊息型別
	 * @param message 訊息內容
	 * @throws Exception 
	 */
	@Override
	public void sendMessage(String type, String message) throws Exception {
		String uuid = UUID.randomUUID().toString();
		if(MessageTypeEnum.MONITORDATA.getCode().equals(type) //每模資料傳送
				|| MessageTypeEnum.MONITORDATAADD.getCode().equals(type)//每模資料新增
				|| MessageTypeEnum.MACHINEADD.getCode().equals(type)//機器新增
				|| MessageTypeEnum.MACHINEUPDATE.getCode().equals(type)//機器編輯
				|| MessageTypeEnum.MACHINEDELETE.getCode().equals(type)//機器刪除
				|| MessageTypeEnum.USERUNREGIST.getCode().equals(type)//使用者賬號登出
				|| MessageTypeEnum.TOURCHANGE.getCode().equals(type)//班次切換
				|| MessageTypeEnum.TOURDAYCHANGE.getCode().equals(type)//班次天切換
				|| MessageTypeEnum.STATIONUPDATE.getCode().equals(type)//基站編輯
				|| MessageTypeEnum.MONITORHOURCOUNT.getCode().equals(type)//每模資料小時統計
				|| MessageTypeEnum.MACHINEUNLINK.getCode().equals(type)//機器失去連線
				|| MessageTypeEnum.MACHINESTATUS.getCode().equals(type)//機器心跳資料
				){
			sendMessage(type, type, uuid, message);
		}
		
	}
	/**
	 * 傳送訊息到mq
	 * @param topic 主題
	 * @param tag 頻道
	 * @param key 訊息key
	 * @param message 訊息內容
	 * @throws Exception 
	 */
	@Override
	public void sendMessage(String topic, String tag, String key, String message) throws Exception {
		Message msg = new Message(topic//  topic
				, tag//  tag 
				, key//  key 
				, (message).getBytes()) ;//  body  
		SendResult sendResult = producer.send(msg);
		logger.info(sendResult.toString());
	}
}

這樣訊息就能統一處理而不需要在每個傳送訊息的類中修改程式碼,雖然寫得時候可能不會省多少力,但是我越發體會到將相同功能的模組整合統一處理的好處,不僅結構更加清晰明瞭,對程式碼的侵入性下降,而且修改的效率更高。

最後是每個模組專門寫的監聽器,這裡就寫一個例子



@WebListener
public class TourListener  implements ServletContextListener{
	Logger logger = LoggerFactory.getLogger(this.getClass());
    Map<String, Integer> map = new HashMap<>();
    ITourRuntimeService tourRuntimeService;
	/**
	 * 初始化事件的監聽
	 * @Description 
	 * @param
	 * @return
	 * @throws
	 * @date 2018年8月17日
	 */
	@Override
	public void contextInitialized(ServletContextEvent event) {
		ServletContext application = event.getServletContext();  
        WebApplicationContext appctx = WebApplicationContextUtils.getWebApplicationContext(application); 
        Environment environment = appctx.getEnvironment();
        String enabled = environment.getProperty("mq.cumsumer.enabled");
        if("0".equals(enabled))
        	return;
        String namesrvAddr = environment.getProperty("mq.cumsumer.namesrvAddr");
        //消費叢集名字
        String groupName = ListenTypeEnum.TOUR.getGroupName();
        //訂閱主題
        String[] topic = new String[] {MessageTypeEnum.MACHINEADD.getCode()
                ,MessageTypeEnum.MACHINEDELETE.getCode()};
        logger.info(" * * * * * * * * * * * * * * * * * * * * *  * 消費監聽器啟動 * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * ");
        logger.info("namesrvAddr="+namesrvAddr);
        tourRuntimeService = appctx.getBean(ITourRuntimeService.class);
        startConsumer(topic, groupName, namesrvAddr,1);
	}
	/**
	 * 開啟消費者
	 * @param topic 話題
	 * @param groupName 組名稱
	 * @param namesrvAddr 地址
	 * @param maxSize 最大數量
	 * @date 2018年8月17日
	 */
	public void startConsumer(String[] topic, String groupName,String namesrvAddr, int maxSize) {
		BaseListenerProxy.commonMessageListenrOrderly(map, logger, topic, groupName, namesrvAddr, maxSize, new BaseListenrCallBack() {
			@Override
			public void process(String topic, String tag, String key, String body) throws Exception {
				if(tag.equals(MessageTypeEnum.MACHINEADD.getCode())) {
					String machineId = StringUtils.split(body, ",")[0];
   				 	tourRuntimeService.onEventMachineAdd(key, machineId);
   			 	}else if(tag.equals(MessageTypeEnum.MACHINEDELETE.getCode())) {
   			 		tourRuntimeService.onEventMachineDelete(key, body);
   			 	}
			}
		});
	}
	@Override
	public void contextDestroyed(ServletContextEvent sce) {
		logger.info("Destoryed");
	}

}

BaseListenerProxy是我寫的一個匿名代理類模式,自從學會了這個用法,我真是越來越喜歡用了,只要有通用的程式碼我都會寫一個匿名代理類

public interface BaseListenrCallBack {
	void process(String topic, String tag, String key, String body) throws Exception;
}
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.slf4j.Logger;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.kq.highnet2.framework.base.common.exception.DiyMessageException;

public class BaseListenerProxy {
	/**
	 * 有序消費模式
	 * @param map
	 * @param logger
	 * @param topic
	 * @param groupName
	 * @param namesrvAddr
	 * @param maxSize
	 * @param callback
	 */
	public static void commonMessageListenrOrderly(Map<String, Integer> map, Logger logger, String[] topics, String groupName,String namesrvAddr, int maxSize, BaseListenrCallBack callback) {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); 
        consumer.setNamesrvAddr(namesrvAddr);  
        consumer.setConsumeMessageBatchMaxSize(maxSize);
		try {
        	consumer.setConsumeMessageBatchMaxSize(maxSize);
        	for(String topic:topics) {
        		consumer.subscribe(topic, "*");
        	}
            /**  
             * Consumer物件在使用之前必須要呼叫start初始化,初始化一次即可 < br > 
              */  
            consumer.registerMessageListener(
            		new MessageListenerOrderly() {

						@Override
						public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
								ConsumeOrderlyContext context) {
			                logger.info("==========CONSUME_START===========");  
							logger.info(Thread.currentThread().getName()  
			                                    + " Receive New Messages: " + msgs.size());  
							Date begin = new Date();
							String spikeTrace = UUID.randomUUID().toString().replace("-", "");
			                try {
			                	logger.info("0001        "+spikeTrace+"  MQ消費-SIZE("+msgs.size()+")");
				           		 for(MessageExt msg:msgs) {
				           			callback.process(msg.getTopic(), msg.getTags(), msg.getKeys(), new String(msg.getBody()));
				        		 }
				                logger.info("0001  "+(new Date().getTime()-begin.getTime())+"  "+spikeTrace+"  MQ消費成功");
				                logger.info("==========CONSUME_SUCCESS===========");  
				                return ConsumeOrderlyStatus.SUCCESS;  
				            } catch(DiyMessageException e) {
				                logger.info("0001  "+(new Date().getTime()-begin.getTime())+"  "+spikeTrace+"  MQ消費結束");
				                logger.info("==========CONSUME_EXCEPTION===========");  
				                logger.error(e.getMessage(),e);
				                return ConsumeOrderlyStatus.SUCCESS;  
				            }catch (Exception e) {
			                	MessageExt msg = msgs.get(0);
			                	if(map.containsKey(msg.getKeys())) {
			                		map.put(msg.getKeys(), map.get(msg.getKeys()) + 1);
			                	}else {
			                		map.put(msg.getKeys(), 1);
			                	}
			                	logger.info(msg.getKeys()+":"+map.get(msg.getKeys()));
			                	if(map.get(msg.getKeys())>3) {
			                		map.remove(msg.getKeys());
			                		return ConsumeOrderlyStatus.SUCCESS;
			                	}
				                logger.info("0001  "+(new Date().getTime()-begin.getTime())+"  "+spikeTrace+"  MQ消費失敗");
				                logger.info("==========RECONSUME_LATER===========");  
				                logger.error(e.getMessage(),e);
				                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
				            }
						}
            			
            		}
            ); 
         consumer.start();
         logger.info("ConsumerStarted.");
        } catch (MQClientException e) {
        	logger.error(e.getMessage(),e);
        	logger.info("==========ConsumerStartedFailed===========");
            e.printStackTrace();
        } 
	}
	/**
	 * 併發消費模式
	 * @param map
	 * @param logger
	 * @param topics
	 * @param groupName
	 * @param namesrvAddr
	 * @param maxSize
	 * @param callback
	 */
	public static void commonMessageListenerConcurrently(Map<String, Integer> map, Logger logger, String[] topics, String groupName,String namesrvAddr, int maxSize, BaseListenrCallBack callback) {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); 
        consumer.setNamesrvAddr(namesrvAddr);  
        consumer.setConsumeMessageBatchMaxSize(maxSize);
		try {
        	consumer.setConsumeMessageBatchMaxSize(maxSize);
        	for(String topic:topics) {
        		consumer.subscribe(topic, "*");
        	}
            /**  
             * Consumer物件在使用之前必須要呼叫start初始化,初始化一次即可 < br > 
              */  
            consumer.registerMessageListener(
            		new MessageListenerConcurrently() {
		                public ConsumeConcurrentlyStatus consumeMessage(  
		                                   List< MessageExt > msgs, ConsumeConcurrentlyContext context) {  
		
		                		logger.info(Thread.currentThread().getName()  
		                                             + " Receive New Messages: " + msgs.size()); 
		                		try {
		                			for(MessageExt msg:msgs) {
					           			callback.process(msg.getTopic(), msg.getTags(), msg.getKeys(), new String(msg.getBody()));
					        		 }
		                            logger.info("==========CONSUME_SUCCESS===========");  
		                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
		                        } catch (Exception e) {
		                            logger.info("==========RECONSUME_LATER===========");  
		                            logger.error(e.getMessage(),e);
		                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
		//                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 
		                        }
		                }  
		            }
            ); 
         consumer.start();
         logger.info("ConsumerStarted.");
        } catch (MQClientException e) {
        	logger.error(e.getMessage(),e);
        	logger.info("==========ConsumerStartedFailed===========");
            e.printStackTrace();
        } 
	}
}

我主要寫了兩種模式,有序消費模式和併發消費模式,這麼長的程式碼使用時只要短短几行,真是太爽了。