RocketMQ之NameServer學習筆記
org.apache.rocketmq.namesrv.NamesrvController
NameserController,NameServer的核心控制類。
1.1 NamesrvConfig
NamesrvConfig,主要指定nameserver的相關配置目錄屬性
1)kvConfigPath(kvConfig.json)
2)mqhome/namesrv/namesrv.properties
3)orderMessageEnable,是否開啟順序消息功能,默認為false
1.2 ScheduledExecutorService scheduledExecutorService
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryIm pl("NSScheduledThread"));
NameServer 定時任務執行線程池,一個線程,默認定時執行兩個任務:
任務1、每隔10s掃描broker,維護當前存活的Broker信息
任務2、每隔10s打印KVConfig信息。
1.3 KVConfigManager
讀取或變更NameServer的配置屬性,加載NamesrvConfig中配置的配置文件到內存,此類一個亮點就是使用輕量級的非線程安全容器,再結合讀寫鎖對資源讀寫進行保護。盡最大程度提高線程的並發度。
1.4 RouteInfoManager
NameServer數據的載體,記錄Broker,Topic等信息。
//NameServer 與 Broker 空閑時長,默認2分鐘,在2分鐘內Nameserver沒有收到Broker的心跳包,則關閉該連接。
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
//讀寫鎖,用來保護非線程安全容器HashMap
private final ReadWriteLock lock = new ReentrantReadWriteLock();
//topicQueueTable,主題與隊列關系,記錄一個主題的隊列分布在哪些Broker上,每個Broker上存在該主題的隊列個數
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//brokerAddrTable,所有Broker信息,使用brokerName當key,BrokerData信息描述每一個broker信息。
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//clusterAddrTable,broker集群信息,每個集群包含哪些Broker
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//brokerLiveTable,當前存活的Broker,該信息不是實時的,NameServer每10S掃描一次所有的broker,根據心跳包的時間得知broker的狀態,該機制也是導致當一個master Down掉後,消息生產者無法感知,可能繼續向Down掉的Master發送消息,導致失敗
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
/**
* Broker信息;key為brokerName,value為BrokerData
*/
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
public class BrokerData implements Comparable<BrokerData> {
/**
* Cluster名稱
*/
private String cluster;
/**
* broker名稱
*/
private String brokerName;
/**
* 0->ip:port
*/
private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
}
- 註冊topic信息topicQueueTable
/**
* 消息隊列路由信息;key為topic,value為QueueData
*/
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
public class QueueData implements Comparable<QueueData> {
/**
* Broker名稱
*/
private String brokerName;
/**
* 讀隊列個數,默認4個
*/
private int readQueueNums;
/**
* 寫隊列個數,默認4個
*/
private int writeQueueNums;
/**
* 隊列權限
*/
private int perm;
/**
* 配置的,同步復制還是異步復制標記,對應TopicConfig.topicSysFlag
*
*/
private int topicSynFlag;
}
/**
* Broker狀態信息,NameServer每次收到心跳包會替換該信息,每隔30秒更新一次
* brokerAddr: ip:port->{}
*/
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
class BrokerLiveInfo {
/**
* 存儲上次收到心跳包的時間,每隔30秒更新一次
*/
private long lastUpdateTimestamp;
private DataVersion dataVersion;
private Channel channel;
private String haServerAddr;
}
- NamesrvStartup.java 啟動入口類,NameServer 啟動默認端口9876
nettyServerConfig.setListenPort(9876)
- 每10秒鐘掃描一次,移除失效的broker,同時刪除緩存元數據信息
//初始化NameServer
boolean initResult = controller.initialize();
public boolean initialize() {
//加載KV配置
this.kvConfigManager.load();
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
//每10秒鐘掃描一次,移除失效的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//每隔10秒鐘打印一次KV配置信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
return true;
}
- 失效時間為2分鐘,即:Broker在2分鐘內未上報心跳會被移除
/**
* 失效時間為2分鐘,即:Broker在2分鐘內未上報心跳會被移除
*/
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
2.DefaultRequestProcessor
- 用於響應客戶端、Broker的請求。主要向NameServer發送心跳包、獲取Cluster、Broker、Topic元數據信息。
- 調用鏈:
在NameServer啟動時註冊,NamesrvController.initialize()->registerProcessor()
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
if (log.isDebugEnabled()) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG://增加NameServer配置信息;由DefaultMQAdminExt使用
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG://根據NameSpace和key獲取NameServer配置信息;由DefaultMQAdminExt使用
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG: //據NameSapce和Key刪除NameServerr配置信息
return this.deleteKVConfig(ctx, request);
case RequestCode.REGISTER_BROKER: //註冊Broker信息;由BrokerOuterAPI.registerBroker使用,在BrokerController啟動時調用
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER://移除註銷broker信息;由BrokerOuterAPI.unregisterBroker使用,在BrokerController.shutdown時調用
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC: //獲取Topic路由信息 TopicRouteData
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO://獲取Cluster及Broker信息
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER: //去除該broker上所有topic的寫權限
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: //獲取所有的Topic列表
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV: //從nameServer中刪除topic
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE: //獲取配置信息 configTable
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER: //獲取該集群下的所有topic list
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: // 此處意思為:系統會將集群名稱、broker名稱作為默認topic創建。現在獲取這類topic
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST: //暫無使用
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: //暫無使用
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST://暫無使用
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG: //更新properties請求
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG: //獲取properties內容
return this.getConfig(ctx, request);
default:
break;
}
return null;
}
註冊broker信息
- Broker每隔30秒向所有的NameServer上報Topic註冊信息
- Broker調用鏈
BrokerController.start()->this.registerBrokerAll()->this.brokerOuterAPI.registerBrokerAll()
//每隔30秒向所有的NameServer上報Topic註冊信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false);
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
- 服務端處理主要包括:註冊集群信息clusterAddrTable、註冊broker信息brokerAddrTable、
註冊topic信息topicQueueTable、broker心跳包brokerLiveTable - NameServer處理鏈
DefaultRequestProcessor->processRequest->RequestCode.REGISTER_BROKER->this.registerBroker->RouteInfoManager.registerBroker()
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
//加寫鎖,防止並發修改
this.lock.writeLock().lockInterruptibly();
//註冊集群信息
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
//註冊broker信息
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
//Topic配置變化了;Master Broker第一次註冊或者Topic dataVersion不相同時更新路由信息
//有Topic新增時dataVersion會遞增
if (null != topicConfigWrapper //
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())//
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue()); //更新topicQueueTable
}
}
}
}
//更新broker心跳信息
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
//新broker註冊時會有日誌輸出
if (null == prevBrokerLiveInfo) {
log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr);
}
//更新filterServer信息
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
//Slave設置MasterAddr和HaServerAddr
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
RocketMQ之NameServer學習筆記