我就吃瓜(Guava): 事件匯流排EventBus
阿新 • • 發佈:2018-12-11
EventBus : 事件匯流排
-
EventBus 是Guava的事件處理機制,是觀察者模式的優雅實現。
-
使用Guava中的EventBus時,不需要再實現指定介面,只需要在指定的方法上實現@Subscribe註解即可。
-
小栗子:
public class EventListener { //訂閱者 //@Subscribe保證有且只有一個輸入引數,如果你需要訂閱某種型別的訊息,只需要在指定的方法上加上 @Subscribe註解即可 @Subscribe public void listen(OrderEvent event){ System.out.println("receive message: "+event.getMessage()); } /* 一個subscriber也可以同時訂閱多個事件 Guava會通過事件型別來和訂閱方法的形參來決定到底呼叫subscriber的哪個訂閱方法 */ @Subscribe public void listen(String message){ System.out.println("receive message: "+message); } } public class MultiEventListener { //訂閱者 @Subscribe public void listen(OrderEvent event){ System.out.println("MultiEventListener receive msg: "+event.getMessage()); } @Subscribe public void listen(String message){ System.out.println("MultiEventListener receive msg: "+message); } } //釋出-訂閱模式中傳遞的事件,是一個普通的POJO類 public class OrderEvent { private String message; public OrderEvent(String message) { this.message = message; } public String getMessage() { return message; } //測試方法 public static void main(String[] args) { EventBus eventBus = new EventBus("jack"); /* 如果多個subscriber訂閱了同一個事件,那麼每個subscriber都將收到事件通知 並且收到事件通知的順序跟註冊的順序保持一致 */ eventBus.register(new EventListener()); //註冊訂閱者 eventBus.register(new MultiEventListener()); eventBus.post(new OrderEvent("hello")); //釋出事件 eventBus.post(new OrderEvent("world")); eventBus.post("!"); //out /** * receive message: hello * MultiEventListener receive msg: hello * receive message: world * MultiEventListener receive msg: world * receive message: ! * MultiEventListener receive msg: ! **/ } }
EventBus 觀察
-
EventBus中的訂閱者們和註冊方法
private final SubscriberRegistry subscribers;
public void register(Object object) {
this.subscribers.register(object);
}
void register(Object listener) { Multimap<Class<?>, Subscriber> listenerMethods = this.findAllSubscribers(listener); ... }
- 註冊時會找到 listener 中所有帶有 @Subscribe 的方法,並根據方法的引數型別進行分類
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) { Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create(); Class<?> clazz = listener.getClass(); Iterator i$ = getAnnotatedMethods(clazz).iterator(); while(i$.hasNext()) { Method method = (Method)i$.next(); Class<?>[] parameterTypes = method.getParameterTypes(); Class<?> eventType = parameterTypes[0]; methodsInListener.put(eventType, Subscriber.create(this.bus, listener, method)); } return methodsInListener; }
- 判斷方法是不是訂閱者
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
Map<SubscriberRegistry.MethodIdentifier, Method> identifiers = Maps.newHashMap();
Iterator i$ = supertypes.iterator();
while(i$.hasNext()) {
Class<?> supertype = (Class)i$.next();
Method[] arr$ = supertype.getDeclaredMethods();
int len$ = arr$.length;
for(int i$ = 0; i$ < len$; ++i$) {
Method method = arr$[i$];
if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
...
}
}
}
return ImmutableList.copyOf(identifiers.values());
}
- 通知方法 , 獲取引數為event型別的全部觀察者
public void post(Object event) {
Iterator<Subscriber> eventSubscribers = this.subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
this.dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
this.post(new DeadEvent(this, event));
}
}
- 將通知分發給觀察者們
void dispatch(Object event, Iterator<Subscriber> subscribers) {
Preconditions.checkNotNull(event);
Preconditions.checkNotNull(subscribers);
Queue<Dispatcher.PerThreadQueuedDispatcher.Event> queueForThread = (Queue)this.queue.get();
queueForThread.offer(new Dispatcher.PerThreadQueuedDispatcher.Event(event, subscribers));
if (!((Boolean)this.dispatching.get()).booleanValue()) {
this.dispatching.set(true);
Dispatcher.PerThreadQueuedDispatcher.Event nextEvent;
try {
while((nextEvent = (Dispatcher.PerThreadQueuedDispatcher.Event)queueForThread.poll()) != null) {
while(nextEvent.subscribers.hasNext()) {
((Subscriber)nextEvent.subscribers.next()).dispatchEvent(nextEvent.event);
}
}
} finally {
this.dispatching.remove();
this.queue.remove();
}
}
}
- 具體的執行帶有 @Subscribe 的方法,非同步執行,提高效率
final void dispatchEvent(final Object event) {
this.executor.execute(new Runnable() {
public void run() {
try {
Subscriber.this.invokeSubscriberMethod(event);
} catch (InvocationTargetException var2) {
Subscriber.this.bus.handleSubscriberException(var2.getCause(), Subscriber.this.context(event));
}
}
});
}