1. 程式人生 > 程式設計 >Dubbo原始碼解析(十八)遠端通訊——Zookeeper

Dubbo原始碼解析(十八)遠端通訊——Zookeeper

遠端通訊——Zookeeper

目標:介紹基於zookeeper的來實現的遠端通訊、介紹dubbo-remoting-zookeeper內的原始碼解析。

前言

對於zookeeper我相信肯定不陌生,在之前的文章裡面也有講到zookeeper來作為註冊中心。在這裡,基於zookeeper來實現遠端通訊,duubo封裝了zookeeper client,來和zookeeper server通訊。

下面是類圖:

zookeeper-類圖

原始碼分析

(一)ZookeeperClient

public interface ZookeeperClient {

    /**
     * 建立client
     * @param
path * @param ephemeral */
void create(String path,boolean ephemeral); /** * 刪除client * @param path */ void delete(String path); /** * 獲得子節點集合 * @param path * @return */ List<String> getChildren(String path); /** * 向zookeeper的該節點發起訂閱,獲得該節點所有 * @param
path * @param listener * @return */
List<String> addChildListener(String path,ChildListener listener); /** * 移除該節點的子節點監聽器 * @param path * @param listener */ void removeChildListener(String path,ChildListener listener); /** * 新增狀態監聽器 * @param
listener */
void addStateListener(StateListener listener); /** * 移除狀態監聽 * @param listener */ void removeStateListener(StateListener listener); /** * 判斷是否連線 * @return */ boolean isConnected(); /** * 關閉客戶端 */ void close(); /** * 獲得url * @return */ URL getUrl(); } 複製程式碼

該介面是基於zookeeper的客戶端介面,其中封裝了客戶端的一些方法。

(二)AbstractZookeeperClient

該類實現了ZookeeperClient介面,是客戶端的抽象類,它實現了一些公共邏輯,把具體的doClose、createPersistent等方法抽象出來,留給子類來實現。

1.屬性

/**
 * url物件
 */
private final URL url;

/**
 * 狀態監聽器集合
 */
private final Set<StateListener> stateListeners = new CopyOnWriteArraySet<StateListener>();

/**
 * 客戶端監聽器集合
 */
private final ConcurrentMap<String,ConcurrentMap<ChildListener,TargetChildListener>> childListeners = new ConcurrentHashMap<String,TargetChildListener>>();

/**
 * 是否關閉
 */
private volatile boolean closed = false;
複製程式碼

2.create

@Override
public void create(String path,boolean ephemeral) {
    // 如果不是臨時節點
    if (!ephemeral) {
        // 判斷該客戶端是否存在
        if (checkExists(path)) {
            return;
        }
    }
    // 獲得/的位置
    int i = path.lastIndexOf('/');
    if (i > 0) {
        // 建立客戶端
        create(path.substring(0,i),false);
    }
    // 如果是臨時節點
    if (ephemeral) {
        // 建立臨時節點
        createEphemeral(path);
    } else {
        // 遞迴建立節點
        createPersistent(path);
    }
}
複製程式碼

該方法是建立客戶端的方法,其中createEphemeral和createPersistent方法都被抽象出來。具體看下面的類的介紹。

3.addStateListener

@Override
public void addStateListener(StateListener listener) {
    // 狀態監聽器加入集合
    stateListeners.add(listener);
}
複製程式碼

該方法就是增加狀態監聽器。

4.close

@Override
public void close() {
    if (closed) {
        return;
    }
    closed = true;
    try {
        // 關閉
        doClose();
    } catch (Throwable t) {
        logger.warn(t.getMessage(),t);
    }
}
複製程式碼

該方法是關閉客戶端,其中doClose方法也被抽象出。

/**
 * 關閉客戶端
 */
protected abstract void doClose();

/**
 * 遞迴建立節點
 * @param path
 */
protected abstract void createPersistent(String path);

/**
 * 建立臨時節點
 * @param path
 */
protected abstract void createEphemeral(String path);

/**
 * 檢測該節點是否存在
 * @param path
 * @return
 */
protected abstract boolean checkExists(String path);

/**
 * 建立子節點監聽器
 * @param path
 * @param listener
 * @return
 */
protected abstract TargetChildListener createTargetChildListener(String path,ChildListener listener);

/**
 * 為子節點新增監聽器
 * @param path
 * @param listener
 * @return
 */
protected abstract List<String> addTargetChildListener(String path,TargetChildListener listener);

/**
 * 移除子節點監聽器
 * @param path
 * @param listener
 */
protected abstract void removeTargetChildListener(String path,TargetChildListener listener);
複製程式碼

上述的方法都是被抽象的,又它的兩個子類來實現。

(三)ZkclientZookeeperClient

該類繼承了AbstractZookeeperClient,是zk客戶端的實現類。

1.屬性

/**
 * zk客戶端包裝類
 */
private final ZkClientWrapper client;

/**
 * 連線狀態
 */
private volatile KeeperState state = KeeperState.SyncConnected;
複製程式碼

該類有兩個屬性,其中client就是核心所在,幾乎所有方法都呼叫了client的方法。

2.建構函式

public ZkclientZookeeperClient(URL url) {
    super(url);
    // 新建一個zkclient包裝類
    client = new ZkClientWrapper(url.getBackupAddress(),30000);
    // 增加狀態監聽
    client.addListener(new IZkStateListener() {
        /**
         * 如果狀態改變
         * @param state
         * @throws Exception
         */
        @Override
        public void handleStateChanged(KeeperState state) throws Exception {
            ZkclientZookeeperClient.this.state = state;
            // 如果狀態變為了斷開連線
            if (state == KeeperState.Disconnected) {
                // 則修改狀態
                stateChanged(StateListener.DISCONNECTED);
            } else if (state == KeeperState.SyncConnected) {
                stateChanged(StateListener.CONNECTED);
            }
        }

        @Override
        public void handleNewSession() throws Exception {
            // 狀態變為重連
            stateChanged(StateListener.RECONNECTED);
        }
    });
    // 啟動客戶端
    client.start();
}
複製程式碼

該方法是構造方法,同時在裡面也做了建立客戶端和啟動客戶端的操作。其他方法都是實現了父類抽象的方法,並且呼叫的是client方法,為舉個例子:

@Override
public void createPersistent(String path) {
    try {
        // 遞迴建立節點
        client.createPersistent(path);
    } catch (ZkNodeExistsException e) {
    }
}
複製程式碼

該方法是遞迴場景節點,呼叫的就是client.createPersistent(path)。

(四)CuratorZookeeperClient

該類是Curator框架提供的一套高階API,簡化了ZooKeeper的操作,從而對客戶端的實現。

1.屬性

/**
 * 框架式客戶端
 */
private final CuratorFramework client;
複製程式碼

2.構造方法

public CuratorZookeeperClient(URL url) {
    super(url);
    try {
        // 工廠建立者
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                .connectString(url.getBackupAddress())
                .retryPolicy(new RetryNTimes(1,1000))
                .connectionTimeoutMs(5000);
        String authority = url.getAuthority();
        if (authority != null && authority.length() > 0) {
            builder = builder.authorization("digest",authority.getBytes());
        }
        // 建立客戶端
        client = builder.build();
        // 新增監聽器
        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework client,ConnectionState state) {
                // 如果為狀態為lost,則改變為未連線
                if (state == ConnectionState.LOST) {
                    CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                } else if (state == ConnectionState.CONNECTED) {
                    // 改變狀態為連線
                    CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                } else if (state == ConnectionState.RECONNECTED) {
                    // 改變狀態為未連線
                    CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                }
            }
        });
        // 啟動客戶端
        client.start();
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(),e);
    }
}
複製程式碼

該方法是構造方法,同樣裡面也包含了客戶端建立和啟動的邏輯。

其他的方法也一樣是實現了父類的抽象方法,舉個列子:

@Override
public void createPersistent(String path) {
    try {
        client.create().forPath(path);
    } catch (NodeExistsException e) {
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(),e);
    }
}
複製程式碼

(五)ZookeeperTransporter

@SPI("curator")
public interface ZookeeperTransporter {

    /**
     * 連線伺服器
     * @param url
     * @return
     */
    @Adaptive({Constants.CLIENT_KEY,Constants.TRANSPORTER_KEY})
    ZookeeperClient connect(URL url);

}
複製程式碼

該方法是zookeeper的資訊交換介面。同樣也是一個可擴充套件介面,預設實現CuratorZookeeperTransporter類。

(六)ZkclientZookeeperTransporter

public class ZkclientZookeeperTransporter implements ZookeeperTransporter {

    @Override
    public ZookeeperClient connect(URL url) {
        // 新建ZkclientZookeeperClient例項
        return new ZkclientZookeeperClient(url);
    }

}
複製程式碼

該類實現了ZookeeperTransporter,其中就是建立了ZkclientZookeeperClient例項。

(七)CuratorZookeeperTransporter

public class CuratorZookeeperTransporter implements ZookeeperTransporter {

    @Override
    public ZookeeperClient connect(URL url) {
        // 建立CuratorZookeeperClient例項
        return new CuratorZookeeperClient(url);
    }

}
複製程式碼

該介面實現了ZookeeperTransporter,是ZookeeperTransporter預設的實現類,同樣也是建立了;對應的CuratorZookeeperClient例項。

(八)ZkClientWrapper

該類是zk客戶端的包裝類。

1.屬性

/**
 * 超時事件
 */
private long timeout;
/**
 * zk客戶端
 */
private ZkClient client;
/**
 * 客戶端狀態
 */
private volatile KeeperState state;
/**
 * 客戶端執行緒
 */
private ListenableFutureTask<ZkClient> listenableFutureTask;
/**
 * 是否開始
 */
private volatile boolean started = false;
複製程式碼

2.構造方法

public ZkClientWrapper(final String serverAddr,long timeout) {
    this.timeout = timeout;
    listenableFutureTask = ListenableFutureTask.create(new Callable<ZkClient>() {
        @Override
        public ZkClient call() throws Exception {
            // 建立zk客戶端
            return new ZkClient(serverAddr,Integer.MAX_VALUE);
        }
    });
}

複製程式碼

設定了超時時間和客戶端執行緒。

3.start

public void start() {
    // 如果客戶端沒有開啟
    if (!started) {
        // 建立連線執行緒
        Thread connectThread = new Thread(listenableFutureTask);
        connectThread.setName("DubboZkclientConnector");
        connectThread.setDaemon(true);
        // 開啟執行緒
        connectThread.start();
        try {
            // 獲得zk客戶端
            client = listenableFutureTask.get(timeout,TimeUnit.MILLISECONDS);
        } catch (Throwable t) {
            logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!",t);
        }
        started = true;
    } else {
        logger.warn("Zkclient has already been started!");
    }
}

複製程式碼

該方法是客戶端啟動方法。

4.addListener

public void addListener(final IZkStateListener listener) {
    // 增加監聽器
    listenableFutureTask.addListener(new Runnable() {
        @Override
        public void run() {
            try {
                client = listenableFutureTask.get();
                // 增加監聽器
                client.subscribeStateChanges(listener);
            } catch (InterruptedException e) {
                logger.warn(Thread.currentThread().getName() + " was interrupted unexpectedly,which may cause unpredictable exception!");
            } catch (ExecutionException e) {
                logger.error("Got an exception when trying to create zkclient instance,can not connect to zookeeper server,please check!",e);
            }
        }
    });
}

複製程式碼

該方法是為客戶端新增監聽器。

其他方法都是對於 客戶端是否還連線的檢測,可自行檢視程式碼。

(九)ChildListener

public interface ChildListener {

    /**
     * 子節點修改
     * @param path
     * @param children
     */
    void childChanged(String path,List<String> children);

}

複製程式碼

該介面是子節點的監聽器,當子節點變化的時候會用到。

(十)StateListener

public interface StateListener {

    int DISCONNECTED = 0;

    int CONNECTED = 1;

    int RECONNECTED = 2;

    /**
     * 狀態修改
     * @param connected
     */
    void stateChanged(int connected);

}

複製程式碼

該介面是狀態監聽器,其中定義了一個狀態更改的方法以及三種狀態。

後記

該部分相關的原始碼解析地址:github.com/CrazyHZM/in…

該文章講解了基於zookeeper的來實現的遠端通訊、介紹dubbo-remoting-zookeeper內的原始碼解析,關鍵需要對zookeeper有所瞭解。該篇之後,遠端通訊的原始碼解析就先到這裡了,其實大家會發現,如果能夠對講解api系列的文章瞭解透了,那麼後面的文章九很簡單,就好像軌道鋪好,可以直接順著軌道往後,根本沒有阻礙。接下來我將開始對rpc模組進行講解。