zookeeper二之Leader選舉
轉載:https://blog.csdn.net/u010670689/article/details/78054945
竊以為,對於zookeeper這種東西,僅僅知道怎麼安裝是遠遠不夠的(廢話麼這不是,,,),至少要對其幾個典型的應用場景進行了解,才能比較全面的知道zk究竟能幹啥,怎麼玩兒,以後的日子裡才能知道這貨如何能為我所用。於是,有了如下的學習:
我們知道zookeeper可以用於搭建高可用服務框架,主要先看以下幾個應用場景:
1、 master的選舉基本思路和編碼實現
2、 資料的釋出和訂閱
3、 軟負載均衡
4、 分散式佇列
5、 分散式鎖
6、 命名服務
目前zookeeper常用的開發包有zkclient跟curator,後者更為方便,日常開發使用較多。
----------------正文分割線-----------------------------------------------------------
master選舉
1、使用場景及結構
現在很多時候我們的服務需要7*24小時工作,假如一臺機器掛了,我們希望能有其它機器頂替它繼續工作。此類問題現在多采用master-salve模式,也就是常說的主從模式,正常情況下主機提供服務,備機負責監聽主機狀態,當主機異常時,可以自動切換到備機繼續提供服務(這裡有點兒類似於資料庫主庫跟備庫,備機正常情況下只監聽,不工作),這個切換過程中選出下一個主機的過程就是master選舉。
對於以上提到的場景,傳統的解決方式是採用一個備用節點,這個備用節點定期給當前主節點發送ping包,主節點收到ping包後會向備用節點發送應答ack,當備用節點收到應答,就認為主節點還活著,讓它繼續提供服務,否則就認為主節點掛掉了,自己將開始行使主節點職責。如圖1所示:
圖1
但這種方式會存在一個隱患,就是網路故障問題。看一下圖2:
圖2
也就是說,我們的主節點並沒有掛掉,只是在備用節點ping主節點,請求應答的時候發生網路故障,這樣我們的備用節點同樣收不到應答,就會認為主節點掛掉,然後備機會啟動自己的master例項。這樣就會導致系統中有兩個主節點,也就是雙master。出現雙master以後,我們的從節點會將它做的事情一部分彙報給主節點,一部分彙報給備用節點,這樣服務就亂套了。為了防止這種情況出現,我們可以考慮採用zookeeper,雖然它不能阻止網路故障的出現,但它能保證同一時刻系統中只存在一個主節點。我們來看zookeeper是怎麼實現的:
在此處,搶主程式是包含在服務程式中,需要程式設計師來手動寫搶主邏輯的,比如噹噹開源框架elastic-job中,就有關於選主的部分,參見:elastic-job-core/main/java/com/dangdang/ddframe/job/internal/election資料夾下的選主程式碼。
一點額外的話:zookeeper自己在叢集環境下的搶主演算法有三種,可以通過配置檔案來設定,預設採用FastLeaderElection,不作贅述;此處主要討論叢集環境中,應用程式利用master的特點,自己選主的過程。程式自己選主,每個人都有自己的一套演算法,有采用“最小編號”的,有采用類似“多數投票”的,各有優劣,本文的演算法僅作演示理解使用:
結構圖:
結構圖解釋:左側樹狀結構為zookeeper叢集,右側為程式伺服器。所有的伺服器在啟動的時候,都會訂閱zookeeper中master節點的刪除事件,以便在主伺服器掛掉的時候進行搶主操作;所有伺服器同時會在servers節點下注冊一個臨時節點(儲存自己的基本資訊),以便於應用程式讀取當前可用的伺服器列表。
選主原理介紹:zookeeper的節點有兩種型別,持久節點跟臨時節點。臨時節點有個特性,就是如果註冊這個節點的機器失去連線(通常是宕機),那麼這個節點會被zookeeper刪除。選主過程就是利用這個特性,在伺服器啟動的時候,去zookeeper特定的一個目錄下注冊一個臨時節點(這個節點作為master,誰註冊了這個節點誰就是master),註冊的時候,如果發現該節點已經存在,則說明已經有別的伺服器註冊了(也就是有別的伺服器已經搶主成功),那麼當前伺服器只能放棄搶主,作為從機存在。同時,搶主失敗的當前伺服器需要訂閱該臨時節點的刪除事件,以便該節點刪除時(也就是註冊該節點的伺服器宕機了或者網路斷了之類的)進行再次搶主操作。從機具體需要去哪裡註冊伺服器列表的臨時節點,節點儲存什麼資訊,根據具體的業務不同自行約定。選主的過程,其實就是簡單的爭搶在zookeeper註冊臨時節點的操作,誰註冊了約定的臨時節點,誰就是master。
ps:本文的例子中,並未用到結構圖server節點下的資料。但換一種演算法或者業務場景就會用到,演算法比如提到的最小編號,主要邏輯是主節點掛掉後,從節點裡邊編號最小的成為主節點,此時會用到該節點內容。換一種業務場景:叢集環境中,有很多工要處理, 主節點負責接收任務,並根據一定演算法將任務分配到不同的機器上執行;這種情況下,主節點跟從節點的職責也是不同的,主節點掛掉也會涉及到從節點進行master選舉的問題。這種情況下,很顯然,作為主節點需要知道當前有多少個從節點還活著,那麼此時也會需要用到servers節點下的資料了。
2、編碼實現
主要有兩個類,WorkServer為主服務類,RunningData用於記錄執行資料。因為是簡單的demo,我們只做搶master節點的編碼,對於從節點應該去哪裡註冊服務列表資訊,不作編碼。
採用zkClient實現,程式碼如下:
WorkServer類:
package mastersalve;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.zookeeper.CreateMode;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Created by nevermore on 16/6/22.
*/
public class WorkServer {
// 客戶端狀態
private volatile boolean running = false;
private ZkClient zkClient;
// zk主節點路徑
public static final String MASTER_PATH = "/master";
// 監聽(用於監聽主節點刪除事件)
private IZkDataListener dataListener;
// 伺服器基本資訊
private RunningData serverData;
// 主節點基本資訊
private RunningData masterData;
// 排程器
private ScheduledExecutorService delayExector = Executors
.newScheduledThreadPool(1);
// 延遲時間5s
private int delayTime = 5;
public WorkServer(RunningData runningData) {
this.serverData = runningData;
this.dataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
// takeMaster();
if (masterData != null
&& masterData.getName().equals(serverData.getName())) {// 若之前master為本機,則立即搶主,否則延遲5秒搶主(防止小故障引起的搶主可能導致的網路資料風暴)
takeMaster();
} else {
delayExector.schedule(new Runnable() {
@Override
public void run() {
takeMaster();
}
}, delayTime, TimeUnit.SECONDS);
}
}
};
}
// 啟動
public void start() throws Exception {
if (running) {
throw new Exception("server has startup....");
}
running = true;
zkClient.subscribeDataChanges(MASTER_PATH, dataListener);
takeMaster();
}
// 停止
public void stop() throws Exception {
if (!running) {
throw new Exception("server has stopped.....");
}
running = false;
delayExector.shutdown();
zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener);
releaseMaster();
}
// 搶注主節點
private void takeMaster() {
if (!running)
return;
try {
zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL);
masterData = serverData;
System.out.println(serverData.getName() + " is master");
delayExector.schedule(new Runnable() {// 測試搶主用,每5s釋放一次主節點
@Override
public void run() {
if (checkMaster()) {
releaseMaster();
}
}
}, 5, TimeUnit.SECONDS);
} catch (ZkNodeExistsException e) {// 節點已存在
RunningData runningData = zkClient.readData(MASTER_PATH, true);
if (runningData == null) {// 讀取主節點時,主節點被釋放
takeMaster();
} else {
masterData = runningData;
}
} catch (Exception e) {
// ignore;
}
}
// 釋放主節點
private void releaseMaster() {
if (checkMaster()) {
zkClient.delete(MASTER_PATH);
}
}
// 檢驗自己是否是主節點
private boolean checkMaster() {
try {
RunningData runningData = zkClient.readData(MASTER_PATH);
masterData = runningData;
if (masterData.getName().equals(serverData.getName())) {
return true;
}
return false;
} catch (ZkNoNodeException e) {// 節點不存在
return false;
} catch (ZkInterruptedException e) {// 網路中斷
return checkMaster();
} catch (Exception e) {// 其它
return false;
}
}
public void setZkClient(ZkClient zkClient) {
this.zkClient = zkClient;
}
public ZkClient getZkClient() {
return zkClient;
}
}
RunningData類:
package mastersalve;
import java.io.Serializable;
/**
* Created by nevermore on 16/6/22.
*/
public class RunningData implements Serializable {
private static final long serialVersionUID = 4260577459043203630L;
// 伺服器id
private long cid;
// 伺服器名稱
private String name;
public long getCid() {
return cid;
}
public void setCid(long cid) {
this.cid = cid;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
說明:在實際生產環境中,可能會由於插拔網線等導致網路短時的不穩定,也就是網路抖動。由於正式生產環境中可能server在zk上註冊的資訊是比較多的,而且server的數量也是比較多的,那麼每一次切換主機,每臺server要同步的資料量(比如要獲取誰是master,當前有哪些salve等資訊,具體視業務不同而定)也是比較大的。那麼我們希望,這種短時間的網路抖動最好不要影響我們的系統穩定,也就是最好選出來的master還是原來的機器,那麼就可以避免發現master更換後,各個salve因為要同步資料等導致的zk資料網路風暴。所以在WorkServer中,54-63行,我們搶主的時候,如果之前主機是本機,則立即搶主,否則延遲5s搶主。這樣就給原來主機預留出一定時間讓其在新一輪選主中佔據優勢,從而利於環境穩定。
測試程式碼:
package mastersalve;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
/**
* Created by nevermore on 16/6/23.
*/
public class LeaderSelectorZkClient {
// 啟動的服務個數
private static final int CLIENT_QTY = 10;
// zookeeper伺服器的地址
private static final String ZOOKEEPER_SERVER = "localhost:2181";
public static void main(String[] args) throws Exception {
// 儲存所有zkClient的列表
List<ZkClient> clients = new ArrayList<ZkClient>();
// 儲存所有服務的列表
List<WorkServer> workServers = new ArrayList<WorkServer>();
try {
for (int i = 0; i < CLIENT_QTY; ++i) {
// 建立zkClient
ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000,
new SerializableSerializer());
clients.add(client);
// 建立serverData
RunningData runningData = new RunningData();
runningData.setCid(Long.valueOf(i));
runningData.setName("Client #" + i);
// 建立服務
WorkServer workServer = new WorkServer(runningData);
workServer.setZkClient(client);
workServers.add(workServer);
workServer.start();
}
System.out.println("敲回車鍵退出!\n");
new BufferedReader(new InputStreamReader(System.in)).readLine();
} finally {
System.out.println("Shutting down...");
for (WorkServer workServer : workServers) {
try {
workServer.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
for (ZkClient client : clients) {
try {
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
兩次測試,本地模擬10臺server,分別不啟用防止網路抖動跟啟動防抖動兩次測試結果如下:
未啟動防抖動:
啟用防抖動:
可以看到,未啟用的時候,斷線後重新選出的主機是隨機的,沒規律;啟用防抖動後,每次選出的master都是id為0的機器。
-----------------------------------------------------------------------------------------------------------------------------
至此,我們已經通過編碼實現了簡單的master選舉。但是,不知你有沒有發現,,,,這個選主過程的程式碼還真是麻煩啊!
我們只是做一個demo,其中並未考慮複雜的業務場景,但其中的 監聽,異常 等程式碼的處理還是讓我覺得有些頭大,怎麼辦?Curator應運而生!
為了熟悉Apache Curator,接下來,將用curator來實現master選舉的demo。