1. 程式人生 > >我就吃瓜(Guava): 事件匯流排EventBus

我就吃瓜(Guava): 事件匯流排EventBus

EventBus : 事件匯流排

  • EventBus 是Guava的事件處理機制,是觀察者模式的優雅實現。

  • 使用Guava中的EventBus時,不需要再實現指定介面,只需要在指定的方法上實現@Subscribe註解即可。

  • 小栗子:

public class EventListener { //訂閱者

    //@Subscribe保證有且只有一個輸入引數,如果你需要訂閱某種型別的訊息,只需要在指定的方法上加上	            @Subscribe註解即可
    @Subscribe
    public void listen(OrderEvent event){
        System.out.println("receive message: "+event.getMessage());
    }

    /*
      一個subscriber也可以同時訂閱多個事件
      Guava會通過事件型別來和訂閱方法的形參來決定到底呼叫subscriber的哪個訂閱方法
     */
    @Subscribe
    public void listen(String message){
        System.out.println("receive message: "+message);
    }
}

public class MultiEventListener { //訂閱者

    @Subscribe
    public void listen(OrderEvent event){
        System.out.println("MultiEventListener receive msg: "+event.getMessage());
    }

    @Subscribe
    public void listen(String message){
        System.out.println("MultiEventListener receive msg: "+message);
    }
}

//釋出-訂閱模式中傳遞的事件,是一個普通的POJO類
public class OrderEvent {
    private String message;

    public OrderEvent(String message) {
        this.message = message;
    }

    public String getMessage() {
        return message;
    }
	
	//測試方法
    public static void main(String[] args) {
        EventBus eventBus = new EventBus("jack");
        /*
         如果多個subscriber訂閱了同一個事件,那麼每個subscriber都將收到事件通知
         並且收到事件通知的順序跟註冊的順序保持一致
        */
        eventBus.register(new EventListener()); //註冊訂閱者
        eventBus.register(new MultiEventListener());
        eventBus.post(new OrderEvent("hello")); //釋出事件
        eventBus.post(new OrderEvent("world"));
        eventBus.post("!");
        //out
        /**
         * receive message: hello
         * MultiEventListener receive msg: hello
         * receive message: world
         * MultiEventListener receive msg: world
         * receive message: !
         * MultiEventListener receive msg: !
         **/
    }

}

EventBus 觀察

  • EventBus中的訂閱者們和註冊方法

private final SubscriberRegistry subscribers;

public void register(Object object) {
        this.subscribers.register(object);
}
void register(Object listener) {
    Multimap<Class<?>, Subscriber> listenerMethods = this.findAllSubscribers(listener);
    ...
}
  •  註冊時會找到 listener 中所有帶有 @Subscribe 的方法,並根據方法的引數型別進行分類
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    Class<?> clazz = listener.getClass();
    Iterator i$ = getAnnotatedMethods(clazz).iterator();

    while(i$.hasNext()) {
        Method method = (Method)i$.next();
        Class<?>[] parameterTypes = method.getParameterTypes();
        Class<?> eventType = parameterTypes[0];
        methodsInListener.put(eventType, Subscriber.create(this.bus, listener, method));
    }

    return methodsInListener;
}
  • 判斷方法是不是訂閱者
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
    Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
    Map<SubscriberRegistry.MethodIdentifier, Method> identifiers = Maps.newHashMap();
    Iterator i$ = supertypes.iterator();

    while(i$.hasNext()) {
        Class<?> supertype = (Class)i$.next();
        Method[] arr$ = supertype.getDeclaredMethods();
        int len$ = arr$.length;

        for(int i$ = 0; i$ < len$; ++i$) {
            Method method = arr$[i$];
            if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
               ...
            }
        }
    }

    return ImmutableList.copyOf(identifiers.values());
}
  • 通知方法 , 獲取引數為event型別的全部觀察者
public void post(Object event) {
    Iterator<Subscriber> eventSubscribers = this.subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
        this.dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
        this.post(new DeadEvent(this, event));
    }
}
  • 將通知分發給觀察者們
void dispatch(Object event, Iterator<Subscriber> subscribers) {
    Preconditions.checkNotNull(event);
    Preconditions.checkNotNull(subscribers);
    Queue<Dispatcher.PerThreadQueuedDispatcher.Event> queueForThread = (Queue)this.queue.get();
    queueForThread.offer(new Dispatcher.PerThreadQueuedDispatcher.Event(event, subscribers));
    if (!((Boolean)this.dispatching.get()).booleanValue()) {
        this.dispatching.set(true);
        
        Dispatcher.PerThreadQueuedDispatcher.Event nextEvent;
        try {
            while((nextEvent = 	                           (Dispatcher.PerThreadQueuedDispatcher.Event)queueForThread.poll()) != null) {    
                while(nextEvent.subscribers.hasNext()) {
                    ((Subscriber)nextEvent.subscribers.next()).dispatchEvent(nextEvent.event);
                }
            }
        } finally {
            this.dispatching.remove();
            this.queue.remove();
        }
    }

}
  • 具體的執行帶有 @Subscribe 的方法,非同步執行,提高效率
final void dispatchEvent(final Object event) {
    this.executor.execute(new Runnable() {
        public void run() {
            try {
                Subscriber.this.invokeSubscriberMethod(event);
            } catch (InvocationTargetException var2) {
                Subscriber.this.bus.handleSubscriberException(var2.getCause(), 			                            Subscriber.this.context(event));
            }

        }
    });
}