1. 程式人生 > >一起寫RPC框架(十五)RPC註冊中心二--註冊中心的基本實現

一起寫RPC框架(十五)RPC註冊中心二--註冊中心的基本實現

註冊中心的實現方式有很多種,實現的方式也有難有易,可以使用zookeeper,consul這些第三方已有的包去幫助實現,這樣可以減少開發註冊中心的難度,卻可能加大運維的難度,當然最最原始的方法就是使用java去實現,不借助其他任何已有的第三方包去搞定,這樣做的好處其實並不明顯,不過對於開發者而言,使用這種最原始的方式最大的好處就是自己知道所有的實現細節,遇到問題,遇到bug,可以很快的定位到問題所在

本Demo RPC因為是練習,所以沒有實現zookeeper的方法,以後有機會會補上,cosul我沒有接觸過,現在基於java的記憶體的方式去完成該功能

與服務提供者一樣,註冊中心的基本結構也很清晰:


在maven多專案的管理的角度上來說,laopopo-registry中只是很簡單的定義了一個RegistryServer.java的類,因為我沒有仔細呼叫過每個方式的實現細節,所以這個介面定義的也是比較簡單:

package org.laopopo.registry;

/**
 * 
 * @author BazingaLyn
 * @description 註冊中心
 * @time
 * @modifytime
 */
public interface RegistryServer {
	
	void start();

}
只是簡單的定義了一個啟動方法,我們看看其基於java記憶體的實現結構DefaultRegistryServer.java,我們截圖說明一下:




這是預設的註冊中心的定義截圖,維護了一個NettyServer的客戶端,維護了一個服務消費者的管理者ConsumerManager,維護了一個服務提供者的管理者ProviderManager,還有幾個執行緒執行器,每個類各司其職,當然他們之間還有相互協作的過程。從整體上來看,註冊中心的整體結構就是這樣,下面給出具體的程式碼:

package org.laopopo.base.registry;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.laopopo.base.registry.model.RegistryPersistRecord;
import org.laopopo.common.utils.NamedThreadFactory;
import org.laopopo.common.utils.PersistUtils;
import org.laopopo.registry.RegistryServer;
import org.laopopo.remoting.netty.NettyRemotingServer;
import org.laopopo.remoting.netty.NettyServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;

/**
 * #######註冊中心######
 * 
 * 可以有多個註冊中心,所有的註冊中心之前不進行通訊,都是無狀態的
 * 1)provider端與每一個註冊中心之間保持長連線,保持重連
 * 2)consumer隨機選擇一個註冊中心保持長連線,如果斷了,不去主動重連,選擇其他可用的註冊中心
 * 
 * @author BazingaLyn
 * @description 預設的註冊中心,處理註冊端的所有事宜:
 * 1)處理consumer端傳送過來的註冊資訊
 * 2)處理provider端傳送過來的訂閱資訊
 * 3)當服務下線需要通知對應的consumer變更後的註冊資訊
 * 4)所有的註冊訂閱資訊的儲存和健康檢查
 * 5)接收管理者的一些資訊請求,比如 服務統計 | 某個例項的服務降級 | 通知消費者的訪問策略  | 改變某個服務例項的比重
 * 6)將管理者對服務的一些資訊 例如稽核結果,負載演算法等資訊持久化到硬碟
 * @time 2016年8月15日
 * @modifytime
 */
public class DefaultRegistryServer implements RegistryServer {
	
	private static final Logger logger = LoggerFactory.getLogger(DefaultRegistryServer.class);
	
	
    private final NettyServerConfig nettyServerConfig;       //netty Server的一些配置檔案
    private RegistryServerConfig registryServerConfig;       //註冊中心的配置檔案
	private NettyRemotingServer remotingServer;  	         //註冊中心的netty server端
	private RegistryConsumerManager consumerManager;         //註冊中心消費側的管理邏輯控制類
	private RegistryProviderManager providerManager;         //註冊中心服務提供者的管理邏輯控制類
	private ExecutorService remotingExecutor;                //執行器
	private ExecutorService remotingChannelInactiveExecutor; //channel inactive的執行緒執行器
	
	//定時任務
    private final ScheduledExecutorService scheduledExecutorService = Executors
    		.newSingleThreadScheduledExecutor(new NamedThreadFactory("registry-timer"));
	
    /**
     * 
     * @param nettyServerConfig 註冊中心的netty的配置檔案 至少需要配置listenPort
     * @param nettyClientConfig 註冊中心連線Monitor端的netty配置檔案,至少需要配置defaultAddress值 這邊monitor是單例項,所以address一個就好
     */
    public DefaultRegistryServer(NettyServerConfig nettyServerConfig,RegistryServerConfig registryServerConfig) {
    	this.nettyServerConfig = nettyServerConfig;
    	this.registryServerConfig = registryServerConfig;
    	consumerManager = new RegistryConsumerManager(this);
    	providerManager = new RegistryProviderManager(this);
    	initialize();
	}

	private void initialize() {
		
		 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig);
		 
		 this.remotingExecutor =
	                Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new NamedThreadFactory("RegistryCenterExecutorThread_"));
		 
		 this.remotingChannelInactiveExecutor =
	                Executors.newFixedThreadPool(nettyServerConfig.getChannelInactiveHandlerThreads(), new NamedThreadFactory("RegistryCenterChannelInActiveExecutorThread_"));
		 
		 //註冊處理器
		 this.registerProcessor();
		 
		 //從硬碟上恢復一些服務的資訊
		 this.recoverServiceInfoFromDisk();
		 
		 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

				@Override
				public void run() {
					// 延遲60秒,每隔60秒開始 定時向consumer傳送消費者消費失敗的資訊
					try {
						DefaultRegistryServer.this.getConsumerManager().checkSendFailedMessage();
					} catch (Exception e) {
						logger.warn("schedule publish failed [{}]",e.getMessage());
					} 
				}
		}, 60, 60, TimeUnit.SECONDS);
		 
		 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

				@Override
				public void run() {
					// 延遲60秒,每隔一段時間將一些服務資訊持久化到硬碟上
					try {
						DefaultRegistryServer.this.getProviderManager().persistServiceInfo();
					} catch (Exception e) {
						logger.warn("schedule persist failed [{}]",e.getMessage());
					} 
				}
		}, 60, this.registryServerConfig.getPersistTime(), TimeUnit.SECONDS);
	}

	/**
	 * 從硬碟上恢復一些服務的稽核負載演算法的資訊
	 */
	private void recoverServiceInfoFromDisk() {
		
		String persistString = PersistUtils.file2String(this.registryServerConfig.getStorePathRootDir());
		
		if (null != persistString) {
			List<RegistryPersistRecord> registryPersistRecords = JSON.parseArray(persistString.trim(), RegistryPersistRecord.class);
			
			if (null != registryPersistRecords) {
				for (RegistryPersistRecord metricsReporter : registryPersistRecords) {
					
				     String serviceName = metricsReporter.getServiceName();
				     this.getProviderManager().getHistoryRecords().put(serviceName, metricsReporter);
				     
				}
			}
		}
		
	}

	private void registerProcessor() {
		this.remotingServer.registerDefaultProcessor(new DefaultRegistryProcessor(this), this.remotingExecutor);
		this.remotingServer.registerChannelInactiveProcessor(new DefaultRegistryChannelInactiveProcessor(this), remotingChannelInactiveExecutor);
	}
	

	@Override
	public void start() {
		this.remotingServer.start();
	}

	public RegistryConsumerManager getConsumerManager() {
		return consumerManager;
	}

	public RegistryProviderManager getProviderManager() {
		return providerManager;
	}

	public NettyRemotingServer getRemotingServer() {
		return remotingServer;
	}

	public void setRemotingServer(NettyRemotingServer remotingServer) {
		this.remotingServer = remotingServer;
	}

	public RegistryServerConfig getRegistryServerConfig() {
		return registryServerConfig;
	}

	public void setRegistryServerConfig(RegistryServerConfig registryServerConfig) {
		this.registryServerConfig = registryServerConfig;
	}

}
註冊中心的基本結構就是如此,詳細程式碼可檢視:

https://github.com/BazingaLyn/laopopo-rpc/tree/master/laopopo-registry-default

其他的具體邏輯下幾個小節一起分析