Vert.x系列(三)--ClusteredEventBus原始碼分析
前言:因為ClusteredEventBus涉及叢集,有必產生網路問題,從而引入了NetServer、ServerID等涉及網路,埠的類。在之前的EventBusImpl中, 使用的資料結構是以address-List<Handler>作為k-v的map容器。作為EventBusImpl的子類,ClusteredEventBus的邏輯結構上一樣的。 不過把address-List<ServerID>作為k-v。
原理:在start方法中,利用第三方框架(預設hazelcast)實現的叢集同步map(變數subs) ,獲取已有的節點資訊。然後根據引數,對自身伺服器的埠實現監聽,把自身伺服器資訊放入前面的map,讓其他節點感知。呼叫consumer方法時,以address-List<ServerID>作為k-v存在一個map的容器中。呼叫send方法時,以address為k從map裡取出ServerID.然後把訊息利用TCP協議傳送給對應的伺服器。
程式碼:
public static final String CLUSTER_PUBLIC_HOST_PROP_NAME = "vertx.cluster.public.host";// 這2個欄位是為了從System.getProperty()取值,優先順序//1.System.getProperty()2.EventBusOptions public static final String CLUSTER_PUBLIC_PORT_PROP_NAME = "vertx.cluster.public.port"; private static final Buffer PONG = Buffer.buffer(new byte[]{(byte) 1}); private static final String SERVER_ID_HA_KEY = "server_id"; private static final String SUBS_MAP_NAME = "__vertx.subs"; //叢集資料存放在叢集同步的map中,需要約定一個固定的key統一存取。 private final ClusterManager clusterManager; private final HAManager haManager; private final ConcurrentMap<ServerID, ConnectionHolder> connections = new ConcurrentHashMap<>();//根據socket長連結 private final Context sendNoContext; private EventBusOptions options; // 建立時的引數 private AsyncMultiMap<String, ClusterNodeInfo> subs; // 叢集核心資料 k是address,value是HazelcastClusterNodeInfo private Set<String> ownSubs = new ConcurrentHashSet<>();// 自身訂閱(Subscribe)的addrees private ServerID serverID; // 自身伺服器資訊(IP和port) private ClusterNodeInfo nodeInfo; // 自身叢集資訊(NodeID、IP和port) private NetServer server;//
在 public void start(Handler<AsyncResult<Void>> resultHandler) 方法中。
做了很多事件,很多邏輯。
1.subs = ar2.result(); 獲取叢集資料。從叢集拉取資料,ar2.succeeded() 為前置判斷。直接排除網路、配置等錯誤的可能。
2.建立底層的埠監聽。這裡埠有大坑,有2個概念:
actualPort 和 publicPort
actualPort是值真正監聽的埠,從option傳值過來,沒有則隨機產生。
publicPort是放到共享給叢集的埠,為了通知別的節點讓它們往這裡發資料。官方的解釋是為了容器情況考慮。在容器裡執行時,和主機的埠是通過代理訪問的。對於這2個port ,因為這裡有好幾個變數可以賦值,所有裡面有優先順序:
actualPort: 1.VertxOptions 也是 EventBusOptions 的setClusterPublicHost,檢視VertxOptions.setClusterPort() / VertxOptions.setClusterPublicHost() 方法,發現其實就是對EventBusOptions操作。 2.隨機產生。
publicPort 1.系統變數CLUSTER_PUBLIC_PORT_PROP_NAME 2.VertxOptions 也是 EventBusOptions 的setClusterPublicHost 3.上面的actualPort
這因為埠直接涉及到通訊,設定不對就無法使用。如果是叢集內多節點的情況,需要設定host,不需要設定port. 因為host預設值是 "localhost",port預設值是隨機產生的可用埠(假設為51854),host和port會產生ServerID。如果不設定host,A節點就會把 "localhost:51854"傳到叢集上。其他B節想要訪問A時,會根據這個資訊去訪問 localhost:51854,結果訪問到自身去了。
下面重點就是consumer 和 send/poblish方法。
呼叫consumer方法時,會依次呼叫到addRegistration(),往叢集共享的subs中放入資訊,達到傳播的目的。
@Override protected <T> void addRegistration(boolean newAddress, String address,boolean replyHandler, boolean localOnly,Handler<AsyncResult<Void>> completionHandler) { if (newAddress && subs != null && !replyHandler && !localOnly) { // Propagate the information subs.add(address, nodeInfo, completionHandler); ownSubs.add(address); } else { completionHandler.handle(Future.succeededFuture()); } }
呼叫send/poblish方法時,會依次呼叫到sendOrPub(),
@Override protected <T> void sendOrPub(SendContextImpl<T> sendContext) { String address = sendContext.message.address(); // 這裡只是定義resultHandler,沒有執行,如果要執行,還需 //要resultHandler.handler(AsyncResult) Handler<AsyncResult<ChoosableIterable<ClusterNodeInfo>>> resultHandler = asyncResult -> { if (asyncResult.succeeded()) { // 重要的 server ChoosableIterable<ClusterNodeInfo> serverIDs = asyncResult.result(); if (serverIDs != null && !serverIDs.isEmpty()) { sendToSubs(serverIDs, sendContext); } else { if (metrics != null) { metrics.messageSent(address, !sendContext.message.isSend(), true, false); } deliverMessageLocally(sendContext); } } else { log.error("Failed to send message", asyncResult.cause()); } }; // 這裡才是處理 。subs存的是k-v是 address-List<HazelcastClusterNodeInfo> // get(k)就是把List<HazelcastClusterNodeInfo>取出來,交給上面的handler if (Vertx.currentContext() == null) { // Guarantees the order when there is no current context sendNoContext.runOnContext(v -> { subs.get(address, resultHandler); }); } else { subs.get(address, resultHandler); } }
sendToSubs()方法是包含了 send/publish 的判斷,這個邏輯本來是在deliverMessageLocally(MessageImpl msg)完成的。
protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName)方法裡,單機版產生的是 MessageImpl, 叢集版產生ClusteredMessage。 ClusteredMessage此類包含了對Buffer 的操作,幫助