1. 程式人生 > 實用技巧 >Zookeeper是什麼&怎麼用

Zookeeper是什麼&怎麼用

1.Zookeeper概述

Zookeeper 是一個開源的分散式協調服務框架 ,主要用來解決分散式叢集中應用系統的一致性問題和資料管理問題

2:Zookeeper的特點

  • Zookeeper 本質上是一個分散式檔案系統, 適合存放小檔案,也可以理解為一個資料庫

  • 在上圖左側, Zookeeper 中儲存的其實是一個又一個 Znode, Znode 是 Zookeeper 中的節點

  • Znode 是有路徑的, 例如 /data/host1 , /data/host2 , 這個路徑也可以理解為是Znode 的 Name

  • Znode 也可以攜帶資料, 例如說某個 Znode 的路徑是 /data/host1 , 其值是一個字串 "192.168.0.1"

  • 正因為 Znode 的特性, 所以 Zookeeper 可以對外提供出一個類似於檔案系統的試圖, 可以通過操作檔案系統的方式操作 Zookeeper

    • 使用路徑獲取 Znode
    • 獲取 Znode 攜帶的資料
    • 修改 Znode 攜帶的資料
    • 刪除 Znode
    • 新增 Znode

3.Zookeeper的應用場景

3.1 資料釋出/訂閱

資料釋出/訂閱系統,需要釋出者將資料釋出到Zookeeper的節點上,供訂閱者進行資料訂閱,進而達到動態獲取資料的目的,實現配置資訊的集中式管理和資料的動態更新。

釋出/訂閱一般有兩種設計模式:推模式和拉模式,服務端主動將資料更新傳送給所有訂閱的客戶端稱為推模式;客戶端主動請求獲取最新資料稱為拉模式。

Zookeeper採用了推拉相結合的模式,客戶端向服務端註冊自己需要關注的節點,一旦該節點資料發生變更,那麼服務端就會向相應的客戶端推送Watcher事件通知,客戶端接收到此通知後,主動到服務端獲取最新的資料。

3.2 命名服務

命名服務是分步實現系統中較為常見的一類場景,分散式系統中,被命名的實體通常可以是叢集中的機器、提供的服務地址或遠端物件等,通過命名服務,客戶端可以根據指定名字來獲取資源的實體,在分散式環境中,上層應用僅僅需要一個全域性唯一的名字。Zookeeper

可以實現一套分散式全域性唯一ID的分配機制。

通過呼叫Zookeeper節點建立的API介面就可以建立一個順序節點,並且在API返回值中會返回這個節點的完整名字,利用此特性,可以生成全域性ID,其步驟如下

  1. 客戶端根據任務型別,在指定型別的任務下通過呼叫介面建立一個順序節點,如"job-"。
  2. 建立完成後,會返回一個完整的節點名,如"job-00000001"。
  3. 客戶端拼接type型別和返回值後,就可以作為全域性唯一ID了,如"type2-job-00000001"。

3.3 分散式協調/通知

Zookeeper中特有的Watcher註冊於非同步通知機制,能夠很好地實現分散式環境下不同機器,甚至不同系統之間的協調與通知,從而實現對資料變更的實時處理。通常的做法是不同的客戶端都對Zookeeper上的同一個資料節點進行Watcher註冊,監聽資料節點的變化(包括

節點本身和子節點),若資料節點發生變化,那麼所有訂閱的客戶端都能夠接收到相應的Watcher通知,並作出相應處理。

在絕大多數分散式系統中,系統機器間的通訊無外乎**心跳檢測、工作進度彙報和系統排程 **。

(1)心跳檢測:不同機器間需要檢測到彼此是否在正常執行,可以使用Zookeeper實現機器間的心跳檢測,基於其臨時節點特性(臨時節點的生存週期是客戶端會話,客戶端若當即後,其臨時節點自然不再存在),可以讓不同機器都在Zookeeper的一個指定節點下建立臨時子節點,不同的機器之間可以根據這個臨時子節點來判斷對應的客戶端機器是否存活。通過Zookeeper可以大大減少系統耦合。

(2)工作進度彙報,通常任務被分發到不同機器後,需要實時地將自己的任務執行進度彙報給分發系統,可以在Zookeeper上選擇一個節點,每個任務客戶端都在這個節點下面建立臨時子節點,這樣不僅可以判斷機器是否存活,同時各個機器可以將自己的任務執行進度寫到該臨時節點中去,以便中心繫統能夠實時獲取任務的執行進度

(3)系統排程,Zookeeper能夠實現如下系統排程模式:分散式系統由控制檯和一些客戶端系統兩部分構成,控制檯的職責就是需要將一些指令資訊傳送給所有的客戶端,以控制他們進行相應的業務邏輯,後臺管理人員在控制檯上做一些操作,實際上就是修改Zookeeper上某些節點的資料,Zookeeper可以把資料變更以時間通知的形式傳送給訂閱客戶端

3.4分散式鎖

分散式鎖用於控制分散式系統之間同步訪問共享資源的一種方式,可以保證不同系統訪問一個或一組資源時的一致性,主要分為排它鎖和共享鎖。

排它鎖又稱為寫鎖或獨佔鎖 ,若事務T1對資料物件O1加上了排它鎖,那麼在整個加鎖期間,只允許事務T1對O1進行讀取和更新操作,其他任何事務都不能再對這個資料物件進行任何型別的操作,直到T1釋放了排它鎖

①獲取鎖,在需要獲取排它鎖時,所有客戶端通過呼叫介面,在/exclusive_lock節點下建立臨時子節點/exclusive_lock/lock。Zookeeper可以保證只有一個客戶端能夠建立成功,沒有成功的客戶端需要註冊/exclusive_lock節點監聽。

② 釋放鎖,當獲取鎖的客戶端宕機或者正常完成業務邏輯都會導致臨時節點的刪除,此時,所有在/exclusive_lock節點上註冊監聽的客戶端都會收到通知,可以重新發起分散式鎖獲取。

共享鎖又稱為讀鎖 ,若事務T1對資料物件O1加上共享鎖,那麼當前事務只能對O1進行讀取操作,其他事務也只能對這個資料物件加共享鎖,直到該資料物件上的所有共享鎖都被釋放。在需要獲取共享鎖時,所有客戶端都會到/shared_lock下面建立一個臨時順序節點。

3.5 分散式佇列

有一些時候,多個團隊需要共同完成一個任務,比如,A團隊將Hadoop叢集計算的結果交給B團隊繼續計算,B完成了自己任務再交給C團隊繼續做。這就有點像業務系統的工作流一樣,一環一環地傳下去。

分散式環境下,我們同樣需要一個類似單程式佇列的元件,用來實現跨程式、跨主機、跨網路的資料共享和資料傳遞,這就是我們的分散式佇列。

4.Zookeeper的架構

Zookeeper叢集是一個基於主從架構的高可用叢集

每個伺服器承擔如下三種角色中的一種

Leader 一個Zookeeper叢集同一時間只會有一個實際工作的Leader,它會發起並維護與各Follwer及Observer間的心跳。所有的寫操作必須要通過Leader完成再由Leader將寫操作廣播給其它伺服器。

Follower 一個Zookeeper叢集可能同時存在多個Follower,它會響應Leader的心跳。Follower可直接處理並返回客戶端的讀請求,同時會將寫請求轉發給Leader處理,並且負責在Leader處理寫請求時對請求進行投票。

Observer 角色與Follower類似,但是無投票權。

5.Zookeeper的選舉機制

Leader選舉是保證分散式資料一致性的關鍵所在。當Zookeeper叢集中的一臺伺服器出現以下兩種情況之一時,需要進入Leader選舉。

5.1. 伺服器啟動時期的Leader選舉

若進行Leader選舉,則至少需要兩臺機器,這裡選取3臺機器組成的伺服器叢集為例。在叢集初始化階段,當有一臺伺服器Server1啟動時,其單獨無法進行和完成Leader選舉,當第二臺伺服器Server2啟動時,此時兩臺機器可以相互通訊,每臺機器都試圖找到Leader,於是進入Leader選舉過程。選舉過程如下

(1) 每個Server發出一個投票。由於是初始情況,Server1和Server2都會將自己作為Leader伺服器來進行投票,每次投票會包含所推舉的伺服器的myid和ZXID,使用(myid, ZXID)來表示,此時Server1的投票為(1, 0),Server2的投票為(2, 0),然後各自將這個投票發給叢集中其他機器

(2) 接受來自各個伺服器的投票。叢集的每個伺服器收到投票後,首先判斷該投票的有效性,如檢查是否是本輪投票、是否來自LOOKING狀態的伺服器。

(3) 處理投票。針對每一個投票,伺服器都需要將別人的投票和自己的投票進行PK,PK規則如下

  • 優先檢查ZXID。ZXID比較大的伺服器優先作為Leader。
  • 如果ZXID相同,那麼就比較myid。myid較大的伺服器作為Leader伺服器。

對於Server1而言,它的投票是(1, 0),接收Server2的投票為(2, 0),首先會比較兩者的ZXID,均為0,再比較myid,此時Server2的myid最大,於是更新自己的投票為(2, 0),然後重新投票,對於Server2而言,其無須更新自己的投票,只是再次向叢集中所有機器發出上一次

投票資訊即可

(4) 統計投票。每次投票後,伺服器都會統計投票資訊,判斷是否已經有過半機器接受到相同的投票資訊,對於Server1、Server2而言,都統計出叢集中已經有兩臺機器接受了(2, 0)的投票資訊,此時便認為已經選出了Leader。

(5) 改變伺服器狀態。一旦確定了Leader,每個伺服器就會更新自己的狀態,如果是Follower,那麼就變更為FOLLOWING,如果是Leader,就變更為LEADING。

5.2.伺服器執行時期的Leader選舉

在Zookeeper執行期間,Leader與非Leader伺服器各司其職,即便當有非Leader伺服器宕機或新加入,此時也不會影響Leader,但是一旦Leader伺服器掛了,那麼整個叢集將暫停對外服務,進入新一輪Leader選舉,其過程和啟動時期的Leader選舉過程基本一致過程相同。

6.Zookeeper安裝

叢集規劃

伺服器IP 主機名 myid的值
192.168.174.100 node01 1
192.168.174.110 node02 2
192.168.174.120 node03 3

伺服器IP前面的網段(192.168.174)要使用自己的網段

第一步:下載zookeeeper的壓縮包,下載網址如下

http://archive.apache.org/dist/zookeeper/

我們在這個網址下載我們使用的zk版本為3.4.9

下載完成之後,上傳到我們的linux的/export/soxwares路徑下準備進行安裝

第二步:解壓

解壓zookeeper的壓縮包到/export/servers路徑下去,然後準備進行安裝(路徑可以自己設定,我這裡用的是/export/servers)

cd /export/software  --到上傳檔案的目錄
tar -zxvf zookeeper-3.4.9.tar.gz -C ../servers/ --解壓到servers目錄

第三步:修改配置檔案

第一臺機器修改配置檔案

cd /export/servers/zookeeper-3.4.9/conf/
cp zoo_sample.cfg zoo.cfg
mkdir -p /export/servers/zookeeper-3.4.9/zkdatas/
vim zoo.cfg

在zoo.cfg檔案的末尾加上

dataDir=/export/servers/zookeeper-3.4.9/zkdatas  -- 自己的路徑
# 保留多少個快照
autopurge.snapRetainCount=3
# 日誌多少小時清理一次
autopurge.purgeInterval=1
# 叢集中伺服器地址
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888

第四步:新增myid配置

在第一臺機器的

/export/servers/zookeeper-3.4.9/zkdatas /這個路徑下建立一個檔案,檔名為myid ,檔案內容為1

echo 1 > /export/servers/zookeeper-3.4.9/zkdatas/myid

第五步:安裝包分發並修改myid的值

安裝包分發到其他機器

第一臺機器上面執行以下兩個命令

scp -r /export/servers/zookeeper-3.4.9/ node02:/export/servers/
scp -r /export/servers/zookeeper-3.4.9/ node03:/export/servers/

第二臺機器上修改myid的值為2

echo 2 > /export/servers/zookeeper-3.4.9/zkdatas/myid

第三臺機器上修改myid的值為3

echo 3 > /export/servers/zookeeper-3.4.9/zkdatas/myid

第六步:三臺機器啟動zookeeper服務

三臺機器啟動zookeeper服務

這個命令三臺機器都要執行

/export/servers/zookeeper-3.4.9/bin/zkServer.sh start

檢視啟動狀態

/export/servers/zookeeper-3.4.9/bin/zkServer.sh status

7.Zookeeper的Shell 客戶端操作

啟動:來到zookeeper資料夾下面執行下面一段話,其中node01是的伺服器名,你也可以用IP地址

 bin/zkCli.sh -server node01:2181

1:建立普通節點

create /app1 hello

2: 建立順序節點

create -s /app3 world

3:建立臨時節點

create -e /tempnode world

4:建立順序的臨時節點

create -s -e /tempnode2 aaa

5:獲取節點資料

get /app1

6:修改節點資料

set /app1 xxx

7:刪除節點

delete /app1 刪除的節點不能有子節點
rmr /app1 遞迴刪除

Znode 的特點

  • 檔案系統的核心是 Znode
  • 如果想要選取一個 Znode , 需要使用路徑的形式, 例如 /test1/test11
  • Znode 本身並不是檔案, 也不是資料夾, Znode 因為具有一個類似於 Name 的路徑, 所以可以從邏輯上實現一個樹狀檔案系統
  • ZK 保證 Znode 訪問的原子性, 不會出現部分 ZK 節點更新成功, 部分 ZK 節點更新失敗的問題
  • Znode 中資料是有大小限制的, 最大隻能為 1M
  • Znode 是由三個部分構成
    • stat : 狀態, Znode的許可權資訊, 版本等
    • data : 資料, 每個Znode都是可以攜帶資料的, 無論是否有子節點
    • children : 子節點列表

Znode 的型別

每個 Znode 有兩大特性, 可以構成四種不同型別的 Znode

  • 永續性

    • 持久 客戶端斷開時, 不會刪除持有的Znode
    • 臨時 客戶端斷開時, 刪除所有持有的Znode, 臨時Znode不允許有子Znode
  • 順序性

    • 有序 建立的Znode有先後順序, 順序就是在後面追加一個序列號, 序列號是由父節點管理的自增
    • 無序 建立的Znode沒有先後順序

Znode 的屬性

  • dataVersion 資料版本, 每次當 Znode 中的資料發生變化的時候, dataVersion都會自增一下
  • cversion 節點版本, 每次當 Znode 的節點發生變化的時候, cversion 都會自增
  • aclVersion ACL(Access Control List) 的版本號, 當 Znode 的許可權資訊發生變化的時候aclVersion會自增
  • zxid 事務ID
  • ctime 建立時間
  • mtime 最近一次更新的時間
  • ephemeralOwner 如果 Znode 為臨時節點, ephemeralOwner 表示與該節點關聯的 SessionId

通知機制

  • 通知類似於資料庫中的觸發器, 對某個Znode設定 Watcher , 當Znode發生變化的時候,WatchManager 會呼叫對應的 Watcher
  • 當Znode發生刪除, 修改, 建立, 子節點修改的時候, 對應的 Watcher 會得到通知
  • Watcher 的特點
    • 一次性觸發 一個 Watcher 只會被觸發一次, 如果需要繼續監聽, 則需要再次新增Watcher
    • 事件封裝: Watcher 得到的事件是被封裝過的, 包括三個內容 keeperStae, eventType, path

8.Zookeeper的JavaAPI操作

這裡操作Zookeeper的JavaAPI使用的是一套zookeeper客戶端框架 Curator ,解決了很多Zookeeper客戶端非常底層的細節開發工作 。

Curator包含了幾個包:

  • curator-framework:對zookeeper的底層api的一些封裝
  • curator-recipes:封裝了一些高階特性,如:Cache事件監聽、選舉、分散式鎖、分散式計數器等

Maven依賴(使用curator的版本:2.12.0,對應Zookeeper的版本為:3.4.x,如果跨版本會有相容性問題,很有可能導致節點操作失敗):

8.1.建立java工程,匯入jar包

<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>com.google.collections</groupId>
<artifactId>google-collections</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>

8.2 節點的操作

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test; /**
* @Description
* @Author wugongzi
* @Date 2020/7/10 21:12
*/
public class ZKTest { /*
節點的watch機制
*/ @Test
public void watchZnode() throws Exception {
//1:定製一個重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 1); //2:獲取客戶端
String conectionStr = "192.168.79.100:2181,192.168.79.110:2181,192.168.79.120:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(conectionStr, 8000, 8000, retryPolicy); //3:啟動客戶端
client.start(); //4:建立一個TreeCache物件,指定要監控的節點路徑
TreeCache treeCache = new TreeCache(client, "/hello3"); //5:自定義一個監聽器
treeCache.getListenable().addListener(new TreeCacheListener() {
//@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception { ChildData data = treeCacheEvent.getData();
if(data != null){
switch (treeCacheEvent.getType()){
case NODE_ADDED:
System.out.println("監控到有新增節點!");
break;
case NODE_REMOVED:
System.out.println("監控到有節點被移除!");
break;
case NODE_UPDATED:
System.out.println("監控到節點被更新!");
break;
default:
break;
}
}
}
}); //開始監聽
treeCache.start(); Thread.sleep(1000000);
}
/*
獲取節點資料
*/
@Test
public void getZnodeData() throws Exception {
//1:定製一個重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
//2:獲取客戶端
String conectionStr = "192.168.79.100:2181,192.168.79.110:2181,192.168.79.120:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(conectionStr, 8000, 8000, retryPolicy); //3:啟動客戶端
client.start();
//4:獲取節點資料
byte[] bytes = client.getData().forPath("/hello");
System.out.println(new String(bytes)); //5:關閉客戶端
client.close(); }
/*
設定節點資料
*/
@Test
public void setZnodeData() throws Exception {
//1:定製一個重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
//2:獲取客戶端
String conectionStr = "192.168.79.100:2181,192.168.79.110:2181,192.168.79.120:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(conectionStr, 8000, 8000, retryPolicy);
//3:啟動客戶端
client.start();
//4:修改節點資料
client.setData().forPath("/hello", "zookeeper".getBytes());
//5:關閉客戶端
client.close(); }
/*
建立臨時節點
*/
@Test
public void createTmpZnode() throws Exception {
//1:定製一個重試策略
/*
param1: 重試的間隔時間
param2:重試的最大次數
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
//2:獲取一個客戶端物件
/*
param1:要連線的Zookeeper伺服器列表
param2:會話的超時時間
param3:連結超時時間
param4:重試策略
*/
String connectionStr = "192.168.79.100:2181,192.168.79.110:2181,192.168.79.120:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr, 8000, 8000, retryPolicy); //3:開啟客戶端
client.start();
//4:建立節點
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/hello4","world".getBytes()); Thread.sleep(5000);
//5:關閉客戶端
client.close(); }
/*
建立永久節點 */
@Test
public void createZnode() throws Exception {
//1:定製一個重試策略
/*
param1: 重試的間隔時間
param2:重試的最大次數
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
//2:獲取一個客戶端物件
/*
param1:要連線的Zookeeper伺服器列表
param2:會話的超時時間
param3:連結超時時間
param4:重試策略
*/
String connectionStr = "192.168.79.100:2181,192.168.79.110:2181,192.168.79.120:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr, 8000, 8000, retryPolicy); //3:開啟客戶端
client.start();
//4:建立節點
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/hello2","world".getBytes());
//5:關閉客戶端
client.close();
} }