Zookeeper 3.5.7(鏟屎官)學習教程
一、Zookeeper介紹
- 是一個觀察者模式設計的分散式框架,負責協調客戶端與服務端。
- 儲存和管理服務端和客戶端的註冊資訊。
- 當服務端註冊資訊發生變化,通知客戶端(上線、下線)。
- 相當於檔案系統 + 通知機制。
zookeeper官網下載:https://archive.apache.org/dist/zookeeper/
二、Zookeeper的選舉機制
- SID(伺服器ID):伺服器的唯一標識。
- ZXID(事務ID):用來標識一次伺服器狀態變更;所有的節點ZXID不一定一致,與Zookeeper的處理邏輯有關。
- Epoch:Leader任期的代號。
1.伺服器啟動初始化
每臺伺服器有自己的唯一標識SID,優先選舉SID最大的為Leader。
- 5臺伺服器,伺服器1啟動,發起選舉,投自己1票,當票數到達一半以上(3票),選舉結束。
- 伺服器2啟動,發起選舉,SID比伺服器1大,投票給伺服器2,伺服器2有2票。
- 伺服器3啟動,發起選舉,伺服器3的SID最大,獲得3票(大於一半),成為Leader。
2.非第一次啟動
Zookeeper叢集有兩種情況會進入選舉:
- 伺服器啟動初始化。
- 執行期間與Leader失去連線。
- 伺服器與Leader斷開連線時,想嘗試選舉,會被告知存在Leader資訊,此時伺服器需要建立聯絡,並更新狀態即可。
- Leader掛了,5臺伺服器,SID為1、2、3、4、5,ZXID為6、6、6、5、5,Epoch都為1,節點3位Leader,3和5都掛了,進行選舉:
- 選舉比較的優先順序:Epoch > ZXID > SID;
- Epoch大,直接勝出。
- Epoch相同,ZXID大勝出。
- ZXID相同,SID大勝出。
- 1、2、4的投票情況:
- 伺服器1:1 6 1
- 伺服器2:1 6 2
- 伺服器4:1 5 4
- 第一輪:Epoch相同。
- 第二輪:伺服器4淘汰(ZXID最小淘汰)。
- 第三輪:伺服器2成為Leader(SID最大勝出)。
- 選舉比較的優先順序:Epoch > ZXID > SID;
總結:
- 比較Epoch,大的直接獲勝。
- Epoch相同,ZXID大的勝出。
- ZXID相同,SID大的勝出。
三、指令碼案例
1.啟動指令碼
啟動hadoop102、hadoop103、hadoop104指令碼
#!/bin/bash # 判斷沒有引數 if [$# -lt 1] then echo "No Args Input..." exit; fi case $1 in "start") for i in hadoop102 hadoop103 hadoop104 do echo "----------------- zookeeper $i 啟動 -----------------" ssh $i "/opt/install/zookeeper-3.5.7/bin/zkServer.sh start" done ;; "stop") for i in hadoop102 hadoop103 hadoop104 do echo "----------------- zookeeper $i 停止 -----------------" ssh $i "/opt/install/zookeeper-3.5.7/bin/zkServer.sh stop" done ;; "status") for i in hadoop102 hadoop103 hadoop104 do echo "----------------- zookeeper $i 狀態 -----------------" ssh $i "/opt/install/zookeeper-3.5.7/bin/zkServer.sh status" done ;; *) echo "Input Args Error..." ;; esac
檢視啟動報錯日誌:
# 檢視啟動報錯日誌
bin/zkServer.sh start-foreground
2.客戶端命令操作
命令基本語法 | 功能描述 |
---|---|
help | 顯示所有操作命令 |
ls /path | 1.使用 ls 命令來檢視當前 znode 的子節點 [可監聽];2.-w 監聽子節點變化;3.-s 附加次級資訊 |
create | 1.普通建立;2.-s 含有序列;2.-e 臨時(重啟或者超時消失) |
get /path | 1.獲得節點的值(可監聽);2.-w 監聽節點內容變化;3.-s 附加次級資訊 |
set | 設定節點的具體值 |
stat | 檢視節點狀態 |
delete | 刪除節點 |
deleteall | 遞迴刪除節點 |
四、動態監聽伺服器上下線
在分散式系統中,有多個節點,可以動態上下線,客戶端能夠感知節點的上下線情況。
1.具體實現
叢集上建立/servers節點
# 啟動客戶端
bin/zkCli.sh
# 建立節點
create /servers "servers"
DistributeServer.java
import org.apache.zookeeper.*;
import java.io.IOException;
public class DistributeServer {
private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private static int sessionTimeout = 2000;
private ZooKeeper zk = null;
private String parentNode = "/servers";
// 建立到 zk 的客戶端連線
public void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, event -> {
});
}
// 註冊伺服器
public void registerServer(String hostname) throws Exception {
String create = zk.create(parentNode + "/server",
hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname + " is online " + create);
}
// 業務功能
public void business(String hostname) throws Exception {
System.out.println(hostname + " is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 1 獲取 zk 連線
DistributeServer server = new DistributeServer();
server.getConnect();
// 2 利用 zk 連線註冊伺服器資訊
server.registerServer(args[0]);
// 3 啟動業務功能
server.business(args[0]);
}
}
DistributeClient.java
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class DistributeClient {
private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private static int sessionTimeout = 2000;
private ZooKeeper zk = null;
private String parentNode = "/servers";
// 建立到 zk 的客戶端連線
public void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, event -> {
// 再次啟動監聽
try {
getServerList();
} catch (Exception e) {
e.printStackTrace();
}
});
}
// 獲取伺服器列表資訊
public void getServerList() throws Exception {
// 1 獲取伺服器子節點資訊,並且對父節點進行監聽
List<String> children = zk.getChildren(parentNode, true);
// 2 儲存伺服器資訊列表
ArrayList<String> servers = new ArrayList<>();
// 3 遍歷所有節點,獲取節點中的主機名稱資訊
for (String child : children) {
byte[] data = zk.getData(parentNode + "/" + child,
false, null);
servers.add(new String(data));
}
// 4 列印伺服器列表資訊
System.out.println(servers);
}
// 業務功能
public void business() throws Exception {
System.out.println("client is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 1 獲取 zk 連線
DistributeClient client = new DistributeClient();
client.getConnect();
// 2 獲取 servers 的子節點資訊,從中獲取伺服器資訊列表
client.getServerList();
// 3 業務程序啟動
client.business();
}
}
五、Zookeeper分散式鎖
分散式鎖和分散式事務的區別:
- 分散式鎖:解決併發資源搶佔問題。
- 採用redis(redission)、zookeeper(curator)解決。
- 分散式事務:解決順序化提交問題,保證事務遵循ACID原則。
- 採用rocketMQ解決。
- 2PC(Two-phase commit protocol)二段提交,分別是準備階段、提交階段
- 3PC三段提交,分別是準備階段、預提交階段和提交階段,把2PC的提交階段拆分為兩個階段。
- TCC(Try - Confirm - Cancel),2PC和3PC是強一致事務性,都是資料庫層面的,TCC是業務層面的。
參考:https://zhuanlan.zhihu.com/p/183753774
1.實現Zookeeper分散式鎖
pom.xml
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
DistributedLock.java
package com.cnwanj.distributed;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author: cnwanj
* @date: 2022-02-19 15:55:41
* @version: 1.0
* @desc: Zookeeper分散式鎖實現
*/
public class DistributedLock {
private final String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private final int sessionTimeout = 2000;
private final ZooKeeper zk;
// 等待連線完畢
private CountDownLatch connectLatch = new CountDownLatch(1);
// 等待監聽上一個節點完畢
private CountDownLatch waitLatch = new CountDownLatch(1);
// 前一個節點路徑
private String waitPath;
private String currentMode;
public DistributedLock() throws IOException, InterruptedException, KeeperException {
// 建立連線
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// 釋放連線等待(若已建立連線)
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
// 釋放監聽等待(存在釋放鎖 && 路徑是前一個節點)
if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
waitLatch.countDown();
}
}
});
// 執行緒等待,建立連線再執行後面
connectLatch.await();
// 判斷根目錄是否存在
Stat stat = zk.exists("/locks", false);
if (stat == null) {
// 建立根節點(引數1:目錄名稱,引數2:目錄下內容,引數3:對外開放,引數4:永久建立)
zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 加鎖
public void zkLock() throws KeeperException, InterruptedException {
// 建立臨時帶序號節點
currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 校驗節點是否第一個,是的話直接加鎖,若不是,需要監聽前一個是否解鎖
List<String> childrenList = zk.getChildren("/locks", false);
// 如果只有一個節點,直接獲取鎖
if (childrenList.size() == 1) {
return;
} else {
// 從小到大排序
Collections.sort(childrenList);
// 當前節點名稱
String thisNode = currentMode.substring("/locks/".length());
// 獲取節點位置
int index = childrenList.indexOf(thisNode);
if (index == -1) {
System.out.println("資料異常");
} else if (index == 0) {
// thisNode為最小,直接獲取鎖
return;
} else {
// 獲取前一個節點路徑
waitPath = "/locks/" + childrenList.get(index - 1);
// 獲取前一個節點,初始化監聽
zk.getData(waitPath, true, null);
// 等待監聽
waitLatch.await();
return;
}
}
}
// 解鎖
public void unZkLock() throws KeeperException, InterruptedException {
// 刪除節點
zk.delete(currentMode, -1);
}
}
DistributedLockTest.java
package com.cnwanj.distributed;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
/**
* @author: cnwanj
* @date: 2022-02-19 21:50:39
* @version: 1.0
* @desc: 測試Zookeeper分散式鎖
*/
public class DistributedLockTest {
public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
// 建立分散式鎖1
final DistributedLock lock1 = new DistributedLock();
// 建立分散式鎖2
final DistributedLock lock2 = new DistributedLock();
// 建立執行緒1
new Thread(() -> {
// 獲取鎖物件
try {
lock1.zkLock();
System.out.println("執行緒1獲取鎖");
Thread.sleep(3 * 1000);
lock1.unZkLock();
System.out.println("執行緒1釋放鎖");
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}).start();
// 建立執行緒2
new Thread(() -> {
try {
lock2.zkLock();
System.out.println("執行緒2獲取鎖");
Thread.sleep(3 * 1000);
lock2.unZkLock();
System.out.println("執行緒2釋放鎖");
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
2.Curator實現分散式鎖
2.1官網解釋:
Apache Curator is a Java/JVM client library for Apache ZooKeeper, a distributed coordination service. It includes a highlevel API framework and utilities to make using Apache ZooKeeper much easier and more reliable. It also includes recipes for common use cases and extensions such as service discovery and a Java 8 asynchronous DSL.
意思是:Curator是Zookeeper的一個Java/Jvm客戶端庫,也是一個分散式協調服務,Curator成為Zookeeper更簡單可靠的一個高可用框架和工具,它也包含了一些常用的用例和擴充套件方法,如Java8和非同步DSL。
2.2原生API缺點:
- 會話連線是非同步的,需要自己去處理;比如使用 CountDownLatch。
- Watch 需要重複註冊,不然就不能生效。
- 開發的複雜性還是比較高的。
- 不支援多節點刪除和建立,需要自己去遞迴。
2.3curator實現案例
maven依賴引入:
<!-- zookeeper依賴 -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
<!-- curator依賴 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
package com.cnwanj.lock.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* @author: cnwanj
* @date: 2022-02-20 11:57:56
* @version: 1.0
* @desc: curator實現分散式鎖
*/
public class CuratorTest {
private String rootNode = "/locks";
private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private int connectionTimeout = 2000;
private int sessionTimeout = 2000;
public static void main(String[] args) {
new CuratorTest().test();
}
private void test() {
// 分散式鎖1
InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), rootNode);
// 分散式鎖2
InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), rootNode);
// 執行緒1
new Thread(() -> {
try {
lock1.acquire();
System.out.println("執行緒1獲取鎖");
lock1.acquire();
System.out.println("執行緒1再次獲取鎖");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("執行緒1釋放鎖");
lock1.release();
System.out.println("執行緒1再次釋放鎖");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
// 執行緒1
new Thread(() -> {
try {
lock2.acquire();
System.out.println("執行緒2獲取鎖");
lock2.acquire();
System.out.println("執行緒2再次獲取鎖");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("執行緒2釋放鎖");
lock2.release();
System.out.println("執行緒2再次釋放鎖");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
private CuratorFramework getCuratorFramework() {
// 重試策略,嘗試時間3秒,重試3次
RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
// 建立Curator
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectString)
.connectionTimeoutMs(connectionTimeout)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(policy)
.build();
// 開啟連線
client.start();
System.out.println("zk初始化完成...");
return client;
}
}
六、演算法基礎
1.拜占庭將軍問題
拜占庭將軍問題是一個協議問題,拜占庭帝國軍隊的將軍們必須全體一致的決定是否攻擊某一支敵軍。問題是這些將軍在地理上是分隔開來的,並且將軍中存在叛徒。叛徒可以任意行動以達到以下目標:欺騙某些將軍採取進攻行動;促成一個不是所有將軍都同意的決定,如當將軍們不希望進攻時促成進攻行動;或者迷惑某些將軍,使他們無法做出決定。如果叛徒達到了這些目的之一,則任何攻擊行動的結果都是註定要失敗的,只有完全達成一致的努力才能獲得勝利。
2.Paxos演算法
Paxos演算法:一種基於訊息傳遞且具有高度容錯特性的一致性演算法。
Paxos演算法解決的問題:就是如何快速正確的在一個分散式系統中對某個資料值達成一致,並且保證不論發生任何異常,都不會破壞整個系統的一致性。
3.ZAB協議
3.1ZAB介紹
Zab 借鑑了 Paxos 演算法,是特別為 Zookeeper 設計的支援崩潰恢復的原子廣播協議。基於該協議,Zookeeper 設計為只有一臺客戶端(Leader)負責處理外部的寫事務請求,然後Leader 客戶端將資料同步到其他 Follower 節點。即 Zookeeper 只有一個 Leader 可以發起提案。
4.CAP理論
CAP理論告訴我們,一個分散式系統不可能同時滿足以下三種:
- 一致性(Consistency)
- 可用性(Available)
- 分割槽容錯性(Partition Tolerance)
這三個基本需求,最多隻能同時滿足其中的兩項,因為P是必須的,因此往往選擇就在CP或者AP中。
1)一致性(Consistency)
在分散式環境中,一致性是指資料在多個副本之間是否能夠保持資料一致的特性。在一致性的需求下,當一個系統在數
據一致的狀態下執行更新操作後,應該保證系統的資料仍然處於一致的狀態。
2)可用性(Available)
可用性是指系統提供的服務必須一直處於可用的狀態,對於使用者的每一個操作請求總是能夠在有限的時間內返回結果。
3)分割槽容錯性(Partition Tolerance)
分散式系統在遇到任何網路分割槽故障的時候,仍然需要能夠保證對外提供滿足一致性和可用性的服務,除非是整個網路
環境都發生了故障。
ZooKeeper保證的是CP
- Zookeeper不能保真每次服務請求都可用(在極端環境下,ZooKeeper可能會丟棄一些請求,消費者程式需要重新請求才能獲得結果)。所以說,ZooKeeper不能保證服務可用性。
- 進行Leader選舉時叢集不可用。
七、詳細流程
1.Zookeeper服務端初始化流程
zkServer.sh start啟動 > new QuorumPeerMain();
- 解析引數:包括解析zoo.cfg、myid等配置檔案。
- 刪除過期快照:預設關閉,最少保留3份,開啟後會清理過期資料。
- 建立通訊:預設NIO通訊,初始化服務端socket,繫結2181埠。
- 啟動zookeeper(quorumPeer.start())。
NIO:
- 非堵塞IO通訊方式,由一個執行緒處理所有的IO事件,並負責分發。
- 執行緒之前通過wait、notify通訊,減少執行緒切換。
- 事件來到時觸發操作,不需要堵塞監視事件,存在驅動機制。
2.Zookeeper選舉流程
選舉主要分為兩步:傳送投票和處理投票
通過選舉演算法(FastLeaderElection)生成選票。
傳送選票:
-
FastLeaderElection類:
- 生成選票。
- 將票放入佇列(sendqueue)。
- 推(poll)到緩衝區(WorkerSender)中,併發送到管理佇列(queueSendMap)中。
-
QuorumCnManager類:
- 從管理佇列中poll到傳送者中。
- 傳送者傳送(send)給其他節點。
處理選票:
- QuorumCnManager類:
- recvWorker(接收者)讀取其他節點的投票。
- 將投票新增到recvQueue(接收佇列)中。
- FastLeaderElection類:
- 通過recvQueue(接收佇列)中的投票poll到WorkerReceiver(工作接收者)中。
- WorkerReceiver(工作接收者)將投票放入recequeue佇列中。
- 最後生成選票
轉載:https://blog.csdn.net/Yh_yh_new_Yh/article/details/123057846