1. 程式人生 > >ZooKeeper原理和應用

ZooKeeper原理和應用

目錄

1、ZooKeeper原理解析

1.1、叢集角色描述

1.2、Paxos演算法概述(ZAB協議)

1.2.1、ZooKeeper的全新叢集選主

1.2.2、ZooKeeper的非全新叢集選主

1.3、資料同步

1.4、ZooKeeper工作流程

1.4.1、Leader工作流程

1.4.2、Follower工作流程

1.4.3、Observer工作流程

2、ZooKeeper應用案例

2.1、伺服器上下線動態感知

2.2、分散式共享鎖

2.3、Hadoop HA 高可用叢集搭建


1、ZooKeeper原理解析

1.1、叢集角色描述

 

1.2、Paxos演算法概述(ZAB協議)

Paxos 演算法是萊斯利•蘭伯特(英語:Leslie Lamport)於 1990 年提出的一種基於訊息傳遞且具有高度容錯特性的一致性演算法。

分散式系統中的節點通訊存在兩種模型:共享記憶體(Shared memory)訊息傳遞(Messages passing)。基於訊息傳遞通訊模型的分散式系統,不可避免的會發生以下錯誤:程序可能會慢、被殺死或者重啟,訊息可能會延遲、丟失、重複,在基礎 Paxos 場景中,先不考慮可能出現訊息篡改即拜占庭錯誤(Byzantine failure,即雖然有可能一個訊息被傳遞了兩次,但是絕對不會出現錯誤的訊息

)的情況。Paxos 演算法解決的問題是在一個可能發生上述異常的分散式系統中如何就某個值達成一致,保證不論發生以上任何異常,都不會破壞決議一致性

Paxos 演算法使用一個希臘故事來描述,在 Paxos 中,存在三種角色,分別為Proposer(提議者,用來發出提案 proposal),
Acceptor(接受者,可以接受或拒絕提案),Learner(學習者,學習被選定的提案,當提案被超過半數的 Acceptor 接受後為被批准)。下面更精確的定義 Paxos 要解決的問題:
1、決議(value)只有在被 proposer 提出後才能被批准
2、在一次 Paxos 演算法的執行例項中,只批准(chose)一個 value 
3、learner 只能獲得被批准(chosen)的 value

ZooKeeper 的選舉演算法有兩種:一種是基於 Basic Paxos(Google Chubby 採用)實現的,另外一種是基於 Fast Paxos(ZooKeeper 採用)演算法實現的。系統預設的選舉演算法為 Fast Paxos。並且 ZooKeeper 在 3.4.0 版本後只保留了 FastLeaderElection 演算法。

ZooKeeper 的核心是原子廣播,這個機制保證了各個 Server 之間的同步。實現這個機制的協議叫做 ZAB 協議(Zookeeper Atomic BrodCast)

ZAB 協議有兩種模式,它們分別是崩潰恢復模式(選主)原子廣播模式(同步)
1、當服務啟動或者在領導者崩潰後,ZAB 就進入了恢復模式,當領導者被選舉出來,且大多數 Server 完成了和 leader 的狀態同步以後,恢復模式就結束了。狀態同步保證了 leader和 follower 之間具有相同的系統狀態。
2、當 ZooKeeper 叢集選舉出 leader 同步完狀態退出恢復模式之後,便進入了原子廣播模式。所有的寫請求都被轉發給 leader,再由 leader 將更新 proposal 廣播給 follower

為了保證事務的順序一致性,zookeeper 採用了遞增的事務 id 號(zxid)來標識事務。所有的提議(proposal)都在被提出的時候加上了 zxid。實現中 zxid 是一個 64 位的數字,它高32 位是 epoch 用來標識 leader 關係是否改變,每次一個 leader 被選出來,它都會有一個新的 epoch,標識當前屬於那個 leader 的統治時期。低 32 位用於遞增計數。

這裡給大家介紹以下 Basic Paxos 流程:
1、選舉執行緒由當前 Server 發起選舉的執行緒擔任,其主要功能是對投票結果進行統計,並選出推薦的 Server
2、選舉執行緒首先向所有 Server 發起一次詢問(包括自己)
3、選舉執行緒收到回覆後,驗證是否是自己發起的詢問(驗證 zxid 是否一致),然後獲取對方的 serverid(myid),並存儲到當前詢問物件列表中,最後獲取對方提議的 leader 相關資訊(serverid,zxid),並將這些資訊儲存到當次選舉的投票記錄表中
4、收到所有 Server 回覆以後,就計算出 id 最大的那個 Server,並將這個 Server 相關資訊設定成下一次要投票的 Server
5、執行緒將當前 id 最大的 Server 設定為當前 Server 要推薦的 Leader,如果此時獲勝的 Server獲得 n/2 + 1 的 Server 票數, 設定當前推薦的 leader 為獲勝的 Server,將根據獲勝的 Server相關資訊設定自己的狀態,否則,繼續這個過程,直到 leader 被選舉出來。

通過流程分析我們可以得出:要使 Leader 獲得多數 Server 的支援,則 Server 總數必須是奇數 2n+1,且存活的 Server 的數目不得少於 n+1。

每個 Server 啟動後都會重複以上流程。在恢復模式下,如果是剛從崩潰狀態恢復的或者剛啟動的 server 還會從磁碟快照中恢復資料和會話資訊,zk 會記錄事務日誌並定期進行快照,方便在恢復時進行狀態恢復。

Fast Paxos 流程是在選舉過程中,某 Server 首先向所有 Server 提議自己要成為 leader,當其它 Server 收到提議以後,解決 epoch 和 zxid 的衝突,並接受對方的提議,然後向對方傳送接受提議完成的訊息,重複這個流程,最後一定能選舉出 Leader

1.2.1、ZooKeeper的全新叢集選主

以一個簡單的例子來說明整個選舉的過程:假設有五臺伺服器組成的 zookeeper 叢集,它們的 serverid 從 1-5,同時它們都是最新啟動的,也就是沒有歷史資料,在存放資料量這一點上,都是一樣的。假設這些伺服器依序啟動,來看看會發生什麼
1、伺服器 1 啟動,此時只有它一臺伺服器啟動了,它發出去的報沒有任何響應,所以它的選舉狀態一直是 LOOKING 狀態
2、伺服器 2 啟動,它與最開始啟動的伺服器 1 進行通訊,互相交換自己的選舉結果,由於兩者都沒有歷史資料,所以 id 值較大的伺服器 2 勝出,但是由於沒有達到超過半數以上的伺服器都同意選舉它(這個例子中的半數以上是 3),所以伺服器 1、2 還是繼續保持 LOOKING狀態
3、伺服器 3 啟動,根據前面的理論分析,伺服器 3 成為伺服器 1,2,3 中的老大,而與上面不同的是,此時有三臺伺服器(超過半數)選舉了它,所以它成為了這次選舉的 leader
4、伺服器 4 啟動,根據前面的分析,理論上伺服器 4 應該是伺服器 1,2,3,4 中最大的,但是由於前面已經有半數以上的伺服器選舉了伺服器 3,所以它只能接收當小弟的命了
5、伺服器 5 啟動,同 4 一樣,當小弟
總結:zookeeper server 的三種工作狀態
LOOKING:當前 Server 不知道 leader 是誰,正在搜尋,正在選舉
LEADING:當前 Server 即為選舉出來的 leader,負責協調事務
FOLLOWING:leader 已經選舉出來,當前 Server 與之同步,服從 leader 的命令

1.2.2、ZooKeeper的非全新叢集選主

那麼,初始化的時候,是按照上述的說明進行選舉的,但是當 zookeeper 運行了一段時間之後,有機器 down 掉,重新選舉時,選舉過程就相對複雜了。
需要加入資料 version、serverid 和邏輯時鐘。
資料 version:資料新的 version 就大,資料每次更新都會更新 version
server id:就是我們配置的 myid 中的值,每個機器一個
邏輯時鐘:這個值從 0 開始遞增,每次選舉對應一個值,也就是說:如果在同一次選舉中,那麼這個值應該是一致的;邏輯時鐘值越大,說明這一次選舉 leader 的程序更新,也就是每次選舉擁有一個 zxid,投票結果只取 zxid 最新的
選舉的標準就變成:
1、邏輯時鐘小的選舉結果被忽略,重新投票
2、統一邏輯時鐘後,資料 version 大的勝出
3、資料 version 相同的情況下,server id 大的勝出

根據這個規則選出 leader。

1.3、資料同步

選完 leader 以後,zk 就進入狀態同步過程。
1、leader 等待 server 連線;
2、follower 連線 leader,將最大的 zxid 傳送給 leader;
3、leader 根據 follower 的 zxid 確定同步點;
4、完成同步後通知 follower 已經成為 uptodate 狀態;
5、follower 收到 uptodate 訊息後,又可以重新接受 client 的請求進行服務了。
以下是流程圖:

1.4、ZooKeeper工作流程

1.4.1、Leader工作流程

Leader 主要有三個功能:
1、恢復資料
2、維持與 Learner 的心跳,接收 Learner 請求並判斷 Learner 的請求訊息型別
Learner 的訊息型別主要:

  • PING 訊息:Learner 的心跳資訊
  • REQUEST 訊息:Follower 傳送的提議資訊,包括讀寫請求
  • ACK 訊息:Follower 對提議的回覆,超過半數的 Follower 通過,則 commit 該提議
  • REVALIDATE 訊息:用來延長 SESSION 有效時間

3、根據不同的訊息型別,進行不同的處理。

1.4.2、Follower工作流程

Follower 主要有四個功能:
1、向 Leader 傳送請求(PING 訊息、REQUEST 訊息、ACK 訊息、REVALIDATE 訊息);
2、接收 Leader 訊息並進行處理;
3、接收 Client 的請求,如果為寫請求,則轉發給 Leader;
4、返回 Client 結果。

Follower 的訊息迴圈處理如下幾種來自 Leader 的訊息:
1、PING 訊息: 心跳訊息;
2、PROPOSAL 訊息:Leader 發起的提案,要求 Follower 投票;
3、COMMIT 訊息:伺服器端最新一次提案的資訊;
4、UPTODATE 訊息:表明同步完成;
5、REVALIDATE 訊息:根據 Leader 的 REVALIDATE 結果,關閉待 revalidate 的 session 還是允許其接受訊息;
6、SYNC 訊息:返回 SYNC 結果到客戶端,這個訊息最初由客戶端發起,用來強制得到最新的更新。

1.4.3、Observer工作流程

Observer 流程和 Follower 的唯一不同的地方就是 Observer 不會參加 Leader 發起的投票,也不會被選舉為 Leader,所以不重複描述了。

2、ZooKeeper應用案例

2.1、伺服器上下線動態感知

1、需求描述
某分散式系統中,主節點可以有多臺,可以動態上下線。任意一臺客戶端都能實時感知到主節點伺服器的上下線

2、設計思路
1)設計伺服器端存入伺服器上線,下線的資訊,比如都寫入到 servers 節點下
2)設計客戶端監聽該 servers 節點,獲取該伺服器叢集的線上伺服器列表
3)伺服器一上線,就往 zookeeper 檔案系統中的一個統一的節點比如 servers 下寫入一個臨時節點,記錄下伺服器的資訊(思考,該節點最好採用什麼型別的節點?臨時節點)
4)伺服器一下線,則刪除 servers 節點下的該伺服器的資訊,則客戶端因為監聽了該節點的資料變化,所以將第一時間得知伺服器的線上狀態。

3、程式碼開發
伺服器端處理:

package com.ghgj.zookeeper.mydemo;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* 用來模擬伺服器的動態上線下線
* 總體思路就是伺服器上線就上 zookeeper 叢集建立一個臨時節點,然後監聽了該資料節
點的個數變化的客戶端都收到通知
* 下線,則該臨時節點自動刪除,監聽了該資料節點的個數變化的客戶端也都收到通知
*/
public class DistributeServer {
    private static final String connectStr = "hadoop02:2181,hadoop03:2181,hadoop04:2181";
    private static final int sessionTimeout = 4000;
    private static final String PARENT_NODE = "/server";
    static ZooKeeper zk = null;
    public static void main(String[] args) throws Exception {
        DistributeServer distributeServer = new DistributeServer();
        distributeServer.getZookeeperConnect();
        distributeServer.registeServer("hadoop03");
        Thread.sleep(Long.MAX_VALUE);
    }
    /**
    * 拿到 zookeeper 進群的連結
    */
    public void getZookeeperConnect() throws Exception {
        zk = new ZooKeeper(connectStr, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
            }
        });
    }
    /**
    * 伺服器上線就註冊,掉線就自動刪除,所以建立的是臨時順序節點
    */
    public void registeServer(String hostname) throws Exception{
        Stat exists = zk.exists(PARENT_NODE, false);
        if(exists == null){                           
            zk.create(PARENT_NODE,"server_parent_node".getBytes(),Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
        }
        zk.create(PARENT_NODE+"/"+hostname, hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, 
        CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(hostname+" is online, start working......");
    }
}

客戶端處理:

package com.ghgj.zookeeper.mydemo;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
/**
* 用來模擬使用者端的操作:連上 zookeeper 進群,實時獲取伺服器動態上下線的節點資訊
* 總體思路就是每次該 server 節點下有增加或者減少節點數,我就打印出來該 server 節點
下的所有節點
*/
public class DistributeClient {
    private static final String connectStr = "hadoop02:2181,hadoop03:2181,hadoop04:2181";
    private static final int sessionTimeout = 4000;
    private static final String PARENT_NODE = "/server";
    static ZooKeeper zk = null;
    public static void main(String[] args) throws Exception {
        DistributeClient dc = new DistributeClient();
        dc.getZookeeperConnect();
        Thread.sleep(Long.MAX_VALUE);
    }
    /**
    * 拿到 zookeeper 進群的連結
    */
    public void getZookeeperConnect() throws Exception {
        zk = new ZooKeeper(connectStr, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    // 獲取父節點 server 節點下所有子節點,即是所有正上線服務的伺服器節點
                    List<String> children = zk.getChildren(PARENT_NODE, true);
                    List<String> servers = new ArrayList<String>();
                    for(String child: children){
                        // 取出每個節點的資料,放入到 list 裡
                        String server = new String(zk.getData(PARENT_NODE+"/"+child, 
                        false, null), "UTF-8");
                        servers.add(server);
                    }
                    // 列印 list 裡面的元素
                    System.out.println(servers);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        System.out.println("Client is online, start Working......");
    }
}

2.2、分散式共享鎖

1、需求描述
在我們自己的分散式業務系統中,可能會存在某種資源,需要被整個系統的各臺伺服器共享訪問,但是隻允許一臺伺服器同時訪問

2、設計思路
1、設計多個客戶端同時訪問同一個資料
2、為了同一時間只能允許一個客戶端上去訪問,所以各個客戶端去 zookeeper 叢集的一個znode 節點去註冊一個臨時節點,定下規則,每次都是編號最小的客戶端才能去訪問
3、多個客戶端同時監聽該節點,每次當有子節點被刪除時,就都收到通知,然後判斷自己的編號是不是最小的,最小的就去執行訪問,不是最小的就繼續監聽。

3、程式碼開發
伺服器端:

package com.ghgj.zookeeper.mydemo;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* 需求:多個客戶端,需要同時訪問同一個資源,但同時只允許一個客戶端進行訪問。
Stay hungry Stay foolish -- http://blog.csdn.net/zhongqi2513
* 設計思路:多個客戶端都去父 znode 下寫入一個子 znode,能寫入成功的去執行訪問,
寫入不成功的等待
*/
public class MyDistributeLock {
	private static final String connectStr = "hadoop02:2181,hadoop03:2181,hadoop04:2181";
	private static final int sessionTimeout = 4000;
	private static final String PARENT_NODE = "/parent_locks";
	private static final String SUB_NODE = "/sub_client";
	static ZooKeeper zk = null;
	private static String currentPath = "";
	public static void main(String[] args) throws Exception {
		MyDistributeLock mdc = new MyDistributeLock();
		// 1、拿到 zookeeper 連結
		mdc.getZookeeperConnect();
		// 2、檢視父節點是否存在,不存在則建立
		Stat exists = zk.exists(PARENT_NODE, false);
		if(exists == null){
			zk.create(PARENT_NODE, PARENT_NODE.getBytes(), Ids.OPEN_ACL_UNSAFE, 
			CreateMode.PERSISTENT);
		}
		// 3、監聽父節點
		zk.getChildren(PARENT_NODE, true);
		// 4、往父節點下注冊節點,註冊臨時節點,好處就是,當宕機或者斷開連結時該
		節點自動刪除
		currentPath = zk.create(PARENT_NODE+SUB_NODE, SUB_NODE.getBytes(), 
		Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
		// 5、關閉 zk 連結
		Thread.sleep(Long.MAX_VALUE);
		zk.close();
	}
	/**
	* 拿到 zookeeper 叢集的連結
	*/
	public void getZookeeperConnect() throws Exception {
		Stay hungry Stay foolish -- http://blog.csdn.net/zhongqi2513
		zk = new ZooKeeper(connectStr, sessionTimeout, new Watcher() {
			@Override
			public void process(WatchedEvent event) {
				// 匹配看是不是子節點變化,並且監聽的路徑也要對
				if(event.getType() == EventType.NodeChildrenChanged && event.getPath().equals(PARENT_NODE)){
					try {
						// 獲取父節點的所有子節點, 並繼續監聽
						List<String> childrenNodes = zk.getChildren(PARENT_NODE, true);
						// 匹配當前建立的 znode 是不是最小的 znode
						Collections.sort(childrenNodes);
						if((PARENT_NODE+"/"+childrenNodes.get(0)).equals(currentPath)){
							// 處理業務
							handleBusiness(currentPath);
						}
					} catch (Exception e) {
						e.printStackTrace();
					} 
				}
			}
		});
	}
	public void handleBusiness(String create) throws Exception{
		System.out.println(create+" is working......");
		Thread.sleep(new Random().nextInt(4000));
		zk.delete(currentPath, -1);
		System.out.println(create+" is done ......");
		currentPath = zk.create(PARENT_NODE+SUB_NODE, SUB_NODE.getBytes(), 
		Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
	}
}

2.3、Hadoop HA 高可用叢集搭建

見搭建文件!