1. 程式人生 > >rpc系列-ZooKeeper

rpc系列-ZooKeeper

一.簡介

Zookeeper是一個分散式協調服務,就是為使用者的分散式應用程式提供協調服務。

Zookeeper本身就是一個分散式程式(只要有半數以上節點存活,zk就能正常服務)。

Zookeeper所提供的服務涵蓋:主從協調、伺服器節點動態上下線、統一配置管理、分散式共享鎖、統一名稱服務……

雖然說可以提供各種服務,但是zookeeper在底層其實只提供了兩個功能:

管理(儲存,讀取)使用者程式提交的資料;

併為使用者程式提供資料節點監聽服務;

Zookeeper叢集的角色:  Leader 和  follower  (Observer)

只要叢集中有半數以上節點存活,叢集就能提供服務

二.結構

特性

1.Zookeeper:一個leader,多個follower組成的叢集

2.全域性資料一致:每個server儲存一份相同的資料副本,client無論連線到哪個server,資料都是一致的

3.分散式讀寫,更新請求轉發,由leader實施

4.更新請求順序進行,來自同一個client的更新請求按其傳送順序依次執行

5.資料更新原子性,一次資料更新要麼成功,要麼失敗

6.實時性,在一定時間範圍內,client能讀到最新資料

結構

1.層次化的目錄結構,命名符合常規檔案系統規範(見下圖)

2.每個節點在zookeeper中叫做znode,並且其有一個唯一的路徑標識

3.節點Znode可以包含資料和子節點(但是EPHEMERAL型別的節點不能有子節點)

4.客戶端應用可以在節點上設定監視器

節點型別

1.Znode有兩種型別:

短暫(ephemeral)(斷開連線自己刪除)

持久(persistent)(斷開連線不刪除)

2.Znode有四種形式的目錄節點(預設是persistent )

PERSISTENT

PERSISTENT_SEQUENTIAL(持久序列/test0000000019 )

EPHEMERAL

EPHEMERAL_SEQUENTIAL

3.建立znode時設定順序標識,znode名稱後會附加一個值,順序號是一個單調遞增的計數器,由父節點維護。

4.在分散式系統中,順序號可以被用於為所有的事件進行全域性排序,這樣客戶端可以通過順序號推斷事件的順序。

三.原理

zookeeper的選舉機制(叢集paxos)

以一個簡單的例子來說明整個選舉的過程. 假設有五臺伺服器組成的zookeeper叢集,它們的id從1-5,同時它們都是最新啟動的,也就是沒有歷史資料,在存放資料量這一點上,都是一樣的.假設這些伺服器依序啟動,來看看會發生什麼. 1. 伺服器1啟動,此時只有它一臺伺服器啟動了,它發出去的報沒有任何響應,所以它的選舉狀態一直是LOOKING狀態。

2.伺服器2啟動,它與最開始啟動的伺服器1進行通訊,互相交換自己的選舉結果,由於兩者都沒有歷史資料,所以id值較大的伺服器2勝出,但是由於沒有達到超過半數以上的伺服器都同意選舉它(這個例子中的半數以上是3),所以伺服器1,2還是繼續保持LOOKING狀態。

3.伺服器3啟動,根據前面的理論分析,伺服器3成為伺服器1,2,3中的老大,而與上面不同的是,此時有三臺伺服器選舉了它,所以它成為了這次選舉的leader。

4. 伺服器4啟動,根據前面的分析,理論上伺服器4應該是伺服器1,2,3,4中最大的,但是由於前面已經有半數以上的伺服器選舉了伺服器3,所以它只能接收當小弟的命了。 5.伺服器5啟動,同4一樣,當小弟.

非全新叢集的選舉機制(資料恢復)

那麼,初始化的時候,是按照上述的說明進行選舉的,但是當zookeeper運行了一段時間之後,有機器down掉,重新選舉時,選舉過程就相對複雜了。

需要加入資料id、leader id和邏輯時鐘。

資料id:資料新的id就大,資料每次更新都會更新id。

Leader id:就是我們配置的myid中的值,每個機器一個。

邏輯時鐘:這個值從0開始遞增,每次選舉對應一個值,也就是說:  如果在同一次選舉中,那麼這個值應該是一致的 ;  邏輯時鐘值越大,說明這一次選舉leader的程序更新。

選舉的標準就變成:

1.邏輯時鐘小的選舉結果被忽略,重新投票

2.統一邏輯時鐘後,資料id大的勝出

3.資料id相同的情況下,leader id大的勝出

根據這個規則選出leader。

四.示例

Zookeeper的監聽器工作機制

監聽器是一個介面,我們的程式碼中可以實現Wather這個介面,實現其中的process方法,方法中即我們自己的業務邏輯

監聽器的註冊是在獲取資料的操作中實現:

getData(path,watch)監聽的事件是:節點資料變化事件

getChildren(path,watch)監聽的事件是:節點下的子節點增減變化事件

服務端

public class DistributedServer {
    private static final String host = "localhost:2181";
    private static final int sessionTimeout = 2000;
    private static final String parentNode = "/servers/";

    private ZooKeeper zk = null;

    /**
     * 建立到zk的客戶端連線
     *
     * @throws Exception
     */
    public void getConnect() throws Exception {

        zk = new ZooKeeper(host, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 收到事件通知後的回撥函式
                System.out.println(event.getType() + "__" + event.getPath());
                try {
                    zk.getChildren("/", true);
                } catch (Exception e) {
                }
            }
        });

    }

    /**
     * 向zk叢集註冊伺服器資訊
     * ZooDefs.Ids.OPEN_ACL_UNSAFE   預設匿名許可權,許可權scheme id:'world,'anyone,許可權位:31(adcwr)
     * ZooDefs.Ids.READ_ACL_UNSAFE   只讀許可權,許可權scheme id:'world,'anyone,許可權位:1(r)
     *
     * CreateMode
     * 節點型別,型別定義在列舉CreateMode中:
     * (1)PERSISTENT:持久;
     * (2)PERSISTENT_SEQUENTIAL:持久順序;
     * (3)EPHEMERAL:臨時;
     * (4)EPHEMERAL_SEQUENTIAL:臨時順序。
     * @param data  建立節點初始化內容
     * @throws Exception
     */
    public void registerServer(String data) throws Exception {
        String create = zk.create(parentNode + "test", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(data + " 註冊節點 " + create);
    }

    /**
     * 業務功能
     *
     * @throws InterruptedException
     */
    public void handleBussiness(String data) throws InterruptedException {
        System.out.println(data + "開始handleBussiness");
        Thread.sleep(Long.MAX_VALUE);
    }

    public static void main(String[] args) throws Exception {

        // 獲取zk連線
        DistributedServer server = new DistributedServer();
        server.getConnect();

        // 利用zk連線註冊伺服器資訊
        server.registerServer("test01");

        // 啟動業務功能
        server.handleBussiness("test01");

    }

}

分散式鎖

/**
 *
 *分散式鎖:幾種實現方式,示例用的是用臨時順序節點實現共享鎖的一般做法
 *
 * 邏輯
 * 1.zk上註冊一個"臨時+序號"的znode,並監聽父節點
 * 2.獲取父節點下所有程式子節點,比較序號大小
 * 3.序號最小的獲取到"鎖",去訪問資源,訪問完後,刪除自己的節點,釋放鎖,重新註冊一個新的子節點
 * 4.其他程式節點會收到事件通知,可以去zk上獲取鎖
 */
public class DistributedClientLock {

    // 會話超時
    private static final int SESSION_TIMEOUT = 2000;
    // zookeeper叢集地址
    private String hosts = "localhost:2181";
    private String groupNode = "servers";
    private String subNode = "test";
    private boolean haveLock = false;

    private ZooKeeper zk;
    /**
     * 記錄自己建立的子節點路徑
     * volatile 不是執行緒安全的,具有可見性,在一個子記憶體操作完後,立即重新整理回到主記憶體。
     * 如果不加Volatile,每次呼叫thisPath,會有副本,修改會有延遲,比如其它執行緒搶到沒有修改完的資料,就在新的執行緒繼續執行,造成最後資料值有誤
     * 比如:一個執行緒寫,其它執行緒去讀的時候,用的Volatile,比如監聽新節點插入。
     */
    private volatile String thisPath;

    /**
     * 連線zookeeper
     */
    public void connectZookeeper() throws Exception {
        zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
            public void process(WatchedEvent event) {
                try {
                    System.out.println(event.getType()+"____"+event.getPath());

                    /**
                     * 判斷事件型別,此處只處理子節點變化事件
                     *                          event For “/path”                           event For “/path/child”
                     * create(“/path”)          EventType.NodeCreated                       無
                     * delete(“/path”)          EventType.NodeDeleted       	            無
                     * setData(“/path”)         EventType.NodeDataChanged                   無
                     * create(“/path/child”)    EventType.NodeChildrenChanged(getChild)     EventType.NodeCreated
                     * delete(“/path/child”)    EventType.NodeChildrenChanged(getChild)     EventType.NodeDeleted
                     * setData(“/path/child”)   無                                           EventType.NodeDataChanged
                     */
                    if (event.getType() == Event.EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {
                        //獲取子節點,並對父節點進行監聽
                        List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
                        String thisNode = thisPath.substring(("/" + groupNode + "/").length());
                        // 去比較是否自己是最小id
                        Collections.sort(childrenNodes);
                        if (childrenNodes.indexOf(thisNode) == 0) {
                            //訪問共享資源處理業務,並且在處理完成之後刪除鎖
                            doSomething();

                            //重新註冊一把新的鎖
                            thisPath = zk.create("/" + groupNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                                    CreateMode.EPHEMERAL_SEQUENTIAL);
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 程式一進來就先註冊一把鎖到zk上
        thisPath = zk.create("/" + groupNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);

        // wait一小會,便於觀察
        Thread.sleep(new Random().nextInt(1000));

        // 從zk的鎖父目錄下,獲取所有子節點,並且註冊對父節點的監聽
        List<String> childrenNodes = zk.getChildren("/" + groupNode, true);

        //如果爭搶資源的程式就只有自己,則可以直接去訪問共享資源
        if (childrenNodes.size() == 1) {
            doSomething();
            thisPath = zk.create("/" + groupNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
        }
    }

    /**
     * 處理業務邏輯,並且在最後釋放鎖
     */
    private void doSomething() throws Exception {
        try {
            System.out.println("鎖: " + thisPath);
            Thread.sleep(2000);

        } finally {
            System.out.println("完成: " + thisPath);
            //刪除當前節點
            zk.delete(this.thisPath, -1);
        }
    }

    public static void main(String[] args) throws Exception {
        DistributedClientLock dl = new DistributedClientLock();
        dl.connectZookeeper();
        Thread.sleep(Long.MAX_VALUE);
    }
}