1. 程式人生 > 其它 >Zookeeper 原理與實踐

Zookeeper 原理與實踐

1、Zookeeper 的由來

在Hadoop生態系統中,許多專案的Logo都採用了動物,比如 Hadoop 和 Hive 採用了大象的形象,HBase 採用了海豚的形象,而從字面上來看 ZooKeeper 表示動物園管理員,所以大家可以理解為 ZooKeeper就是對這些動物(專案元件)進行一些管理工作的。

對於單機環境多執行緒的競態資源協調方法,我們一般通過執行緒鎖來協調對共享資料的訪問以保證狀態的一致性。 但是分散式環境如何進行協調呢?於是,Google創造了Chubby,而ZooKeeper則是對於Chubby的一個開源實現。  ZooKeeper是一種為分散式應用所設計的高可用、高效能且一致的開源協調服務,它提供了一項基本服務:分散式鎖服務。由於ZooKeeper的開源特性,後來我們的開發者在分散式鎖的基礎上,摸索了出了其他的使用方法:配置維護、組服務、分散式訊息佇列、分散式通知/協調等。它被設計為易於程式設計,使用檔案系統目錄樹作為資料模型。

2、ZooKeeper叢集模式典型架構

2.1 角色

Zookeeper服務自身組成一個叢集(2n+1個服務允許n>=1個失效)。Zookeeper叢集是一個基於主從複製的高可用叢集,每個伺服器承擔如下三種角色中的一種

  • Leader 一個Zookeeper叢集同一時間只會有一個實際工作的Leader,它會發起並維護與各Follwer及Observer間的心跳。所有的寫操作必須要通過Leader完成再由Leader將寫操作廣播給其它伺服器。
  • Follower 一個Zookeeper叢集可能同時存在多個Follower,它會響應Leader的心跳。Follower可直接處理並返回客戶端的讀請求,同時會將寫請求轉發給Leader處理,並且負責在Leader處理寫請求時對請求進行投票。
  • Observer 角色與Follower類似,但是無投票權。

2.2 資料特性

  • 順序一致性:按照客戶端傳送請求的順序更新資料。
  • 原子性:更新要麼成功,要麼失敗,不會出現部分更新。
  • 單一性 :無論客戶端連線哪個server,都會看到同一個檢視。
  • 可靠性:一旦資料更新成功,將一直保持,直到新的更新。
  • 及時性:客戶端會在一個確定的時間內得到最新的資料。

2.3 資料模型 Znode 

Zookeeper表現為一個分層的檔案系統目錄樹結構(不同於檔案系統的是,節點可以有自己的資料,而檔案系統中的目錄節點只有子節點)。

  • znode是被它所在的路徑唯一標識
  • znode可以有子節點目錄,並且每個znode可以儲存資料
  • 每個znode中儲存的資料可以有多個版本
  • znode可以被監控
  • 臨時+編號
  • 每次對Zookeeper的狀態的改變都會產生一個zxid,全域性有序

3、核心原理

3.1 原子廣播(ZAB協議)

為了保證寫操作的一致性與可用性,Zookeeper專門設計了一種名為原子廣播(ZAB)的支援崩潰恢復的一致性協議。基於該協議,Zookeeper實現了一種主從模式的系統架構來保持叢集中各個副本之間的資料一致性。根據ZAB協議,所有的寫操作都必須通過Leader完成,leader收到寫請求,會將請求轉為proposal並廣播給所有其它節點,其他節點根據協議進行批准或通過。broadcast階段事實上就是一個兩階段提交的簡化版。其所有過程都跟兩階段提交一致,唯一不一致的是不能做事務的回滾。如果實現完整的兩階段提交,那就解決了一致性問題,沒必要發明新協議了,所以zab實際上拋棄了兩階段提交的事務回滾,於是一臺follower只能回覆ACK或者乾脆就不回覆了,leader只要收到過半的機器回覆即通過proposal。

一旦Leader節點無法工作,ZAB協議能夠自動從Follower節點中重新選出一個合適的替代者,即新的Leader,該過程即為領導選舉。該領導選舉過程,是ZAB協議中最為重要和複雜的過程。

3.2 Watch機制

  • (一次性觸發)One-time trigger 
  • (傳送至客戶端)Sent to the client 
  • (被設定 watch 的資料)The data for which the watch was set

3.3 出現腦裂怎麼辦(split-brain)

  • GC、網路假死
  • Fencing

3.4 併發效能問題

  • 讀寫分離+ZAB

4、應用場景

4.1 統一命名服務

  • 全域性唯一的ID
  • 叢集管理:動態的服務註冊和發現
def test_create():
    nodename = '/zk_test/service'
    index = 1
    while index < 10:
        zk.create(nodename,
                  b'192.168.187.215_{index}'.
                  format(index=index),
                        ephemeral=True,
                        sequence=True)
        time.sleep(3)
        index += 1



@zk.ChildrenWatch('/zk_test')
def my_func(children):
    print children
children = zk.get_children('/zk_test',watch=my_func)

while True:
    time.sleep(3)

4.2 分散式鎖

my_id = uuid.uuid4()
lock = zk.Lock("/lockpath", str(my_id))
print "I am {}".format(str(my_id))
def work():
    time.sleep(3)
    print "{} is working! ".format(str(my_id))
while True:
    with lock:
        work()
zk.stop()

4.3 配置管理(資料釋出與訂閱)

  • 推拉結合
  • 分散式協調/通知
    • 只通知一次
@zk.DataWatch('/zk_test/service')
def my_func(data, stat):
    print("Data is %s, Version is %s" %(data, stat.version))

while True:
    time.sleep(2)
    print '------'

4.4 叢集管理之 Master/Leader Election

  • 傳統的方案
    • 缺點
  • 分散式方案
    • 爭搶
my_id = uuid.uuid4()
def leader_func():
    print "I am the leader {}".format(str(my_id))
    while True:
        print "{} is working! ".format(str(my_id))
        time.sleep(3)

election = zk.Election("/electionpath")

# blocks until the election is won, then calls
# leader_func()
election.run(leader_func)

4.5 佇列管理

  • 同步分散式佇列
    • 當一個佇列的成員都聚齊時,這個佇列才可用,否則一直等待所有成員到達。
  • 分散式FIFO佇列
    • 入佇列有編號,出佇列時通過 getChildren( ) 返回所有,消費最小

4.6 心跳檢測/故障檢測(Failure Detection)

#coding=utf-8
from kazoo.client import KazooClient
import time
import logging

logging.basicConfig()
zk = KazooClient(hosts='bjdhj-187-215.58os.org:2181')
zk.start()
 
# Determine if a node exists
while True:
    if zk.exists("/zk_test/service0000000054"):
        print "the worker is alive!"
    else:
        print "the worker is dead!"
        break
    time.sleep(3)
 
zk.stop()

4.7 其它場景

  • 資料儲存(進度彙報)
    • offset → Zookeeper 
    • Zookeeper Write 影響 kafka 叢集吞吐量
    • __consumer_offsets Topic( 0.8.2.2  → 0.10.1.1)
      • Group,Topic,Partition 組合 Key
      • acking 級別設定為了 -1
      • 記憶體三元組
  • 負載均衡
    • 服務端
    • 客戶端

5、ZooKeeper在大型分散式系統中的應用

5.1 ZooKeeper在Hadoop中的應用

  • 主備切換

建立鎖節點 在ZooKeeper上會有一個/yarn-leader-election/appcluster-yarn的鎖節點,所有的ResourceManager在啟動的時候,都會去競爭寫一個Lock子節點:/yarn-leader-election/appcluster-yarn/ActiveBreadCrumb,該節點是臨時節點。ZooKeepr能夠為我們保證最終只有一個ResourceManager能夠建立成功建立成功的那個ResourceManager就切換為Active狀態沒有成功的那些ResourceManager則切換為Standby狀態

  • 註冊Watcher監聽

所有Standby狀態的ResourceManager都會向/yarn-leader-election/appcluster-yarn/ActiveBreadCrumb節點註冊一個節點變更的Watcher監聽,利用臨時節點的特性,能夠快速感知到Active狀態的ResourceManager的執行情況。

  • 主備切換

當Active狀態的ResourceManager出現諸如宕機或重啟的異常情況時,其在ZooKeeper上連線的客戶端會話就會失效,因此/yarn-leader-election/appcluster-yarn/ActiveBreadCrumb節點就會被刪除。此時其餘各個Standby狀態的ResourceManager就都會接收到來自ZooKeeper服務端的Watcher事件通知,然後會重複進行步驟1的操作

以上就是利用ZooKeeper來實現ResourceManager的主備切換的過程,實現了ResourceManager的HA。

HDFS中NameNode的HA的實現原理跟YARN中ResourceManager的HA的實現原理相同。其鎖節點為/hadoop-ha/mycluster/ActiveBreadCrumb

  • RM狀態儲存

在 ResourceManager 中,RMStateStore 能夠儲存一些 RM 的內部狀態資訊,包括 Application 以及它們的 Attempts 資訊、Delegation Token 及 Version Information 等。需要注意的是,RMStateStore 中的絕大多數狀態資訊都是不需要持久化儲存的,因為很容易從上下文資訊中將其重構出來,如資源的使用情況。在儲存的設計方案中,提供了三種可能的實現,分別如下。

  • 基於記憶體實現,一般是用於日常開發測試。
  • 基於檔案系統的實現,如HDFS。
  • 基於ZooKeeper實現。

由於這些狀態資訊的資料量都不是很大,因此Hadoop官方建議基於ZooKeeper來實現狀態資訊的儲存。在ZooKeepr上,ResourceManager 的狀態資訊都被儲存在/rmstore這個根節點下面。

RMAppRoot 節點下儲存的是與各個 Application 相關的資訊,RMDTSecretManagerRoot 儲存的是與安全相關的 Token 等資訊。每個 Active 狀態的 ResourceManager 在初始化階段都會從 ZooKeeper 上讀取到這些狀態資訊,並根據這些狀態資訊繼續進行相應的處理。

小結:

ZooKeepr在Hadoop中的應用主要有:

  1. HDFS中NameNode的HA和YARN中ResourceManager的HA。
  2. 儲存RMStateStore狀態資訊

5.2 ZooKeeper在HBase中的應用

HBase主要用ZooKeeper來實現HMaster選舉與主備切換、系統容錯、RootRegion管理、Region狀態管理和分散式SplitWAL任務管理等。

  • HMaster選舉與主備切換

HMaster選舉與主備切換的原理和HDFS中NameNode及YARN中ResourceManager的HA原理相同。

  • 系統容錯

當HBase啟動時,每個RegionServer都會到ZooKeeper的/hbase/rs節點下建立一個資訊節點(下文中,我們稱該節點為”rs狀態節點”),例如/hbase/rs/[Hostname],同時,HMaster會對這個節點註冊監聽。當某個 RegionServer 掛掉的時候,ZooKeeper會因為在一段時間內無法接受其心跳(即 Session 失效),而刪除掉該 RegionServer 伺服器對應的 rs 狀態節點。與此同時,HMaster 則會接收到 ZooKeeper 的 NodeDelete 通知,從而感知到某個節點斷開,並立即開始容錯工作。

HBase為什麼不直接讓HMaster來負責RegionServer的監控呢?如果HMaster直接通過心跳機制等來管理RegionServer的狀態,隨著叢集越來越大,HMaster的管理負擔會越來越重,另外它自身也有掛掉的可能,因此資料還需要持久化。在這種情況下,ZooKeeper就成了理想的選擇。

  • RootRegion管理

對應HBase叢集來說,資料儲存的位置資訊是記錄在元資料region,也就是RootRegion上的。每次客戶端發起新的請求,需要知道資料的位置,就會去查詢RootRegion,而RootRegion自身位置則是記錄在ZooKeeper上的(預設情況下,是記錄在ZooKeeper的/hbase/meta-region-server節點中)。當RootRegion發生變化,比如Region的手工移動、重新負載均衡或RootRegion所在伺服器發生了故障等是,就能夠通過ZooKeeper來感知到這一變化並做出一系列相應的容災措施,從而保證客戶端總是能夠拿到正確的RootRegion資訊。

  • Region管理

HBase裡的Region會經常發生變更,這些變更的原因來自於系統故障、負載均衡、配置修改、Region分裂與合併等。一旦Region發生移動,它就會經歷下線(offline)和重新上線(online)的過程。

下線期間資料是不能被訪問的,並且Region的這個狀態變化必須讓全域性知曉,否則可能會出現事務性的異常。對於大的HBase叢集來說,Region的數量可能會多達十萬級別,甚至更多,這樣規模的Region狀態管理交給ZooKeeper來做也是一個很好的選擇。

  • 分散式SplitWAL任務管理

當某臺RegionServer伺服器掛掉時,由於總有一部分新寫入的資料還沒有持久化到HFile中,因此在遷移該RegionServer的服務時,一個重要的工作就是從WAL中恢復這部分還在記憶體中的資料,而這部分工作最關鍵的一步就是SplitWAL,即HMaster需要遍歷該RegionServer伺服器的WAL,並按Region切分成小塊移動到新的地址下,並進行日誌的回放(replay)

由於單個RegionServer的日誌量相對龐大(可能有上千個Region,上GB的日誌),而使用者又往往希望系統能夠快速完成日誌的恢復工作。因此一個可行的方案是將這個處理WAL的任務分給多臺RegionServer伺服器來共同處理,而這就又需要一個持久化元件來輔助HMaster完成任務的分配。當前的做法是,HMaster會在ZooKeeper上建立一個SplitWAL節點(預設情況下,是/hbase/SplitWAL節點),將“哪個RegionServer處理哪個Region”這樣的資訊以列表的形式存放到該節點上,然後由各個RegionServer伺服器自行到該節點上去領取任務並在任務執行成功或失敗後再更新該節點的資訊,以通知HMaster繼續進行後面的步驟。ZooKeeper在這裡擔負起了分散式叢集中相互通知和資訊持久化的角色。

小結:

以上就是一些HBase中依賴ZooKeeper完成分散式協調功能的典型場景。但事實上,HBase對ZooKeepr的依賴還不止這些,比如HMaster還依賴ZooKeeper來完成Table的enable/disable狀態記錄,以及HBase中幾乎所有的元資料儲存都是放在ZooKeeper上的。

由於ZooKeeper出色的分散式協調能力及良好的通知機制,HBase在各版本的演進過程中越來越多地增加了ZooKeeper的應用場景,從趨勢上來看兩者的交集越來越多。HBase中所有對ZooKeeper的操作都封裝在了org.apache.hadoop.hbase.zookeeper這個包中,感興趣的同學可以自行研究。

5.3 ZooKeeper在 canal 中的應用

  • Canal Client
    • 保證get/ack/rollback有序
  • canal server
    • 減少mysql dump的請求
  • 原理
    • zk 分散式鎖

6、無鎖實現

CAS

  • 原理
  • 缺陷
    • ABA
    • 衝突
AtomicInteger atomicInteger = new AtomicInteger();
for(int b = 0; b < numThreads; b++) {
  new Thread(() -> {
    for(int a = 0; a < iteration; a++) {
      atomicInteger.incrementAndGet();
    }
  }).start();
}

 public final int getAndIncrement() {
       for( ; ; ) {
           int current = get();
           int next = current + 1;
           if ( compareAndSet(current, next) )
                return current;
       }
}
private final Map<String, Long> urlCounter = new ConcurrentHashMap<>();
//介面呼叫次數+1
public long increase(String url) {
    Long oldValue = urlCounter.get(url);
    Long newValue = (oldValue == null) ? 1L : oldValue + 1;
    urlCounter.put(url, newValue);
    return newValue;
}
//獲取呼叫次數
public Long getCount(String url){
    return urlCounter.get(url);
}
//模擬併發情況下的介面呼叫統計
for(int i=0;i<callTime;i++){
    executor.execute(new Runnable() {
        @Override
        public void run() {
            counterDemo.increase(url);
            countDownLatch.countDown();
        }
    });
}



// CAS:
while (true) {
    private final Map<String, Long> urlCounter = new ConcurrentHashMap<>();
    oldValue = urlCounter.get(url);
    if (oldValue == null) {
        newValue = 1l;
        //初始化成功,退出迴圈
        if (urlCounter.putIfAbsent(url, 1l) == null) break;
        //如果初始化失敗,說明其他執行緒已經初始化過了
    } else {
        newValue = oldValue + 1;
        //+1成功,退出迴圈,原子操作
        if (urlCounter.replace(url, oldValue, newValue)) break;
        //如果+1失敗,說明其他執行緒已經修改過了舊值
    }
}
return newValue;

Refer:

[1] ZooKeeper 學習筆記之掃盲

https://my.oschina.net/leejun2005/blog/67250

[2] ZooKeeper 原理及其在 Hadoop 和 HBase 中的應用

http://blog.jobbole.com/110388/

[3] Java 併發實踐 — ConcurrentHashMap 與 CAS

https://my.oschina.net/leejun2005/blog/81835