1. 程式人生 > >Dubbo分析之Registry層

Dubbo分析之Registry層

前言

本文分析dubbo的register層;此層封裝服務地址的註冊與發現,以服務URL為中心,擴充套件介面為RegistryFactory, Registry, RegistryService;

Registry介面

介面定義如下:

public interface Registry extends Node, RegistryService {
}

public interface RegistryService {

void register(URL url);

void unregister(URL url);

void subscribe(URL url, NotifyListener listener);

void unsubscribe(URL url, NotifyListener listener);

List<URL> lookup(URL url);

}
主要提供了註冊(register),登出(unregister),訂閱(subscribe),退訂(unsubscribe)等功能;dubbo提供了多種註冊方式分別是:Multicast ,Zookeeper,Redis以及Simple方式;

Multicast:Multicast註冊中心不需要啟動任何中心節點,只要廣播地址一樣,就可以互相發現;

Zookeeper:Zookeeper是Apacahe Hadoop的子專案,是一個樹型的目錄服務,支援變更推送,適合作為Dubbo服務的註冊中心,工業強度較高,可用於生產環境,並推薦使用;

Redis:基於Redis實現的註冊中心,使用 Redis的Publish/Subscribe事件通知資料變更;

Simple:Simple註冊中心本身就是一個普通的Dubbo服務,可以減少第三方依賴,使整體通訊方式一致;

後面重點介紹官方推薦的Zookeeper註冊方式;具體的Register是在RegistryFactory中生成的,具體看一下介面定義;

RegistryFactory介面

介面定義如下:

@SPI("dubbo")
public interface RegistryFactory {

@Adaptive({"protocol"})
Registry getRegistry(URL url);

}
RegistryFactory提供了SPI擴充套件,預設使用dubbo,具體有哪些擴充套件可以檢視META-INF/dubbo/internal/com.alibaba.dubbo.registry.RegistryFactory:

dubbo=com.alibaba.dubbo.registry.dubbo.DubboRegistryFactory
multicast=com.alibaba.dubbo.registry.multicast.MulticastRegistryFactory
zookeeper=com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistryFactory
redis=com.alibaba.dubbo.registry.redis.RedisRegistryFactory
已推薦的Zookeeper為例項,檢視ZookeeperRegistryFactory,提供了createRegistry方法:

private ZookeeperTransporter zookeeperTransporter;

public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
例項化ZookeeperRegistry,兩個引數分別是url和zookeeperTransporter,zookeeperTransporter是操作Zookeeper的客戶端元件包括:zkclient和curator兩種方式

@SPI("curator")
public interface ZookeeperTransporter {

@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
ZookeeperClient connect(URL url);

}
ZookeeperTransporter同樣提供了SPI擴充套件,預設使用curator方式;接下來重點看一下Zookeeper註冊中心。

Zookeeper註冊中心

1.整體設計流程

在dubbo的整體設計中,可以大致檢視Registry層的大致流程,首先通過RegistryFactory例項化Registry,Registry可以接收RegistryProtocol傳過來的註冊(register)和訂閱(subscribe)訊息,然後Registry通過ZKClient來向Zookeeper指定的目錄下寫入url資訊,如果是訂閱訊息Registry會通過NotifyListener來通知RegitryDirctory進行更新url,最後就是Cluster層通過路由,負載均衡選擇具體的提供方;

2.Zookeeper註冊中心流程

官方提供了dubbo在Zookeeper中心的流程圖:
Dubbo分析之Registry層

流程說明:

服務提供者啟動時: 向/dubbo/com.foo.BarService/providers目錄下寫入自己的URL地址;

服務消費者啟動時: 訂閱/dubbo/com.foo.BarService/providers目錄下的提供者URL地址;並向/dubbo/com.foo.BarService/consumers目錄下寫入自己的URL地址;

監控中心啟動時: 訂閱/dubbo/com.foo.BarService 目錄下的所有提供者和消費者URL地址。

下面分別從註冊(register),登出(unregister),訂閱(subscribe),退訂(unsubscribe)四個方面來分析

3.註冊(register)

ZookeeperRegistry的父類FailbackRegistry中實現了register方法,FailbackRegistry從名字可以看出來具有:失敗自動恢復,後臺記錄失敗請求,定時重發功能;下面具體看一下register方法:

public void register(URL url) {
super.register(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
Throwable t = e;

        // If the startup detection is opened, the Exception is thrown directly.
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // Record a failed registration request to a failed list, retry regularly
        failedRegistered.add(url);
    }
}

後臺記錄了失敗的請求,包括failedRegistered和failedUnregistered,註冊的時候將裡面存放的url刪除,然後執行doRegister方法,此方式在ZookeeperRegistry中實現,主要是在Zookeeper指定的目錄下寫入url資訊,如果失敗會記錄註冊失敗的url,等待自動恢復;doRegister相關程式碼如下:

protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
呼叫zkClient的create方法在Zookeeper上建立節點,預設建立臨時節點,create方法在AbstractZookeeperClient中實現,具體原始碼如下:

public void create(String path, boolean ephemeral) {
if (!ephemeral) {
if (checkExists(path)) {
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
create(path.substring(0, i), false);
}
if (ephemeral) {
createEphemeral(path);
} else {
createPersistent(path);
}
}
path指定需要建立的目錄,ephemeral指定是否是建立臨時節點,並且提供了遞迴建立目錄,除了葉子目錄其他目錄都是持久化的;可以發現不管是建立臨時目錄還是持久化目錄,都沒有指定目錄的Data,所有使用的是預設值,也就是本地ip地址;例項中建立的目錄如下:

/dubbo/com.dubboApi.DemoService/providers/dubbo%3A%2F%2F10.13.83.7%3A20880%2Fcom.dubboApi.DemoService%3Fanyhost%3Dtrue%26application%3Dhello-world-app%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.dubboApi.DemoService%26methods%3DsyncSayHello%2CsayHello%2CasyncSayHello%26pid%3D13252%26serialization%3Dprotobuf%26side%3Dprovider%26timestamp%3D1545297239027
dubbo是一個根節點,然後是service名稱,providers是固定的一個型別,如果是消費端這裡就是consumers,最後就是一個臨時節點;使用臨時節點的目的就是提供者出現斷電等異常停機時,註冊中心能自動刪除提供者資訊;可以通過如下方法查詢當前的目錄節點資訊:

public class CuratorTest {

static String path = "/dubbo";
static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
        .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

public static void main(String[] args) throws Exception {
    client.start();
    List<String> paths = listChildren(path);
    for (String path : paths) {
        Stat stat = new Stat();
        System.err.println(
                "path:" + path + ",value:" + new String(client.getData().storingStatIn(stat).forPath(path)));
    }
}

private static List<String> listChildren(String path) throws Exception {
    List<String> pathList = new ArrayList<String>();
    pathList.add(path);
    List<String> list = client.getChildren().forPath(path);
    if (list != null && list.size() > 0) {
        for (String cPath : list) {
            String temp = "";
            if ("/".equals(path)) {
                temp = path + cPath;
            } else {
                temp = path + "/" + cPath;
            }
            pathList.addAll(listChildren(temp));
        }
    }
    return pathList;
}

}
遞迴遍歷/dubbo目錄下的所有子目錄,同時將節點儲存的資料都查詢出來,結果如下:

path:/dubbo,value:10.13.83.7
path:/dubbo/com.dubboApi.DemoService,value:10.13.83.7
path:/dubbo/com.dubboApi.DemoService/configurators,value:10.13.83.7
path:/dubbo/com.dubboApi.DemoService/providers,value:10.13.83.7
path:/dubbo/com.dubboApi.DemoService/providers/dubbo%3A%2F%2F10.13.83.7%3A20880%2Fcom.dubboApi.DemoService%3Fanyhost%3Dtrue%26application%3Dhello-world-app%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.dubboApi.DemoService%26methods%3DsyncSayHello%2CsayHello%2CasyncSayHello%26pid%3D4712%26serialization%3Dprotobuf%26side%3Dprovider%26timestamp%3D1545358401966,value:10.13.83.7
除了最後一個節點是臨時節點,其他都是持久化的;

4.登出(unregister)

同樣在父類FailbackRegistry中實現了unregister方法,程式碼如下:

public void unregister(URL url) {
super.unregister(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// Sending a cancellation request to the server side
doUnregister(url);
} catch (Exception e) {
Throwable t = e;

        // If the startup detection is opened, the Exception is thrown directly.
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to uregister " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // Record a failed registration request to a failed list, retry regularly
        failedUnregistered.add(url);
    }
}

登出時同樣刪除了failedRegistered和failedUnregistered存放的url,然後呼叫doUnregister,刪除Zookeeper中的目錄節點,失敗的情況下會儲存在failedUnregistered中,等待重試;

protected void doUnregister(URL url) {
try {
zkClient.delete(toUrlPath(url));
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}

//CuratorZookeeperClient刪除操作
public void delete(String path) {
    try {
        client.delete().forPath(path);
    } catch (NoNodeException e) {
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}

直接使用CuratorZookeeperClient中的delete方法刪除臨時節點;

5.訂閱(subscribe)

服務消費者啟動時,會先向Zookeeper註冊消費者節點資訊,然後訂閱…/providers目錄下提供者的URL地址;消費端也同樣需要註冊節點資訊,是因為監控中心需要對服務端和消費端都進行監控;下面重點看一下訂閱的相關程式碼,在父類FailbackRegistry中實現了subscribe方法:

public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;

        List<URL> urls = getCacheUrls(url);
        if (urls != null && !urls.isEmpty()) {
            notify(url, listener, urls);
            logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
        } else {
            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true);
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
        }

        // Record a failed registration request to a failed list, retry regularly
        addFailedSubscribed(url, listener);
    }
}

類似的格式,同樣儲存了失敗了訂閱url資訊,重點看ZookeeperRegistry中的doSubscribe方法:

private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();

protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {br/>@Override
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && !services.isEmpty()) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<URL> urls = new ArrayList<URL>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
br/>@Override
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
在ZookeeperRegistry中定義了一個zkListeners變數,每個URL對應了一個map;map裡面分別是NotifyListener和ChildListener的對應關係,消費端訂閱時這裡的NotifyListener其實就是RegistryDirectory,ChildListener是一個內部類,用來在監聽的節點發生變更時,通知對應的消費端,具體的監聽處理是在zkClient.addChildListener中實現的:

public List<String> addChildListener(String path, final ChildListener listener) {
ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path);
if (listeners == null) {
childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, TargetChildListener>());
listeners = childListeners.get(path);
}
TargetChildListener targetListener = listeners.get(listener);
if (targetListener == null) {
listeners.putIfAbsent(listener, createTargetChildListener(path, listener));
targetListener = listeners.get(listener);
}
return addTargetChildListener(path, targetListener);
}

public CuratorWatcher createTargetChildListener(String path, ChildListener listener) {
    return new CuratorWatcherImpl(listener);
}

public List<String> addTargetChildListener(String path, CuratorWatcher listener) {
    try {
        return client.getChildren().usingWatcher(listener).forPath(path);
    } catch (NoNodeException e) {
        return null;
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}

private class CuratorWatcherImpl implements CuratorWatcher {

    private volatile ChildListener listener;

    public CuratorWatcherImpl(ChildListener listener) {
        this.listener = listener;
    }

    public void unwatch() {
        this.listener = null;
    }

    @Override
    public void process(WatchedEvent event) throws Exception {
        if (listener != null) {
            String path = event.getPath() == null ? "" : event.getPath();
            listener.childChanged(path,
                    StringUtils.isNotEmpty(path)
                            ? client.getChildren().usingWatcher(this).forPath(path)
                            : Collections.<String>emptyList());
        }
    }
}

CuratorWatcherImpl實現了Zookeeper的監聽介面CuratorWatcher,用來在節點有變更時通知對應的ChildListener,這樣ChildListener就可以通知RegistryDirectory進行更新資料;

6.退訂(unsubscribe)

在父類FailbackRegistry中實現了unsubscribe方法

public void unsubscribe(URL url, NotifyListener listener) {
super.unsubscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a canceling subscription request to the server side
doUnsubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;

        // If the startup detection is opened, the Exception is thrown directly.
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true);
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to unsubscribe " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to unsubscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // Record a failed registration request to a failed list, retry regularly
        Set<NotifyListener> listeners = failedUnsubscribed.get(url);
        if (listeners == null) {
            failedUnsubscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
            listeners = failedUnsubscribed.get(url);
        }
        listeners.add(listener);
    }
}

同樣使用failedUnsubscribed用來儲存失敗退訂的url,具體看ZookeeperRegistry中的doUnsubscribe方法

protected void doUnsubscribe(URL url, NotifyListener listener) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
ChildListener zkListener = listeners.get(listener);
if (zkListener != null) {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
zkClient.removeChildListener(root, zkListener);
} else {
for (String path : toCategoriesPath(url)) {
zkClient.removeChildListener(path, zkListener);
}
}
}
}
}
退訂就比較簡單了,只需要移除監聽器就可以了;

7.失敗重試

FailbackRegistry從名字可以看出來具有:失敗自動恢復,後臺記錄失敗請求,定時重發功能;在FailbackRegistry的構造器中啟動了一個定時器:

this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {br/>@Override
public void run() {
// Check and connect to the registry
try {
retry();
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
例項化了一個間隔5秒執行一次重試的定時器,retry部分程式碼如下:

protected void retry() {
if (!failedRegistered.isEmpty()) {
Set<URL> failed = new HashSet<URL>(failedRegistered);
if (failed.size() > 0) {
if (logger.isInfoEnabled()) {
logger.info("Retry register " + failed);
}
try {
for (URL url : failed) {
try {
doRegister(url);
failedRegistered.remove(url);
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
} catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
}
}
}
...省略...
}
定期檢查是否存在失敗的註冊(register),登出(unregister),訂閱(subscribe),退訂(unsubscribe)URL,如果存在則重試;

總結

本文首先介紹了RegistryFactory, Registry, RegistryService幾個核心介面,然後以Zookeeper為註冊中心重點介紹了註冊(register),登出(unregister),訂閱(subscribe),退訂(unsubscribe)方式。