Spring 事件監聽器示例及原始碼解析
阿新 • • 發佈:2020-11-04
Spring 事件監聽器示例及原始碼解析
原始碼解析
ApplicationEvent
public abstract class ApplicationEvent extends EventObject { //序列化 private static final long serialVersionUID = 7099057708183571937L; //時間戳 private final long timestamp = System.currentTimeMillis(); //source事件推送源 public ApplicationEvent(Object source) { super(source); } public final long getTimestamp() { return this.timestamp; } }
ApplicationContext - publishEvent()
protected void publishEvent(Object event, @Nullable ResolvableType eventType) { //事件非null Assert.notNull(event, "Event must not be null"); Object applicationEvent; //是否事件繼承自ApplicationEvent if (event instanceof ApplicationEvent) { applicationEvent = (ApplicationEvent)event; } else { applicationEvent = new PayloadApplicationEvent(this, event); if (eventType == null) { eventType = ((PayloadApplicationEvent)applicationEvent).getResolvableType(); } } //早期事件是否不為空 if (this.earlyApplicationEvents != null) { this.earlyApplicationEvents.add(applicationEvent); } else { //事件儲存 this.getApplicationEventMulticaster().multicastEvent((ApplicationEvent)applicationEvent, eventType); } if (this.parent != null) { if (this.parent instanceof AbstractApplicationContext) { ((AbstractApplicationContext)this.parent).publishEvent(event, eventType); } else { this.parent.publishEvent(event); } } }
SimpleApplicationEventMulticaster
public void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType) { ResolvableType type = eventType != null ? eventType : this.resolveDefaultEventType(event); //獲取執行緒池 Executor executor = this.getTaskExecutor(); //獲取指定事件監聽器的迭代器 Iterator var5 = this.getApplicationListeners(event, type).iterator(); while(var5.hasNext()) { ApplicationListener<?> listener = (ApplicationListener)var5.next(); //執行緒池不為null if (executor != null) { //執行緒非同步執行 executor.execute(() -> { this.invokeListener(listener, event); }); } else { //同步執行 this.invokeListener(listener, event); } } } protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) { //異常處理器 ErrorHandler errorHandler = this.getErrorHandler(); if (errorHandler != null) { try { //事件處理 this.doInvokeListener(listener, event); } catch (Throwable var5) { errorHandler.handleError(var5); } } else { //事件處理 this.doInvokeListener(listener, event); } } private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) { try { //呼叫事件監聽器處理事件 listener.onApplicationEvent(event); } catch (ClassCastException var6) { String msg = var6.getMessage(); if (msg != null && !this.matchesClassCastMessage(msg, event.getClass())) { throw var6; } Log logger = LogFactory.getLog(this.getClass()); if (logger.isTraceEnabled()) { logger.trace("Non-matching event type for listener: " + listener, var6); } } }
程式碼
RedisPubSubConfig:Redis 訂閱釋出配置
@Configuration
public class RedisPubSubConfig {
public final static String tokenChannel = "ztx";
@Bean
RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
redisMessageListenerContainer.addMessageListener(pubSubRefreshListener(), new ChannelTopic(tokenChannel));
return redisMessageListenerContainer;
}
@Autowired
EventPubSubService eventPubSubService;
MessageListener pubSubRefreshListener() {
//事件監聽器
return new MessageListener() {
@SneakyThrows
@Override
public void onMessage(Message message, byte[] pattern) {
//將message中的資料進行反序列化
try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(message.getBody()))) {
//將事件強轉為父介面型別
EventBase event = (EventBase) in.readObject();
//getEvent()獲取的型別是子型別介面 此處泛型
eventPubSubService.subEvent(event.getEvent());
}
}
};
}
EventPubSubService:訊息釋出與事件釋出
@Service
public class EventPubSubService {
@Autowired
RedisTemplate redisTemplate;
@Autowired
ApplicationContext applicationContext;
//訊息推送 將訊息推送到redis
public void pubEvent(ApplicationEvent event) {
redisTemplate.convertAndSend(RedisPubSubConfig.tokenChannel, event);
}
//將redis釋出的事件進行釋出
public <T> void subEvent(T event) {
applicationContext.publishEvent(event);
}
}
EventBase:事件介面
//泛型 用於統一發布事件 不同監聽器各自監聽
public interface EventBase<T> {
T getEvent();
}
ZTXEvent:事件類
@Getter
public class ZTXEvent extends ApplicationEvent implements EventBase<ZTXEvent>, Serializable {
private String carId;
private Integer type;
private byte param;
public ZTXEvent() {
super("ztx");
}
public ZTXEvent(String carId, Integer type, byte param) {
super("ztx");
this.carId = carId;
this.type = type;
this.param = param;
}
@Override
public ZTXEvent getEvent() {
return this;
}
}
ZTXEventListener:事件監聽器
@Service
public class ZTXEventListener implements ApplicationListener<ZTXEvent> {
@Autowired
EquipmentServiceImpl equipmentService;
//處理事件
@Override
public void onApplicationEvent(ZTXEvent ztxEvent) {
ChannelHandlerContext ctx = equipmentService.carCtx.get(ztxEvent.getCarId()).getCtx();
ProtocolOfZTX protocolOfZTX = new ProtocolOfZTX();
protocolOfZTX.setAgvId(Byte.parseByte(ztxEvent.getCarId()));
switch (ztxEvent.getType()){
case 1:
setProtocolOfZTXData(protocolOfZTX,new byte[]{5,2}); //前
break;
case 2:
setProtocolOfZTXData(protocolOfZTX,new byte[]{5,3}); //後
break;
case 3:
setProtocolOfZTXData(protocolOfZTX,new byte[]{7,2}); //左
break;
case 4:
setProtocolOfZTXData(protocolOfZTX,new byte[]{7,3}); //右
break;
case 5:
setProtocolOfZTXData(protocolOfZTX,new byte[]{7,3}); //右
break;
case 6:
setProtocolOfZTXData(protocolOfZTX,new byte[]{6,0}); //停止
break;
case 7:
setProtocolOfZTXData(protocolOfZTX,new byte[]{8,ztxEvent.getParam()}); //速度
break;
case 8:
setProtocolOfZTXData(protocolOfZTX,new byte[]{0x0B,1,0,0,0}); //急停
break;
case 9:
setProtocolOfZTXData(protocolOfZTX,new byte[]{0x0B,0,1,0,0}); //準備
break;
case 10:
setProtocolOfZTXData(protocolOfZTX,new byte[]{0x0B,0,0,1,0}); //故障清除
break;
case 11:
setProtocolOfZTXData(protocolOfZTX,new byte[]{0x0B,0,0,0,1}); //驅動提升
break;
default:
break;
}
ctx.writeAndFlush(protocolOfZTX);
}
private void setProtocolOfZTXData(ProtocolOfZTX protocolOfZTXData,byte[] bytes){
byte[] data = new byte[20];
System.arraycopy(bytes, 0, data, 0, bytes.length);
protocolOfZTXData.setData(data);
}
}
流程
類分佈
- A系統包含類
- ZTXEvent
- RedisPubSubConfig
- EventBase
- B系統包含類
- EventBase
- ZTXEvent
- ZTXEventListener
工作流程
- A系統
- 介面按鈕點選
- redis推送時間ZTXEvent(自定義事件)
- B系統
- 接受到Redis釋出的時間
- 釋出事件ZTXEvent
- ZTXEventListener對ZTXEvent進行事件處理
設計原理
A系統需要和B系統進行通訊,A系統推送事件後,B系統收到事件並將該事件釋出,B系統實現了該事件的監聽器,C系統沒有實現該事件的監聽器,所以B該事件只對B系統有效。
在這種情況下,所有需要相互互動的系統,都只需要通過redis釋出事件即可,若某個系統對該事件感興趣則只需要實現該事件的監聽器即可。