Vert.x系列(二)--EventBusImpl原始碼分析
前言:Vert.x 實現了2種完成不同的eventBus:
EventBusImpl(A local event bus implementation)和 它的子類 ClusteredEventBus(An event bus implementation that clusters with other Vert.x nodes)。這裡介紹下EventBusImpl
EventBusImpl 原理:呼叫consumer方法時,以address-handler作為k-v存在一個map的容器中。接著呼叫send方法時,把message,DeploymentOptions等內容封裝成物件(MessageIml,命令模式),從以address為k從map裡取出handler.把MessageIml作為引數傳遞給handler執行。
一.初始化:
初始化過程就是new EventBusImpl,並修改狀態變數started。
首先,在VertxImpl的構造方法
VertxImpl(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler)
中進行初始化。以 options.isClustered()為判斷條件,呼叫createAndStartEventBus(options, resultHandler);
其次createAndStartEventBus中做了2件事
1.以options.isClustered()判斷條件,new出了ClusteredEventBus/ EventBusImpl. new時並沒有業務邏輯。(額外提一句eventBus = new EventBusImpl(this);使eventBus和VertImpl相互擁有對方的引用,是很常見的寫法。)
2.呼叫EventBusImpl的初始化方法start(),並返回結果給最外層resultHandler的。start()更沒做什麼事,只是EventBusImpl裡面有個狀態變數started。把它置為true.
二. consumer訂閱
EventBusImpl維護了
protected final ConcurrentMap<String, Handlers> handlerMap = new ConcurrentHashMap<>();
成員變數。
Handlers 是一個handler的List的封裝類,上面可以理解為 ConcurrentMap<String, List<Handler>>這種資料結構。consumer方法以address為k,以handler作v的list的一員,存放在handlerMap中。
所以重點關注對handlerMap的操作。
呼叫vertx.eventBus().consumer("Address1", ar -> {});發生了什麼?
檢視程式碼發現,先new HandlerRegistration(這裡也有相互引用)。再呼叫HandlerRegistration .handler,那裡面又會呼叫eventBusImpl.addRegistration()。在HandlerRegistration這個類兜了一圈,又回到eventBusImpl裡。
(相關程式碼截斷如下: EventBusImpl.consumer(address);--> new HandlerRegistration --> consumer.handler-->eventBus.addRegistration(address, this, repliedAddress != null, localOnly);)
核心邏輯在addRegistration() 和 addLocalRegistration()中。我的理解是,前個方法明顯有問題。最後一句addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult);前面的引數都沒有使用,應該可以省略,修改為addRegistration(registration::setResult);就可以。很少在Vert.x框架中看到這樣不合規範的程式碼。如果讀者有好的見解,歡迎留言。
// 呼叫 addLocalRegistration
// 註冊完成
protected <T> void addRegistration(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly) { Objects.requireNonNull(registration.getHandler(), "handler"); boolean newAddress = addLocalRegistration(address, registration, replyHandler, localOnly); addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult); }
/** *
* 初始化 或 獲取原 Contex
初始化 或 獲取原 Handlers
* 新建 HandlerHolder
* Handlers 裡新增 HandlerHolder
**/
protected <T> boolean addLocalRegistration(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly) { Objects.requireNonNull(address, "address"); Context context = Vertx.currentContext(); boolean hasContext = context != null; if (!hasContext) { // Embedded context = vertx.getOrCreateContext(); } registration.setHandlerContext(context); boolean newAddress = false; HandlerHolder holder = new HandlerHolder<>(metrics, registration, replyHandler, localOnly, context); Handlers handlers = handlerMap.get(address); if (handlers == null) { handlers = new Handlers(); Handlers prevHandlers = handlerMap.putIfAbsent(address, handlers); if (prevHandlers != null) { handlers = prevHandlers; } newAddress = true; } handlers.list.add(holder); if (hasContext) { HandlerEntry entry = new HandlerEntry<>(address, registration); context.addCloseHook(entry); } return newAddress; }
新出現的幾個類的作用:
Context 執行緒排程--Vert.x框架的優點是執行緒安全,就是通過Context實現。
HandlerHolder--對HandlerRegistration的封裝,外加Context。
Handlers--上面HandlerHolder 的集合封裝,外加平衡輪詢邏輯。
handlers.list.add(holder);這句作為壓軸(戲曲名詞,指一場摺子戲演出的倒數第二個劇目)出場完成整個功能的核心註冊操作。
至於後面的那段程式碼,我覺得有點問題。
if (hasContext) { HandlerEntry entry = new HandlerEntry<>(address, registration); context.addCloseHook(entry); }
作用是在context上註冊關閉事件,由DeploymentManager在unploy的時候呼叫,對應的核心邏輯在 CloseHooks.run()方法中。但這個這個判斷條件案例只有第2次新增consumer的時候才有效果。或者是上面的程式碼boolean hasContext = context != null;給人的誤導? 以上consumer的流程還被reply方法使用。
三. Send/Publish傳送
多個send過載方法最後定位到EventBusImpl.send(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler)。但這個核心方法的最終卻呼叫了一個
名為sendOrPubInternal的方法,不由得在讓人想起寫程式最難的事之一是命名。正如開頭說的這個使用了設計模式中的命令模式,把引數封裝成MessageImpl物件傳送到後面的方法。
sendOrPubInternal做了3個事情,
1.createReplyHandlerRegistration -- 有replyHandler.reply()這步才有意義
2.new SendContextImpl -- 從Context類判斷,SendContextImpl可以繫結執行緒
3.sendContext.next(); -- 在執行方法前,執行攔截器。攔截器極大地豐富開發人員的自定義使用。
本來應該1,2,3順序介紹程式碼,但是訊息流程一般是:
Sender----( message )--->customer;
Sender<---(reply message)---customer;
根據這個流程,得先介紹2.new SendContextImpl 和3.sendContext.next();
再回頭介紹 1.createReplyHandlerRegistration
先說 2.new SendContextImpl
這個類是整個Send相關類的大封裝。
3.sendContext.next();
根據程式碼流程
sendOrPub--》deliverMessageLocally--》deliverMessageLocally
進入到deliverMessageLocally(),這個方法做了2個大事情。
- 獲取address所對應的所有handlers
- 根據isSend()區分 send (平衡輪詢發一個handler)/publish(遍歷handlers發給所有)
方法的第一句話msg.setBus(this);和reply邏輯有關係。在這個local eventbus下,是重複賦值,沒有作用的。
然後Handlers handlers = handlerMap.get(msg.address());
這句根據以address為k,取出Handlers。sender的messageImpl 終於和consumer的HandlerHold見面
Handler.choose()方法實現了輪詢傳送message, 個人認為這個方法叫做 balanceChoose()更好。
程式碼如下:
public HandlerHolder choose() { while (true) { int size = list.size(); if (size == 0) { return null; } int p = pos.getAndIncrement(); if (p >= size - 1) { pos.set(0); } try { return list.get(p); } catch (IndexOutOfBoundsException e) { // Can happen pos.set(0); } } }
當時我使用Vert.x的時候,就很好奇eventBus的輪詢功能怎麼實現。現在看到其實非常簡單。維護一個 AtomicInteger 的變數,每次呼叫累加一次。如果超過List的長度,則重置為0,方法永遠返回 list.get(p)。巧妙!
最後在deliverToHandler()方法裡,在Context的執行緒控制下,完成message和handler的最終互動。
那麼,回到最開始的問題,
Sender----( message )--->Customer;
Sender<---(reply message)---Customer;
在上面的流程中,Sender根據address找到Customer從而傳送message,那麼Customer的reply是怎麼找到Sender的呢?
答案是一個臨時的replyAddress。通過以 replyAddress為key,把Sender作為handler註冊到eventBusImpl上,處理後直接登出。replyAddress的規律是從1開始的步長為1的自增數列,所以開發者不應該使用純數字作為自身業務的Address,避免衝突。
最後說說1.createReplyHandlerRegistration
如果sender在傳送訊息時使用了
send(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler);方法。
vertx.eventBus().send("address1", "測試訊息", ar -> {
if (ar.succeeded()) {
System.out.println("我是producer1:" + ar.result().body());
}
});
並且consumer在接受訊息到後,呼叫了 reply();
vertx.eventBus().consumer("address1", ar -> {
System.out.println("consumer:" + ar.body());
ar.reply("consumer reply message ");
});
則會進入createReplyHandlerRegistration的處理邏輯。
使用
protected String generateReplyAddress() {
return Long.toString(replySequence.incrementAndGet());
}
這裡產生從1開始的步長為1的自增數列address。
Handler<Message<T>> simpleReplyHandler = convertHandler(replyHandler);
HandlerRegistration<T> registration =
new HandlerRegistration<>(vertx, metrics, this, replyAddress, message.address, true, replyHandler, timeout);
registration.handler(simpleReplyHandler);
裡面的this是eventBusImpl,並在handler()方法裡把 boolean replyHander的值置為true.
這樣,eventBusImpl的handlerMap變數裡,就多了<replyAddress, replyHander>。
在cuomser處呼叫reply()後,會在eventBusImpl的內部類ReplySendContextImpl<T> extends SendContextImpl 的參與下,走類似send()的流程。區別是最後在deliverToHandler()方法裡,會判斷boolean replyHander的值,如果是true呼叫完畢就登出.
錯誤程式碼測驗:
vertx.eventBus().consumer("1", ar -> {
System.out.println("我不應該在這裡" + ar.body());
ar.reply("對不起,其實我是阿杜。");
});
vertx.eventBus().consumer("address1", ar -> {
System.out.println("consumer:" + ar.body());
ar.reply("我是高帥富");
});
vertx.eventBus().send("address1", "測試訊息", ar -> {
if (ar.succeeded()) {
System.out.println("sender:接收收到的迴應是:"+ar.result().body());
}else{
System.out.println("傳送失敗");
}
});
存在consumer("1", ar -> {})的Console:
consumer:測試訊息 我不應該在這裡我是高帥富 20:08:56.404 [vert.x-eventloop-thread-0] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024 20:08:56.405 [vert.x-eventloop-thread-0] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096 傳送失敗
可以看到上面的輸出完全不是設想的結果。
如果不存在consumer("1", ar -> {})address為1的Console:
consumer:測試訊息
sender:接收收到的迴應是:我是高帥富
最後,再次提醒:使用eventBus時,不要使用純數字作為自身業務的addr