RocketMQ原理學習-- Name Server
阿新 • • 發佈:2018-12-02
Name Server作為RocketMQ的一個元件,其作用就是一個註冊中心,用於管理Broker相關的一些資訊,生產者和消費者可以從Name Server中獲取Broker中相關的Topic資訊等,Name Server可以單臺部署也可以多臺部署,相互之間不存在聯絡。
Name Server主要有以下兩個功能:
- 維護一份Broker資訊(叢集名稱、Broker名稱及相關地址資訊),Broker在啟動後會將自身資訊註冊到Name Server中
- 維護每個Topic相關的資訊,Broker心跳時會將自身的topic提交到Name Server,生產者在傳送訊息時會根據Topic名稱獲取Broker的列表,消費者在監聽Topic時也會根據Topic名稱從Name Server中獲取相關Broker的資訊。
Name Server主要在RouteInfoManager中維護了以下幾個列表(通過讀寫鎖來實現執行緒安全操作):
//每個Topic的列表資訊 private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; //每個Broker的地址列表 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; //叢集名稱對應的Broker名稱 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; //每個broker地址資訊 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; //每個broker地址相關的過濾器資訊 private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
Name Server 啟動的時序圖
接下來我們通過分析原始碼簡單瞭解一下:
1、請求處理器DefaultRequestProcessor:
在DefaultRequestProcessor中提供processRequest方法,會根據請求中的RequestCode值呼叫不同的處理操作。
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { if (ctx != null) { log.debug("receive request, {} {} {}", request.getCode(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), request); } switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: return this.putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG: return this.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: return this.deleteKVConfig(ctx, request); case RequestCode.QUERY_DATA_VERSION: return queryBrokerTopicConfig(ctx, request); //註冊Broker資訊 case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } //刪除Broker資訊 case RequestCode.UNREGISTER_BROKER: return this.unregisterBroker(ctx, request); //獲取Topic資訊 case RequestCode.GET_ROUTEINTO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request); //獲取Broker的機器資訊 case RequestCode.GET_BROKER_CLUSTER_INFO: return this.getBrokerClusterInfo(ctx, request); case RequestCode.WIPE_WRITE_PERM_OF_BROKER: return this.wipeWritePermOfBroker(ctx, request); //獲取所有Topic資訊 case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: return getAllTopicListFromNameserver(ctx, request); //刪除Topic case RequestCode.DELETE_TOPIC_IN_NAMESRV: return deleteTopicInNamesrv(ctx, request); //通過名稱空間獲取配置資訊 case RequestCode.GET_KVLIST_BY_NAMESPACE: return this.getKVListByNamespace(ctx, request); //獲取Topic資訊 case RequestCode.GET_TOPICS_BY_CLUSTER: return this.getTopicsByCluster(ctx, request); //獲取系統Topic case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: return this.getSystemTopicListFromNs(ctx, request); case RequestCode.GET_UNIT_TOPIC_LIST: return this.getUnitTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: return this.getHasUnitSubTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST: return this.getHasUnitSubUnUnitTopicList(ctx, request); //更新NameServer的配置 case RequestCode.UPDATE_NAMESRV_CONFIG: return this.updateConfig(ctx, request); //獲取Name Server的配置資訊 case RequestCode.GET_NAMESRV_CONFIG: return this.getConfig(ctx, request); default: break; } return null; }
2、Broker註冊:
(1)Broker註冊請求頭資訊
(2)Broker註冊請求體資訊
根據請求頭及請求體中的資訊將相關Broker和Topic資訊新增到RouteInfoManager中。
3、Producer查詢Topic資訊:
(1)Producer請求頭資訊
根據Topic名稱TopicA-Test從 RouteInfoManager的topicQueueTable中獲取相關佇列資訊。
呼叫程式碼如下:
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final GetRouteInfoRequestHeader requestHeader =
(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
if (topicRouteData != null) {
if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
String orderTopicConf =
this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
requestHeader.getTopic());
topicRouteData.setOrderTopicConf(orderTopicConf);
}
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
相關部落格: