一起寫RPC框架(十五)RPC註冊中心二--註冊中心的基本實現
阿新 • • 發佈:2019-02-15
註冊中心的實現方式有很多種,實現的方式也有難有易,可以使用zookeeper,consul這些第三方已有的包去幫助實現,這樣可以減少開發註冊中心的難度,卻可能加大運維的難度,當然最最原始的方法就是使用java去實現,不借助其他任何已有的第三方包去搞定,這樣做的好處其實並不明顯,不過對於開發者而言,使用這種最原始的方式最大的好處就是自己知道所有的實現細節,遇到問題,遇到bug,可以很快的定位到問題所在
本Demo RPC因為是練習,所以沒有實現zookeeper的方法,以後有機會會補上,cosul我沒有接觸過,現在基於java的記憶體的方式去完成該功能
與服務提供者一樣,註冊中心的基本結構也很清晰:
在maven多專案的管理的角度上來說,laopopo-registry中只是很簡單的定義了一個RegistryServer.java的類,因為我沒有仔細呼叫過每個方式的實現細節,所以這個介面定義的也是比較簡單:
package org.laopopo.registry;
/**
*
* @author BazingaLyn
* @description 註冊中心
* @time
* @modifytime
*/
public interface RegistryServer {
void start();
}
只是簡單的定義了一個啟動方法,我們看看其基於java記憶體的實現結構DefaultRegistryServer.java,我們截圖說明一下:
這是預設的註冊中心的定義截圖,維護了一個NettyServer的客戶端,維護了一個服務消費者的管理者ConsumerManager,維護了一個服務提供者的管理者ProviderManager,還有幾個執行緒執行器,每個類各司其職,當然他們之間還有相互協作的過程。從整體上來看,註冊中心的整體結構就是這樣,下面給出具體的程式碼:
註冊中心的基本結構就是如此,詳細程式碼可檢視:package org.laopopo.base.registry; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.laopopo.base.registry.model.RegistryPersistRecord; import org.laopopo.common.utils.NamedThreadFactory; import org.laopopo.common.utils.PersistUtils; import org.laopopo.registry.RegistryServer; import org.laopopo.remoting.netty.NettyRemotingServer; import org.laopopo.remoting.netty.NettyServerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; /** * #######註冊中心###### * * 可以有多個註冊中心,所有的註冊中心之前不進行通訊,都是無狀態的 * 1)provider端與每一個註冊中心之間保持長連線,保持重連 * 2)consumer隨機選擇一個註冊中心保持長連線,如果斷了,不去主動重連,選擇其他可用的註冊中心 * * @author BazingaLyn * @description 預設的註冊中心,處理註冊端的所有事宜: * 1)處理consumer端傳送過來的註冊資訊 * 2)處理provider端傳送過來的訂閱資訊 * 3)當服務下線需要通知對應的consumer變更後的註冊資訊 * 4)所有的註冊訂閱資訊的儲存和健康檢查 * 5)接收管理者的一些資訊請求,比如 服務統計 | 某個例項的服務降級 | 通知消費者的訪問策略 | 改變某個服務例項的比重 * 6)將管理者對服務的一些資訊 例如稽核結果,負載演算法等資訊持久化到硬碟 * @time 2016年8月15日 * @modifytime */ public class DefaultRegistryServer implements RegistryServer { private static final Logger logger = LoggerFactory.getLogger(DefaultRegistryServer.class); private final NettyServerConfig nettyServerConfig; //netty Server的一些配置檔案 private RegistryServerConfig registryServerConfig; //註冊中心的配置檔案 private NettyRemotingServer remotingServer; //註冊中心的netty server端 private RegistryConsumerManager consumerManager; //註冊中心消費側的管理邏輯控制類 private RegistryProviderManager providerManager; //註冊中心服務提供者的管理邏輯控制類 private ExecutorService remotingExecutor; //執行器 private ExecutorService remotingChannelInactiveExecutor; //channel inactive的執行緒執行器 //定時任務 private final ScheduledExecutorService scheduledExecutorService = Executors .newSingleThreadScheduledExecutor(new NamedThreadFactory("registry-timer")); /** * * @param nettyServerConfig 註冊中心的netty的配置檔案 至少需要配置listenPort * @param nettyClientConfig 註冊中心連線Monitor端的netty配置檔案,至少需要配置defaultAddress值 這邊monitor是單例項,所以address一個就好 */ public DefaultRegistryServer(NettyServerConfig nettyServerConfig,RegistryServerConfig registryServerConfig) { this.nettyServerConfig = nettyServerConfig; this.registryServerConfig = registryServerConfig; consumerManager = new RegistryConsumerManager(this); providerManager = new RegistryProviderManager(this); initialize(); } private void initialize() { this.remotingServer = new NettyRemotingServer(this.nettyServerConfig); this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new NamedThreadFactory("RegistryCenterExecutorThread_")); this.remotingChannelInactiveExecutor = Executors.newFixedThreadPool(nettyServerConfig.getChannelInactiveHandlerThreads(), new NamedThreadFactory("RegistryCenterChannelInActiveExecutorThread_")); //註冊處理器 this.registerProcessor(); //從硬碟上恢復一些服務的資訊 this.recoverServiceInfoFromDisk(); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { // 延遲60秒,每隔60秒開始 定時向consumer傳送消費者消費失敗的資訊 try { DefaultRegistryServer.this.getConsumerManager().checkSendFailedMessage(); } catch (Exception e) { logger.warn("schedule publish failed [{}]",e.getMessage()); } } }, 60, 60, TimeUnit.SECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { // 延遲60秒,每隔一段時間將一些服務資訊持久化到硬碟上 try { DefaultRegistryServer.this.getProviderManager().persistServiceInfo(); } catch (Exception e) { logger.warn("schedule persist failed [{}]",e.getMessage()); } } }, 60, this.registryServerConfig.getPersistTime(), TimeUnit.SECONDS); } /** * 從硬碟上恢復一些服務的稽核負載演算法的資訊 */ private void recoverServiceInfoFromDisk() { String persistString = PersistUtils.file2String(this.registryServerConfig.getStorePathRootDir()); if (null != persistString) { List<RegistryPersistRecord> registryPersistRecords = JSON.parseArray(persistString.trim(), RegistryPersistRecord.class); if (null != registryPersistRecords) { for (RegistryPersistRecord metricsReporter : registryPersistRecords) { String serviceName = metricsReporter.getServiceName(); this.getProviderManager().getHistoryRecords().put(serviceName, metricsReporter); } } } } private void registerProcessor() { this.remotingServer.registerDefaultProcessor(new DefaultRegistryProcessor(this), this.remotingExecutor); this.remotingServer.registerChannelInactiveProcessor(new DefaultRegistryChannelInactiveProcessor(this), remotingChannelInactiveExecutor); } @Override public void start() { this.remotingServer.start(); } public RegistryConsumerManager getConsumerManager() { return consumerManager; } public RegistryProviderManager getProviderManager() { return providerManager; } public NettyRemotingServer getRemotingServer() { return remotingServer; } public void setRemotingServer(NettyRemotingServer remotingServer) { this.remotingServer = remotingServer; } public RegistryServerConfig getRegistryServerConfig() { return registryServerConfig; } public void setRegistryServerConfig(RegistryServerConfig registryServerConfig) { this.registryServerConfig = registryServerConfig; } }
https://github.com/BazingaLyn/laopopo-rpc/tree/master/laopopo-registry-default
其他的具體邏輯下幾個小節一起分析