1. 程式人生 > 其它 >Zookeeper 3.5.7(鏟屎官)學習教程

Zookeeper 3.5.7(鏟屎官)學習教程

一、Zookeeper介紹

  • 是一個觀察者模式設計的分散式框架,負責協調客戶端與服務端。
  • 儲存和管理服務端和客戶端的註冊資訊。
  • 當服務端註冊資訊發生變化,通知客戶端(上線、下線)。
  • 相當於檔案系統 + 通知機制。

zookeeper官網下載:https://archive.apache.org/dist/zookeeper/

二、Zookeeper的選舉機制

  • SID(伺服器ID):伺服器的唯一標識。
  • ZXID(事務ID):用來標識一次伺服器狀態變更;所有的節點ZXID不一定一致,與Zookeeper的處理邏輯有關。
  • Epoch:Leader任期的代號。

1.伺服器啟動初始化

每臺伺服器有自己的唯一標識SID,優先選舉SID最大的為Leader。

  • 5臺伺服器,伺服器1啟動,發起選舉,投自己1票,當票數到達一半以上(3票),選舉結束。
  • 伺服器2啟動,發起選舉,SID比伺服器1大,投票給伺服器2,伺服器2有2票。
  • 伺服器3啟動,發起選舉,伺服器3的SID最大,獲得3票(大於一半),成為Leader。

2.非第一次啟動

Zookeeper叢集有兩種情況會進入選舉:

  • 伺服器啟動初始化。
  • 執行期間與Leader失去連線。
  • 伺服器與Leader斷開連線時,想嘗試選舉,會被告知存在Leader資訊,此時伺服器需要建立聯絡,並更新狀態即可。
  • Leader掛了,5臺伺服器,SID為1、2、3、4、5,ZXID為6、6、6、5、5,Epoch都為1,節點3位Leader,3和5都掛了,進行選舉:
    • 選舉比較的優先順序:Epoch > ZXID > SID;
      • Epoch大,直接勝出。
      • Epoch相同,ZXID大勝出。
      • ZXID相同,SID大勝出。
    • 1、2、4的投票情況:
      • 伺服器1:1 6 1
      • 伺服器2:1 6 2
      • 伺服器4:1 5 4
    • 第一輪:Epoch相同。
    • 第二輪:伺服器4淘汰(ZXID最小淘汰)。
    • 第三輪:伺服器2成為Leader(SID最大勝出)。

總結:

  • 比較Epoch,大的直接獲勝。
  • Epoch相同,ZXID大的勝出。
  • ZXID相同,SID大的勝出。

三、指令碼案例

1.啟動指令碼

啟動hadoop102、hadoop103、hadoop104指令碼

#!/bin/bash
# 判斷沒有引數
if [$# -lt 1]
then
	echo "No Args Input..."
	exit;
fi

case $1 in
"start")
	for i in hadoop102 hadoop103 hadoop104
	do
		echo "----------------- zookeeper $i 啟動 -----------------"
		ssh $i "/opt/install/zookeeper-3.5.7/bin/zkServer.sh start"
	done
;;
"stop")
	for i in hadoop102 hadoop103 hadoop104
	do
		echo "----------------- zookeeper $i 停止 -----------------"
		ssh $i "/opt/install/zookeeper-3.5.7/bin/zkServer.sh stop"
	done
;;
"status")
	for i in hadoop102 hadoop103 hadoop104
	do
		echo "----------------- zookeeper $i 狀態 -----------------"
		ssh $i "/opt/install/zookeeper-3.5.7/bin/zkServer.sh status"
	done
;;
*)
	echo "Input Args Error..."
;;
esac

檢視啟動報錯日誌:

# 檢視啟動報錯日誌
bin/zkServer.sh start-foreground

2.客戶端命令操作

命令基本語法 功能描述
help 顯示所有操作命令
ls /path 1.使用 ls 命令來檢視當前 znode 的子節點 [可監聽];2.-w 監聽子節點變化;3.-s 附加次級資訊
create 1.普通建立;2.-s 含有序列;2.-e 臨時(重啟或者超時消失)
get /path 1.獲得節點的值(可監聽);2.-w 監聽節點內容變化;3.-s 附加次級資訊
set 設定節點的具體值
stat 檢視節點狀態
delete 刪除節點
deleteall 遞迴刪除節點

四、動態監聽伺服器上下線

在分散式系統中,有多個節點,可以動態上下線,客戶端能夠感知節點的上下線情況。

1.具體實現

叢集上建立/servers節點

# 啟動客戶端
bin/zkCli.sh
# 建立節點
create /servers "servers"

DistributeServer.java

import org.apache.zookeeper.*;

import java.io.IOException;

public class DistributeServer {
	private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
	private static int sessionTimeout = 2000;
	private ZooKeeper zk = null;
	private String parentNode = "/servers";

	// 建立到 zk 的客戶端連線
	public void getConnect() throws IOException {
		zk = new ZooKeeper(connectString, sessionTimeout, event -> {
		});
	}

	// 註冊伺服器
	public void registerServer(String hostname) throws Exception {
		String create = zk.create(parentNode + "/server",
			hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
			CreateMode.EPHEMERAL_SEQUENTIAL);
		System.out.println(hostname + " is online " + create);
	}

	// 業務功能
	public void business(String hostname) throws Exception {
		System.out.println(hostname + " is working ...");
		Thread.sleep(Long.MAX_VALUE);
	}

	public static void main(String[] args) throws Exception {
		// 1 獲取 zk 連線
		DistributeServer server = new DistributeServer();
		server.getConnect();
		// 2 利用 zk 連線註冊伺服器資訊
		server.registerServer(args[0]);
		// 3 啟動業務功能
		server.business(args[0]);
	}
}

DistributeClient.java

import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class DistributeClient {
	private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
	private static int sessionTimeout = 2000;
	private ZooKeeper zk = null;
	private String parentNode = "/servers";

	// 建立到 zk 的客戶端連線
	public void getConnect() throws IOException {
		zk = new ZooKeeper(connectString, sessionTimeout, event -> {
			// 再次啟動監聽
			try {
				getServerList();
			} catch (Exception e) {
				e.printStackTrace();
			}
		});
	}

	// 獲取伺服器列表資訊
	public void getServerList() throws Exception {
		// 1 獲取伺服器子節點資訊,並且對父節點進行監聽
		List<String> children = zk.getChildren(parentNode, true);
		// 2 儲存伺服器資訊列表
		ArrayList<String> servers = new ArrayList<>();
		// 3 遍歷所有節點,獲取節點中的主機名稱資訊
		for (String child : children) {
			byte[] data = zk.getData(parentNode + "/" + child,
				false, null);
			servers.add(new String(data));
		}
		// 4 列印伺服器列表資訊
		System.out.println(servers);
	}

	// 業務功能
	public void business() throws Exception {
		System.out.println("client is working ...");
		Thread.sleep(Long.MAX_VALUE);
	}

	public static void main(String[] args) throws Exception {
		// 1 獲取 zk 連線
		DistributeClient client = new DistributeClient();
		client.getConnect();
		// 2 獲取 servers 的子節點資訊,從中獲取伺服器資訊列表
		client.getServerList();
		// 3 業務程序啟動
		client.business();
	}
}

五、Zookeeper分散式鎖

分散式鎖和分散式事務的區別:

  • 分散式鎖:解決併發資源搶佔問題。
    • 採用redis(redission)、zookeeper(curator)解決。
  • 分散式事務:解決順序化提交問題,保證事務遵循ACID原則。
    • 採用rocketMQ解決。
    • 2PC(Two-phase commit protocol)二段提交,分別是準備階段、提交階段
    • 3PC三段提交,分別是準備階段、預提交階段和提交階段,把2PC的提交階段拆分為兩個階段。
    • TCC(Try - Confirm - Cancel),2PC和3PC是強一致事務性,都是資料庫層面的,TCC是業務層面的。

參考:https://zhuanlan.zhihu.com/p/183753774

1.實現Zookeeper分散式鎖

pom.xml

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.7</version>
</dependency>

DistributedLock.java

package com.cnwanj.distributed;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @author: cnwanj
 * @date: 2022-02-19 15:55:41
 * @version: 1.0
 * @desc: Zookeeper分散式鎖實現
 */
public class DistributedLock {

	private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
	private final int sessionTimeout = 2000;
	private final ZooKeeper zk;

	// 等待連線完畢
	private CountDownLatch connectLatch = new CountDownLatch(1);
	// 等待監聽上一個節點完畢
	private CountDownLatch waitLatch = new CountDownLatch(1);
	// 前一個節點路徑
	private String waitPath;
	private String currentMode;

	public DistributedLock() throws IOException, InterruptedException, KeeperException {
		// 建立連線
		zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
			@Override
			public void process(WatchedEvent watchedEvent) {
				// 釋放連線等待(若已建立連線)
				if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
					connectLatch.countDown();
				}
				// 釋放監聽等待(存在釋放鎖 && 路徑是前一個節點)
				if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
					waitLatch.countDown();
				}
			}
		});
		// 執行緒等待,建立連線再執行後面
		connectLatch.await();

		// 判斷根目錄是否存在
		Stat stat = zk.exists("/locks", false);
		if (stat == null) {
			// 建立根節點(引數1:目錄名稱,引數2:目錄下內容,引數3:對外開放,引數4:永久建立)
			zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
		}
	}

	// 加鎖
	public void zkLock() throws KeeperException, InterruptedException {
		// 建立臨時帶序號節點
		currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
		// 校驗節點是否第一個,是的話直接加鎖,若不是,需要監聽前一個是否解鎖
		List<String> childrenList = zk.getChildren("/locks", false);
		// 如果只有一個節點,直接獲取鎖
		if (childrenList.size() == 1) {
			return;
		} else {
			// 從小到大排序
			Collections.sort(childrenList);
			// 當前節點名稱
			String thisNode = currentMode.substring("/locks/".length());
			// 獲取節點位置
			int index = childrenList.indexOf(thisNode);
			if (index == -1) {
				System.out.println("資料異常");
			} else if (index == 0) {
				// thisNode為最小,直接獲取鎖
				return;
			} else {
				// 獲取前一個節點路徑
				waitPath = "/locks/" + childrenList.get(index - 1);
				// 獲取前一個節點,初始化監聽
				zk.getData(waitPath, true, null);
				// 等待監聽
				waitLatch.await();
				return;
			}
		}

	}

	// 解鎖
	public void unZkLock() throws KeeperException, InterruptedException {
		// 刪除節點
		zk.delete(currentMode, -1);
	}
}

DistributedLockTest.java

package com.cnwanj.distributed;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

/**
 * @author: cnwanj
 * @date: 2022-02-19 21:50:39
 * @version: 1.0
 * @desc: 測試Zookeeper分散式鎖
 */
public class DistributedLockTest {

	public static void main(String[] args) throws InterruptedException, IOException, KeeperException {

		// 建立分散式鎖1
		final DistributedLock lock1 = new DistributedLock();
		// 建立分散式鎖2
		final DistributedLock lock2 = new DistributedLock();

		// 建立執行緒1
		new Thread(() -> {
			// 獲取鎖物件
			try {
				lock1.zkLock();
				System.out.println("執行緒1獲取鎖");
				Thread.sleep(3 * 1000);
				lock1.unZkLock();
				System.out.println("執行緒1釋放鎖");
			} catch (KeeperException | InterruptedException e) {
				e.printStackTrace();
			}
		}).start();
		// 建立執行緒2
		new Thread(() -> {
			try {
				lock2.zkLock();
				System.out.println("執行緒2獲取鎖");
				Thread.sleep(3 * 1000);
				lock2.unZkLock();
				System.out.println("執行緒2釋放鎖");
			} catch (KeeperException | InterruptedException e) {
				e.printStackTrace();
			}
		}).start();
	}
}

2.Curator實現分散式鎖

2.1官網解釋:

Apache Curator is a Java/JVM client library for Apache ZooKeeper, a distributed coordination service. It includes a highlevel API framework and utilities to make using Apache ZooKeeper much easier and more reliable. It also includes recipes for common use cases and extensions such as service discovery and a Java 8 asynchronous DSL.

意思是:Curator是Zookeeper的一個Java/Jvm客戶端庫,也是一個分散式協調服務,Curator成為Zookeeper更簡單可靠的一個高可用框架和工具,它也包含了一些常用的用例和擴充套件方法,如Java8和非同步DSL。

官網:https://curator.apache.org

2.2原生API缺點:

  • 會話連線是非同步的,需要自己去處理;比如使用 CountDownLatch。
  • Watch 需要重複註冊,不然就不能生效。
  • 開發的複雜性還是比較高的。
  • 不支援多節點刪除和建立,需要自己去遞迴。

2.3curator實現案例

maven依賴引入:

<!-- zookeeper依賴 -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.7</version>
</dependency>

<!-- curator依賴 -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>4.3.0</version>
</dependency>
package com.cnwanj.lock.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * @author: cnwanj
 * @date: 2022-02-20 11:57:56
 * @version: 1.0
 * @desc: curator實現分散式鎖
 */
public class CuratorTest {

	private String rootNode = "/locks";
	private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
	private int connectionTimeout = 2000;
	private int sessionTimeout = 2000;

	public static void main(String[] args) {
		new CuratorTest().test();
	}

	private void test() {
		// 分散式鎖1
		InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), rootNode);
		// 分散式鎖2
		InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), rootNode);
		// 執行緒1
		new Thread(() -> {
			try {
				lock1.acquire();
				System.out.println("執行緒1獲取鎖");
				lock1.acquire();
				System.out.println("執行緒1再次獲取鎖");
				Thread.sleep(5 * 1000);
				lock1.release();
				System.out.println("執行緒1釋放鎖");
				lock1.release();
				System.out.println("執行緒1再次釋放鎖");
			} catch (Exception e) {
				e.printStackTrace();
			}
		}).start();
		// 執行緒1
		new Thread(() -> {
			try {
				lock2.acquire();
				System.out.println("執行緒2獲取鎖");
				lock2.acquire();
				System.out.println("執行緒2再次獲取鎖");
				Thread.sleep(5 * 1000);
				lock2.release();
				System.out.println("執行緒2釋放鎖");
				lock2.release();
				System.out.println("執行緒2再次釋放鎖");
			} catch (Exception e) {
				e.printStackTrace();
			}
		}).start();
	}

	private CuratorFramework getCuratorFramework() {
		// 重試策略,嘗試時間3秒,重試3次
		RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
		// 建立Curator
		CuratorFramework client = CuratorFrameworkFactory.builder()
			.connectString(connectString)
			.connectionTimeoutMs(connectionTimeout)
			.sessionTimeoutMs(sessionTimeout)
			.retryPolicy(policy)
			.build();
		// 開啟連線
		client.start();
		System.out.println("zk初始化完成...");
		return client;
	}
}

六、演算法基礎

1.拜占庭將軍問題

拜占庭將軍問題是一個協議問題,拜占庭帝國軍隊的將軍們必須全體一致的決定是否攻擊某一支敵軍。問題是這些將軍在地理上是分隔開來的,並且將軍中存在叛徒。叛徒可以任意行動以達到以下目標:欺騙某些將軍採取進攻行動促成一個不是所有將軍都同意的決定,如當將軍們不希望進攻時促成進攻行動;或者迷惑某些將軍,使他們無法做出決定。如果叛徒達到了這些目的之一,則任何攻擊行動的結果都是註定要失敗的,只有完全達成一致的努力才能獲得勝利。

2.Paxos演算法

Paxos演算法:一種基於訊息傳遞且具有高度容錯特性的一致性演算法。

Paxos演算法解決的問題:就是如何快速正確的在一個分散式系統中對某個資料值達成一致,並且保證不論發生任何異常,都不會破壞整個系統的一致性。

3.ZAB協議

3.1ZAB介紹

Zab 借鑑了 Paxos 演算法,是特別為 Zookeeper 設計的支援崩潰恢復的原子廣播協議。基於該協議,Zookeeper 設計為只有一臺客戶端(Leader)負責處理外部的寫事務請求,然後Leader 客戶端將資料同步到其他 Follower 節點。即 Zookeeper 只有一個 Leader 可以發起提案。

4.CAP理論

CAP理論告訴我們,一個分散式系統不可能同時滿足以下三種:

  • 一致性(Consistency)
  • 可用性(Available)
  • 分割槽容錯性(Partition Tolerance)

這三個基本需求,最多隻能同時滿足其中的兩項,因為P是必須的,因此往往選擇就在CP或者AP中。

1)一致性(Consistency)

在分散式環境中,一致性是指資料在多個副本之間是否能夠保持資料一致的特性。在一致性的需求下,當一個系統在數

據一致的狀態下執行更新操作後,應該保證系統的資料仍然處於一致的狀態。

2)可用性(Available)

可用性是指系統提供的服務必須一直處於可用的狀態,對於使用者的每一個操作請求總是能夠在有限的時間內返回結果。

3)分割槽容錯性(Partition Tolerance)

分散式系統在遇到任何網路分割槽故障的時候,仍然需要能夠保證對外提供滿足一致性和可用性的服務,除非是整個網路

環境都發生了故障。

ZooKeeper保證的是CP

  • Zookeeper不能保真每次服務請求都可用(在極端環境下,ZooKeeper可能會丟棄一些請求,消費者程式需要重新請求才能獲得結果)。所以說,ZooKeeper不能保證服務可用性。
  • 進行Leader選舉時叢集不可用。

七、詳細流程

1.Zookeeper服務端初始化流程

zkServer.sh start啟動 > new QuorumPeerMain();

  • 解析引數:包括解析zoo.cfg、myid等配置檔案。
  • 刪除過期快照:預設關閉,最少保留3份,開啟後會清理過期資料。
  • 建立通訊:預設NIO通訊,初始化服務端socket,繫結2181埠。
  • 啟動zookeeper(quorumPeer.start())。

NIO:

  • 非堵塞IO通訊方式,由一個執行緒處理所有的IO事件,並負責分發。
  • 執行緒之前通過wait、notify通訊,減少執行緒切換。
  • 事件來到時觸發操作,不需要堵塞監視事件,存在驅動機制。

2.Zookeeper選舉流程

選舉主要分為兩步:傳送投票和處理投票

通過選舉演算法(FastLeaderElection)生成選票。

傳送選票:

  • FastLeaderElection類:

    • 生成選票。
    • 將票放入佇列(sendqueue)。
    • 推(poll)到緩衝區(WorkerSender)中,併發送到管理佇列(queueSendMap)中。
  • QuorumCnManager類:

    • 從管理佇列中poll到傳送者中。
    • 傳送者傳送(send)給其他節點。

處理選票:

  • QuorumCnManager類:
    • recvWorker(接收者)讀取其他節點的投票。
    • 將投票新增到recvQueue(接收佇列)中。
  • FastLeaderElection類:
    • 通過recvQueue(接收佇列)中的投票poll到WorkerReceiver(工作接收者)中。
    • WorkerReceiver(工作接收者)將投票放入recequeue佇列中。
    • 最後生成選票

轉載:https://blog.csdn.net/Yh_yh_new_Yh/article/details/123057846