Dubbo原始碼解析(七)註冊中心——zookeeper
註冊中心——zookeeper
目標:解釋以為zookeeper實現的註冊中心原理,解讀duubo-registry-zookeeper的原始碼
這篇文章是講解註冊中心的最後一篇文章。這篇文章講的是dubbo的註冊中心用zookeeper來實現。這種實現註冊中心的方法也是dubbo推薦的方法。為了能更加理解zookeeper在dubbo中的應用,接下來我先簡單的介紹一下zookeeper。
因為dubbo是一個分散式的RPC開源框架,各個服務之間單獨部署,就會出現資源之間不一致的問題。而zookeeper就有保證分散式一致性的特性。ZooKeeper是一種為分散式應用所設計的高可用、高效能且一致的開源協調服務。關於dubbo為什麼會推薦使用zookeeper作為它的註冊中心實現,有很多書籍以及部落格講解了zookeeper的特性以及優勢,這不是本章的重點,我要講的是zookeeper的資料結構,dubbo服務是如何被zookeeper的資料結構儲存管理的,因為這影響到下面原始碼的解讀。zookeeper採用的是樹形結構來組織資料節點,它類似於一個標準的檔案系統。先來看看下面這張圖:
該圖是官方檔案裡面的一張圖,展示了dubbo在zookeeper中儲存的形式以及節點層級,
- dubbo的Root層是根目錄,通過<dubbo:registry group="dubbo" />的“group”來設定zookeeper的根節點,預設值是“dubbo”。
- Service層是服務介面的全名。
- Type層是分類,一共有四種分類,分別是providers(服務提供者列表)、consumers(服務消費者列表)、routes(路由規則列表)、configurations(配置規則列表)。
- URL層:根據不同的Type目錄:可以有服務提供者 URL 、服務消費者 URL 、路由規則 URL 、配置規則 URL 。不同的Type關注的URL不同。
zookeeper以每個斜槓來分割每一層的znode,比如第一層根節點dubbo就是“/dubbo”,而第二層的Service層就是/com.foo.Barservice,zookeeper的每個節點通過路徑來表示以及訪問,例如服務提供者啟動時,向/dubbo/com.foo.Barservice/providers目錄下寫入自己的URL地址。關於流程呼叫說明,見官方檔案:
瞭解了dubbo在zookeeper中的節點層級,就可以看相關的原始碼了,下圖是包的結構:
跟前面三種實現方式一樣的目錄,也就兩個類,看起來非常的舒服,接下來就來解析這兩個類。
(一)ZookeeperRegistry
該類繼承了FailbackRegistry類,該類就是針對註冊中心核心的功能註冊、訂閱、取消註冊、取消訂閱,查詢註冊列表進行展開,基於zookeeper來實現。
1.屬性
// 日誌記錄
private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);
// 預設的zookeeper埠
private final static int DEFAULT_ZOOKEEPER_PORT = 2181;
// 預設zookeeper根節點
private final static String DEFAULT_ROOT = "dubbo";
// zookeeper根節點
private final String root;
// 服務介面集合
private final Set<String> anyServices = new ConcurrentHashSet<String>();
// 監聽器集合
private final ConcurrentMap<URL,ConcurrentMap<NotifyListener,ChildListener>> zkListeners = new ConcurrentHashMap<URL,ChildListener>>();
// zookeeper客戶端例項
private final ZookeeperClient zkClient;
複製程式碼
其實你會發現zookeeper雖然是最被推薦的,反而它的實現邏輯相對簡單,因為呼叫了zookeeper服務元件,很多的邏輯不需要在dubbo中自己去實現。上面的屬性介紹也很簡單,不需要多說,更多的是呼叫zookeeper客戶端。
2.構造方法
public ZookeeperRegistry(URL url,ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 獲得url攜帶的分組配置,並且作為zookeeper的根節點
String group = url.getParameter(Constants.GROUP_KEY,DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 建立zookeeper client
zkClient = zookeeperTransporter.connect(url);
// 新增狀態監聽器,當狀態為重連的時候呼叫恢復方法
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
// 恢復
recover();
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
}
}
});
}
複製程式碼
這裡有以下幾個關注點:
- 引數中ZookeeperTransporter是一個介面,並且在dubbo中有ZkclientZookeeperTransporter和CuratorZookeeperTransporter兩個實現類,ZookeeperTransporter還是一個可擴充套件的介面,基於 Dubbo SPI Adaptive 機制,會根據url中攜帶的引數去選擇用哪個實現類。
- 上面我說明瞭dubbo在zookeeper節點層級有一層是root層,該層是通過group屬性來設定的。
- 給客戶端新增一個監聽器,當狀態為重連的時候呼叫FailbackRegistry的恢復方法
3.appendDefaultPort
static String appendDefaultPort(String address) {
if (address != null && address.length() > 0) {
int i = address.indexOf(':');
// 如果地址本身沒有埠,則使用預設埠2181
if (i < 0) {
return address + ":" + DEFAULT_ZOOKEEPER_PORT;
} else if (Integer.parseInt(address.substring(i + 1)) == 0) {
return address.substring(0,i + 1) + DEFAULT_ZOOKEEPER_PORT;
}
}
return address;
}
複製程式碼
該方法是拼接使用預設的zookeeper埠,就是方地址本身沒有埠的時候才使用預設埠。
4.isAvailable && destroy
@Override
public boolean isAvailable() {
return zkClient.isConnected();
}
@Override
public void destroy() {
super.destroy();
try {
zkClient.close();
} catch (Exception e) {
logger.warn("Failed to close zookeeper client " + getUrl() + ",cause: " + e.getMessage(),e);
}
}
複製程式碼
這裡兩個方法分別是檢測zookeeper是否連線以及銷燬連線,很簡單,都是呼叫了zookeeper客戶端封裝好的方法。
5.doRegister && doUnregister
@Override
protected void doRegister(URL url) {
try {
// 建立URL節點,也就是URL層的節點
zkClient.create(toUrlPath(url),url.getParameter(Constants.DYNAMIC_KEY,true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ",e);
}
}
@Override
protected void doUnregister(URL url) {
try {
// 刪除節點
zkClient.delete(toUrlPath(url));
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ",e);
}
}
複製程式碼
這兩個方法分別是註冊和取消註冊,也很簡單,呼叫都是客戶端create和delete方法,一個是建立一個節點,另一個是刪除節點,該操作都在URL層。
6.doSubscribe
@Override
protected void doSubscribe(final URL url,final NotifyListener listener) {
try {
// 處理所有Service層發起的訂閱,例如監控中心的訂閱
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
// 獲得根目錄
String root = toRootPath();
// 獲得url對應的監聽器集合
ConcurrentMap<NotifyListener,ChildListener> listeners = zkListeners.get(url);
// 不存在就建立監聽器集合
if (listeners == null) {
zkListeners.putIfAbsent(url,new ConcurrentHashMap<NotifyListener,ChildListener>());
listeners = zkListeners.get(url);
}
// 獲得節點監聽器
ChildListener zkListener = listeners.get(listener);
// 如果該節點監聽器為空,則建立
if (zkListener == null) {
listeners.putIfAbsent(listener,new ChildListener() {
@Override
public void childChanged(String parentPath,List<String> currentChilds) {
// 遍歷現有的節點,如果現有的服務集合中沒有該節點,則加入該節點,然後訂閱該節點
for (String child : currentChilds) {
// 解碼
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY,child,Constants.CHECK_KEY,String.valueOf(false)),listener);
}
}
}
});
// 重新獲取,為了保證一致性
zkListener = listeners.get(listener);
}
// 建立service節點,該節點為持久節點
zkClient.create(root,false);
// 向zookeeper的service節點發起訂閱,獲得Service介面全名陣列
List<String> services = zkClient.addChildListener(root,zkListener);
if (services != null && !services.isEmpty()) {
// 遍歷Service介面全名陣列
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
// 發起該service層的訂閱
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY,service,listener);
}
}
} else {
// 處理指定 Service 層的發起訂閱,例如服務消費者的訂閱
List<URL> urls = new ArrayList<URL>();
// 遍歷分類陣列
for (String path : toCategoriesPath(url)) {
// 獲得監聽器集合
ConcurrentMap<NotifyListener,ChildListener> listeners = zkListeners.get(url);
// 如果沒有則建立
if (listeners == null) {
zkListeners.putIfAbsent(url,ChildListener>());
listeners = zkListeners.get(url);
}
// 獲得節點監聽器
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener,new ChildListener() {
@Override
public void childChanged(String parentPath,List<String> currentChilds) {
// 通知服務變化 回撥NotifyListener
ZookeeperRegistry.this.notify(url,listener,toUrlsWithEmpty(url,parentPath,currentChilds));
}
});
// 重新獲取節點監聽器,保證一致性
zkListener = listeners.get(listener);
}
// 建立type節點,該節點為持久節點
zkClient.create(path,false);
// 向zookeeper的type節點發起訂閱
List<String> children = zkClient.addChildListener(path,zkListener);
if (children != null) {
// 加入到自子節點資料陣列
urls.addAll(toUrlsWithEmpty(url,path,children));
}
}
// 通知資料變化
notify(url,urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ",e);
}
}
複製程式碼
這個方法是訂閱,邏輯實現比較多,可以分兩段來看,這裡的實現把所有Service層發起的訂閱以及指定的Service層發起的訂閱分開處理。所有Service層類似於監控中心發起的訂閱。指定的Service層發起的訂閱可以看作是服務消費者的訂閱。訂閱的大致邏輯類似,不過還是有幾個區別:
- 所有Service層發起的訂閱中的ChildListener是在在 Service 層發生變更時,才會做出解碼,用anyServices屬性判斷是否是新增的服務,最後呼叫父類的subscribe訂閱。而指定的Service層發起的訂閱是在URL層發生變更的時候,呼叫notify,回撥回撥NotifyListener的邏輯,做到通知服務變更。
- 所有Service層發起的訂閱中客戶端建立的節點是Service節點,該節點為持久節點,而指定的Service層發起的訂閱中建立的節點是Type節點,該節點也是持久節點。這裡補充一下zookeeper的持久節點是節點建立後,就一直存在,直到有刪除操作來主動清除這個節點,不會因為建立該節點的客戶端會話失效而消失。而臨時節點的生命週期和客戶端會話繫結。也就是說,如果客戶端會話失效,那麼這個節點就會自動被清除掉。注意,這裡提到的是會話失效,而非連線斷開。另外,在臨時節點下面不能建立子節點。
- 指定的Service層發起的訂閱中呼叫了兩次notify,第一次是增量的通知,也就是隻是通知這次增加的服務節點,而第二個是全量的通知。
7.doUnsubscribe
@Override
protected void doUnsubscribe(URL url,NotifyListener listener) {
// 獲得監聽器集合
ConcurrentMap<NotifyListener,ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
// 獲得子節點的監聽器
ChildListener zkListener = listeners.get(listener);
if (zkListener != null) {
// 如果為全部的服務介面,例如監控中心
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
// 獲得根目錄
String root = toRootPath();
// 移除監聽器
zkClient.removeChildListener(root,zkListener);
} else {
// 遍歷分類陣列進行移除監聽器
for (String path : toCategoriesPath(url)) {
zkClient.removeChildListener(path,zkListener);
}
}
}
}
}
複製程式碼
該方法是取消訂閱,也是分為兩種情況,所有的Service發起的取消訂閱還是指定的Service發起的取消訂閱。可以看到所有的Service發起的取消訂閱就直接移除了根目錄下所有的監聽器,而指定的Service發起的取消訂閱是移除了該Service層下面的所有Type節點監聽器。如果不太明白再回去看看前面的那個節點層級圖。
8.lookup
@Override
public List<URL> lookup(URL url) {
if (url == null) {
throw new IllegalArgumentException("lookup url == null");
}
try {
List<String> providers = new ArrayList<String>();
// 遍歷分組類別
for (String path : toCategoriesPath(url)) {
// 獲得子節點
List<String> children = zkClient.getChildren(path);
if (children != null) {
providers.addAll(children);
}
}
// 獲得 providers 中,和 consumer 匹配的 URL 陣列
return toUrlsWithoutEmpty(url,providers);
} catch (Throwable e) {
throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ",e);
}
}
複製程式碼
該方法就是查詢符合條件的已經註冊的服務。呼叫了toUrlsWithoutEmpty方法,在後面會講到。
9.toServicePath
private String toServicePath(URL url) {
String name = url.getServiceInterface();
// 如果是包括所有服務,則返回根節點
if (Constants.ANY_VALUE.equals(name)) {
return toRootPath();
}
return toRootDir() + URL.encode(name);
}
複製程式碼
該方法是獲得服務路徑,拼接規則:Root + Type。
10.toCategoriesPath
private String[] toCategoriesPath(URL url) {
String[] categories;
// 如果url攜帶的分類配置為*,則建立包括所有分類的陣列
if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) {
categories = new String[]{Constants.PROVIDERS_CATEGORY,Constants.CONSUMERS_CATEGORY,Constants.ROUTERS_CATEGORY,Constants.CONFIGURATORS_CATEGORY};
} else {
// 返回url攜帶的分類配置
categories = url.getParameter(Constants.CATEGORY_KEY,new String[]{Constants.DEFAULT_CATEGORY});
}
String[] paths = new String[categories.length];
for (int i = 0; i < categories.length; i++) {
// 加上服務路徑
paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categories[i];
}
return paths;
}
private String toCategoryPath(URL url) {
return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY,Constants.DEFAULT_CATEGORY);
}
複製程式碼
第一個方法是獲得分類陣列,也就是url攜帶的服務下的所有Type節點陣列。第二個是獲得分類路徑,分類路徑拼接規則:Root + Service + Type
11.toUrlPath
private String toUrlPath(URL url) {
return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
}
複製程式碼
該方法是獲得URL路徑,拼接規則是Root + Service + Type + URL
12.toUrlsWithoutEmpty && toUrlsWithEmpty
private List<URL> toUrlsWithoutEmpty(URL consumer,List<String> providers) {
List<URL> urls = new ArrayList<URL>();
if (providers != null && !providers.isEmpty()) {
// 遍歷服務提供者
for (String provider : providers) {
// 解碼
provider = URL.decode(provider);
if (provider.contains("://")) {
// 把服務轉化成url的形式
URL url = URL.valueOf(provider);
// 判斷是否匹配,如果匹配, 則加入到集合中
if (UrlUtils.isMatch(consumer,url)) {
urls.add(url);
}
}
}
}
return urls;
}
private List<URL> toUrlsWithEmpty(URL consumer,String path,List<String> providers) {
// 返回和服務消費者匹配的服務提供者url
List<URL> urls = toUrlsWithoutEmpty(consumer,providers);
// 如果不存在,則建立`empty://` 的 URL返回
if (urls == null || urls.isEmpty()) {
int i = path.lastIndexOf('/');
String category = i < 0 ? path : path.substring(i + 1);
URL empty = consumer.setProtocol(Constants.EMPTY_PROTOCOL).addParameter(Constants.CATEGORY_KEY,category);
urls.add(empty);
}
return urls;
}
複製程式碼
第一個toUrlsWithoutEmpty方法是獲得 providers 中,和 consumer 匹配的 URL 陣列,第二個toUrlsWithEmpty方法是呼叫了第一個方法後增加了若不存在匹配,則建立 empty://
的 URL返回。通過這樣的方式,可以處理類似服務提供者為空的情況。
(二)ZookeeperRegistryFactory
該類繼承了AbstractRegistryFactory類,實現了AbstractRegistryFactory抽象出來的createRegistry方法,看一下原始碼:
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url,zookeeperTransporter);
}
}
複製程式碼
可以看到就是例項化了ZookeeperRegistry而已,所有這裡就不解釋了。
後記
該部分相關的原始碼解析地址:github.com/CrazyHZM/in…
該文章講解了dubbo利用zookeeper來實現註冊中心,其中關鍵的是需要弄明白dubbo在zookeeper中儲存的節點層級意義,也就是root層、service層、type層以及url層分別代表什麼,其他的邏輯並不複雜大多數呼叫了zookeeper客戶端的能力,有興趣的同學也可以深入的去了解zookeeper。如果我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見。