1. 程式人生 > >EventBus3.0原始碼分析

EventBus3.0原始碼分析

簡述:

    在專案中,我們大多數開發者可能都使用過EventBus,即使沒有使用過但我可以確定Android開發者也聽說過這個牛X的庫,從誕生到目前EventBus已經更新到3.X版本,可見生命力極強呀。那麼這篇博文就從EventBus3.0原始碼的角度分析一下其內部處理流程。

使用流程:

    註冊:

EventBus.getDefault().register(obj)
    訂閱(訊息接收):
@Subscribe
public void receive(Object event){
}
    釋出訊息:
EventBus.getDefault().post(event)
    登出:
EventBus.getDefault().unregister(obj)

原始碼分析:

註冊:

EventBus.getDefault().register(obj) 

   這段程式碼做了兩件事情:① EventBus.getDefault() 建立EventBus物件;② register(obj) 方法為obj該類物件註冊EventBus。 那這兩個方法究竟在EventBus中究竟做了哪些工作呢?我們開啟EventBus的原始碼看一下:

   1、EventBus.getDefault()     原始碼如下:
public static EventBus getDefault() {
    if (defaultInstance == null) {
        synchronized (EventBus.class) {
            if (defaultInstance == null) {
                defaultInstance = new EventBus();
            }
        }
    }
    return defaultInstance;
}
    看到了吧,EventBus採用單例模式建立EventBus物件,接下來它在構造方法中又做了什麼事情呢?
public EventBus() {
    this(DEFAULT_BUILDER);
}
    在構造方法中其呼叫了有參構造方法:EventBus(EventBusBuilder builder ),我們再跟進去看一看:
EventBus(EventBusBuilder builder) {
    subscriptionsByEventType = new HashMap<>();
    typesBySubscriber = new HashMap<>();
    stickyEvents = new ConcurrentHashMap<>();

    mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
    backgroundPoster = new BackgroundPoster(this);
    asyncPoster = new AsyncPoster(this);

    indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;

    //預設情況下引數為(null,false,false)
    subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
            builder.strictMethodVerification, builder.ignoreGeneratedIndex);

    logSubscriberExceptions = builder.logSubscriberExceptions;
    logNoSubscriberMessages = builder.logNoSubscriberMessages;
    sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
    sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
    throwSubscriberException = builder.throwSubscriberException;
    eventInheritance = builder.eventInheritance;
    executorService = builder.executorService;
}
    這段程式碼對一些變數進行了初始化,現在就挑重要的變數解釋一下。首先,初始化了3個Map,這3個Map有什麼用呢?我們再來一一詳細說一下:     ① Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType:key:事件型別(如:String型別或者自定義的事件型別),value:該事件的訂閱者的list集合。當傳送event訊息的時候,都是去這裡找對應的訂閱者。     ② Map<Object, List<Class<?>>> typesBySubscriber:key:事件的訂閱者(如XXXActivity),value:事件型別的執行時類的list集合。register()unregister()的時候都是操作這個Map     ③ Map<Class<?>, Object> stickyEvents:維護的是粘性事件的集合,粘性事件也就是當event傳送出去之後再註冊粘性事件的話,該粘性事件也能接收到之前傳送出去的event訊息。     其次,初始化3個訊息傳送器如下:     mainThreadPoster :該類繼承自Handler,而且在EventBus中mainThreadPoster屬於主執行緒Handler,這是因為mainThreadPoster就是為處理“訊息接收方法在主執行緒而訊息傳送在子執行緒 ”這個問題而設計的,所以子執行緒向主執行緒傳送訊息必須使用主執行緒的Handler。mainThreadPoster繼承Handler也是為了效率考慮的。     backgroundPoster:該類繼承自Runnable,重寫了run()方法。在run()方法中將子執行緒中的訊息通過EventBus傳送到主執行緒。所以這個訊息傳送器作用就是處理“訊息接收方法在子執行緒接而訊息的釋出在主執行緒”這樣的問題。     asyncPoster:該類繼承自Runnable,也重寫了run()方法,不過就像名字一樣“非同步”,也就是說不管訂閱者是不是在主執行緒,訊息接收方法都會另外開啟一個執行緒處理訊息。     然後,一個重要的初始化物件為subscriberMethodFinder,這個物件利用反射的方法查詢每一個接收訊息者的方法(也即是添加了“@Subscribe ”註解的方法)。     最後就是一些對EventBusBuilder的一些配置資訊。其中eventInheritanceexecutorService在接下來分析原始碼時會經常碰到:① eventInheritance表示我們自定義的待發布訊息事件是否允許繼承,,預設情況下eventInheritance==true。它的作用就是處理業務時後來新增加業務後不必再修改程式碼,只需要繼承就OK啦(這也符合程式的“開閉原則”)如下:
public class MessageEvent {
    public final String message;

    public MessageEvent(String message) {
        this.message = message;
    }
}

public class SubMessageEvent extends MessageEvent {
    public SubMessageEvent(String message) {
        super(message);
    }
}
    ②executorService:這個執行緒池是給backgroundPoster和asyncPoster用來處理訊息傳送的。這樣做也能夠提高訊息傳送的效率。 2、註冊register(Object subscriber )     EventBus的初始化工作已經完畢,我們繼續看一下EventBus是怎麼進行註冊的,在註冊過程中又搞了哪些事情?
public void register(Object subscriber) {
    Class<?> subscriberClass = subscriber.getClass();
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder
            .findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}
     在該方法中首先取得註冊者的執行時類物件,拿到執行時類物件後通過註冊者註冊方法查詢器SubscriberMethodFinder利用反射的方法找到註冊者類中所有的接收訊息的方法,也即是所有添加了註解“Subscribe ”的方法。最後進行通過方法subscribe(subscriber, subscriberMethod)為每一個接收訊息的方法進行註冊。流程大致就是這樣的,首先 我們先看一下findSubscriberMethods這個方法:
 /**
     * 方法描述:獲取該執行時類的所有@Subscribe註解的所有方法
     *
     * @param subscriberClass @Subscribe註解所屬類的執行時類物件
     * @return 註冊EventBus類中@Subscribe註解的所有方法的List集合
     */
    List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        //先從快取中查詢是否存在訊息接收的方法
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }

        if (ignoreGeneratedIndex) {
            //使用反射方法拿到訂閱者中的訂閱方法
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            //使用apt處理器拿到訂閱者中的訂閱方法
            subscriberMethods = findUsingInfo(subscriberClass);
        }
            //如果該訊息事件沒有被訂閱則丟擲異常
       if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            //將查到的方法放到快取中,以便下次使用
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }
    這個方法很重要,在初次使用EventBus3.0的時候也容易出錯的一個點(3.0新增註解):就是在訂閱事件的方法上沒有新增@Subscribe 註解,所以會碰到下面這個異常:
Caused by: org.greenrobot.eventbus.EventBusException: Subscriber class XXX and its super classes 
have no public methods with the @Subscribe annotation
    說到這我們還是沒有最終看到EventBus是怎麼進行註冊的,OK,回過頭來我們繼續看註冊
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    Class<?> eventType = subscriberMethod.eventType;
    Log.e(TAG, eventType.getSimpleName());
    //將訂閱類Object物件subscriber封裝為EventBus的訂閱類Subscription
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                    + eventType);
        }
    }

    /**
    * 方法描述:對CopyOnWriteArrayList中的Subscription根據優先順序的高低重新進行排序
    */
    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }
    /**
    * 方法描述:將開發者註冊EventBus的類的執行時類新增到subscribedEvents中,並且把該執行時類新增到
    * typesBySubscriber中
    */
    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    Log.e(TAG, "typesBySubscriber的");
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    subscribedEvents.add(eventType);

    if (subscriberMethod.sticky) {
        if (eventInheritance) {
            // Existing sticky events of all subclasses of eventType have to be considered.
            // Note: Iterating over all events may be inefficient with lots of sticky events,
            // thus data structure should be changed to allow a more efficient lookup
            // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}
    上面的這段程式碼雖然很多,但主要做了幾件事情:① 將註冊的訂閱者封裝為新的Subscription類 ②將訂閱者儲存到Map集合subscriptionsByEventType當中 ③對訊息事件接收者根據優先順序進行重排序 ④新增粘性訊息事件

釋出訊息:

    我們已經分析完了EventBus的註冊過程,接下來我們再來分析一下EventBus的事件傳送過程。

EventBus.getDefault().post(event);
    那麼這段程式碼是如何實現訊息的傳送呢?繼續原始碼看一下:
public void post(Object event) {
    PostingThreadState postingState = currentPostingThreadState.get();
    List<Object> eventQueue = postingState.eventQueue;
    eventQueue.add(event);//將該事件新增到事件隊列當中
    //事件沒有分發則開始分發
    if (!postingState.isPosting) {
        //判斷訊息接收者是否在主執行緒 
        postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();  
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {
            //迴圈傳送訊息
            while (!eventQueue.isEmpty()) {
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}
    從上面的程式碼中可以得知,待發送的訊息首先儲存到一個訊息list集合當中,然後再不斷的迴圈傳送訊息。傳送訊息時利用的方法是postSingleEvent(Object event, PostingThreadState postingState ),OK,我們繼續跟進:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    //預設情況下Event事件允許繼承,即預設情況下eventInheritance==true
    if (eventInheritance) {
        //查詢event事件及event子類事件
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    if (!subscriptionFound) {
      ...
    }
}
   在這個方法中並沒有真正的看到訊息的分發,而是查找了待分發事件訊息及其子類或者是待分發訊息介面及其子類的所有事件(預設情況下我們定義的訊息事件是允許繼承的。 我們在專案中起初可能考慮的不是很全面,再到後來不可預料的需求到來時我們可能會繼續改事件的一種情況,看到這不得不說EventBus真心考慮周全呀)。然後呼叫postSingleEventForEventType(event, postingState, eventClass)方法查詢該事件及其子類事件的訂閱者,如果沒有找到就傳送空訊息並列印日誌。好吧,很失望,到現在依然沒有看到對訊息事件進行分發。那我們繼續跟進:postSingleEventForEventType(event, postingState, eventClass);
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState,
                                            Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        //從Map中取出所有訂閱了eventClass事件的所有訂閱者
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    //如果該事件的訂閱者存在則向每一個訂閱者釋出訊息事件
    if (subscriptions != null && !subscriptions.isEmpty()) {
        for (Subscription subscription : subscriptions) {
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted = false;
            try {
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;
}
    好吧,小眼一瞄仍然沒有對訊息進行分發,而是查詢事件的所有訂閱者然後對所有訂閱者進行了一層封裝,封裝成PostingThreadState。那我們還是繼續吧,我們跟進postToSubscription(subscription, event, postingState.isMainThread)這個方法:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}
    看到這是不是有一種“復行數十步,豁然開朗”的感覺。是的,在這個方法中我們終於看到了對訊息事件進行4中不同情況下的分發了。根據訊息接收的threadMode分別進行了不同的處理:     POSTING:EventBus預設情況下的threadMode型別,這裡意思就是如果訊息釋出和訊息接收在同一執行緒情況下就直接呼叫invokeSubscriber(subscription, event)對訊息進行傳送。這種情況下事件傳遞是同步完成的,事件傳遞完成時,所有的訂閱者將已經被呼叫一次了。這個ThreadMode意味著最小的開銷,因為它完全避免了執行緒的切換。     MAIN:訊息接收在主執行緒中進行(此種情況適合進行UI操作),如果訊息釋出也在主執行緒就直接呼叫invokeSubscriber(subscription, event)對訊息進行傳送(這種情況是POSTING的情況),如果訊息釋出不在主執行緒中進行,那麼呼叫mainThreadPoster.enqueue(subscription, event)進行處理。他是怎麼處理的呢?我們跟進去瞧瞧:
final class HandlerPoster extends Handler {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    //預設情況下EventBus建立HandlerPoster的Looper為MainLooper,最大maxMillisInsideHandleMessage==10ms
    HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    /**
    * 方法描述:將訂閱者與訊息實體之間的對映存到佇列PendingPostQueue當中
    *
    * @param subscription 訂閱者
    * @param event        訊息事件
    */
    void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                //同步取出訊息佇列
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                //訊息傳送超時
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}
    為了說明問題,我們整個類貼出來:由於是主執行緒向子執行緒傳送訊息所以Looper採用的是主執行緒Looper,Handler也就是主執行緒Handler,其內部維護了一個PendingPost的物件池,這樣做也是為了提高記憶體利用率,這也不是重點,我們直接看重點,在enqueue(Subscription subscription, Object event)方法中利用HandlerPoster 傳送空訊息,HandlerPoster也重寫了handleMessage方法,在handleMessage方法中又呼叫eventBus.invokeSubscriber(pendingPost)進行訊息傳送,我們跟進去之後發現最終還是呼叫了invokeSubscriber(subscription, event)對訊息進行傳送。      BACKGROUND:這種情況是訊息接收在子執行緒(此種模式下適合在接收者方法中做IO等耗時操作)。那麼如果訊息釋出也在某個子執行緒中進行的就直接呼叫invokeSubscriber(subscription, event)對訊息進行傳送,如果訊息釋出在主執行緒當中應該儘可能快的將訊息傳送出去以免造成主執行緒阻塞,所以這時候就交給backgroundPoster去處理。它是怎麼處理的呢?我們進去看一看:
final class BackgroundPoster implements Runnable {

    ....
    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
                ...
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
             while (true) {
                    ...
             eventBus.invokeSubscriber(pendingPost);
            }
        } finally {
            executorRunning = false;
        }
    }
}
     backgroundPoster對Runnable進行了重寫,而且和HandlerPoster一樣也採用了物件池提高效率,當然重點是其開啟了執行緒池處理訊息的傳送,這也是避免阻塞主執行緒的舉措。當然其最終還是呼叫了invokeSubscriber()-----》invokeSubscriber(subscription, event)方法。     ASYNC:這種情況是訊息接收在子執行緒(如果訊息釋出在子執行緒中進行,那麼該子執行緒既不同於訊息釋出的子執行緒,又不在主執行緒,而是接收訊息是一個獨立於主執行緒又不同於訊息釋出的子執行緒)。由於在這種模式下每一個新新增的任務都會線上程池中開闢一個新執行緒執行,所以併發量更高效。而且最終還是會呼叫invokeSubscriber(subscription, event)方法對訊息進行分發。     既然4種模式下均是呼叫了invokeSubscriber(subscription, event)方法,那我們最後再看一下這個方法:
void invokeSubscriber(Subscription subscription, Object event) {
    try {
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}
   看到了吧,這個方法中就是利用反射將訊息傳送給每一個訊息的訂閱者。到此我們就完整的看完了EventBus的工作流程及主要程式碼的分析過程。真心不容易呀,已經被累跪啦!