1. 程式人生 > 實用技巧 >Spring 事件監聽器示例及原始碼解析

Spring 事件監聽器示例及原始碼解析

Spring 事件監聽器示例及原始碼解析


原始碼解析


ApplicationEvent

public abstract class ApplicationEvent extends EventObject {
    //序列化
    private static final long serialVersionUID = 7099057708183571937L;
    //時間戳
    private final long timestamp = System.currentTimeMillis();
    //source事件推送源
    public ApplicationEvent(Object source) {
        super(source);
    }

    public final long getTimestamp() {
        return this.timestamp;
    }
}

ApplicationContext - publishEvent()

protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
        //事件非null
        Assert.notNull(event, "Event must not be null");
        Object applicationEvent;
        //是否事件繼承自ApplicationEvent
        if (event instanceof ApplicationEvent) {
            applicationEvent = (ApplicationEvent)event;
        } else {
            applicationEvent = new PayloadApplicationEvent(this, event);
            if (eventType == null) {
                eventType = ((PayloadApplicationEvent)applicationEvent).getResolvableType();
            }
        }
		
	    //早期事件是否不為空
        if (this.earlyApplicationEvents != null) {
            this.earlyApplicationEvents.add(applicationEvent);
        } else {
            //事件儲存
            this.getApplicationEventMulticaster().multicastEvent((ApplicationEvent)applicationEvent, eventType);
        }

        if (this.parent != null) {
            if (this.parent instanceof AbstractApplicationContext) {
                ((AbstractApplicationContext)this.parent).publishEvent(event, eventType);
            } else {
                this.parent.publishEvent(event);
            }
        }

    }

SimpleApplicationEventMulticaster

public void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType) {
        ResolvableType type = eventType != null ? eventType : this.resolveDefaultEventType(event);
        //獲取執行緒池
        Executor executor = this.getTaskExecutor();
        //獲取指定事件監聽器的迭代器
        Iterator var5 = this.getApplicationListeners(event, type).iterator();
        
        while(var5.hasNext()) {
            ApplicationListener<?> listener = (ApplicationListener)var5.next();
            //執行緒池不為null
            if (executor != null) {
                //執行緒非同步執行
                executor.execute(() -> {
                    this.invokeListener(listener, event);
                });
            } else {
                //同步執行
                this.invokeListener(listener, event);
            }
        }

    }

protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
        //異常處理器
        ErrorHandler errorHandler = this.getErrorHandler();
        if (errorHandler != null) {
            try {
                //事件處理
                this.doInvokeListener(listener, event);
            } catch (Throwable var5) {
                errorHandler.handleError(var5);
            }
        } else {
            //事件處理
            this.doInvokeListener(listener, event);
        }

    }

 private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
        try {
            //呼叫事件監聽器處理事件
            listener.onApplicationEvent(event);
        } catch (ClassCastException var6) {
            String msg = var6.getMessage();
            if (msg != null && !this.matchesClassCastMessage(msg, event.getClass())) {
                throw var6;
            }

            Log logger = LogFactory.getLog(this.getClass());
            if (logger.isTraceEnabled()) {
                logger.trace("Non-matching event type for listener: " + listener, var6);
            }
        }

    }

程式碼


RedisPubSubConfig:Redis 訂閱釋出配置

@Configuration
public class RedisPubSubConfig {
    public final static String tokenChannel = "ztx";
    @Bean
    RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        redisMessageListenerContainer.addMessageListener(pubSubRefreshListener(), new ChannelTopic(tokenChannel));
        return redisMessageListenerContainer;
    }

    @Autowired
    EventPubSubService eventPubSubService;
    MessageListener pubSubRefreshListener() {
        //事件監聽器
        return new MessageListener() {
            @SneakyThrows
            @Override
            public void onMessage(Message message, byte[] pattern) {
                //將message中的資料進行反序列化
                try (ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(message.getBody()))) {
                    //將事件強轉為父介面型別
                    EventBase event = (EventBase) in.readObject();
                    //getEvent()獲取的型別是子型別介面 此處泛型
                    eventPubSubService.subEvent(event.getEvent());
                }
            }
        };
    }

EventPubSubService:訊息釋出與事件釋出

@Service
public class EventPubSubService {
    @Autowired
    RedisTemplate redisTemplate;
    @Autowired
    ApplicationContext applicationContext;
    //訊息推送 將訊息推送到redis
    public void pubEvent(ApplicationEvent event) {
        redisTemplate.convertAndSend(RedisPubSubConfig.tokenChannel, event);
    }
	
    //將redis釋出的事件進行釋出
    public <T> void subEvent(T event) {
        applicationContext.publishEvent(event);
    }

}

EventBase:事件介面

//泛型 用於統一發布事件 不同監聽器各自監聽
public interface EventBase<T> {
    T getEvent();
}

ZTXEvent:事件類

@Getter
public class ZTXEvent extends ApplicationEvent implements EventBase<ZTXEvent>, Serializable {
    private String carId;
    private Integer type;
    private byte param;


    public ZTXEvent() {
        super("ztx");
    }

    public ZTXEvent(String carId, Integer type, byte param) {
        super("ztx");
        this.carId = carId;
        this.type = type;
        this.param = param;
    }


    @Override
    public ZTXEvent getEvent() {
        return this;
    }
}

ZTXEventListener:事件監聽器

@Service
public class ZTXEventListener implements ApplicationListener<ZTXEvent> {
    @Autowired
    EquipmentServiceImpl equipmentService;
    
    //處理事件
    @Override
    public void onApplicationEvent(ZTXEvent ztxEvent) {
        ChannelHandlerContext ctx = equipmentService.carCtx.get(ztxEvent.getCarId()).getCtx();
        ProtocolOfZTX protocolOfZTX = new ProtocolOfZTX();
        protocolOfZTX.setAgvId(Byte.parseByte(ztxEvent.getCarId()));
        switch (ztxEvent.getType()){
            case 1:
                setProtocolOfZTXData(protocolOfZTX,new byte[]{5,2});  //前
                break;
            case 2:
                setProtocolOfZTXData(protocolOfZTX,new byte[]{5,3});  //後
                break;
            case 3:
                setProtocolOfZTXData(protocolOfZTX,new byte[]{7,2});  //左
                break;
            case 4:
                setProtocolOfZTXData(protocolOfZTX,new byte[]{7,3});  //右
                break;
            case 5:
                setProtocolOfZTXData(protocolOfZTX,new byte[]{7,3});  //右
                break;
            case 6:
                setProtocolOfZTXData(protocolOfZTX,new byte[]{6,0});  //停止
                break;
            case 7:
                setProtocolOfZTXData(protocolOfZTX,new byte[]{8,ztxEvent.getParam()});  //速度
                break;
            case 8:
                setProtocolOfZTXData(protocolOfZTX,new byte[]{0x0B,1,0,0,0});  //急停
                break;
            case 9:
                setProtocolOfZTXData(protocolOfZTX,new byte[]{0x0B,0,1,0,0});  //準備
                break;
            case 10:
                setProtocolOfZTXData(protocolOfZTX,new byte[]{0x0B,0,0,1,0});  //故障清除
                break;
            case 11:
                setProtocolOfZTXData(protocolOfZTX,new byte[]{0x0B,0,0,0,1});  //驅動提升
                break;
            default:
                break;
        }
        ctx.writeAndFlush(protocolOfZTX);
    }

    private void setProtocolOfZTXData(ProtocolOfZTX protocolOfZTXData,byte[] bytes){
        byte[] data = new byte[20];
        System.arraycopy(bytes, 0, data, 0, bytes.length);
        protocolOfZTXData.setData(data);
    }
}

流程

類分佈

  • A系統包含類
    • ZTXEvent
    • RedisPubSubConfig
    • EventBase
  • B系統包含類
    • EventBase
    • ZTXEvent
    • ZTXEventListener

工作流程

  • A系統
    • 介面按鈕點選
    • redis推送時間ZTXEvent(自定義事件)
  • B系統
    • 接受到Redis釋出的時間
    • 釋出事件ZTXEvent
    • ZTXEventListener對ZTXEvent進行事件處理

設計原理

A系統需要和B系統進行通訊,A系統推送事件後,B系統收到事件並將該事件釋出,B系統實現了該事件的監聽器,C系統沒有實現該事件的監聽器,所以B該事件只對B系統有效。

在這種情況下,所有需要相互互動的系統,都只需要通過redis釋出事件即可,若某個系統對該事件感興趣則只需要實現該事件的監聽器即可。