Dubbo服務暴露與註冊
前面的文章中,我們講解了Dubbo是如何進行配置的屬性的初始化的,並且講到,Dubbo最終會將所有的屬性引數都封裝為一個URL物件,從而以這個URL物件為基準傳遞引數。本文則主要講解Dubbo是如何基於URL物件進行服務的暴露與註冊的。
首先需要說明的一點是,服務的暴露與註冊是兩個不同的概念。在Dubbo中,微服務之間的互動預設是通過Netty進行的,而服務之間的通訊是基於TCP以全雙工的方式進行的。那麼也就是說,每個服務都會存在一個ip和port。所謂的服務暴露就是指根據配置將當前服務使用Netty繫結一個本地的埠號(對於消費者而言,則是嘗試連線目標服務的ip和埠)。至於註冊,由於微服務架構中對於新新增的服務,需要一定的機制來通知消費者,有新的服務可用,或者對於某些下線的服務,也需要通知消費者,將這個已經下線的服務給移除。Dubbo中服務的註冊與發現預設是委託給zookeeper來進行的。本文主要講解服務的暴露與註冊的整體實現結構,至於服務暴露和註冊時所需要注意的詳細細節,則在後面的文章中進行講解。
1. 服務的暴露
服務的暴露的入口位置主要在RegistryProtocol.export()
方法中,該方法首先會進行服務的暴露,然後會進行服務的註冊。如下是該方法的原始碼:
@Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 獲取服務註冊相關的配置資料 URL registryUrl = getRegistryUrl(originInvoker); // 獲取provider相關的配置資料 URL providerUrl = getProviderUrl(originInvoker); // 對provider的部分配置資訊進行覆蓋,重寫的工作主要是委託給Configurator進行, // 這裡OverrideListener的作用主要是在當前服務的配置資訊發生更改時,對原有的配置進行重寫, // 並且會判斷是否需要對當前的服務進行重新暴露 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); // 進行服務的本地暴露,本質上就是根據配置使用Netty繫結本地的某個埠,從而完成服務暴露工作 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // 根據配置獲取對應的Registry物件,常見的有ZookeeperRegistry和RedisRegistry,預設使用的是 // ZookeeperRegistry,本文則以Zookeeper為例進行講解 final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); // 將當前的Invoker物件註冊到一個全域性的providerInvokers中進行快取, // 該Map物件儲存了所有的已經暴露了的服務 ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); // 除非主動配置不進行註冊,那麼這裡將會返回true boolean register = registeredProviderUrl.getParameter("register", true); if (register) { // 進行服務註冊的程式碼,主要是通過Zookeeper的客戶端CuratorFramework進行服務的註冊 register(registryUrl, registeredProviderUrl); // 將當前Invoker標識為已經註冊完成 providerInvokerWrapper.setReg(true); } // 註冊配置被更改的監聽事件,將配置被更改時將會觸發相應的listener registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // 設定相關的URL物件,並且使用DestroyableExporter對exporter進行封裝返回 exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); return new DestroyableExporter<>(exporter); }
上面的程式碼中,主要完成了三部分的工作:
- 將服務與本地的某個埠號進行繫結,從而實現服務暴露的功能;
- 根據配置得到一個服務註冊物件
Registry
,然後對其進行註冊; - 建立一個配置被重寫的監聽器,並且註冊該監聽器,從而實現配置被重寫時能夠動態的使用新的配置進行服務的配置。
對於服務的暴露,主要是在doLocalExport()
方法中,我們繼續閱讀其原始碼:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { // 獲取當前Invoker對應的key,預設為group/interface/version的格式 String key = getCacheKey(originInvoker); // 這一段程式碼看起來比較複雜,其實本質上還是protocol.export()方法的呼叫,該方法就是進行服務暴露的程式碼, // 而ExporterChangeableWrapper的主要作用則是進行unexport()時的一些清理工作 return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> { Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker); }); }
doLocalExport()
方法的實現比較簡單,主要的匯出工作還是委託給了protocol.export()
方法進行,這裡的protocol
的型別為DubboProtocol
,這裡我們直接看其export()
方法:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
// 這裡主要是構建Stub的事件分發器,該分發器用於在消費者端進行Stub事件的分發
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,
Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer ["
+ url.getParameter(Constants.INTERFACE_KEY)
+ "], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 開啟服務
openServer(url);
// 該方法的主要作用是對序列化進行優化,其會獲取配置的實現了SerializationOptimizer介面的配置類,
// 然後通過其getSerializableClasses()方法獲取序列化類,通過這些類來進行序列化的優化
optimizeSerialization(url);
return exporter;
}
export()
方法主要做了三件事:a. 註冊stub事件分發器;b. 開啟服務;c. 註冊序列化優化器類。這裡openServer()
方法是用於開啟服務的,我們繼續閱讀其原始碼:
private void openServer(URL url) {
String key = url.getAddress();
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
// 這裡採用雙檢查法來判斷對應於當前服務的server是否已經建立,如果沒有建立,
// 則建立一個新的,並且快取起來
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
// 建立並快取新服務
serverMap.put(key, createServer(url));
}
}
} else {
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
url = URLBuilder.from(url)
.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
.addParameterIfAbsent(Constants.HEARTBEAT_KEY,
String.valueOf(Constants.DEFAULT_HEARTBEAT))
.addParameter(Constants.CODEC_KEY, DubboCodec.NAME)
.build();
// 獲取所使用的server型別,預設為netty
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0
&& !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
// 通過Exchangers.bind()方法進行服務的繫結
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 獲取client引數所指定的值,該值指定了當前client所使用的傳輸層服務,比如netty或mina。
// 然後判斷當前SPI所提供的傳輸層服務是否包含所指定的服務型別,如果不包含,則丟擲異常
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class)
.getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
上面的程式碼主要是建立ExchangeServer
的,使用雙檢查來檢測是否已經存在了對應的服務,如果不存在,則通過Exchangers.bind()
方法進行建立。這裡最終會將bind()
方法的呼叫委託給HeaderExchanger.bind()
方法進行。需要注意的是,上面的程式碼中傳入了一個requestHandler
的引數,這是一個ExchangeHandler
型別的物件,其主要作用是獲取並且呼叫Invoker
,以得到最終的呼叫結果,這些Handler
的作用,我們將在後面的文章中進行講解,本文主要講解服務的暴露與註冊的過程。下面我們繼續閱讀HeaderExchanger.bind()
方法的原始碼:
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(
new HeaderExchangeHandler(handler))));
}
這裡的bind()
方法主要是建立了三個Handler
,並且最後一個Handler
將傳入的ExchangeHandler
包裹起來了。相信讀者朋友應該很快就能認識到,這裡使用的是責任鏈模式,這幾個handler通過統一的建構函式將下一個handler的例項注入到當前handler中。其實我們也就能夠理解,最終通過netty進行的呼叫過程就是基於這些責任鏈的。這裡我們主要看Transporters.bind()
方法的實現原理:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
// 判斷傳入的Handler是否只有一個,如果只有一個,則直接使用該handler,如果存在多個,
// 則使用ChannelHandlerDispatcher將這些handler包裹起來進行分發
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
// 通過配置指定的Transporter進行服務的繫結,這裡預設使用的是NettyTransporter
return getTransporter().bind(url, handler);
}
// NettyTransporter
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
// 在NettyTransporter中進行服務繫結時,其只是建立了一個NettyServer以返回,但實際上在建立該物件的
// 過程中,就完成了Netty服務的繫結。需要注意的是,這裡的NettyServer並不是Netty所提供的類,而是
// Dubbo自己封裝的一個服務類,其對Netty的服務進行了封裝
return new NettyServer(url, listener);
}
Transporters.bind()
方法主要是將服務的繫結過程交由NettyTransporter
進行,而其則是建立了一個NettyServer
物件,真正的繫結過程就在建立該物件的過程中。下面我們來看其建立的原始碼:
// AbstractServer
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
// 獲取繫結的ip和埠號等資訊
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp))
{
bindIp = Constants.ANYHOST_VALUE;
}
// 在本地繫結指定的ip和埠
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY,
Constants.DEFAULT_IDLE_TIMEOUT);
try {
// 通過建立的InetSocketAddress物件,將真正的繫結過程交由子類進行
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress()
+ ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind "
+ getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: "
+ t.getMessage(), t);
}
// 這裡的DataStore只是一個本地快取的資料倉庫,主要是對一些大物件進行快取
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY,
Integer.toString(url.getPort()));
}
// NettyServer
@Override
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
// 這裡就進入了建立netty服務的過程,bossGroup指定的執行緒數為1,因為只有一個channel用於接收客戶端請求,
// 而workerGroup執行緒數則指定為配置檔案所設定的執行緒數,這些執行緒主要用於進行請求的處理
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(
Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
// 建立NettyServerHandler,這個handler就是用於處理請求用的handler,但是前面我們也講到了,
// Dubbo使用了一個handler的責任鏈來進行訊息的處理,第二個引數this就是這個鏈的鏈頭。需要注意的是,
// Netty本身提供的責任鏈與Dubbo這裡使用的責任鏈是不同的,Dubbo只是使用了Netty的鏈的一個節點來
// 處理Dubbo所建立的鏈,這樣Dubbo的鏈其實是可以在多種服務複用的,比如Mina
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
// 這裡是標準的建立Netty的BootstrapServer的過程
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(),
NettyServer.this);
ch.pipeline()
// 新增用於解碼的handler
.addLast("decoder", adapter.getDecoder())
// 新增用於編碼的handler
.addLast("encoder", adapter.getEncoder())
// 新增用於進行心跳監測的handler
.addLast("server-idle-handler", new IdleStateHandler(0, 0,
idleTimeout, MILLISECONDS))
// 將處理請求的handler新增到pipeline中
.addLast("handler", nettyServerHandler);
}
});
// 進行服務的繫結
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
上面的程式碼就是一個標準的使用Netty進行服務繫結的程式碼,關於Netty的使用,讀者朋友可以閱讀Netty Reactor模式實現原理詳解。
2. 服務的註冊
對於服務的註冊,前面我們已經講到,入口主要在RegistryProtocol.export()
方法中,而呼叫入口則是通過其register()
方法進行的,這裡我們來看一下該方法的呼叫過程:
public void register(URL registryUrl, URL registeredProviderUrl) {
// 通過RegistryFactory獲取一個Registry物件,該物件的主要作用是進行服務的註冊,
// 這裡預設返回的是ZookeeperRegistry
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
這裡主要是根據配置獲取一個Registry
物件,我們繼續閱讀其register()
方法的原始碼:
// FailbackRegistry
@Override
public void register(URL url) {
// 將當前URL物件儲存到已註冊的URL物件列表中
super.register(url);
// 移除之前註冊失敗的記錄
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// 將真正的註冊過程委託給ZookeeperRegistry進行
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 下面的過程主要是在註冊失敗的情況下,將當前URL新增到註冊失敗的URL列表中
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry "
+ getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: "
+ t.getMessage(), t);
}
// 將當前URL新增到註冊失敗的URL列表中
addFailedRegistered(url);
}
}
// ZookeeperRegistry
@Override
public void doRegister(URL url) {
try {
// 這裡是真正的註冊過程。需要注意的是這裡的zkClient型別為ZookeeperClient,其是Dubbo對
// 真正使用的CuratorFramework的一個封裝
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl()
+ ", cause: " + e.getMessage(), e);
}
}
上面的程式碼中首先會對一些快取資料進行清理,並且將當前URL新增到註冊的URL列表中,然後將註冊過程委託給ZookeeperClient進行。下面我們來看其是如何進行註冊的:
@Override
public void create(String path, boolean ephemeral) {
// 判斷建立的是否為臨時節點,如果不是臨時節點,則判斷是否已經存在該節點,如果存在,則直接返回
if (!ephemeral) {
if (checkExists(path)) {
return;
}
}
// 對path進行擷取,因為最後一個"/"後面是被編碼的URL物件,前面則是serviceKey + category
// 這裡的category指定的是provider還是consumer
int i = path.lastIndexOf('/');
if (i > 0) {
// 建立節點,需要注意的是,這裡的create()方法進行的是遞迴呼叫,這是因為zookeeper建立節點時
// 只能一級一級的建立,因而其每次都是取"/"前面的一部分來建立,只有當前節點已經存在的情況下,
// 上面的checkExists()才會為true,而且這裡,由於zookeeper規定,除了葉節點以外,其餘所有的
// 節點都必須為非臨時節點,因而這裡第二個引數傳入的是false,這也是前面的if判斷能通過的原因
create(path.substring(0, i), false);
}
if (ephemeral) {
// 建立臨時節點,具體的建立工作交由子類進行,也就是下面的程式碼
createEphemeral(path);
} else {
// 建立持久節點,具體的建立工作交由子類進行,也就是下面的程式碼
createPersistent(path);
}
}
@Override
public void createEphemeral(String path) {
try {
// 將臨時節點的建立工作交由CuratorFramework進行
client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
} catch (NodeExistsException e) {
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public void createPersistent(String path) {
try {
// 將持久節點的建立工作交由CuratorFramework進行
client.create().forPath(path);
} catch (NodeExistsException e) {
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
3. 小結
本文主要講解了Dubbo在匯出服務時是如何進行服務暴露與註冊的,並且具體講解了如何基於netty進行服務的暴露,和如何基於zookeeper進