1. 程式人生 > >Zookeeper簡介與使用

Zookeeper簡介與使用

lock 投票 host err set tab 不同 3.1.1 臨時

1. Zookeeper概念簡介:

Zookeeper是一個分布式協調服務;就是為用戶的分布式應用程序提供協調服務

A、zookeeper是為別的分布式程序服務的

B、Zookeeper本身就是一個分布式程序(只要有半數以上節點存活,zk就能正常服務)

C、Zookeeper所提供的服務涵蓋:主從協調、服務器節點動態上下線、統一配置管理、分布式共享鎖、統一名稱服務……

D、雖然說可以提供各種服務,但是zookeeper在底層其實只提供了兩個功能:

管理(存儲,讀取)用戶程序提交的數據;

並為用戶程序提供數據節點監聽服務;

Zookeeper常用應用場景:

《見圖》

技術分享圖片

Zookeeper集群的角色: Leader 和 follower (Observer)

只要集群中有半數以上節點存活,集群就能提供服務

2. zookeeper集群機制

半數機制:集群中半數以上機器存活,集群可用。

zookeeper適合裝在奇數臺機器上!!!

3. 安裝

3.1. 安裝

3.1.1. 機器部署

安裝到3臺虛擬機上

安裝好JDK

3.1.2. 上傳

上傳用工具。(alt +p)

3.1.3. 解壓

su – hadoop(切換到hadoop用戶)

tar -zxvf zookeeper-3.4.5.tar.gz(解壓)

3.1.4. 重命名

mv zookeeper-3.4.5 zookeeper(重命名文件夾zookeeper-3.4.5為zookeeper)

3.1.5. 修改環境變量

1、su – root(切換用戶到root)

2、vi /etc/profile(修改文件)

3、添加內容:

export ZOOKEEPER_HOME=/home/hadoop/zookeeper

export PATH=$PATH:$ZOOKEEPER_HOME/bin

4、重新編譯文件:

source /etc/profile

5、註意:3臺zookeeper都需要修改

6、修改完成後切換回hadoop用戶:

su - hadoop

3.1.6. 修改配置文件

1、用hadoop用戶操作

cd zookeeper/conf

cp zoo_sample.cfg zoo.cfg

2、vi zoo.cfg

3、添加內容:

dataDir=/home/hadoop/zookeeper/data

dataLogDir=/home/hadoop/zookeeper/log

server.1=slave1:2888:3888 (主機名, 心跳端口、數據端口)

server.2=slave2:2888:3888

server.3=slave3:2888:3888

4、創建文件夾:

cd /home/hadoop/zookeeper/

mkdir -m 755 data

mkdir -m 755 log

5、在data文件夾下新建myid文件,myid的文件內容為:

cd data

vi myid

添加內容:

1

3.1.7. 將集群下發到其他機器上

scp -r /home/hadoop/zookeeper hadoop@slave2:/home/hadoop/

scp -r /home/hadoop/zookeeper hadoop@slave3:/home/hadoop/

3.1.8. 修改其他機器的配置文件

到slave2上:修改myid為:2

到slave3上:修改myid為:3

3.1.9. 啟動(每臺機器)

zkServer.sh start

3.1.10. 查看集群狀態

1、 jps(查看進程)

2、 zkServer.sh status(查看集群狀態,主從信息)

4. zookeeper結構和命令

4.1. zookeeper特性

1、Zookeeper:一個leader,多個follower組成的集群

2、全局數據一致:每個server保存一份相同的數據副本,client無論連接到哪個server,數據都是一致的

3、分布式讀寫,更新請求轉發,由leader實施

4、更新請求順序進行,來自同一個client的更新請求按其發送順序依次執行

5、數據更新原子性,一次數據更新要麽成功,要麽失敗

6、實時性,在一定時間範圍內,client能讀到最新數據

4.2. zookeeper數據結構

1、層次化的目錄結構,命名符合常規文件系統規範(見下圖)

2、每個節點在zookeeper中叫做znode,並且其有一個唯一的路徑標識

3、節點Znode可以包含數據和子節點(但是EPHEMERAL類型的節點不能有子節點,下一頁詳細講解)

4、客戶端應用可以在節點上設置監視器(後續詳細講解)

4.3. 數據結構的圖

技術分享圖片

4.4. 節點類型

1、Znode有兩種類型:

短暫(ephemeral)(斷開連接自己刪除)

持久(persistent)(斷開連接不刪除)

2、Znode有四種形式的目錄節點(默認是persistent )

PERSISTENT

PERSISTENT_SEQUENTIAL(持久序列/test0000000019 )

EPHEMERAL

EPHEMERAL_SEQUENTIAL

3、創建znode時設置順序標識,znode名稱後會附加一個值,順序號是一個單調遞增的計數器,由父節點維護

4、在分布式系統中,順序號可以被用於為所有的事件進行全局排序,這樣客戶端可以通過順序號推斷事件的順序

4.5. zookeeper命令行操作

運行 zkCli.sh –server <ip>進入命令行工具

1、使用 ls 命令來查看當前 ZooKeeper 中所包含的內容:

[zk: 202.115.36.251:2181(CONNECTED) 1] ls /

2、創建一個新的 znode ,使用 create /zk myData 。這個命令創建了一個新的 znode 節點“ zk ”以及與它關聯的字符串:

[zk: 202.115.36.251:2181(CONNECTED) 2] create /zk "myData“

3、我們運行 get 命令來確認 znode 是否包含我們所創建的字符串:

[zk: 202.115.36.251:2181(CONNECTED) 3] get /zk

#監聽這個節點的變化,當另外一個客戶端改變/zk時,它會打出下面的

#WATCHER::

#WatchedEvent state:SyncConnected type:NodeDataChanged path:/zk

[zk: localhost:2181(CONNECTED) 4] get /zk watch

4、下面我們通過 set 命令來對 zk 所關聯的字符串進行設置:

[zk: 202.115.36.251:2181(CONNECTED) 4] set /zk "zsl“

5、下面我們將剛才創建的 znode 刪除:

[zk: 202.115.36.251:2181(CONNECTED) 5] delete /zk

6、刪除節點:rmr

[zk: 202.115.36.251:2181(CONNECTED) 5] rmr /zk

4.6. zookeeper-api應用

4.6.1. 基本使用

org.apache.zookeeper.Zookeeper是客戶端入口主類,負責建立與server的會話

它提供了表 1 所示幾類主要方法 :

功能

描述

create

在本地目錄樹中創建一個節點

delete

刪除一個節點

exists

測試本地是否存在目標節點

get/set data

從目標節點上讀取 / 寫數據

get/set ACL

獲取 / 設置目標節點訪問控制列表信息

get children

檢索一個子節點上的列表

sync

等待要被傳送的數據

表 1 : ZooKeeper API 描述

4.6.2. demo增刪改查

public class SimpleDemo {

// 會話超時時間,設置為與系統默認時間一致

private static final int SESSION_TIMEOUT = 30000;

// 創建 ZooKeeper 實例

ZooKeeper zk;

// 創建 Watcher 實例

Watcher wh = new Watcher() {

public void process(org.apache.zookeeper.WatchedEvent event)

{

System.out.println(event.toString());

}

};

// 初始化 ZooKeeper 實例

private void createZKInstance() throws IOException

{

zk = new ZooKeeper("weekend01:2181", SimpleDemo.SESSION_TIMEOUT, this.wh);

}

private void ZKOperations() throws IOException, InterruptedException, KeeperException

{

System.out.println("/n1. 創建 ZooKeeper 節點 (znode : zoo2, 數據: myData2 ,權限: OPEN_ACL_UNSAFE ,節點類型: Persistent");

zk.create("/zoo2", "myData2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

System.out.println("/n2. 查看是否創建成功: ");

System.out.println(new String(zk.getData("/zoo2", false, null)));

System.out.println("/n3. 修改節點數據 ");

zk.setData("/zoo2", "shenlan211314".getBytes(), -1);

System.out.println("/n4. 查看是否修改成功: ");

System.out.println(new String(zk.getData("/zoo2", false, null)));

System.out.println("/n5. 刪除節點 ");

zk.delete("/zoo2", -1);

System.out.println("/n6. 查看節點是否被刪除: ");

System.out.println(" 節點狀態: [" + zk.exists("/zoo2", false) + "]");

}

private void ZKClose() throws InterruptedException

{

zk.close();

}

public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

SimpleDemo dm = new SimpleDemo();

dm.createZKInstance();

dm.ZKOperations();

dm.ZKClose();

}

}

Zookeeper的監聽器工作機制

技術分享圖片

監聽器是一個接口,我們的代碼中可以實現Wather這個接口,實現其中的process方法,方法中即我們自己的業務邏輯

監聽器的註冊是在獲取數據的操作中實現:

getData(path,watch?)監聽的事件是:節點數據變化事件

getChildren(path,watch?)監聽的事件是:節點下的子節點增減變化事件

4.7. zookeeper應用案例(分布式應用HA||分布式鎖)

技術分享圖片

技術分享圖片

技術分享圖片

3.7.1 實現分布式應用的(主節點HA)及客戶端動態更新主節點狀態

某分布式系統中,主節點可以有多臺,可以動態上下線

任意一臺客戶端都能實時感知到主節點服務器的上下線

A、客戶端實現

public class AppClient {

private String groupNode = "sgroup";

private ZooKeeper zk;

private Stat stat = new Stat();

private volatile List<String> serverList;

/**

* 連接zookeeper

*/

public void connectZookeeper() throws Exception {

zk

= new ZooKeeper("localhost:4180,localhost:4181,localhost:4182", 5000, new Watcher() {

public void process(WatchedEvent event) {

// 如果發生了"/sgroup"節點下的子節點變化事件, 更新server列表, 並重新註冊監聽

if (event.getType() == EventType.NodeChildrenChanged

&& ("/" + groupNode).equals(event.getPath())) {

try {

updateServerList();

} catch (Exception e) {

e.printStackTrace();

}

}

}

});

updateServerList();

}

/**

* 更新server列表

*/

private void updateServerList() throws Exception {

List<String> newServerList = new ArrayList<String>();

// 獲取並監聽groupNode的子節點變化

// watch參數為true, 表示監聽子節點變化事件.

// 每次都需要重新註冊監聽, 因為一次註冊, 只能監聽一次事件, 如果還想繼續保持監聽, 必須重新註冊

List<String> subList = zk.getChildren("/" + groupNode, true);

for (String subNode : subList) {

// 獲取每個子節點下關聯的server地址

byte[] data = zk.getData("/" + groupNode + "/" + subNode, false, stat);

newServerList.add(new String(data, "utf-8"));

}

// 替換server列表

serverList = newServerList;

System.out.println("server list updated: " + serverList);

}

/**

* client的工作邏輯寫在這個方法中

* 此處不做任何處理, 只讓client sleep

*/

public void handle() throws InterruptedException {

Thread.sleep(Long.MAX_VALUE);

}

public static void main(String[] args) throws Exception {

AppClient ac = new AppClient();

ac.connectZookeeper();

ac.handle();

}

}

  B、服務器端實現

public class AppServer {

private String groupNode = "sgroup";

private String subNode = "sub";

/**

* 連接zookeeper

* @param address server的地址

*/

public void connectZookeeper(String address) throws Exception {

ZooKeeper zk = new ZooKeeper(

"localhost:4180,localhost:4181,localhost:4182",

5000, new Watcher() {

public void process(WatchedEvent event) {

// 不做處理

}

});

// 在"/sgroup"下創建子節點

// 子節點的類型設置為EPHEMERAL_SEQUENTIAL, 表明這是一個臨時節點, 且在子節點的名稱後面加上一串數字後綴

// 將server的地址數據關聯到新創建的子節點上

String createdPath = zk.create("/" + groupNode + "/" + subNode, address.getBytes("utf-8"),

Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

System.out.println("create: " + createdPath);

}

/**

* server的工作邏輯寫在這個方法中

* 此處不做任何處理, 只讓server sleep

*/

public void handle() throws InterruptedException {

Thread.sleep(Long.MAX_VALUE);

}

public static void main(String[] args) throws Exception {

// 在參數中指定server的地址

if (args.length == 0) {

System.err.println("The first argument must be server address");

System.exit(1);

}

AppServer as = new AppServer();

as.connectZookeeper(args[0]);

as.handle();

}

}

  3.7.2分布式共享鎖的簡單實現

  ü 客戶端A

public class DistributedClient {

// 超時時間

private static final int SESSION_TIMEOUT = 5000;

// zookeeper server列表

private String hosts = "localhost:4180,localhost:4181,localhost:4182";

private String groupNode = "locks";

private String subNode = "sub";

private ZooKeeper zk;

// 當前client創建的子節點

private String thisPath;

// 當前client等待的子節點

private String waitPath;

private CountDownLatch latch = new CountDownLatch(1);

/**

* 連接zookeeper

*/

public void connectZookeeper() throws Exception {

zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {

public void process(WatchedEvent event) {

try {

// 連接建立時, 打開latch, 喚醒wait在該latch上的線程

if (event.getState() == KeeperState.SyncConnected) {

latch.countDown();

}

// 發生了waitPath的刪除事件

if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {

doSomething();

}

} catch (Exception e) {

e.printStackTrace();

}

}

});

// 等待連接建立

latch.await();

// 創建子節點

thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,

CreateMode.EPHEMERAL_SEQUENTIAL);

// wait一小會, 讓結果更清晰一些

Thread.sleep(10);

// 註意, 沒有必要監聽"/locks"的子節點的變化情況

List<String> childrenNodes = zk.getChildren("/" + groupNode, false);

// 列表中只有一個子節點, 那肯定就是thisPath, 說明client獲得鎖

if (childrenNodes.size() == 1) {

doSomething();

} else {

String thisNode = thisPath.substring(("/" + groupNode + "/").length());

// 排序

Collections.sort(childrenNodes);

int index = childrenNodes.indexOf(thisNode);

if (index == -1) {

// never happened

} else if (index == 0) {

// inddx == 0, 說明thisNode在列表中最小, 當前client獲得鎖

doSomething();

} else {

// 獲得排名比thisPath前1位的節點

this.waitPath = "/" + groupNode + "/" + childrenNodes.get(index - 1);

// 在waitPath上註冊監聽器, 當waitPath被刪除時, zookeeper會回調監聽器的process方法

zk.getData(waitPath, true, new Stat());

}

}

}

private void doSomething() throws Exception {

try {

System.out.println("gain lock: " + thisPath);

Thread.sleep(2000);

// do something

} finally {

System.out.println("finished: " + thisPath);

// 將thisPath刪除, 監聽thisPath的client將獲得通知

// 相當於釋放鎖

zk.delete(this.thisPath, -1);

}

}

public static void main(String[] args) throws Exception {

for (int i = 0; i < 10; i++) {

new Thread() {

public void run() {

try {

DistributedClient dl = new DistributedClient();

dl.connectZookeeper();

} catch (Exception e) {

e.printStackTrace();

}

}

}.start();

}

Thread.sleep(Long.MAX_VALUE);

}

}

  ü 分布式多進程模式實現:

public class DistributedClientMy {

// 超時時間

private static final int SESSION_TIMEOUT = 5000;

// zookeeper server列表

private String hosts = "spark01:2181,spark02:2181,spark03:2181";

private String groupNode = "locks";

private String subNode = "sub";

private boolean haveLock = false;

private ZooKeeper zk;

// 當前client創建的子節點

private volatile String thisPath;

/**

* 連接zookeeper

*/

public void connectZookeeper() throws Exception {

zk = new ZooKeeper("spark01:2181", SESSION_TIMEOUT, new Watcher() {

public void process(WatchedEvent event) {

try {

// 子節點發生變化

if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {

// thisPath是否是列表中的最小節點

List<String> childrenNodes = zk.getChildren("/" + groupNode, true);

String thisNode = thisPath.substring(("/" + groupNode + "/").length());

// 排序

Collections.sort(childrenNodes);

if (childrenNodes.indexOf(thisNode) == 0) {

doSomething();

thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,

CreateMode.EPHEMERAL_SEQUENTIAL);

}

}

} catch (Exception e) {

e.printStackTrace();

}

}

});

// 創建子節點

thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,

CreateMode.EPHEMERAL_SEQUENTIAL);

// wait一小會, 讓結果更清晰一些

Thread.sleep(new Random().nextInt(1000));

// 監聽子節點的變化

List<String> childrenNodes = zk.getChildren("/" + groupNode, true);

// 列表中只有一個子節點, 那肯定就是thisPath, 說明client獲得鎖

if (childrenNodes.size() == 1) {

doSomething();

thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,

CreateMode.EPHEMERAL_SEQUENTIAL);

}

}

/**

* 共享資源的訪問邏輯寫在這個方法中

*/

private void doSomething() throws Exception {

try {

System.out.println("gain lock: " + thisPath);

Thread.sleep(2000);

// do something

} finally {

System.out.println("finished: " + thisPath);

// 將thisPath刪除, 監聽thisPath的client將獲得通知

// 相當於釋放鎖

zk.delete(this.thisPath, -1);

}

}

public static void main(String[] args) throws Exception {

DistributedClientMy dl = new DistributedClientMy();

dl.connectZookeeper();

Thread.sleep(Long.MAX_VALUE);

}

}


5. zookeeper原理

Zookeeper雖然在配置文件中並沒有指定master和slave

但是,zookeeper工作時,是有一個節點為leader,其他則為follower

Leader是通過內部的選舉機制臨時產生的

5.1. zookeeper的選舉機制(全新集群paxos)

以一個簡單的例子來說明整個選舉的過程.
假設有五臺服務器組成的zookeeper集群,它們的id從1-5,同時它們都是最新啟動的,也就是沒有歷史數據,在存放數據量這一點上,都是一樣的.假設這些服務器依序啟動,來看看會發生什麽.
1) 服務器1啟動,此時只有它一臺服務器啟動了,它發出去的報沒有任何響應,所以它的選舉狀態一直是LOOKING狀態
2) 服務器2啟動,它與最開始啟動的服務器1進行通信,互相交換自己的選舉結果,由於兩者都沒有歷史數據,所以id值較大的服務器2勝出,但是由於沒有達到超過半數以上的服務器都同意選舉它(這個例子中的半數以上是3),所以服務器1,2還是繼續保持LOOKING狀態.
3) 服務器3啟動,根據前面的理論分析,服務器3成為服務器1,2,3中的老大,而與上面不同的是,此時有三臺服務器選舉了它,所以它成為了這次選舉的leader.
4) 服務器4啟動,根據前面的分析,理論上服務器4應該是服務器1,2,3,4中最大的,但是由於前面已經有半數以上的服務器選舉了服務器3,所以它只能接收當小弟的命了.
5) 服務器5啟動,同4一樣,當小弟.

5.2. 非全新集群的選舉機制(數據恢復)

那麽,初始化的時候,是按照上述的說明進行選舉的,但是當zookeeper運行了一段時間之後,有機器down掉,重新選舉時,選舉過程就相對復雜了。

需要加入數據id、leader id和邏輯時鐘。

數據id:數據新的id就大,數據每次更新都會更新id。

Leader id:就是我們配置的myid中的值,每個機器一個。

邏輯時鐘:這個值從0開始遞增,每次選舉對應一個值,也就是說: 如果在同一次選舉中,那麽這個值應該是一致的 ; 邏輯時鐘值越大,說明這一次選舉leader的進程更新.

選舉的標準就變成:

1、邏輯時鐘小的選舉結果被忽略,重新投票

2、統一邏輯時鐘後,數據id大的勝出

3、數據id相同的情況下,leader id大的勝出

根據這個規則選出leader。

Zookeeper簡介與使用