zookeeper master 選舉
原文地址:
http://www.cnblogs.com/nevermorewang/p/5611807.html
選主原理介紹:zookeeper的節點有兩種類型,持久節點跟臨時節點。臨時節點有個特性,就是如果註冊這個節點的機器失去連接(通常是宕機),那麽這個節點會被zookeeper刪除。選主過程就是利用這個特性,在服務器啟動的時候,去zookeeper特定的一個目錄下註冊一個臨時節點(這個節點作為master,誰註冊了這個節點誰就是master),註冊的時候,如果發現該節點已經存在,則說明已經有別的服務器註冊了(也就是有別的服務器已經搶主成功),那麽當前服務器只能放棄搶主,作為從機存在。同時,搶主失敗的當前服務器需要訂閱該臨時節點的刪除事件,以便該節點刪除時(也就是註冊該節點的服務器宕機了或者網絡斷了之類的)進行再次搶主操作。從機具體需要去哪裏註冊服務器列表的臨時節點,節點保存什麽信息,根據具體的業務不同自行約定。選主的過程,其實就是簡單的爭搶在zookeeper註冊臨時節點的操作,誰註冊了約定的臨時節點,誰就是master。
主要有兩個類,WorkServer為主服務類,RunningData用於記錄運行數據。因為是簡單的demo,我們只做搶master節點的編碼,對於從節點應該去哪裏註冊服務列表信息,不作編碼。
采用zkClient實現,代碼如下:
WorkServer類
package com.zookeeper.master; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkInterruptedException;View Codeimport org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.apache.zookeeper.CreateMode; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * Created by nevermore on 16/6/22.*/ public class WorkServer { //客戶端狀態 private volatile boolean running = false; private ZkClient zkClient; //zk主節點路徑 public static final String MASTER_PATH = "/master"; //監聽(用於監聽主節點刪除事件) private IZkDataListener dataListener; //服務器基本信息 private RunningData serverData; //主節點基本信息 private RunningData masterData; //調度器 private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1); //延遲時間5s private int delayTime = 5; public WorkServer(RunningData runningData){ this.serverData = runningData; this.dataListener = new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { } @Override public void handleDataDeleted(String s) throws Exception { //takeMaster(); if(masterData != null && masterData.getName().equals(serverData.getName())){ //若之前master為本機,則立即搶主,否則延遲5秒搶主(防止小故障引起的搶主可能導致的網絡數據風暴) takeMaster(); }else{ delayExector.schedule(new Runnable() { @Override public void run() { takeMaster(); } },delayTime, TimeUnit.SECONDS); } } }; } //啟動 public void start() throws Exception{ if(running){ throw new Exception("server has startup...."); } running = true; zkClient.subscribeDataChanges(MASTER_PATH,dataListener); takeMaster(); } //停止 public void stop() throws Exception{ if(!running){ throw new Exception("server has stopped....."); } running = false; delayExector.shutdown(); zkClient.unsubscribeDataChanges(MASTER_PATH,dataListener); releaseMaster(); } //搶註主節點 private void takeMaster(){ if(!running) return ; try { zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL); masterData = serverData; System.out.println(serverData.getName()+" is master"); delayExector.schedule(new Runnable() {//測試搶主用,每5s釋放一次主節點 @Override public void run() { if(checkMaster()){ releaseMaster(); } } },5,TimeUnit.SECONDS); }catch (ZkNodeExistsException e){//節點已存在 RunningData runningData = zkClient.readData(MASTER_PATH,true); if(runningData == null){//讀取主節點時,主節點被釋放 takeMaster(); }else{ masterData = runningData; } } catch (Exception e) { // ignore; } } //釋放主節點 private void releaseMaster(){ if(checkMaster()){ zkClient.delete(MASTER_PATH); } } //檢驗自己是否是主節點 private boolean checkMaster(){ try { RunningData runningData = zkClient.readData(MASTER_PATH); masterData = runningData; if (masterData.getName().equals(serverData.getName())) { return true; } return false; }catch (ZkNoNodeException e){//節點不存在 return false; }catch (ZkInterruptedException e){//網絡中斷 return checkMaster(); }catch (Exception e){//其它 return false; } } public void setZkClient(ZkClient zkClient) { this.zkClient = zkClient; } public ZkClient getZkClient() { return zkClient; } }
RunningData類:
package com.zookeeper.master; import java.io.Serializable; /** * Created by nevermore on 16/6/22. */ public class RunningData implements Serializable { private static final long serialVersionUID = 4260577459043203630L; //服務器id private long cid; //服務器名稱 private String name; public long getCid() { return cid; } public void setCid(long cid) { this.cid = cid; } public String getName() { return name; } public void setName(String name) { this.name = name; } }View Code
說明:在實際生產環境中,可能會由於插拔網線等導致網絡短時的不穩定,也就是網絡抖動。由於正式生產環境中可能server在zk上註冊的信息是比較多的,而且server的數量也是比較多的,那麽每一次切換主機,每臺server要同步的數據量(比如要獲取誰是master,當前有哪些salve等信息,具體視業務不同而定)也是比較大的。那麽我們希望,這種短時間的網絡抖動最好不要影響我們的系統穩定,也就是最好選出來的master還是原來的機器,那麽就可以避免發現master更換後,各個salve因為要同步數據等導致的zk數據網絡風暴。所以在WorkServer中,54-63行,我們搶主的時候,如果之前主機是本機,則立即搶主,否則延遲5s搶主。這樣就給原來主機預留出一定時間讓其在新一輪選主中占據優勢,從而利於環境穩定。
測試代碼:
package com.zookeeper.master; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.SerializableSerializer; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; /** * Created by nevermore on 16/6/23. */ public class LeaderSelectorZkClient { //啟動的服務個數 private static final int CLIENT_QTY = 10; //zookeeper服務器的地址 private static final String ZOOKEEPER_SERVER = "localhost:2181"; public static void main(String[] args) throws Exception{ //保存所有zkClient的列表 List<ZkClient> clients = new ArrayList<ZkClient>(); //保存所有服務的列表 List<WorkServer> workServers = new ArrayList<WorkServer>(); try{ for ( int i = 0; i < CLIENT_QTY; ++i ){ //創建zkClient ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer()); clients.add(client); //創建serverData RunningData runningData = new RunningData(); runningData.setCid(Long.valueOf(i)); runningData.setName("Client #" + i); //創建服務 WorkServer workServer = new WorkServer(runningData); workServer.setZkClient(client); workServers.add(workServer); workServer.start(); } System.out.println("敲回車鍵退出!\n"); new BufferedReader(new InputStreamReader(System.in)).readLine(); }finally{ System.out.println("Shutting down..."); for ( WorkServer workServer : workServers ){ try { workServer.stop(); } catch (Exception e) { e.printStackTrace(); } } for ( ZkClient client : clients ){ try { client.close(); } catch (Exception e) { e.printStackTrace(); } } } } }View Code
zookeeper master 選舉