8.原始碼分析---從設計模式中看SOFARPC中的EventBus?
我們在前面分析客戶端引用的時候會看到如下這段程式碼:
// 產生開始呼叫事件
if (EventBus.isEnable(ClientStartInvokeEvent.class)) {
EventBus.post(new ClientStartInvokeEvent(request));
}
這裡用EventBus呼叫了一下post方法之後就什麼也沒做了,就方法名來看是傳送了一個post請求,也不知道發給誰,到底有什麼用。
所以這一節我們來分析一下EventBus這個類的作用。
首先我們來看一下這個類的方法
從EventBus的方法中我們是不是應該想到了這是使用了什麼設計模式?
沒錯,這裡用到的是訂閱釋出模式(Subscribe/Publish)。訂閱釋出模式定義了一種一對多的依賴關係,讓多個訂閱者物件同時監聽某一個主題物件。這個主題物件在自身狀態變化時,會通知所有訂閱者物件,使它們能夠自動更新自己的狀態。
我們先分析原始碼,分析完原始碼之後再來總結一下。
EventBus傳送事件
根據上面的示例,我們先看EventBus#post是裡面是怎麼做的。
EventBus#post
private final static ConcurrentMap<Class<? extends Event>, CopyOnWriteArraySet<Subscriber>> SUBSCRIBER_MAP = new ConcurrentHashMap<Class<? extends Event>, CopyOnWriteArraySet<Subscriber>>(); public static void post(final Event event) { //是否開啟匯流排 if (!isEnable()) { return; } //根據傳入得event獲取到相應的Subscriber CopyOnWriteArraySet<Subscriber> subscribers = SUBSCRIBER_MAP.get(event.getClass()); if (CommonUtils.isNotEmpty(subscribers)) { for (final Subscriber subscriber : subscribers) { //如果事件訂閱者是同步的,那麼直接呼叫 if (subscriber.isSync()) { handleEvent(subscriber, event); } else { // 非同步 final RpcInternalContext context = RpcInternalContext.peekContext(); //使用執行緒池啟動一個執行緒一部執行任務 final ThreadPoolExecutor asyncThreadPool = AsyncRuntime.getAsyncThreadPool(); try { asyncThreadPool.execute( new Runnable() { @Override public void run() { try { RpcInternalContext.setContext(context); //呼叫訂閱者的event事件 handleEvent(subscriber, event); } finally { RpcInternalContext.removeContext(); } } }); } catch (RejectedExecutionException e) { LOGGER .warn("This queue is full when post event to async execute, queue size is " + asyncThreadPool.getQueue().size() + ", please optimize this async thread pool of eventbus."); } } } } } private static void handleEvent(final Subscriber subscriber, final Event event) { try { subscriber.onEvent(event); } catch (Throwable e) { if (LOGGER.isWarnEnabled()) { LOGGER.warn("Handle " + event.getClass() + " error", e); } } }
這個post方法主要做了這麼幾件事:
- 根據傳入的Event獲取對應的訂閱列表subscribers
- 遍歷subscribers
- 如果訂閱者是非同步的,那麼就使用執行緒池啟動執行任務
4, 如果是同步的那麼就呼叫handleEvent方法向訂閱者釋出訊息
我們再來看看訂閱者是怎樣的:
Subscriber
public abstract class Subscriber { /** * 接到事件是否同步執行 */ protected boolean sync = true; /** * 事件訂閱者 */ protected Subscriber() { } /** * 事件訂閱者 * * @param sync 是否同步 */ protected Subscriber(boolean sync) { this.sync = sync; } /** * 是否同步 * * @return 是否同步 */ public boolean isSync() { return sync; } /** * 事件處理,請處理異常 * * @param event 事件 */ public abstract void onEvent(Event event); }
Subscriber是一個抽象類,預設是同步的方式進行訂閱。總共有下面四個實現類:
LookoutSubscriber
FaultToleranceSubscriber
RestTracerSubscriber
SofaTracerSubscriber
這裡我不打算每個都進行分析,到時候打算用到了再詳細說明,這樣不會那麼抽象。
由於我們前面講到了,在客戶端引用的時候會發送一個產生開始呼叫事件給匯流排,那一定要有訂閱者這個傳送事件才有意義。所以我們接下來看看是在哪裡進行事件的註冊的。
訂閱者註冊到EventBus
通過上面的繼承關係圖可以看到,在ConsumerConfig是AbstractIdConfig的子類,所以在初始化ConsumerConfig的時候AbstractIdConfig靜態程式碼塊也會被初始化。
public abstract class AbstractIdConfig<S extends AbstractIdConfig> implements Serializable {
static {
RpcRuntimeContext.now();
}
}
在呼叫RpcRuntimeContext#now方法的時候,會呼叫到RpcRuntimeContext的靜態程式碼塊
RpcRuntimeContext
public class RpcRuntimeContext {
static {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Welcome! Loading SOFA RPC Framework : {}, PID is:{}", Version.BUILD_VERSION, PID);
}
put(RpcConstants.CONFIG_KEY_RPC_VERSION, Version.RPC_VERSION);
// 初始化一些上下文
initContext();
// 初始化其它模組
ModuleFactory.installModules();
// 增加jvm關閉事件
if (RpcConfigs.getOrDefaultValue(RpcOptions.JVM_SHUTDOWN_HOOK, true)) {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("SOFA RPC Framework catch JVM shutdown event, Run shutdown hook now.");
}
destroy(false);
}
}, "SOFA-RPC-ShutdownHook"));
}
}
public static long now() {
return System.currentTimeMillis();
}
}
在RpcRuntimeContext靜態程式碼塊裡主要做了以下幾件事:
- 初始化一些上下文的東西,例如:應用Id,應用名稱,當前所在資料夾地址等
- 初始化一些模組,等下分析
- 增加jvm關閉時的鉤子
我們直接看installModules方法就好了,其他的方法和主流程無關。
ModuleFactory#installModules
public static void installModules() {
//通過SPI載入Module模組
ExtensionLoader<Module> loader = ExtensionLoaderFactory.getExtensionLoader(Module.class);
//moduleLoadList 預設是 *
String moduleLoadList = RpcConfigs.getStringValue(RpcOptions.MODULE_LOAD_LIST);
for (Map.Entry<String, ExtensionClass<Module>> o : loader.getAllExtensions().entrySet()) {
String moduleName = o.getKey();
Module module = o.getValue().getExtInstance();
// judge need load from rpc option
if (needLoad(moduleLoadList, moduleName)) {
// judge need load from implement
if (module.needLoad()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Install Module: {}", moduleName);
}
//安裝模板
module.install();
INSTALLED_MODULES.put(moduleName, module);
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("The module " + moduleName + " does not need to be loaded.");
}
}
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("The module " + moduleName + " is not in the module load list.");
}
}
}
}
這個方法裡面一開始獲取Module的擴充套件類,Module的擴充套件類有如下幾個:
FaultToleranceModule
LookoutModule
RestTracerModule
SofaTracerModule- 然後會去獲取MODULE_LOAD_LIST配置類,多個配置用“;”分割。
- 呼叫loader.getAllExtensions()獲取所有的擴充套件類。遍歷擴充套件類。
- 接著呼叫needLoad方法:
static boolean needLoad(String moduleLoadList, String moduleName) {
//用;拆分
String[] activatedModules = StringUtils.splitWithCommaOrSemicolon(moduleLoadList);
boolean match = false;
for (String activatedModule : activatedModules) {
//ALL 就是 *
if (StringUtils.ALL.equals(activatedModule)) {
match = true;
} else if (activatedModule.equals(moduleName)) {
match = true;
} else if (match && (activatedModule.equals("!" + moduleName)
|| activatedModule.equals("-" + moduleName))) {
match = false;
break;
}
}
return match;
}
這個方法會傳入配置的moduleLoadList和當前遍歷到的moduleName,moduleLoadList預設是*
所以會返回true,如果配置了moduleLoadList不為*
的話,如果moduleName是配置中的之一便會返回true。
- 呼叫module的install進行模板的裝配
這裡我們進入到SofaTracerModule#install中
SofaTracerModule#install
public void install() {
Tracer tracer = TracerFactory.getTracer("sofaTracer");
if (tracer != null) {
subscriber = new SofaTracerSubscriber();
EventBus.register(ClientStartInvokeEvent.class, subscriber);
EventBus.register(ClientBeforeSendEvent.class, subscriber);
EventBus.register(ClientAfterSendEvent.class, subscriber);
EventBus.register(ServerReceiveEvent.class, subscriber);
EventBus.register(ServerSendEvent.class, subscriber);
EventBus.register(ServerEndHandleEvent.class, subscriber);
EventBus.register(ClientSyncReceiveEvent.class, subscriber);
EventBus.register(ClientAsyncReceiveEvent.class, subscriber);
EventBus.register(ClientEndInvokeEvent.class, subscriber);
}
}
這裡我們可以看到文章一開始被髮送的ClientStartInvokeEvent在這裡被註冊了。訂閱者的實現類是SofaTracerSubscriber。
訂閱者被呼叫
在上面我們分析到在註冊到EventBus之後,會發送一個post請求,然後EventBus會遍歷所有的Subscriber,呼叫符合條件的Subscriber的onEvent方法。
SofaTracerSubscriber#onEvent
public void onEvent(Event originEvent) {
if (!Tracers.isEnable()) {
return;
}
Class eventClass = originEvent.getClass();
if (eventClass == ClientStartInvokeEvent.class) {
ClientStartInvokeEvent event = (ClientStartInvokeEvent) originEvent;
Tracers.startRpc(event.getRequest());
}
else if (eventClass == ClientBeforeSendEvent.class) {
ClientBeforeSendEvent event = (ClientBeforeSendEvent) originEvent;
Tracers.clientBeforeSend(event.getRequest());
}
.....
}
這個方法裡面主要就是對不同的event做出不同的反應。ClientStartInvokeEvent所做的請求就是呼叫一下Tracers#startRpc,Tracers是用來做鏈路追蹤的,這篇文章不涉及。
總結
我們首先上一張圖,來說明一下訂閱釋出模式整體的結構。
在我們這個例子裡EventBus的職責就是排程中心,subscriber的具體實現註冊到EventBus中後,會儲存到EventBus的SUBSCRIBER_MAP集合中。
釋出者在釋出訊息的時候會呼叫EventBus的post方法傳入一個具體的event來呼叫訂閱者的事件。一個事件有多個訂閱者,訊息的釋出者不會直接的去呼叫訂閱者來發布訊息,而是通過EventBus來進行觸發。
通過EventBus來觸發不同的訂閱者的事件可以在觸發事件之前同一的為其做一些操作,比如是同步還是非同步,要不要過濾部分訂閱者等。
SOFARPC原始碼解析系列:
1. 原始碼分析---SOFARPC可擴充套件的機制SPI
2. 原始碼分析---SOFARPC客戶端服務引用
3. 原始碼分析---SOFARPC客戶端服務呼叫
4. 原始碼分析---SOFARPC服務端暴露
5.原始碼分析---SOFARPC呼叫服務
6.原始碼分析---和dubbo相比SOFARPC是如何實現負載均衡的?
7.原始碼分析---SOFARPC是如何實現連線管理與心跳