1. 程式人生 > >大資料 ZooKeeper

大資料 ZooKeeper

前言

ZooKeeper是一個為分散式應用所設計的開源協調服務。

介紹

ZooKeeper是一個為分散式應用所設計的開源協調服務。它主要為使用者提供同步、配置管理、分組和命名等服務,減輕分散式應用程式所承擔的協調任務。ZooKeeper

的檔案系統使用了我們所熟悉的目錄樹結構。ZooKeeper是使用Java編寫的,但是它支援JavaC兩種程式語言。

設計目標

目標 說明
簡單化 ZooKeeper允許分散式的程序通過共享體系的名稱空間來進行協調,ZooKeeper採用Znode搭建與標準檔案系統類似的名稱空間【可以參見下一章“ ZooKeeper 名稱空間”】,且該名稱空間是存放在記憶體中的,這就意味著ZooKeeper具備高吞吐量、低延遲的能力。ZooKeeper具備高效能、高可靠性以及嚴格的有序訪問。
健壯性 使用心跳來檢測伺服器的狀態。如果有伺服器失聯,就連線到其他備用伺服器上。
有序性 ZooKeeper可以為每一次更新操作賦予一個版本號,並且此版本號是全域性有序的,不存在重複的情況。
速度優勢 讀效能優於寫效能。
原子性 在名稱空間中,每一個Znode的資料將被原子地讀寫。讀操作將讀取與Znode相關的所有資料,寫操作將替換掉所有的資料。每一個結點都有一個訪問控制表,規定了使用者操作的許可權。
可靠性
時效性

ZooKeeper 名稱空間

ZooKeeper擁有一個層次的名稱空間。在名稱空間中,每個結點稱為Znode,每個Znode包含了它自身和它的子節點相關聯的資料。指向結點的路徑必須使用規範的絕對路徑表示,並且以“/”來分隔。【在ZooKeeper中不允許使用相對路徑來表示】

在這裡插入圖片描述

每個Znode維護著一個屬性結構,包含了資料的版本號dataVersion,時間戳ctime、mtime等狀態資訊。

[zk: localhost:2181(CONNECTED) 14] get /app2
aaabbb
cZxid = 0x600000004 # 建立事務編號
ctime = Thu Sep 13 21:47:07 EDT 2018 
mZxid = 0x600000004 # 修改事務編號
mtime = Thu Sep 13 21:47:07 EDT 2018
pZxid = 0x600000004 # 持久化事務編號
cversion = 0 # 建立版本
dataVersion = 0
aclVersion = 0 # 許可權版本
ephemeralOwner = 0x65d5b998e80000
dataLength = 6 # 資料長度
numChildren = 0 # 子節點個數

看不懂這些屬性,可以參見【Znode 屬性

Znode 主要特徵

特徵 說明
Watch 客戶端可以在節點上設定Watch,當節點的資料發生變化(增刪改等操作)將會出發Watch的對應的操作。【Watch有且僅會被觸發一次併發送一個通知。】
資料訪問 ZooKeeper中的每個節點上儲存的資料需要被原子性的操作。
臨時節點 ZooKeeper中的節點有兩種,分別為臨時節點和永久節點。節點的型別在建立後不能改變。ZooKeeper臨時節點的生命週期依賴於建立它們的Session。一旦Session結束,臨時節點將被自動刪除,當然也可以手動刪除。ZooKeeper的臨時節點不允許擁有子節點。相反,永久節點的生命週期不依賴於Session,並且只有在客戶端顯示執行刪除操作的時候,它們才被刪除。
順序結點 【唯一性】當建立Znode的時候,使用者可以請求在ZooKeeper的路徑結尾新增一個遞增的計數。這個計數對於此節點的父節點來說是唯一的,它的格式為“%010d”(10位數字,沒有數值的資料位用0填充,例如0000000001)。當計數值大於232-1時,計數器將會溢位。

Znode 屬性

屬性 說明
czxid 節點被建立的Zxid值。
mzxid 節點被修改的Zxid值。
ctime 節點被建立的時間。
mtime 節點被修改的時間。
version 節點被修改的版本號。
cversion 節點所擁有的子節點被修改的版本號。
aversion 節點的ACL被修改的版本號。
ephemeralOwner 如果此節點是臨時節點,那麼它的值就是這個節點擁有者的Session ID。否則,它的值為0。
numChildren 節點擁有的子節點的個數。

ZooKeeper ACL

ZooKeeper使用ACL對Znode進行訪問控制。ACL擁有三種模式:

  • user:檔案的擁有者
  • group:使用者組
  • world:

一個ACL和一個ZooKeeper節點是對應的,並且不存在繼承關係,相互獨立。

訪問控制權限所規定的許可權

許可權 許可權描述
CREATE 建立子節點。
READ 從節點獲取資料或者列出節點的所有子節點。
WRITE 設定節點的資料。
DELETE 刪除子節點。
ADMIN 可以設定許可權。

驗證模式

模式 說明
world 代表某一特定的使用者(Client)。
auth 代表任何已經通過驗證的使用者(Client)。
digest 通過使用者密碼進行驗證。
ip 通過Client IP地址進行驗證。

ZooKeeper一致性保證

一致性特點 說明
順序一致性 Client的更新順序與它們傳送的順序相一致的。
原子性 更新操作只有失敗和成功兩種結果。
單系統映象 ZooKeeper檢視在不同伺服器上都一致。
可靠性 一旦一個更新操作被應用,那麼在更新之前,其值都不會改變。
實時性 在特定的一段時間內,客戶端看到的系統需要被保證是實時的(在十幾秒的時間裡)。在此時間段內,任何系統的改變將被客戶端看到,或者被客戶端偵測到。

ZooKeeper Leader選舉

搭建環境

  • 環境:VMWare下的四個Centos7虛擬機器。

首先,當然是先下載環境啦~點選跳轉到下載頁面

  • 修改主機名:(也可以不修改)
    • vim /etc/sysconfig/network
    • 增加:HOSTNAME=hadoop
  • 下載:wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
  • 解壓:tar -zxvf zookeeper-3.4.10.tar.gz
  • 移動到/opt目錄下:
    • sudo mv zookeeper-3.4.10 /opt
    • cd /opt
    • sudo mv zookeeper-3.4.10 zookeeper
  • 修改配置檔案:
    • cd conf
    • cp zoo_sample.cfg zoo.cfg
    • vim zoo.cfg
    • 修改dataDir目錄:
      • mkdir /opt/zookeeper/zooData
      • 修改dataDir=/tmp/zookeeper/opt/zookeeper/zooData
    • 增加Log輸出目錄:
      • mkdir /opt/zookeeper/zooLog
      • dataLogDir=/opt/zookeeper/zooLog
    • 增加服務端:
      • 增加一個IP地址別名,後面配置使用。
        • vim /ect/hosts
        • 說明:IP地址 別名
        • 192.168.80.8 node0
        • 192.168.80.9 node1
        • 192.168.80.10 node2
        • 192.168.80.11 node3
      • 說明:等號後引數分別為:主機名、心跳埠、資料埠
        • server.1=node1:2888:3888
        • server.2=node2:2888:3888
        • server.3=node3:2888:3888
    • 配置myid
      • 在每個服務端都需要這樣子配,但是myid不允許相同
      • myid就是上述的server.數字的數字,即server.1中的1就是myid的值。
      • echo 0 > /opt/zookeeper/zooData/myid

免密登入訪問

  • 配置SSH免密登入訪問。
    • 生成公鑰祕鑰:ssh-keygen,一路回車即可。
    • 進入~/.ssh資料夾,建立一個touch authorized_keys檔案,將需要SSH免登入的服務端的公鑰id_rsa.pub中的內容複製到authorized_keys中即可。

分發Zookeeper

  • 將配置好的zookeeper資料夾分發到其他的服務端上,記得修改myid,和按照上修改配置檔案部分重新修改一遍。

關閉防火牆

  • 關閉防火牆(其實也可以單獨開某幾個埠吧,為了方便,自行選擇哈~
    • Centos7
      • systemctl stop firewalld && systemctl disable firewalld
    • Centos6
      • service iptables stop

啟動zookeeper

全部配置完畢之後,就是啟動zookeeper了。
如果啟動不成功的話,可以看看zookeeper/bin/zookeeper.out下的日誌輸出。

  • 啟動
    • ./bin/zkServer.sh start
  • 檢視狀態
    • ./bin/zkServer.sh status
  • 啟動輸出如下:
[[email protected] bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
  • 啟動成功如下:
    • 下面leaderMaster伺服器,這個是選舉出來的,如果Master掛掉,會重新隨機選舉出來。
    • followerSlave伺服器。
[[email protected] bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Mode: leader 

[[email protected] bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Mode: follower

ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Mode: follower
  • 如果kill -9leader,那麼其他節點就會重新投票產生leader
  • 半數機制:叢集中半數以上機器存活,叢集可用。如果低於半數的機器存活,那麼就會出現Error contacting service. It is probably not running.

zkCli Command

使用./bin/zkCli.sh啟動命令列,使用help可以檢視幫助文件。

  • ls path [watch]
    • ls 路徑 監聽
[zk: localhost:2181(CONNECTED) 6] ls /
[zookeeper]
  • create [-s] [-e] path data acl
    • create 序號 結點型別 路徑 資料 許可權
    • 結點型別有兩種:
      • 短暫ephemeral:斷開連線就會刪除自身結點。
      • 永久persistent:斷開連線不刪除自身結點。
      • Znode有四種形式的結點(預設是persistent
        • PERSISTENT
        • PERSISTENT_SEQUENTIAL(持久序列/test0000000019 )
        • EPHEMERAL
        • EPHEMERAL_SEQUENTIAL
[zk: localhost:2181(CONNECTED) 6] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 7] create /app1 aaa
Created /app1
[zk: localhost:2181(CONNECTED) 8] ls /
[zookeeper, app1]
[zk: localhost:2181(CONNECTED) 9] create -s /app2 aaa
Created /app20000000001
[zk: localhost:2181(CONNECTED) 10] ls /
[app20000000001, zookeeper, app1]
[zk: localhost:2181(CONNECTED) 11] create -e /app2 aaabbb
Created /app2
[zk: localhost:2181(CONNECTED) 12] ls /                  
[app20000000001, zookeeper, app2, app1]
  • get path [watch],獲取資料
    • get 路徑 監聽
[zk: localhost:2181(CONNECTED) 14] get /app2
aaabbb
cZxid = 0x600000004 # 建立事務編號
ctime = Thu Sep 13 21:47:07 EDT 2018 
mZxid = 0x600000004 # 修改事務編號
mtime = Thu Sep 13 21:47:07 EDT 2018
pZxid = 0x600000004 # 持久化事務編號
cversion = 0 # 建立版本
dataVersion = 0
aclVersion = 0 # 許可權版本
ephemeralOwner = 0x65d5b998e80000
dataLength = 6 # 資料長度
numChildren = 0 # 子節點個數
  • set path data [version],設定結點資料
    • set 路徑 資料 [版本號]
# 設定資料為Hello_world
[zk: localhost:2181(CONNECTED) 15] set /app2 Hello_world 
cZxid = 0x600000004
ctime = Thu Sep 13 21:47:07 EDT 2018
mZxid = 0x600000005
mtime = Thu Sep 13 22:17:11 EDT 2018
pZxid = 0x600000004
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x65d5b998e80000
dataLength = 11
numChildren = 0
[zk: localhost:2181(CONNECTED) 16] get /app2
Hello_world # 重新設定的資料
cZxid = 0x600000004
ctime = Thu Sep 13 21:47:07 EDT 2018
mZxid = 0x600000005
mtime = Thu Sep 13 22:17:11 EDT 2018
pZxid = 0x600000004
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x65d5b998e80000
dataLength = 11
numChildren = 0
  • delete path [version]
    • delete 路徑 版本號
[zk: localhost:2181(CONNECTED) 18] delete /app2
[zk: localhost:2181(CONNECTED) 19] ls /app2
Node does not exist: /app2

監聽資料的變化

  • get path [watch]
    • get 路徑 監聽器
    • 監聽效果有且僅有一次,換句話時候,就是隻能監聽一次,一次之後就失效了,需要重新建立監聽器。

Java API使用

依賴庫

  • log4j-1.2.17.jar
  • zookeeper-3.4.6.jar

程式碼示例

import org.apache.log4j.BasicConfigurator;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.List;

public class Test1 {
    private static ZooKeeper zk = null;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        // 配置Log4J
        BasicConfigurator.configure();
        // 建立連線
        int sessionTimeout = 2000;
        // 注意:node1、node2、node3需要在/etc/hosts中配置以下內容
        // 192.168.80.9 node1
        // 192.168.80.10 node2
        // 192.168.80.11 node3
        String connectString = "node1:2181,node2:2181,node3:2181";
        zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> System.out.println("時間觸發……"));
    }

    /**
     * create方法引數
     * String path, byte[] data, List<ACL> acl, CreateMode createMode
     * path:路徑
     * data:值bytes
     * acl:對節點的訪問控制
     * createMode:節點的型別——短暫,永久,序號
     */
    @Test
    public void create() throws KeeperException, InterruptedException {
        String path = zk.create("/app3", "hello_world".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(path);
    }

    /**
     * 判斷節點是否存在
     */
    @Test
    public void exist() throws KeeperException, InterruptedException {
        String nodeName = "/app1";
        // 設定為true會呼叫zk中的監聽器
        Stat stat = zk.exists(nodeName, true);
        if (stat != null) {
            System.out.println("節點“" + nodeName + "”存在,長度為:" + stat.getDataLength());
        } else {
            System.out.println("節點" + nodeName + "不存在的。");
        }
    }

    /**
     * 獲取子節點
     */
    @Test
    public void getChildren() throws KeeperException, InterruptedException {
        List<String> list = zk.getChildren("/", true);
        for (String str : list) {
            System.out.println(str);
        }
    }

    /**
     * 獲取子節點
     * getChildren(path, watch) 監聽的事件是:節點下的子節點增減變化事件。
     */
    @Test
    public void getChildren1() throws KeeperException, InterruptedException {
        List<String> list = zk.getChildren("/", watchedEvent -> System.out.println("監控節點下的子節點被改變->" + watchedEvent.getPath()));
        for (String str : list) {
            System.out.println(str);
        }
        Thread.sleep(Long.MAX_VALUE);
    }

    /**
     * 獲取節點的內容
     * getData(path,watch) 監聽的事件是:節點資料變化事件。
     */
    @Test
    public void getData1() throws KeeperException, InterruptedException {
        byte[] bytes = zk.getData("/app1", watchedEvent -> System.out.println("資料改變了……"), null);
        System.out.println(new String(bytes));
    }

    /**
     * 獲取節點的內容
     */
    @Test
    public void getData() throws KeeperException, InterruptedException {
        byte[] bytes = zk.getData("/app1", false, null);
        System.out.println(new String(bytes));
    }

    /**
     * 修改節點的內容
     * version為-1時表示自動維護
     */
    @Test
    public void setData() throws KeeperException, InterruptedException {
        Stat stat = zk.setData("/app1", "test".getBytes(), -1);
        System.out.println(stat.toString());
    }

    /**
     * 刪除節點,非空節點刪除不掉。
     * version為-1時表示自動維護
     */
    @Test
    public void delete() throws KeeperException, InterruptedException {
        zk.delete("/app1", -1);
    }
}

附錄