1. 程式人生 > >Dubbo服務暴露與註冊

Dubbo服務暴露與註冊

        前面的文章中,我們講解了Dubbo是如何進行配置的屬性的初始化的,並且講到,Dubbo最終會將所有的屬性引數都封裝為一個URL物件,從而以這個URL物件為基準傳遞引數。本文則主要講解Dubbo是如何基於URL物件進行服務的暴露與註冊的。

        首先需要說明的一點是,服務的暴露與註冊是兩個不同的概念。在Dubbo中,微服務之間的互動預設是通過Netty進行的,而服務之間的通訊是基於TCP以全雙工的方式進行的。那麼也就是說,每個服務都會存在一個ip和port。所謂的服務暴露就是指根據配置將當前服務使用Netty繫結一個本地的埠號(對於消費者而言,則是嘗試連線目標服務的ip和埠)。至於註冊,由於微服務架構中對於新新增的服務,需要一定的機制來通知消費者,有新的服務可用,或者對於某些下線的服務,也需要通知消費者,將這個已經下線的服務給移除。Dubbo中服務的註冊與發現預設是委託給zookeeper來進行的。本文主要講解服務的暴露與註冊的整體實現結構,至於服務暴露和註冊時所需要注意的詳細細節,則在後面的文章中進行講解。

1. 服務的暴露

        服務的暴露的入口位置主要在RegistryProtocol.export()方法中,該方法首先會進行服務的暴露,然後會進行服務的註冊。如下是該方法的原始碼:

@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
  // 獲取服務註冊相關的配置資料
  URL registryUrl = getRegistryUrl(originInvoker);
  // 獲取provider相關的配置資料
  URL providerUrl = getProviderUrl(originInvoker);

  // 對provider的部分配置資訊進行覆蓋,重寫的工作主要是委託給Configurator進行,
  // 這裡OverrideListener的作用主要是在當前服務的配置資訊發生更改時,對原有的配置進行重寫,
  // 並且會判斷是否需要對當前的服務進行重新暴露
  final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
  final OverrideListener overrideSubscribeListener = 
    new OverrideListener(overrideSubscribeUrl, originInvoker);
  overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
  providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
  
  // 進行服務的本地暴露,本質上就是根據配置使用Netty繫結本地的某個埠,從而完成服務暴露工作
  final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

  // 根據配置獲取對應的Registry物件,常見的有ZookeeperRegistry和RedisRegistry,預設使用的是
  // ZookeeperRegistry,本文則以Zookeeper為例進行講解
  final Registry registry = getRegistry(originInvoker);
  final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
  // 將當前的Invoker物件註冊到一個全域性的providerInvokers中進行快取,
  // 該Map物件儲存了所有的已經暴露了的服務
  ProviderInvokerWrapper<T> providerInvokerWrapper = 
    ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, 
      registeredProviderUrl);

  // 除非主動配置不進行註冊,那麼這裡將會返回true
  boolean register = registeredProviderUrl.getParameter("register", true);
  if (register) {
    // 進行服務註冊的程式碼,主要是通過Zookeeper的客戶端CuratorFramework進行服務的註冊
    register(registryUrl, registeredProviderUrl);
    // 將當前Invoker標識為已經註冊完成
    providerInvokerWrapper.setReg(true);
  }

  // 註冊配置被更改的監聽事件,將配置被更改時將會觸發相應的listener
  registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

  // 設定相關的URL物件,並且使用DestroyableExporter對exporter進行封裝返回
  exporter.setRegisterUrl(registeredProviderUrl);
  exporter.setSubscribeUrl(overrideSubscribeUrl);
  return new DestroyableExporter<>(exporter);
}

        上面的程式碼中,主要完成了三部分的工作:

  • 將服務與本地的某個埠號進行繫結,從而實現服務暴露的功能;
  • 根據配置得到一個服務註冊物件Registry,然後對其進行註冊;
  • 建立一個配置被重寫的監聽器,並且註冊該監聽器,從而實現配置被重寫時能夠動態的使用新的配置進行服務的配置。

        對於服務的暴露,主要是在doLocalExport()方法中,我們繼續閱讀其原始碼:

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, 
      URL providerUrl) {
  // 獲取當前Invoker對應的key,預設為group/interface/version的格式
  String key = getCacheKey(originInvoker);

  // 這一段程式碼看起來比較複雜,其實本質上還是protocol.export()方法的呼叫,該方法就是進行服務暴露的程式碼,
  // 而ExporterChangeableWrapper的主要作用則是進行unexport()時的一些清理工作
  return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
    Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
    return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), 
        originInvoker);
  });
}

        doLocalExport()方法的實現比較簡單,主要的匯出工作還是委託給了protocol.export()方法進行,這裡的protocol的型別為DubboProtocol,這裡我們直接看其export()方法:

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
  URL url = invoker.getUrl();
  
  String key = serviceKey(url);
  DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
  exporterMap.put(key, exporter);

  // 這裡主要是構建Stub的事件分發器,該分發器用於在消費者端進行Stub事件的分發
  Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, 
      Constants.DEFAULT_STUB_EVENT);
  Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
  if (isStubSupportEvent && !isCallbackservice) {
    String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
    if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
      if (logger.isWarnEnabled()) {
        logger.warn(new IllegalStateException("consumer [" 
            + url.getParameter(Constants.INTERFACE_KEY) 
            + "], has set stubproxy support event ,but no stub methods founded."));
      }
    } else {
      stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
    }
  }

  // 開啟服務
  openServer(url);
  // 該方法的主要作用是對序列化進行優化,其會獲取配置的實現了SerializationOptimizer介面的配置類,
  // 然後通過其getSerializableClasses()方法獲取序列化類,通過這些類來進行序列化的優化
  optimizeSerialization(url);

  return exporter;
}

        export()方法主要做了三件事:a. 註冊stub事件分發器;b. 開啟服務;c. 註冊序列化優化器類。這裡openServer()方法是用於開啟服務的,我們繼續閱讀其原始碼:

private void openServer(URL url) {
  String key = url.getAddress();
  boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
  // 這裡採用雙檢查法來判斷對應於當前服務的server是否已經建立,如果沒有建立,
  // 則建立一個新的,並且快取起來
  if (isServer) {
    ExchangeServer server = serverMap.get(key);
    if (server == null) {
      synchronized (this) {
        server = serverMap.get(key);
        if (server == null) {
          // 建立並快取新服務
          serverMap.put(key, createServer(url));
        }
      }
    } else {
      server.reset(url);
    }
  }
}

private ExchangeServer createServer(URL url) {
  url = URLBuilder.from(url)
    .addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
    .addParameterIfAbsent(Constants.HEARTBEAT_KEY, 
         String.valueOf(Constants.DEFAULT_HEARTBEAT))
    .addParameter(Constants.CODEC_KEY, DubboCodec.NAME)
    .build();
  
  // 獲取所使用的server型別,預設為netty
  String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
  if (str != null && str.length() > 0 
      && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
    throw new RpcException("Unsupported server type: " + str + ", url: " + url);
  }

  // 通過Exchangers.bind()方法進行服務的繫結
  ExchangeServer server;
  try {
    server = Exchangers.bind(url, requestHandler);
  } catch (RemotingException e) {
    throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
  }

  // 獲取client引數所指定的值,該值指定了當前client所使用的傳輸層服務,比如netty或mina。
  // 然後判斷當前SPI所提供的傳輸層服務是否包含所指定的服務型別,如果不包含,則丟擲異常
  str = url.getParameter(Constants.CLIENT_KEY);
  if (str != null && str.length() > 0) {
    Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class)
      .getSupportedExtensions();
    if (!supportedTypes.contains(str)) {
      throw new RpcException("Unsupported client type: " + str);
    }
  }

  return server;
}

        上面的程式碼主要是建立ExchangeServer的,使用雙檢查來檢測是否已經存在了對應的服務,如果不存在,則通過Exchangers.bind()方法進行建立。這裡最終會將bind()方法的呼叫委託給HeaderExchanger.bind()方法進行。需要注意的是,上面的程式碼中傳入了一個requestHandler的引數,這是一個ExchangeHandler型別的物件,其主要作用是獲取並且呼叫Invoker,以得到最終的呼叫結果,這些Handler的作用,我們將在後面的文章中進行講解,本文主要講解服務的暴露與註冊的過程。下面我們繼續閱讀HeaderExchanger.bind()方法的原始碼:

@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
  return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(
    new HeaderExchangeHandler(handler))));
}

        這裡的bind()方法主要是建立了三個Handler,並且最後一個Handler將傳入的ExchangeHandler包裹起來了。相信讀者朋友應該很快就能認識到,這裡使用的是責任鏈模式,這幾個handler通過統一的建構函式將下一個handler的例項注入到當前handler中。其實我們也就能夠理解,最終通過netty進行的呼叫過程就是基於這些責任鏈的。這裡我們主要看Transporters.bind()方法的實現原理:

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
  if (url == null) {
    throw new IllegalArgumentException("url == null");
  }
  if (handlers == null || handlers.length == 0) {
    throw new IllegalArgumentException("handlers == null");
  }
  
  // 判斷傳入的Handler是否只有一個,如果只有一個,則直接使用該handler,如果存在多個,
  // 則使用ChannelHandlerDispatcher將這些handler包裹起來進行分發
  ChannelHandler handler;
  if (handlers.length == 1) {
    handler = handlers[0];
  } else {
    handler = new ChannelHandlerDispatcher(handlers);
  }
  
  // 通過配置指定的Transporter進行服務的繫結,這裡預設使用的是NettyTransporter
  return getTransporter().bind(url, handler);
}

// NettyTransporter
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
  // 在NettyTransporter中進行服務繫結時,其只是建立了一個NettyServer以返回,但實際上在建立該物件的
  // 過程中,就完成了Netty服務的繫結。需要注意的是,這裡的NettyServer並不是Netty所提供的類,而是
  // Dubbo自己封裝的一個服務類,其對Netty的服務進行了封裝
  return new NettyServer(url, listener);
}

        Transporters.bind()方法主要是將服務的繫結過程交由NettyTransporter進行,而其則是建立了一個NettyServer物件,真正的繫結過程就在建立該物件的過程中。下面我們來看其建立的原始碼:

// AbstractServer
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
  super(url, handler);
  localAddress = getUrl().toInetSocketAddress();

  // 獲取繫結的ip和埠號等資訊
  String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
  int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
  if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) 
  {
    bindIp = Constants.ANYHOST_VALUE;
  }
  
  // 在本地繫結指定的ip和埠
  bindAddress = new InetSocketAddress(bindIp, bindPort);
  this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
  this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 
      Constants.DEFAULT_IDLE_TIMEOUT);
  try {
    // 通過建立的InetSocketAddress物件,將真正的繫結過程交由子類進行
    doOpen();
    if (logger.isInfoEnabled()) {
      logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() 
          + ", export " + getLocalAddress());
    }
  } catch (Throwable t) {
    throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " 
        + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " 
        + t.getMessage(), t);
  }

  // 這裡的DataStore只是一個本地快取的資料倉庫,主要是對一些大物件進行快取
  DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class)
      .getDefaultExtension();
  executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, 
      Integer.toString(url.getPort()));
}

// NettyServer
@Override
protected void doOpen() throws Throwable {
  bootstrap = new ServerBootstrap();

  // 這裡就進入了建立netty服務的過程,bossGroup指定的執行緒數為1,因為只有一個channel用於接收客戶端請求,
  // 而workerGroup執行緒數則指定為配置檔案所設定的執行緒數,這些執行緒主要用於進行請求的處理
  bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
  workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(
    Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
    new DefaultThreadFactory("NettyServerWorker", true));

  // 建立NettyServerHandler,這個handler就是用於處理請求用的handler,但是前面我們也講到了,
  // Dubbo使用了一個handler的責任鏈來進行訊息的處理,第二個引數this就是這個鏈的鏈頭。需要注意的是,
  // Netty本身提供的責任鏈與Dubbo這裡使用的責任鏈是不同的,Dubbo只是使用了Netty的鏈的一個節點來
  // 處理Dubbo所建立的鏈,這樣Dubbo的鏈其實是可以在多種服務複用的,比如Mina
  final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
  channels = nettyServerHandler.getChannels();

  // 這裡是標準的建立Netty的BootstrapServer的過程
  bootstrap.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
    .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
      @Override
      protected void initChannel(NioSocketChannel ch) throws Exception {
        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), 
            NettyServer.this);
        ch.pipeline()
          // 新增用於解碼的handler
          .addLast("decoder", adapter.getDecoder())
          // 新增用於編碼的handler
          .addLast("encoder", adapter.getEncoder())
          // 新增用於進行心跳監測的handler
          .addLast("server-idle-handler", new IdleStateHandler(0, 0, 
              idleTimeout, MILLISECONDS))
          // 將處理請求的handler新增到pipeline中
          .addLast("handler", nettyServerHandler);
      }
    });

  // 進行服務的繫結
  ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
  channelFuture.syncUninterruptibly();
  channel = channelFuture.channel();

}

        上面的程式碼就是一個標準的使用Netty進行服務繫結的程式碼,關於Netty的使用,讀者朋友可以閱讀Netty Reactor模式實現原理詳解

2. 服務的註冊

        對於服務的註冊,前面我們已經講到,入口主要在RegistryProtocol.export()方法中,而呼叫入口則是通過其register()方法進行的,這裡我們來看一下該方法的呼叫過程:

public void register(URL registryUrl, URL registeredProviderUrl) {
  // 通過RegistryFactory獲取一個Registry物件,該物件的主要作用是進行服務的註冊,
  // 這裡預設返回的是ZookeeperRegistry
  Registry registry = registryFactory.getRegistry(registryUrl);
  registry.register(registeredProviderUrl);
}

        這裡主要是根據配置獲取一個Registry物件,我們繼續閱讀其register()方法的原始碼:

// FailbackRegistry
@Override
public void register(URL url) {
  // 將當前URL物件儲存到已註冊的URL物件列表中
  super.register(url);
  // 移除之前註冊失敗的記錄
  removeFailedRegistered(url);
  removeFailedUnregistered(url);
  try {
    // 將真正的註冊過程委託給ZookeeperRegistry進行
    doRegister(url);
  } catch (Exception e) {
    Throwable t = e;

    // 下面的過程主要是在註冊失敗的情況下,將當前URL新增到註冊失敗的URL列表中
    boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
      && url.getParameter(Constants.CHECK_KEY, true)
      && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
    boolean skipFailback = t instanceof SkipFailbackWrapperException;
    if (check || skipFailback) {
      if (skipFailback) {
        t = t.getCause();
      }
      throw new IllegalStateException("Failed to register " + url + " to registry " 
          + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
    } else {
      logger.error("Failed to register " + url + ", waiting for retry, cause: " 
          + t.getMessage(), t);
    }

    // 將當前URL新增到註冊失敗的URL列表中
    addFailedRegistered(url);
  }
}

// ZookeeperRegistry
@Override
public void doRegister(URL url) {
  try {
    // 這裡是真正的註冊過程。需要注意的是這裡的zkClient型別為ZookeeperClient,其是Dubbo對
    // 真正使用的CuratorFramework的一個封裝
    zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
  } catch (Throwable e) {
    throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() 
        + ", cause: " + e.getMessage(), e);
  }
}

        上面的程式碼中首先會對一些快取資料進行清理,並且將當前URL新增到註冊的URL列表中,然後將註冊過程委託給ZookeeperClient進行。下面我們來看其是如何進行註冊的:

@Override
public void create(String path, boolean ephemeral) {
  // 判斷建立的是否為臨時節點,如果不是臨時節點,則判斷是否已經存在該節點,如果存在,則直接返回
  if (!ephemeral) {
    if (checkExists(path)) {
      return;
    }
  }
  
  // 對path進行擷取,因為最後一個"/"後面是被編碼的URL物件,前面則是serviceKey + category
  // 這裡的category指定的是provider還是consumer
  int i = path.lastIndexOf('/');
  if (i > 0) {
    // 建立節點,需要注意的是,這裡的create()方法進行的是遞迴呼叫,這是因為zookeeper建立節點時
    // 只能一級一級的建立,因而其每次都是取"/"前面的一部分來建立,只有當前節點已經存在的情況下,
    // 上面的checkExists()才會為true,而且這裡,由於zookeeper規定,除了葉節點以外,其餘所有的
    // 節點都必須為非臨時節點,因而這裡第二個引數傳入的是false,這也是前面的if判斷能通過的原因
    create(path.substring(0, i), false);
  }
  
  if (ephemeral) {
    // 建立臨時節點,具體的建立工作交由子類進行,也就是下面的程式碼
    createEphemeral(path);
  } else {
    // 建立持久節點,具體的建立工作交由子類進行,也就是下面的程式碼
    createPersistent(path);
  }
}
@Override
public void createEphemeral(String path) {
  try {
    // 將臨時節點的建立工作交由CuratorFramework進行
    client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
  } catch (NodeExistsException e) {
  } catch (Exception e) {
    throw new IllegalStateException(e.getMessage(), e);
  }
}
@Override
public void createPersistent(String path) {
  try {
    // 將持久節點的建立工作交由CuratorFramework進行
    client.create().forPath(path);
  } catch (NodeExistsException e) {
  } catch (Exception e) {
    throw new IllegalStateException(e.getMessage(), e);
  }
}

3. 小結

        本文主要講解了Dubbo在匯出服務時是如何進行服務暴露與註冊的,並且具體講解了如何基於netty進行服務的暴露,和如何基於zookeeper進