1. 程式人生 > >分散式系統中如何較好地做服務發現

分散式系統中如何較好地做服務發現

前言


在分散式系統中的中心管理服務模式下,往往採用的模式是1個manager服務節點,多個worker節點,然後由manager來管控這些worker節點。但是本篇文章不是來講manager如何管理的問題,而是woker識別發現manager服務的問題。目前一種比較簡單的做法,通過worker節點本地配置的方式,來指定manager服務地址。這種方式實現較為容易,但是可維護性並不高。比如一個簡單的場景,如果manager節點地址發生改變,其下worker節點內所標明的manager地址就得被動地一個個更新了。我們可以用一個專業的術語表示這個現象:Service Discovery(服務發現)。

服務發現的基本原則


服務發現說到底就是讓客戶端如何快速,高效地“找到”服務端。前面提到的通過本地配置直接指明地址的方式是一種,但是確切地來說,它並不高效。

一種更為高效的方式應該是下面這種:

客戶端始終聯絡(通訊)的是一個固定(共享)的地址,而不是實際的地址,通過這個共享的地址,我們能夠找到實際的地址。

可能有人會說了,這不就是代理地址的意思嘛。但其實這並不完全等同於代理地址的意思。在後面的篇幅內,後具體介紹這裡面的差異。

服務發現的現有解決方案


針對上節提到的大原則的前提下,我們有哪些可行的方案呢?從最近Hadoop社群討論中,筆者歸納出了以下幾種:

  • 第一種,通過本地xml檔案的方式,就是和現有HDFS指定hdfs-site.xml的配置方式型別。但是這個解析得到配置結果的途徑改為統一走底層通用框架的模式,而不是原有直接在執行程式碼中解析配置。只是說,我們保留了一種與原先本地配置化讀取一樣效果的方式。
  • 第二種,通過外部儲存的方式。具體地來說,是引入一個外部Store來儲存實際的地址,而客戶端只需要看到的地址是這個Store地址。也就是說,客戶端需要從這個Store中先查詢實際的地址。這個Store可以是我們常見的比如ZK,HDFS或HBase等等。至於在查詢過程潛在的效能問題,可以通過引入快取機制來優化這個問題。
  • 第三種,通過Router的方式。Router與上面提到的方式的一個不同點在於,Router幫我們免去了實際查詢的動作,也就是說,客戶端只需要知道Router地址,這就足夠了。這種方式就可以理解為是完全一個代理地址的概念了。現有Router模式的例子,大家可以學習HDFS RBF特性。
  • 第四種,DNS解析的方式。這是指我們引入一個共享地址host,這個host表明的是我們具體服務的地址。由於DNS伺服器來做這個地址的解析。這種方案主要考慮的問題在於服務host地址的及時更新問題。

依賴外部儲存Store的服務發現解決方案實現


下面筆者給出社群上被提出過的第二種方案的具體程式碼實現,大家可以仔細理解其中的解決過程(這裡依賴的外部儲存是ZK,分散式系統服務為HDFS)。

/**
 * 服務發現抽象類.
 */
public abstract class NameserviceDiscovery implements Configurable, Closeable {

  private static final Logger LOG =
      LoggerFactory.getLogger(NameserviceDiscovery.class);


  /** 針對每個服務發現例項,進行快取構造. */
  protected static final LoadingCache<Id, NameserviceDiscovery> CACHE =
      CacheBuilder.newBuilder()
          .expireAfterAccess(1, TimeUnit.MINUTES)
          .removalListener(getRemover())
          .build(getLoader());

  /** Local configuration. */
  private Configuration conf;


  static class Id {
    // 服務發現類
    Class<? extends NameserviceDiscovery> clazz;
    // 節點當前配置資訊
    Configuration conf;

    Id(
        Class<? extends NameserviceDiscovery> className,
        Configuration config) {
      this.clazz = className;
      this.conf = config;
    }

  }

  /**
   * 從快取中獲得服務發現例項
   */
  public static NameserviceDiscovery get(Configuration conf) {
    Class<? extends NameserviceDiscovery> clazz = conf.getClass(
        DFS_DISCOVERY_CLASS_KEY,
        DFS_DISCOVERY_CLASS_DEFAULT,
        NameserviceDiscovery.class);
    try {
      Id key = new Id(clazz, conf);
      return CACHE.get(key);
    } catch (ExecutionException e) {
      LOG.error("Cannot get a nameservice discovery from the cache", e);
    }
    return ReflectionUtils.newInstance(clazz, conf);
  }

  private static CacheLoader<Id, NameserviceDiscovery> getLoader() {
    return new CacheLoader<Id, NameserviceDiscovery>() {
      @Override
      public NameserviceDiscovery load(Id id) throws Exception {
        return ReflectionUtils.newInstance(id.clazz, id.conf);
      }
    };
  }

  private static RemovalListener<Id, NameserviceDiscovery> getRemover() {
    return new RemovalListener<Id, NameserviceDiscovery>() {
      @Override
      public void onRemoval(
          RemovalNotification<Id, NameserviceDiscovery> notification) {
        NameserviceDiscovery discovery = notification.getValue();
        try {
          discovery.close();
        } catch (IOException e) {
          LOG.error("Cannot close nameservice discovery");
        }
      }
    };
  }

  @Override
  public void setConf(Configuration config) {
    this.conf = config;
  }

  @Override
  public Configuration getConf() {
    return this.conf;
  }


  /**
   * 獲取服務地址方法
   */
  public abstract Collection<String> getNameServiceIds();
  public abstract Map<String, Map<String, InetSocketAddress>>
  public abstract Map<String, Map<String, InetSocketAddress>> getHttpAddresses();
  public abstract Map<String, Map<String, InetSocketAddress>> getHttpsAddresses();
}

下面是基於ZK的服務發現實現類,

/**
 * 基於ZK的服務發現實現類.
 */
public class ZookeeperBasedNameserviceDiscovery extends NameserviceDiscovery
    implements DynamicNameserviceDiscovery {

  private static final Logger LOG =
      LoggerFactory.getLogger(ZookeeperBasedNameserviceDiscovery.class);

  /** ZK管理器介面. */
  private ZKCuratorManager zkManager;

  /** 實際地址資訊的ZK儲存目錄. */
  private String baseZNode;


  /**
   * 初始化ZK連線操作
   */
  public void init() {
    if (zkManager == null) {
      Configuration conf = getConf();
      baseZNode = conf.get(
          DFS_DISCOVERY_ZK_PARENT_PATH_KEY,
          DFS_DISCOVERY_ZK_PARENT_PATH_DEFAULT);
      try {
        zkManager = new ZKCuratorManager(conf);
        zkManager.start();
      } catch (IOException e) {
        LOG.error("Cannot initialize the ZK connection", e);
      }
    }
  }

  /**
   * 關閉ZK連線
   */
  public void close() throws IOException {
    if (zkManager != null) {
      zkManager.close();
      zkManager = null;
    }
  }

  /**
   * 從ZK中獲取地址的操作方法
   */
  Map<String, Map<String, InetSocketAddress>> getAddresses(
      final String attr) throws IOException {
    Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap();
    try {
      List<String> nsIds = zkManager.getChildren(baseZNode);
      for (String nsId : nsIds) {
        Map<String, InetSocketAddress> nsMap = Maps.newLinkedHashMap();
        String pathNs = baseZNode + "/" + nsId;
        List<String> nnIds = zkManager.getChildren(pathNs);
        for (String nnId : nnIds) {
          String pathNn = pathNs + "/" + nnId;
          String pathAddress = pathNn + "/" + attr;
          String addr = zkManager.getStringData(pathAddress);
          InetSocketAddress sockAddr = NetUtils.createSocketAddr(addr);
          nsMap.put(nnId, sockAddr);
        }
        ret.put(nsId, nsMap);
      }
    } catch (Exception e) {
      LOG.error("Cannot get the addresses", e);
      throw new IOException(e.getMessage());
    }
    return ret;
  }

  /**
   * 其它型別方法
   */
  @Override
  public Collection<String> getNameServiceIds() {
    init();
    try {
      return getAddresses("rpcAddress").keySet();
    } catch (IOException e) {
      // Fallback to the configuration based
      return getConf().getTrimmedStringCollection(DFS_NAMESERVICES);
    }
  }

  @Override
  public Map<String, Map<String, InetSocketAddress>> getRpcAddresses() {
    init();
    try {
      return getAddresses("rpcAddress");
    } catch (IOException e) {
      // Fallback to the configuration based
      Configuration conf = getConf();
      return DFSUtilClient.getAddresses(conf, null,
        HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
    }
  }

  @Override
  public Map<String, Map<String, InetSocketAddress>> getHttpAddresses() {
    init();
    try {
      return getAddresses("httpAddress");
    } catch (IOException e) {
      // Fallback to the configuration based
      Configuration conf = getConf();
      return DFSUtilClient.getAddresses(conf, null,
          HdfsClientConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
    }
  }

  @Override
  public Map<String, Map<String, InetSocketAddress>> getHttpsAddresses() {
    init();
    try {
      return getAddresses("httpsAddress");
    } catch (IOException e) {
      // Fallback to the configuration based
      Configuration conf = getConf();
      return DFSUtilClient.getAddresses(conf, null,
          HdfsClientConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY);
    }
  }
}

使用的方式很簡單,呼叫底層NameserviceDiscovery的介面即可。在系統中將配置解析操作方法替換為上述介面方式的話,服務發現的方式就優化成了第二種方案了,可維護性也增強了許多。以上就是一個簡單的依賴外部Store的服務發現的實現方案。

引用


[1].https://issues.apache.org/jira/browse/HADOOP-15774. Discovery of HA servers
[2].https://issues.apache.org/jira/browse/HDFS-13312. NameNode High Availability ZooKeeper based discovery rather than explicit nn1,nn2 configs