分散式系統筆記:利用zookeeper實現分散式leader節點選舉
阿新 • • 發佈:2019-02-05
利用zookeeper實現分散式leader節點選舉
依賴原理
- 在ZK中新增基本節點,路徑程式定義,節點型別為持久節點(PERSISTENT)。
- 對需要競選leader的每個程序,在ZK中分別新增基本節點的子節點,型別為臨時自編號節點(EPHEMERAL_SEQUENTIAL),並儲存建立返回的實際節點路徑。
- 通過delete方式刪除本程序建立的子節點,可以作為退出leader狀態的方式。
- 基本節點的子節點型別為臨時自編號節點(EPHEMERAL_SEQUENTIAL),當程序與ZK連線中斷後,ZK會自動將該節點刪除,確保了斷連之後其他程序對leader的選舉。
- 由於ZK自編號產生的路徑是遞增的,因此可以通過判斷基本節點的子節點中最小路徑數字編號的節點是否是本程序新建的節點來判斷是否獲得leader地位。
原理圖示
利用zk實現的分散式leader節點選舉實現原理如下:
若干程序分別嘗試競選leader,情況如下:
- (1)8個程序分別在ZK基本節點下建立臨時自編號節點,獲取建立成功後的實際路徑
- (2)在基本節點子節點列表中,判斷本程序建立節點編號是否為最小
- (3)最小編號程序獲得leader地位
leader程式異常退出或者伺服器異常導致leader程序無法執行leader功能:
- (1)程序將ZK中對應的臨時節點刪除,此時基本節點下路徑最小的子節點將獲得leader地位
- (2)程序由於網路或其他原因與ZK斷開了連線,ZK自動將其對應的臨時節點刪除
- (3)新出現的程序加入leader競選,在ZK下建立臨時節點,排隊等待
方案一 :父節點監聽方式
實現原理
程式流程圖如下:
實現程式碼
package xuyihao.zktest.server.zk.leader;
import org.apache.zookeeper.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* 基於zk的分散式leader節點選舉
* <pre>
* 方案一:父節點監聽方式
*
* 實現思路:監聽父節點狀態
* 1.在父節點(持久化)下建立臨時節點,實際建立的節點路徑會根據數量進行自增(ZK自編號方式建立節點)。
* 2.建立節點成功後,獲取父節點下的子節點列表,判斷本執行緒的路徑字尾編號是否是所有子節點中最小的,若是則成為leader,反之監聽父節點變動狀態(通過getChildren()方法註冊watcher)
* 3.當父節點狀態變動(主要是子節點列表變動)後watcher會接收到通知,這時判斷父節點下的子節點的排序狀態,若滿足本執行緒的路徑字尾編號最小則成為leader,反之繼續註冊watcher監聽父節點狀態
* </pre>
* <p>
* Created by xuyh at 2017/11/24 9:19.
*/
public class ZKLeader {
private static ZKLeader zkLeader;
private Logger logger = LoggerFactory.getLogger(ZKLeader.class);
private final static String BASE_NODE_PATH = "/ZKLeader_Leader";
private final static String NODE_PATH = "host_process_no_";
private String finalNodePath;
//是否是主節點標誌位
private boolean leader = false;
private String host = "127.0.0.1";
private String port = "2181";
private ZooKeeper zooKeeper;
private FatherWatcher fatherWatcher;
//是否連線成功標誌位
private boolean connected = false;
public static ZKLeader create(String host, String port) {
ZKLeader zkLeader = new ZKLeader(host, port);
zkLeader.connectZookeeper();
return zkLeader;
}
public boolean leader() {
return leader;
}
public void close() {
disconnectZooKeeper();
}
private ZKLeader(String host, String port) {
this.host = host;
this.port = port;
this.fatherWatcher = new FatherWatcher(this);
}
private boolean connectZookeeper() {
try {
zooKeeper = new ZooKeeper(host + ":" + port, 60000, event -> {
if (event.getState() == Watcher.Event.KeeperState.AuthFailed) {
leader = false;
} else if (event.getState() == Watcher.Event.KeeperState.Disconnected) {
leader = false;
} else if (event.getState() == Watcher.Event.KeeperState.Expired) {
leader = false;
} else {
if (event.getType() == Watcher.Event.EventType.None) {//說明連線成功了
connected = true;
}
}
});
int i = 1;
while (!connected) {//等待非同步連線成功,超過時間30s則退出等待
if (i == 100)
break;
Thread.sleep(300);
i++;
}
if (connected) {
if (zooKeeper.exists(BASE_NODE_PATH, false) == null) {//建立父節點
zooKeeper.create(BASE_NODE_PATH, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//建立子節點
finalNodePath = zooKeeper.create(BASE_NODE_PATH + "/" + NODE_PATH, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//檢查一次是否是主節點
checkLeader();
} else {
logger.warn("Connect zookeeper failed. Time consumes 30 s");
return false;
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return false;
}
return true;
}
private boolean disconnectZooKeeper() {
if (zooKeeper == null)
return false;
try {
connected = false;
leader = false;
zooKeeper.close();
} catch (Exception e) {
logger.warn(String.format("ZK disconnect failed. [%s]", e.getMessage()), e);
}
return true;
}
private void checkLeader() {
if (!connected)
return;
try {
//獲取子節點列表同時再次註冊監聽
List<String> childrenList = zooKeeper.getChildren(BASE_NODE_PATH, fatherWatcher);
if (judgePathNumMin(childrenList)) {
leader = true;
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
private boolean judgePathNumMin(List<String> paths) {
if (paths.isEmpty())
return true;
if (paths.size() >= 2) {
//對無序狀態的path列表按照編號升序排序
paths.sort((str1, str2) -> {
int num1;
int num2;
String string1 = str1.substring(NODE_PATH.length(), str1.length());
String string2 = str2.substring(NODE_PATH.length(), str2.length());
num1 = Integer.parseInt(string1);
num2 = Integer.parseInt(string2);
if (num1 > num2) {
return 1;
} else if (num1 < num2) {
return -1;
} else {
return 0;
}
});
}
String minId = paths.get(0);
return finalNodePath.equals(BASE_NODE_PATH + "/" + minId);
}
private class FatherWatcher implements Watcher {
private ZKLeader context;
FatherWatcher(ZKLeader context) {
this.context = context;
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {//子節點有變化
context.checkLeader();
}
}
}
}
測試
測試程式
private void zkLeaderOneTestWithMultiThread() throws Exception {
List<LeaderOneThread> leaderOneThreads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
leaderOneThreads.add(new LeaderOneThread(ZKLeader.create("127.0.0.1", "2181"), i));
}
leaderOneThreads.forEach(LeaderOneThread::start);
//執行緒0斷連
Thread.sleep(20000);
leaderOneThreads.get(0).getZkLeader().close();
Thread.sleep(2000);
System.out.println(String.format("執行緒: [%s] 斷開連線", 0));
//執行緒1斷連
Thread.sleep(20000);
leaderOneThreads.get(1).getZkLeader().close();
System.out.println(String.format("執行緒: [%s] 斷開連線", 1));
//執行緒3斷連
Thread.sleep(20000);
leaderOneThreads.get(3).getZkLeader().close();
System.out.println(String.format("執行緒: [%s] 斷開連線", 3));
//執行緒4斷連
Thread.sleep(20000);
leaderOneThreads.get(4).getZkLeader().close();
System.out.println(String.format("執行緒: [%s] 斷開連線", 4));
//執行緒2斷連
Thread.sleep(20000);
leaderOneThreads.get(2).getZkLeader().close();
System.out.println(String.format("執行緒: [%s] 斷開連線", 2));
Thread.sleep(60000);
}
private class LeaderOneThread extends Thread {
private ZKLeader zkLeader;
private int threadNum;
public ZKLeader getZkLeader() {
return zkLeader;
}
LeaderOneThread(ZKLeader zkLeader, int threadNum) {
this.zkLeader = zkLeader;
this.threadNum = threadNum;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
Date dt = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String currentTime = sdf.format(dt);
if (zkLeader.leader()) {
System.out.println(String.format("[%s] 執行緒: [%s] 是主節點", currentTime, threadNum));
}
}
}
}
結果:
[2017-11-30 17:05:02] 執行緒: [0] 是主節點
[2017-11-30 17:05:07] 執行緒: [0] 是主節點
[2017-11-30 17:05:12] 執行緒: [0] 是主節點
執行緒: [0] 斷開連線
[2017-11-30 17:05:22] 執行緒: [1] 是主節點
[2017-11-30 17:05:27] 執行緒: [1] 是主節點
[2017-11-30 17:05:32] 執行緒: [1] 是主節點
[2017-11-30 17:05:37] 執行緒: [1] 是主節點
執行緒: [1] 斷開連線
[2017-11-30 17:05:42] 執行緒: [2] 是主節點
[2017-11-30 17:05:47] 執行緒: [2] 是主節點
[2017-11-30 17:05:52] 執行緒: [2] 是主節點
[2017-11-30 17:05:57] 執行緒: [2] 是主節點
執行緒: [3] 斷開連線
[2017-11-30 17:06:02] 執行緒: [2] 是主節點
[2017-11-30 17:06:07] 執行緒: [2] 是主節點
[2017-11-30 17:06:12] 執行緒: [2] 是主節點
[2017-11-30 17:06:17] 執行緒: [2] 是主節點
執行緒: [4] 斷開連線
[2017-11-30 17:06:22] 執行緒: [2] 是主節點
[2017-11-30 17:06:27] 執行緒: [2] 是主節點
[2017-11-30 17:06:32] 執行緒: [2] 是主節點
[2017-11-30 17:06:37] 執行緒: [2] 是主節點
執行緒: [2] 斷開連線
[2017-11-30 17:06:42] 執行緒: [5] 是主節點
[2017-11-30 17:06:47] 執行緒: [5] 是主節點
[2017-11-30 17:06:52] 執行緒: [5] 是主節點
[2017-11-30 17:06:57] 執行緒: [5] 是主節點
[2017-11-30 17:07:02] 執行緒: [5] 是主節點
[2017-11-30 17:07:07] 執行緒: [5] 是主節點
[2017-11-30 17:07:12] 執行緒: [5] 是主節點
[2017-11-30 17:07:17] 執行緒: [5] 是主節點
[2017-11-30 17:07:22] 執行緒: [5] 是主節點
[2017-11-30 17:07:27] 執行緒: [5] 是主節點
[2017-11-30 17:07:32] 執行緒: [5] 是主節點
[2017-11-30 17:07:37] 執行緒: [5] 是主節點
方案一優劣
優點
- 實現對父節點變動狀態(主要是子節點列表變化)的監聽
- 當子節點列表出現變化後,ZK通知監聽的各個程序,各個程序查詢子節點狀態
- 對父節點進行監聽,實現起來相對簡單
劣勢
- 每個程序都監聽父節點狀態,即父節點出現變動(主要是子節點列表變化)後,ZK伺服器需要通知到所有註冊監聽的程序,網路消耗和資源浪費比較大
方案三 :子節點監聽方式
實現原理
程式流程圖如下:
實現程式碼
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* Created by xuyh at 2017/11/30 14:40.
* <p>
* **最優方案**
* <pre>
* 方案三:子節點監聽方式
*
* 實現思路:監聽子節點狀態
* 1.在父節點(持久化)下建立臨時節點,實際建立的節點路徑會根據數量進行自增(ZK自編號方式建立節點)。
* 2.建立節點成功後,首先獲取父節點下的子節點列表,判斷本執行緒的路徑字尾編號是否是所有子節點中最小的,若是則成為leader,反之監聽本節點前一個節點(路徑排序為本節點路徑數字減一的節點)變動狀態(通過getData()方法註冊watcher)
* 3.當監聽物件狀態變動(節點刪除狀態)後watcher會接收到通知,這時再次判斷父節點下的子節點的排序狀態,若滿足本執行緒的路徑字尾編號最小則成為leader,反之繼續註冊watcher監聽前一個節點狀態
*/
public class ZKLeaderTwo {
private static ZKLeaderTwo zkLeaderTwo;
private Logger logger = LoggerFactory.getLogger(ZKLeader.class);
private final static String BASE_NODE_PATH = "/ZKLeader_Leader";
private final static String NODE_PATH = "host_process_no_";
private String finalNodePath;
//是否是主節點標誌位
private boolean leader = false;
private String host = "127.0.0.1";
private String port = "2181";
private ZooKeeper zooKeeper;
private PreviousNodeWatcher previousNodeWatcher;
//是否連線成功標誌位
private boolean connected = false;
public static ZKLeaderTwo create(String host, String port) {
ZKLeaderTwo zkLeaderTwo = new ZKLeaderTwo(host, port);
zkLeaderTwo.connectZookeeper();
return zkLeaderTwo;
}
public boolean leader() {
return leader;
}
public void close() {
disconnectZooKeeper();
}
private ZKLeaderTwo(String host, String port) {
this.host = host;
this.port = port;
this.previousNodeWatcher = new PreviousNodeWatcher(this);
}
private boolean connectZookeeper() {
try {
zooKeeper = new ZooKeeper(host + ":" + port, 60000, event -> {
if (event.getState() == Watcher.Event.KeeperState.AuthFailed) {
leader = false;
} else if (event.getState() == Watcher.Event.KeeperState.Disconnected) {
leader = false;
} else if (event.getState() == Watcher.Event.KeeperState.Expired) {
leader = false;
} else {
if (event.getType() == Watcher.Event.EventType.None) {//說明連線成功了
connected = true;
}
}
});
int i = 1;
while (!connected) {//等待非同步連線成功,超過時間30s則退出等待
if (i == 100)
break;
Thread.sleep(300);
i++;
}
if (connected) {
if (zooKeeper.exists(BASE_NODE_PATH, false) == null) {//建立父節點
zooKeeper.create(BASE_NODE_PATH, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//建立子節點
finalNodePath = zooKeeper.create(BASE_NODE_PATH + "/" + NODE_PATH, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//檢查一次是否是主節點
checkLeader();
} else {
logger.warn("Connect zookeeper failed. Time consumes 30 s");
return false;
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return false;
}
return true;
}
private boolean disconnectZooKeeper() {
if (zooKeeper == null)
return false;
try {
zooKeeper.close();
connected = false;
leader = false;
} catch (Exception e) {
logger.warn(String.format("ZK disconnect failed. [%s]", e.getMessage()), e);
}
return true;
}
private void checkLeader() {
if (!connected)
return;
try {
//獲取子節點列表,若沒有成為leader,註冊監聽,監聽物件應當是比本節點路徑編號小一(或者排在前面一位)的節點
List<String> childrenList = zooKeeper.getChildren(BASE_NODE_PATH, false);
if (judgePathNumMin(childrenList)) {
leader = true;//成為leader
} else {
watchPreviousNode(childrenList);
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
private boolean judgePathNumMin(List<String> paths) {
if (paths.isEmpty())
return true;
if (paths.size() >= 2) {
//對無序狀態的path列表按照編號升序排序
paths.sort((str1, str2) -> {
int num1;
int num2;
String string1 = str1.substring(NODE_PATH.length(), str1.length());
String string2 = str2.substring(NODE_PATH.length(), str2.length());
num1 = Integer.parseInt(string1);
num2 = Integer.parseInt(string2);
if (num1 > num2) {
return 1;
} else if (num1 < num2) {
return -1;
} else {
return 0;
}
});
}
String minId = paths.get(0);
return finalNodePath.equals(BASE_NODE_PATH + "/" + minId);
}
private void watchPreviousNode(List<String> paths) {
if (paths.isEmpty() || paths.size() == 1) {
return;
}
int currentNodeIndex = paths.indexOf(finalNodePath.substring((BASE_NODE_PATH + "/").length(), finalNodePath.length()));
String previousNodePath = BASE_NODE_PATH + "/" + paths.get(currentNodeIndex - 1);
//通過getData方法再次註冊watcher
try {
zooKeeper.getData(previousNodePath, previousNodeWatcher, new Stat());
} catch (Exception e) {
logger.warn(String.format("Previous node watcher register failed! message: [%s]", e.getMessage()), e);
}
}
private class PreviousNodeWatcher implements Watcher {
private ZKLeaderTwo context;
PreviousNodeWatcher(ZKLeaderTwo context) {
this.context = context;
}
@Override
public void process(WatchedEvent event) {
//節點被刪除了,說明這個節點放棄了leader
if (event.getType() == Event.EventType.NodeDeleted) {
context.checkLeader();
}
}
}
}
測試
測試程式
private void zkLeaderTwoTestWithMultiThread() throws Exception {
List<LeaderTwoThread> leaderTwoThreads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
leaderTwoThreads.add(new LeaderTwoThread(ZKLeaderTwo.create("127.0.0.1", "2181"), i));
}
leaderTwoThreads.forEach(LeaderTwoThread::start);
//執行緒0斷連
Thread.sleep(20000);
leaderTwoThreads.get(0).getZkLeaderTwo().close();
System.out.println(String.format("執行緒: [%s] 斷開連線", 0));
//執行緒1斷連
Thread.sleep(20000);
leaderTwoThreads.get(1).getZkLeaderTwo().close();
System.out.println(String.format("執行緒: [%s] 斷開連線", 1));
//執行緒3斷連
Thread.sleep(20000);
leaderTwoThreads.get(3).getZkLeaderTwo().close();
System.out.println(String.format("執行緒: [%s] 斷開連線", 3));
//執行緒4斷連
Thread.sleep(20000);
leaderTwoThreads.get(4).getZkLeaderTwo().close();
System.out.println(String.format("執行緒: [%s] 斷開連線", 4));
//執行緒2斷連
Thread.sleep(20000);
leaderTwoThreads.get(2).getZkLeaderTwo().close();
System.out.println(String.format("執行緒: [%s] 斷開連線", 2));
Thread.sleep(60000);
}
private class LeaderTwoThread extends Thread {
private ZKLeaderTwo zkLeaderTwo;
private int threadNum;
public ZKLeaderTwo getZkLeaderTwo() {
return zkLeaderTwo;
}
LeaderTwoThread(ZKLeaderTwo zkLeaderTwo, int threadNum) {
this.zkLeaderTwo = zkLeaderTwo;
this.threadNum = threadNum;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
Date dt = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String currentTime = sdf.format(dt);
if (zkLeaderTwo.leader()) {
System.out.println(String.format("[%s] 執行緒: [%s] 是主節點", currentTime, threadNum));
}
}
}
}
結果:
[2017-11-30 16:47:41] 執行緒: [0] 是主節點
[2017-11-30 16:47:46] 執行緒: [0] 是主節點
[2017-11-30 16:47:51] 執行緒: [0] 是主節點
[2017-11-30 16:47:56] 執行緒: [0] 是主節點
執行緒: [0] 斷開連線
[2017-11-30 16:48:01] 執行緒: [1] 是主節點
[2017-11-30 16:48:06] 執行緒: [1] 是主節點
[2017-11-30 16:48:11] 執行緒: [1] 是主節點
[2017-11-30 16:48:16] 執行緒: [1] 是主節點
執行緒: [1] 斷開連線
[2017-11-30 16:48:21] 執行緒: [2] 是主節點
[2017-11-30 16:48:26] 執行緒: [2] 是主節點
[2017-11-30 16:48:31] 執行緒: [2] 是主節點
[2017-11-30 16:48:36] 執行緒: [2] 是主節點
執行緒: [3] 斷開連線
[2017-11-30 16:48:41] 執行緒: [2] 是主節點
[2017-11-30 16:48:46] 執行緒: [2] 是主節點
[2017-11-30 16:48:51] 執行緒: [2] 是主節點
[2017-11-30 16:48:56] 執行緒: [2] 是主節點
執行緒: [4] 斷開連線
[2017-11-30 16:49:01] 執行緒: [2] 是主節點
[2017-11-30 16:49:06] 執行緒: [2] 是主節點
[2017-11-30 16:49:11] 執行緒: [2] 是主節點
[2017-11-30 16:49:16] 執行緒: [2] 是主節點
執行緒: [2] 斷開連線
[2017-11-30 16:49:21] 執行緒: [5] 是主節點
[2017-11-30 16:49:26] 執行緒: [5] 是主節點
[2017-11-30 16:49:31] 執行緒: [5] 是主節點
[2017-11-30 16:49:36] 執行緒: [5] 是主節點
[2017-11-30 16:49:41] 執行緒: [5] 是主節點
[2017-11-30 16:49:46] 執行緒: [5] 是主節點
[2017-11-30 16:49:51] 執行緒: [5] 是主節點
[2017-11-30 16:49:56] 執行緒: [5] 是主節點
[2017-11-30 16:50:01] 執行緒: [5] 是主節點
[2017-11-30 16:50:06] 執行緒: [5] 是主節點
[2017-11-30 16:50:11] 執行緒: [5] 是主節點
[2017-11-30 16:50:16] 執行緒: [5] 是主節點
方案二優劣
優點
- 實現對子節點變動狀態(排序在本程序對應節點之前的一個節點)的監聽
- 被監聽子節點變動(刪除)之後,ZK通知本程序執行相應操作,判斷是否成為leader
- 相對於父節點監聽方式,子節點監聽方式在每一次鎖釋放(或者節點變動)時,ZK僅通知到一個程序的watcher,節省了大量的網路消耗和資源佔用
劣勢
- 實現方式與程式邏輯較父節點監聽來說比較繁瑣
總結比較
程式複雜度:
父節點監聽方式 < 子節點監聽方式網路資源消耗:
父節點監聽方式 >> 子節點監聽方式程式可靠性
父節點監聽方式 < 子節點監聽方式