1. 程式人生 > 程式設計 >詳解JAVA Spring 中的事件機制

詳解JAVA Spring 中的事件機制

說到事件機制,可能腦海中最先浮現的就是日常使用的各種 listener,listener去監聽事件源,如果被監聽的事件有變化就會通知listener,從而針對變化做相應的動作。這些listener是怎麼實現的呢?說listener之前,我們先從設計模式開始講起。

觀察者模式

觀察者模式一般包含以下幾個物件:

  • Subject:被觀察的物件。它提供一系列方法來增加和刪除觀察者物件,同時它定義了通知方法notify()。目標類可以是介面,也可以是抽象類或具體類。
  • ConcreteSubject:具體的觀察物件。Subject的具體實現類,在這裡實現通知事件。
  • Observer:觀察者。這裡是抽象的觀察者,觀察者有一個或者多個。
  • ConcreteObserver:具體的觀察者。在這裡維護觀察物件的具體操作。

按照觀察者物件,我們來寫一個簡單的觀察者示例,定義一個氣象中心,釋出氣象資訊,觀察者是各個電視臺,訂閱氣象中心的資訊,有新增的氣象資訊釋出的時候,及時播報。

定義氣象中心:

public interface WeatherCenter {

  void publishWeatherInfo();

}

定義觀察者物件:

public interface Observer {

  void sendWeatherWarning();
}

定義具體的觀察者:

public class BeijingTvObserver implements Observer {
  
  @Override
  public void sendWeatherWarning(){
    System.out.println("北京衛視天氣預報開始了");
  }

}

中央電視臺:

public class CCTVObserver implements Observer {

  @Override
  public void sendWeatherWarning(){
    System.out.println("中央電視臺天氣預報開始了");
  }

}

現在釋出北京的氣象資訊:

public class BeijingWeather implements WeatherCenter {

  private List<Observer> observerArrayList = new ArrayList<>();

  @Override
  public void publishWeatherInfo() {
    for(Observer observer : observerArrayList) {
      observer.sendWeatherWarning();
    }
  }
}

這時候給所有的訂閱者推送一條氣象釋出訊息,那麼他們就收到最新的氣象預報。

總結一下觀察者模式的核心就是:事件中心持有所有的訂閱者,每當事件發生時迴圈通知所有的訂閱者。

當然上面我寫的比較簡單,你也可以在事件中心寫一個註冊訂閱者的方法,每當有新的訂閱者加入就呼叫該方法註冊。

Java 中的事件機制

Java中提供了基本的事件處理基類:

  1. EventObject:所有事件狀態物件都將從其派生的根類;
  2. EventListener:所有事件偵聽器介面必須擴充套件的標記介面;

具體使用方式可以用一個非常經典的開門案例來講解:

首先建立一個開/關門事件:

import java.util.EventObject;

/**
 * @author rickiyang
 * @date 2019-12-05
 * @Desc TODO
 */
public class DoorEvent extends EventObject {

  private Integer doorStatus;

  public DoorEvent(Object source) {
    super(source);
  }

  public DoorEvent(Object source,Integer status) {
    super(source);
    this.doorStatus = status;
  }

  public void setStatus(Integer status) {
    this.doorStatus = status;
  }

  public Integer getStatus() {
    return doorStatus;
  }

}

所有的事件都繼承 EventObject。

然後建立監聽器:

public interface DoorListener extends EventListener {

  void DoorEvent(DoorEvent doorEvent);
}

所有的監聽器都要實現 EventListener。

繼續建立具體的開門/關門的監聽器:

public class CloseDoorListener implements DoorListener {

  @Override
  public void DoorEvent(DoorEvent doorEvent) {
    Integer openStatus = doorEvent.getStatus();
    if(0 == openStatus) {
      System.out.println("the door is close");
    }
  }
}

開門:

public class OpenDoorListener implements DoorListener {
  @Override
  public void DoorEvent(DoorEvent doorEvent) {
    Integer openStatus = doorEvent.getStatus();
    if(1 == openStatus) {
      System.out.println("the door is open");
    }
  }
}

有了監聽器和事件之後,下一步就是用上他們。還記得上面的觀察者模式嘛,同樣的使用方式:

		/**
   * 將所有的listener儲存起來
   *
   * @return
   */
public static List<DoorListener> getAllListener() {
 List<DoorListener> list = Lists.newArrayList();
 list.add(new OpenDoorListener());
 list.add(new CloseDoorListener());
 return list;
}

public static void main(String[] args) {
 DoorEvent open = new DoorEvent("open",1);
 List<DoorListener> listeners = getAllListener();
 for (DoorListener listener : listeners) {
  listener.DoorEvent(open);
 }
}

Spring 中的事件機制

在 Spring 容器中通過 ApplicationEvent 類和 ApplicationListener 介面來處理事件,如果某個 bean實現 ApplicationListener 介面並被部署到容器中,那麼每次對應的 ApplicationEvent 被髮布到容器中都會通知該 bean,這是典型的觀察者模式。

Spring 的事件預設是同步的,即呼叫 publishEvent 方法釋出事件後,它會處於阻塞狀態,直到 onApplicationEvent 接收到事件並處理返回之後才繼續執行下去,這種單執行緒同步的好處是可以進行事務管理。

先展示一下使用方式,我們拿使用者登入來舉例。首先來建立一個事件:

import org.springframework.context.ApplicationEvent;

/**
 * @author rickiyang
 * @date 2019-12-04
 * @Desc TODO
 */
public class UserRegisterEvent extends ApplicationEvent {

  public UserRegisterEvent(Object source) {
    super(source);
  }
}

然後建立監聽器去監聽這個事件:

import com.alibaba.fastjson.JSON;
import com.rickiyang.learn.entity.User;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
 * @author rickiyang
 * @date 2019-12-05
 * @Desc 插入使用者資訊
 */
@Component
public class UserInsertListener implements ApplicationListener<UserRegisterEvent> {


  @Override
  public void onApplicationEvent(UserRegisterEvent userRegisterEvent) {
    String source = (String)userRegisterEvent.getSource();
    User user = JSON.parseObject(source,User.class);
    //insert db
  }
}

建立一個使用者註冊成功之後插入使用者資訊的監聽器。

import com.alibaba.fastjson.JSON;
import com.rickiyang.learn.entity.User;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
 * @author rickiyang
 * @date 2019-12-05
 * @Desc 使用者註冊成功傳送簡訊
 */
@Component
public class NotifyUserListener implements ApplicationListener<UserRegisterEvent> {


  @Override
  public void onApplicationEvent(UserRegisterEvent userRegisterEvent) {
    String source = (String)userRegisterEvent.getSource();
    User user = JSON.parseObject(source,User.class);
    //send sms
  }
}

建立註冊成功傳送通知簡訊的監聽器。

import com.alibaba.fastjson.JSON;
import com.rickiyang.learn.entity.User;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
 * @author rickiyang
 * @date 2019-12-05
 * @Desc 使用者註冊成功給使用者生成推薦商品
 */
@Component
public class RecommendListener implements ApplicationListener<UserRegisterEvent> {


  @Override
  public void onApplicationEvent(UserRegisterEvent userRegisterEvent) {
    String source = (String)userRegisterEvent.getSource();
    User user = JSON.parseObject(source,User.class);
    // generate recommend commodity
  }
}

建立使用者註冊成功之後給使用者推薦商品的事件。

使用者註冊事件的監聽器建立完畢,那麼接下來就釋出事件等待監聽器監聽就行。在Spring中提供了 ApplicationEventPublisherAware 介面,從名稱上看就知道是 ApplicationEventPublisher 的介面卡類,用法就是你在業務類中實現該介面,然後使用 ApplicationEventPublisher#publishEvent釋出你的事件即可。

package com.rickiyang.learn.controller.test;

import com.alibaba.fastjson.JSON;
import com.rickiyang.learn.entity.User;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Service;

/**
 * @author rickiyang
 * @date 2019-12-04
 * @Desc TODO
 */
@Service
public class UserRegisterPublisherService implements ApplicationEventPublisherAware {

  private ApplicationEventPublisher applicationEventPublisher;

  @Override
  public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
    this.applicationEventPublisher = applicationEventPublisher;
  }

  public void insert(User user){
    UserRegisterEvent event = new UserRegisterEvent(JSON.toJSONString(user));
    applicationEventPublisher.publishEvent(event);
  }
}

呼叫insert方法就可以釋出事件,寫一個test測試一下:

import com.rickiyang.learn.entity.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

@RunWith(SpringRunner.class)
@SpringBootTest
public class UserRegisterPublisherServiceTest {

  @Resource
  private UserRegisterPublisherService userRegisterPublisherService;


  @Test
  public void test1() {
    User build = User.builder().name("1").sex(1).phone("123456789").build();
    userRegisterPublisherService.insert(build);
  }

}

可以看到3個監聽器都打印出來了:

傳送簡訊
商品推薦
插入使用者

有個問題不知道大家發現沒,監聽器的釋出順序是按照 bean 自然裝載的順序執行的,如果我們的bean是有序的應該怎麼辦呢?別怕,Spring自然考慮到這個問題。

SmartApplicationListener實現有序的監聽

SmartApplicationListener 介面繼承了 ApplicationListener,使用全域性的 ApplicationEvent 作為監聽的事件物件。之所以 能提供順序性,是因為繼承了 Ordered 類,實現了排序的邏輯。另外添加了兩個方法#supportsEventType、#supportsSourceType 來作為區分是否是我們監聽的事件,只有這兩個方法同時返回true時才會執行onApplicationEvent方法。

package com.rickiyang.learn.controller.test;

import com.rickiyang.learn.entity.User;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.stereotype.Component;

/**
 * @author rickiyang
 * @date 2019-12-05
 * @Desc TODO
 */
@Component
public class UserInsert1Listener implements SmartApplicationListener {

  @Override
  public boolean supportsEventType(Class<? extends ApplicationEvent> aClass) {
    return aClass == UserRegisterEvent.class;
  }

  @Override
  public boolean supportsSourceType(Class<?> sourceType) {
    return sourceType == User.class;
  }

  /**
   * 數字越小優先順序越高
   * 預設值為 2147483647
   * @return
   */
  @Override
  public int getOrder() {
    return 8;
  }

  @Override
  public void onApplicationEvent(ApplicationEvent applicationEvent) {
    UserRegisterEvent event = (UserRegisterEvent)applicationEvent;
    // insert to db
  }
}

如果你有對多個監聽器做排序的需求,那麼你只用在 getOrder 方法中指定當前的排序級別即可。數字越大優先順序越低,預設的排序級別是2147483647,你可以自己調整。

Spring 對事件監聽機制的註解支援

Spring4.2之後,ApplicationEventPublisher 自動被注入到容器中,不再需要顯示實現Aware介面。

import com.alibaba.fastjson.JSON;
import com.rickiyang.learn.entity.User;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * @author rickiyang
 * @date 2019-12-04
 * @Desc TODO
 */
@Service
public class UserRegisterPublisher1Service {

  @Resource
  private ApplicationEventPublisher applicationEventPublisher;

  public void insert(User user){
    UserRegisterEvent event = new UserRegisterEvent(JSON.toJSONString(user));
    applicationEventPublisher.publishEvent(event);
  }
}

建立listener也就不需要顯式的繼承 ApplicationListener 或者 SmartApplicationListener,使用 @EventListener 註解即可:

import com.alibaba.fastjson.JSON;
import com.rickiyang.learn.entity.User;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;

/**
 * @author rickiyang
 * @date 2019-12-07
 * @Desc TODO
 */
@Service
public class UserInfoCheckListener {

  @Order(8)
  @EventListener(classes = UserRegisterEvent.class)
  public void checkUserInfo(UserRegisterEvent event) {
    String source = (String) event.getSource();
    User user = JSON.parseObject(source,User.class);
    //todo check user info
  }
}

如果你想使用順序性的listener,那麼只需要使用 @Order註解就可以了。

非同步事件的支援

上面說過 Spring 事件機制預設是同步阻塞的,如果 ApplicationEventPublisher 釋出事件之後他會一直阻塞等待listener 響應,多個 listener 的情況下前面的沒有執行完後面的一直被阻塞。如果我們的應用場景是:使用者訂單完成之後非同步發貨,檢查快遞資訊,這些操作是沒有必要返回結果給使用者的。

這種情況下,我們是不是想到可以使用非同步執行緒的方式來處理。你可以把listener中的處理流程做一個非同步執行緒,或者利用 Spring 提供的執行緒池註解 @Async 來實現非同步執行緒。

要使用 @Async 之前需要先開啟執行緒池,在 啟動類上新增 @EnableAsync 註解即可。執行緒池支援配置模式,如果你不想使用預設的執行緒池配置,可以手動指定:

package com.rickiyang.learn.controller.test;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.*;

/**
 * @author rickiyang
 * @date 2019-12-07
 * @Desc TODO
 */
@Configuration
@EnableAsync
public class AsyncConfig {

  @Bean("userInfoPool")
  public Executor getExecutor() {
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
        .setNameFormat("consumer-queue-thread-%d").build();
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    // 執行緒池維護執行緒的最少數量
    executor.setCorePoolSize(5);
    // 執行緒池維護執行緒的最大數量
    executor.setMaxPoolSize(10);
    // 快取佇列
    executor.setQueueCapacity(25);
    //執行緒名
    executor.setThreadFactory(namedThreadFactory);
    // 執行緒池初始化
    executor.initialize();
    return executor;
  }
}

手動配置一個 bean name 為 userInfoPool 的執行緒池,接下來使用@Async註解使用執行緒池:

package com.rickiyang.learn.controller.test;

import com.alibaba.fastjson.JSON;
import com.rickiyang.learn.entity.User;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * @author rickiyang
 * @date 2019-12-07
 * @Desc TODO
 */
@Service
public class UserInfoCheckListener {

  @Async("userInfoPool")
  @Order(8)
  @EventListener(classes = UserRegisterEvent.class)
  public void checkUserInfo(UserRegisterEvent event) {
    String source = (String) event.getSource();
    User user = JSON.parseObject(source,User.class);
    System.out.println("async deel");
    //todo check user info
  }
}

這樣我們就把 UserInfoCheckListener 變成了非同步任務。

Spring中的事件機制分析

上面從基本的釋出訂閱設計模式到 Java 提供的基本的事件處理基類,再拓展到 Spring 中如何使用事件機制來拓展程式碼,整條線還是很清晰。講完了我們應該如何在業務程式碼中使用釋出訂閱模式,我們也來分析一下Spring是如何實現釋出訂閱模式的,看看人家的程式碼功底。

在Spring 中提供了Event 的基類:ApplicationEvent,如果事件要想被Spring監聽那麼就必須繼承該類,同樣該類也繼承了 Java 中的事件基類:EventObject。

有了事件源,我們要定義事件監聽者用於處理事件,所有的事件監聽者都要繼承 org.springframework.context.ApplicationListener 介面:

/**
 * Interface to be implemented by application event listeners.
 * Based on the standard {@code java.util.EventListener} interface
 * for the Observer design pattern.
 *
 * <p>As of Spring 3.0,an ApplicationListener can generically declare the event type
 * that it is interested in. When registered with a Spring ApplicationContext,events
 * will be filtered accordingly,with the listener getting invoked for matching event
 * objects only.
 *
 * @author Rod Johnson
 * @author Juergen Hoeller
 * @param <E> the specific ApplicationEvent subclass to listen to
 * @see org.springframework.context.event.ApplicationEventMulticaster
 */
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {

}

ApplicationListener 提供了 一個基於 ApplicationEvent 的泛型,所以你指定了某個類的監聽者只會處理該型別的event。

上面我們說了 Spring 是基於 ApplicationEventPublisher 來發布事件,那麼監聽器是如何獲取到事件呢?

注意到 ApplicationListener 上面的註釋寫到:@param <E> the specific ApplicationEvent subclass to listen to ApplicationEventMulticaster,從名稱上看這個類的作用應該是用於事件廣播。

ApplicationEventMulticaster是一個介面,提供瞭如下方法:

  • addApplicationListener(ApplicationListener<?> listener) :新增一個listener;
  • addApplicationListenerBean(String listenerBeanName):新增一個listener,引數為bean name;
  • removeApplicationListener(ApplicationListener<?> listener):刪除listener;
  • removeApplicationListenerBean(String listenerBeanName):根據bean name 刪除listener;
  • multicastEvent(ApplicationEvent event):廣播事件;
  • multicastEvent(ApplicationEvent event,@Nullable ResolvableType eventType):廣播事件,指定事件的source型別。

從介面的方法看,該類的作用就是新增監聽器然後對所有監聽器或者指定監聽器傳送事件進行處理。

ApplicationEventMulticaster 有兩個實現類:

  • SimpleApplicationEventMulticaster
  • AbstractApplicationEventMulticaster

因為 AbstractApplicationEventMulticaster 是一個抽象類,並且 SimpleApplicationEventMulticaster 也繼承了了 SimpleApplicationEventMulticaster ,所以我們直接看 SimpleApplicationEventMulticaster:

public abstract class AbstractApplicationEventMulticaster
		implements ApplicationEventMulticaster,BeanClassLoaderAware,BeanFactoryAware {

	private final ListenerRetriever defaultRetriever = new ListenerRetriever(false);

	final Map<ListenerCacheKey,ListenerRetriever> retrieverCache = new ConcurrentHashMap<>(64);

 
 @Override
	public void addApplicationListener(ApplicationListener<?> listener) {
		synchronized (this.retrievalMutex) {
			// Explicitly remove target for a proxy,if registered already,// in order to avoid double invocations of the same listener.
			Object singletonTarget = AopProxyUtils.getSingletonTarget(listener);
			if (singletonTarget instanceof ApplicationListener) {
				this.defaultRetriever.applicationListeners.remove(singletonTarget);
			}
			this.defaultRetriever.applicationListeners.add(listener);
			this.retrieverCache.clear();
		}
	}
 
 ......
 ......


}

#addApplicationListener 方法用於新增監聽器,新增的邏輯主要在這一句:

defaultRetriever.applicationListeners.add(listener);

繼續看 ListenerRetriever 的實現:

private class ListenerRetriever {

 public final Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>();

 public final Set<String> applicationListenerBeans = new LinkedHashSet<>();

 private final boolean preFiltered;

 public ListenerRetriever(boolean preFiltered) {
  this.preFiltered = preFiltered;
 }

 public Collection<ApplicationListener<?>> getApplicationListeners() {
  List<ApplicationListener<?>> allListeners = new ArrayList<>(
   this.applicationListeners.size() + this.applicationListenerBeans.size());
  allListeners.addAll(this.applicationListeners);
  if (!this.applicationListenerBeans.isEmpty()) {
   BeanFactory beanFactory = getBeanFactory();
   for (String listenerBeanName : this.applicationListenerBeans) {
    try {
     ApplicationListener<?> listener = beanFactory.getBean(listenerBeanName,ApplicationListener.class);
     if (this.preFiltered || !allListeners.contains(listener)) {
      allListeners.add(listener);
     }
    }
    catch (NoSuchBeanDefinitionException ex) {
     // Singleton listener instance (without backing bean definition) disappeared -
     // probably in the middle of the destruction phase
    }
   }
  }
  if (!this.preFiltered || !this.applicationListenerBeans.isEmpty()) {
   AnnotationAwareOrderComparator.sort(allListeners);
  }
  return allListeners;
 }
}

看到沒,最終還是 持有了一個 applicationListeners 的集合,跟我們的釋出訂閱設計模式一樣。

剩下的邏輯就好去解釋,順著咱們前面講過的釋出訂閱模式的使用套路擼下去就行,事件廣播的方法#multicastEvent不外乎就是遍歷所有的監聽器進行匹配。

總結

這一篇講的釋出訂閱模式以及在Spring中的使用在日常開發中只要稍加註意你就會發現對改善程式碼流程的影響還是挺大。寫程式碼有90%的時間我們都是在寫同步程式碼,因為不用動腦子,順著該有的流程擼就完事。這樣帶來的後果就是你真的只是在搬磚!

有的時候停下來,從業務邏輯跳出來拿半個小時想想你應該如何讓這這一次搬磚有點技術含量。或許從此刻開始,搬磚也會與眾不同。

以上就是詳解JAVA Spring 中的事件機制的詳細內容,更多關於JAVA Spring 事件機制的資料請關注我們其它相關文章!