1. 程式人生 > >RocketMQ原理學習-- Name Server

RocketMQ原理學習-- Name Server

       Name Server作為RocketMQ的一個元件,其作用就是一個註冊中心,用於管理Broker相關的一些資訊,生產者和消費者可以從Name Server中獲取Broker中相關的Topic資訊等,Name Server可以單臺部署也可以多臺部署,相互之間不存在聯絡。

Name Server主要有以下兩個功能:

  • 維護一份Broker資訊(叢集名稱、Broker名稱及相關地址資訊),Broker在啟動後會將自身資訊註冊到Name Server中
  • 維護每個Topic相關的資訊,Broker心跳時會將自身的topic提交到Name Server,生產者在傳送訊息時會根據Topic名稱獲取Broker的列表,消費者在監聽Topic時也會根據Topic名稱從Name Server中獲取相關Broker的資訊。

Name Server主要在RouteInfoManager中維護了以下幾個列表(通過讀寫鎖來實現執行緒安全操作):

//每個Topic的列表資訊
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//每個Broker的地址列表
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//叢集名稱對應的Broker名稱
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//每個broker地址資訊
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//每個broker地址相關的過濾器資訊
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

Name Server 啟動的時序圖 

接下來我們通過分析原始碼簡單瞭解一下:

1、請求處理器DefaultRequestProcessor:

在DefaultRequestProcessor中提供processRequest方法,會根據請求中的RequestCode值呼叫不同的處理操作。

	@Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {

        if (ctx != null) {
            log.debug("receive request, {} {} {}",
                request.getCode(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                request);
        }

        switch (request.getCode()) {
            case RequestCode.PUT_KV_CONFIG:
                return this.putKVConfig(ctx, request);
            case RequestCode.GET_KV_CONFIG:
                return this.getKVConfig(ctx, request);
            case RequestCode.DELETE_KV_CONFIG:
                return this.deleteKVConfig(ctx, request);
            case RequestCode.QUERY_DATA_VERSION:
                return queryBrokerTopicConfig(ctx, request);
			//註冊Broker資訊
            case RequestCode.REGISTER_BROKER:
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request);
                } else {
                    return this.registerBroker(ctx, request);
                }
			//刪除Broker資訊
            case RequestCode.UNREGISTER_BROKER:
                return this.unregisterBroker(ctx, request);
			//獲取Topic資訊
            case RequestCode.GET_ROUTEINTO_BY_TOPIC:
                return this.getRouteInfoByTopic(ctx, request);
			//獲取Broker的機器資訊
            case RequestCode.GET_BROKER_CLUSTER_INFO:
                return this.getBrokerClusterInfo(ctx, request);
            case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
                return this.wipeWritePermOfBroker(ctx, request);
			//獲取所有Topic資訊
            case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
                return getAllTopicListFromNameserver(ctx, request);
			//刪除Topic
            case RequestCode.DELETE_TOPIC_IN_NAMESRV:
                return deleteTopicInNamesrv(ctx, request);
			//通過名稱空間獲取配置資訊
            case RequestCode.GET_KVLIST_BY_NAMESPACE:
                return this.getKVListByNamespace(ctx, request);
			//獲取Topic資訊
            case RequestCode.GET_TOPICS_BY_CLUSTER:
                return this.getTopicsByCluster(ctx, request);
			//獲取系統Topic
            case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
                return this.getSystemTopicListFromNs(ctx, request);
            case RequestCode.GET_UNIT_TOPIC_LIST:
                return this.getUnitTopicList(ctx, request);
            case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
                return this.getHasUnitSubTopicList(ctx, request);
            case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
                return this.getHasUnitSubUnUnitTopicList(ctx, request);
			//更新NameServer的配置
            case RequestCode.UPDATE_NAMESRV_CONFIG:
                return this.updateConfig(ctx, request);
			//獲取Name Server的配置資訊
            case RequestCode.GET_NAMESRV_CONFIG:
                return this.getConfig(ctx, request);
            default:
                break;
        }
        return null;
    }

2、Broker註冊:

(1)Broker註冊請求頭資訊

(2)Broker註冊請求體資訊

根據請求頭及請求體中的資訊將相關Broker和Topic資訊新增到RouteInfoManager中。

3、Producer查詢Topic資訊:

(1)Producer請求頭資訊

根據Topic名稱TopicA-Test從 RouteInfoManager的topicQueueTable中獲取相關佇列資訊。

呼叫程式碼如下: 

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final GetRouteInfoRequestHeader requestHeader =
            (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

        TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

        if (topicRouteData != null) {
            if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
                String orderTopicConf =
                    this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                        requestHeader.getTopic());
                topicRouteData.setOrderTopicConf(orderTopicConf);
            }

            byte[] content = topicRouteData.encode();
            response.setBody(content);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }

        response.setCode(ResponseCode.TOPIC_NOT_EXIST);
        response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
            + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
        return response;
    }

 

相關部落格:

https://fdx321.github.io/2017/08/17/%E3%80%90RocketMQ%E6%BA%90%E7%A0%81%E5%AD%A6%E4%B9%A0%E3%80%912-Namesrv/

https://blog.csdn.net/mr253727942/article/details/52637126