從spring原始碼汲取營養:模仿spring事件釋出機制,解耦業務程式碼
前言
最近在專案中做了一項優化,對業務程式碼進行解耦。我們部門做的是警用系統,通俗的說,可理解為110報警。一條警情,會先後經過接警員、處警排程員、一線警員,警情是需要記錄每一步的日誌,是要可追溯的,比如報警人張小三在2019-12-02 00:02:01時間報警,接警員A在1分鐘後,將該警情記錄完成,並分派給處警排程員B,排程員B在5分鐘後,分派給一線警員C,C趕到現場後,花了1個小時處理完成。
這中間,每一個介面,需要做的事情,可能就包括了:警情日誌記錄;警員當前任務數統計,包括待處置的任務和已經處置完成的任務;我們其實還有一個操作,就是發mq,去通知其他相關人,比如接警員A接警完成後,要發mq通知其主管。
以前的程式碼可能是這樣的:
## 介面1裡, 接收警情service裡完成以下操作
void 接收警情(xxxReqVo reqVo){
1:寫庫
2:記錄警情跟蹤日誌
3:增加當前接警員的接警數
4:發mq通知其他相關人
}
##介面2裡,分派警情的service裡完成以下操作
void 分派警情(xxxReqVo reqVo){
1:寫庫
2:記錄警情跟蹤日誌
3:增加當前處警排程警員的處警數
4:發mq通知其他相關人
}
這樣的問題是什麼呢?
- 在每一個相關接口裡,都要“顯式”呼叫:記錄跟蹤日誌的相關方法、統計相關的方法、發mq相關的方法;但凡有一個地方忘記了,都會導致問題,比如統計數量不準,mq忘發,跟蹤日誌遺漏等。
- 業務邏輯和這類通用業務揉在一起,假設下次又需要給報警人發個簡訊,豈不是又得去改核心程式碼?這不符合我們“對修改關閉,對擴充套件開放”的開閉原則啊;假設腦殘的產品經理,這次說要給報警人發簡訊,過兩天又不要了,難道每個介面,挨個挨個改嗎,想想都想打死產品經理,但是這個又犯法,還是想想其他辦法?
這個問題,我們可以用類似mq的方法來解決,即,傳送訊息,各個消費者去消費。一般,mq的方式適用於微服務之間,而我們這裡,將使用事件-釋出機制來解決這個問題。
原始碼地址(直接dubug跟一下,很簡單,比看文章來得快):
https://gitee.com/ckl111/spring-event-publish-demo
先說說ApplicationListener
在spring boot
之前的spring
時代,想必一些同學用過org.springframework.context.ApplicationListener
,正好我手裡有一個老專案,也用到了這個東西,我就拿這個舉個例子:
在我們的專案中,需要在啟動後,初始化一些東西,比如預熱快取,最早的程式碼呢,可能是大家各自實現org.springframework.beans.factory.InitializingBean
,但是這樣呢,初始化程式碼散落在各個service中;還有一些直接使用@PostContruct
註解,然後在對應方法裡去完成一些初始化操作。但是總體來說,這些方式,在spring的啟動過程中,被呼叫的時機比較靠前,有些候某些bean可能還沒初始化完成,而導致一些奇怪的問題。
所以,我們後來統一去掉了這些初始化程式碼,全部採用以下機制來實現:
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
@Service
public class InitRunner implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private InitService initService;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
//root application context,因為是web專案,
if (contextRefreshedEvent.getApplicationContext().getParent() == null) {
initService.init();
}
}
在這個類中,我們實現了org.springframework.context.ApplicationListener<ContextRefreshedEvent>
,這個 listener的定義如下:
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
/**
* Handle an application event.
* @param event the event to respond to
*/
void onApplicationEvent(E event);
}
介面 EventListener 是 jdk 的一個 marker interface:
package java.util;
/**
- A tagging interface that all event listener interfaces must extend.
- @since JDK1.1
*/
public interface EventListener {
}
我們在實現listener時,指定了本listener感興趣的事件:ContextRefreshedEvent
,這個事件的類繼承關係如下:
那麼,這個事件是什麼意思呢?
/**
* Event raised when an {@code ApplicationContext} gets initialized or refreshed.
*
* @author Juergen Hoeller
* @since 04.03.2003
* @see ContextClosedEvent
*/
@SuppressWarnings("serial")
public class ContextRefreshedEvent extends ApplicationContextEvent {
/**
* Create a new ContextRefreshedEvent.
* @param source the {@code ApplicationContext} that has been initialized
* or refreshed (must not be {@code null})
*/
public ContextRefreshedEvent(ApplicationContext source) {
super(source);
}
}
註釋說:Event raised when an {@code ApplicationContext} gets initialized or refreshed.,那麼意思就是,該事件,是在上下文初始化完成後被髮布。
這樣的話,就能保證,在我們listener監聽到這個事件的時候,整個應用上下文已經可以使用了。
一覽Spring事件監聽機制
我們再通過debug,來看看其真實的呼叫時機:
上圖紅框處,對spring上下文進行refresh,refresh就是spring 最核心的部分了,基本上,看懂了這個函式,就懂了一半:
// Prepare this context for refreshing.
prepareRefresh();
// Tell the subclass to refresh the internal bean factory.
ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
// Prepare the bean factory for use in this context.
// 例項化beanFactory
prepareBeanFactory(beanFactory);
try {
// Allows post-processing of the bean factory in context subclasses.
// 對beanFactory進行處理
postProcessBeanFactory(beanFactory);
// Invoke factory processors registered as beans in the context.
// BeanFactoryPostProcessor開始作用的地方,這裡會呼叫所有的beanFactory後置處理器
invokeBeanFactoryPostProcessors(beanFactory);
// Register bean processors that intercept bean creation.
// 註冊 bean的後置處理器到beanFactory,注意,截止目前,還沒開始例項化bean(除了少數幾個內部bean)
registerBeanPostProcessors(beanFactory);
// Initialize message source for this context.
// 註冊國際化相關bean
initMessageSource();
// Initialize event multicaster for this context.
// 註冊事件釋出器,這個和本文主題大有關係
initApplicationEventMulticaster();
// Initialize other special beans in specific context subclasses.
//注意上面這行註釋,這個類是交給子類覆蓋的,比如,在 org.springframework.web.context.support.AbstractRefreshableWebApplicationContext中,例項化了 org.springframework.ui.context.ThemeSource
onRefresh();
// Check for listener beans and register them.
// 從spring容器上下文中,查詢ApplicationListener型別的監聽器,新增到前兩步,初始化的事件釋出器中
registerListeners();
// Instantiate all remaining (non-lazy-init) singletons.
//注意:截止到目前為止,beanFactory裡面基本還是空空如也,沒有bean,只有BeanDefinition,在這一步才會 //根據那些BeanDefinition來例項化那些:非lazy-init的bean
finishBeanFactoryInitialization(beanFactory);
// Last step: publish corresponding event.
// 釋出:容器完成初始化的事件
finishRefresh();
}
上面基本都加了註釋,比較容易懂,需要重點關注的是:
- 事件釋出器初始化
initApplicationEventMulticaster();
這一步會生成一個org.springframework.context.event.ApplicationEventMulticaster
,儲存在org.springframework.context.support.AbstractApplicationContext#applicationEventMulticaster
該事件釋出器的介面主要有(去除了無關方法):
/**
* Add a listener to be notified of all events.
* @param listener the listener to add
*/
void addApplicationListener(ApplicationListener<?> listener);
/**
* Multicast the given application event to appropriate listeners.
* <p>Consider using {@link #multicastEvent(ApplicationEvent, ResolvableType)}
* if possible as it provides a better support for generics-based events.
* @param event the event to multicast
*/
void multicastEvent(ApplicationEvent event);
從上面可以看出,該介面主要是維護監聽器ApplicationListener
,以及進行事件釋出。
註冊監聽器
// 註冊listeners protected void registerListeners() { // Register statically specified listeners first. for (ApplicationListener<?> listener : getApplicationListeners()) { getApplicationEventMulticaster().addApplicationListener(listener); } // Do not initialize FactoryBeans here: We need to leave all regular beans // uninitialized to let post-processors apply to them! //這裡的這句註釋也很魔性,哈哈,側面說明了,截至目前,beanFactory都是沒有bean例項存在的,bean還沒 //有例項化 String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false); for (String listenerBeanName : listenerBeanNames) { getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName); }
beanFactory初始化完成後,釋出事件
protected void finishRefresh() { // Publish the final event. // 釋出上下文refresh完畢的事件,通知listener publishEvent(new ContextRefreshedEvent(this)); }
這裡,publishEvent實現如下:
protected void publishEvent(Object event, ResolvableType eventType) { // Decorate event as an ApplicationEvent if necessary ApplicationEvent applicationEvent; if (event instanceof ApplicationEvent) { applicationEvent = (ApplicationEvent) event; } getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); }
方案1:參考spring,實現自己的事件監聽機制,解耦業務程式碼
專案原始碼地址:https://gitee.com/ckl111/spring-event-publish-demo.git
專案結構如下:
定義與實現事件釋出器
package com.ceiec.base.event; import org.springframework.context.event.ApplicationEventMulticaster; /** * desc: * 參考spring的設計 * {@link ApplicationEventMulticaster} * @author : ckl * creat_date: 2019/11/16 0016 * creat_time: 10:40 **/ public interface ICommonApplicationEventMulticaster { /** * Add a listener to be notified of all events. * @param listener the listener to add */ void addApplicationListener(ICommonApplicationEventListener<?> listener); /** * Multicast the given application event to appropriate listeners. * @param event the event to multicast */ void multicastEvent(CommonApplicationEvent event); }
package com.ceiec.base.event; import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.SimpleApplicationEventMulticaster; import org.springframework.stereotype.Component; import java.util.LinkedHashSet; import java.util.Set; /** * desc: * 參考spring * {@link SimpleApplicationEventMulticaster} * * @author : ckl * creat_date: 2019/11/16 0016 * creat_time: 10:40 **/ @Slf4j @Component public class CommonApplicationEventMulticaster implements ICommonApplicationEventMulticaster { public final Set<ICommonApplicationEventListener<?>> applicationListeners = new LinkedHashSet<>(); @Override public void addApplicationListener(ICommonApplicationEventListener<?> listener) { applicationListeners.add(listener); } @Override public void removeApplicationListener(ICommonApplicationEventListener<?> listener) { applicationListeners.remove(listener); } @Override public void removeAllListeners() { applicationListeners.clear(); } @Override @SuppressWarnings({"rawtypes", "unchecked"}) public void multicastEvent(CommonApplicationEvent event) { try { for (ICommonApplicationEventListener applicationListener : applicationListeners) { //判斷listener是否支援處理該事件,如果支援,則丟給listener處理 if (applicationListener.supportsEventType(event)) { applicationListener.onApplicationEvent(event); } } } catch (Exception e) { log.error("{}",e); } } }
定義listener
package com.ceiec.base.event; import org.springframework.context.ApplicationListener; import java.util.EventListener; /** * desc: * 參考spring * {@link ApplicationListener} * @author : ckl * creat_date: 2019/11/16 0016 * creat_time: 10:45 **/ public interface ICommonApplicationEventListener<E extends CommonApplicationEvent> extends EventListener { boolean supportsEventType(E event ); /** * Handle an application event. * @param event the event to respond to */ void onApplicationEvent(E event); }
定義事件類
package com.ceiec.base.event; import lombok.AllArgsConstructor; import lombok.Data; import lombok.experimental.Accessors; import java.util.EventObject; /** * desc: * 參考spring的設計 * {@link org.springframework.context.ApplicationEvent} **/ @Data @AllArgsConstructor @Accessors(chain = true) public class CommonApplicationEvent<T>{ /** * 事件型別 */ private IEventType iEventType; /** * 事件攜帶的資料 */ private T data; }
listener的樣例實現,下面的實現,用於警情的跟蹤日誌記錄
package com.ceiec.base.listener; import com.ceiec.base.applicationevent.SystemEventType; import com.ceiec.base.event.CommonApplicationEvent; import com.ceiec.base.event.ICommonApplicationEventListener; import com.ceiec.base.eventmsg.*; import com.ceiec.base.service.IIncidentTraceService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * desc: * 接警統計listener * @author : ckl * creat_date: 2019/11/16 0016 * creat_time: 9:56 **/ @Component @Slf4j public class IncidentTraceListener implements ICommonApplicationEventListener{ @Autowired private IIncidentTraceService iIncidentTraceService; @Override public boolean supportsEventType(CommonApplicationEvent event) { return true; } @Override public void onApplicationEvent(CommonApplicationEvent event) { log.info("{}",event); Object data = event.getData(); if (event.getIEventType() == SystemEventType.FINISH_INCIDENT_APPEAL) { FinishIncidentDisposalEventMsg msg = (FinishIncidentDisposalEventMsg) data; iIncidentTraceService.finishIncidentDisposal(msg); } } }
啟動程式時,註冊listener到事件釋出器
package com.ceiec.base.init; import com.ceiec.base.event.CommonApplicationEventMulticaster; import com.ceiec.base.event.ICommonApplicationEventListener; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import java.util.Collection; import java.util.Map; /** * desc: * * @author : ckl * creat_date: 2019/11/11 0011 * creat_time: 15:46 **/ @Component @Slf4j public class InitRunner implements CommandLineRunner,ApplicationContextAware { private ApplicationContext applicationContext; @Autowired private CommonApplicationEventMulticaster commonApplicationEventMulticaster; @Override public void run(String... args) throws Exception { Map<String, ICommonApplicationEventListener> map = applicationContext.getBeansOfType(ICommonApplicationEventListener.class); Collection<ICommonApplicationEventListener> listeners = map.values(); for (ICommonApplicationEventListener listener : listeners) { /** * 註冊事件listener到事件釋出器 */ log.info("register listener:{}",listener); commonApplicationEventMulticaster.addApplicationListener(listener); } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
定義endpoint,在service中進行事件釋出
controller:
@Autowired private IIncidentService iIncidentService; @RequestMapping("/test.do") public String finishIncident() { iIncidentService.finishIncident(); return "success"; }
service:
@Slf4j @Service public class IIncidentServiceImpl implements IIncidentService { @Autowired private CommonApplicationEventMulticaster commonApplicationEventMulticaster; @Override public void finishIncident() { FinishIncidentDisposalEventMsg msg = new FinishIncidentDisposalEventMsg(); msg.setIncidentInformationId(1111L); msg.setDesc("處置完成"); CommonApplicationEvent event = new CommonApplicationEvent(SystemEventType.FINISH_INCIDENT_APPEAL,msg); commonApplicationEventMulticaster.multicastEvent(event); } }
效果展示
啟動時,註冊listener:
2019-12-03 16:49:47.477 INFO 493432 --- [ main] com.ceiec.base.BootStrap : Started BootStrap in 1.436 seconds (JVM running for 2.22) 2019-12-03 16:49:47.478 INFO 493432 --- [ main] com.ceiec.base.init.InitRunner : register listener:com.ceiec.base.listener.IncidentStatisticsListener@c6b2dd9 2019-12-03 16:49:47.479 INFO 493432 --- [ main] com.ceiec.base.init.InitRunner : register listener:com.ceiec.base.listener.IncidentTraceListener@3f985a86 2019-12-03 16:49:47.479 INFO 493432 --- [ main] com.ceiec.base.init.InitRunner : register listener:com.ceiec.base.listener.MqListener@57a2ed35
瀏覽器中,請求http://localhost:8081/test.do,日誌如下:
方案2:直接使用spring內建的事件釋出器,解耦業務程式碼
原始碼:https://gitee.com/ckl111/spring-event-publish-demo/tree/master/spring-event-use-builtin-multicaster
這部分,比上面的方案相比,少了很多東西,只包含如下部分:
總的來說,listener直接繼承spring的ApplicationListener,事件釋出器直接使用spring的org.springframework.context.ApplicationEventPublisher
核心程式碼:
package com.ceiec.base.service.impl;
import com.ceiec.base.applicationevent.SystemEventType;
import com.ceiec.base.event.CommonApplicationEvent;
import com.ceiec.base.eventmsg.FinishIncidentDisposalEventMsg;
import com.ceiec.base.service.IIncidentService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
/**
* desc:
* 釋出事件的業務程式碼示例
* @author : ckl
* creat_date: 2019/12/2 0002
* creat_time: 14:27
**/
@Slf4j
@Service
public class IIncidentServiceImpl implements IIncidentService {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@Override
public void finishIncident() {
FinishIncidentDisposalEventMsg msg = new FinishIncidentDisposalEventMsg();
msg.setIncidentInformationId(1111L);
msg.setDesc("處置完成");
CommonApplicationEvent event = new CommonApplicationEvent(SystemEventType.FINISH_INCIDENT_APPEAL,msg);
applicationEventPublisher.publishEvent(event);
}
}
package com.ceiec.base.listener;
import com.ceiec.base.applicationevent.SystemEventType;
import com.ceiec.base.event.CommonApplicationEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
/**
* desc:
* 這裡,直接繼承 spring 的listener
* @author : ckl
* creat_date: 2019/11/16 0016
* creat_time: 9:56
**/
@Component
@Slf4j
public class IncidentStatisticsListener implements ApplicationListener<CommonApplicationEvent> {
@Override
public void onApplicationEvent(CommonApplicationEvent event) {
log.info("receive event:{}",event);
}
}
總結
以上兩種都可以用,一個是自己仿的,定製性強一點;一個直接用spring的。大家自由選擇即可。
通過這樣的方式,我們的業務程式碼,可以做到解耦,大體和mq其實是類似的