1. 程式人生 > >zookeeper master 選舉

zookeeper master 選舉

eve 讀取 hid ast ktr 代碼 new del nod

原文地址:

http://www.cnblogs.com/nevermorewang/p/5611807.html

選主原理介紹:zookeeper的節點有兩種類型,持久節點跟臨時節點。臨時節點有個特性,就是如果註冊這個節點的機器失去連接(通常是宕機),那麽這個節點會被zookeeper刪除。選主過程就是利用這個特性,在服務器啟動的時候,去zookeeper特定的一個目錄下註冊一個臨時節點(這個節點作為master,誰註冊了這個節點誰就是master),註冊的時候,如果發現該節點已經存在,則說明已經有別的服務器註冊了(也就是有別的服務器已經搶主成功),那麽當前服務器只能放棄搶主,作為從機存在。同時,搶主失敗的當前服務器需要訂閱該臨時節點的刪除事件,以便該節點刪除時(也就是註冊該節點的服務器宕機了或者網絡斷了之類的)進行再次搶主操作。從機具體需要去哪裏註冊服務器列表的臨時節點,節點保存什麽信息,根據具體的業務不同自行約定。選主的過程,其實就是簡單的爭搶在zookeeper註冊臨時節點的操作,誰註冊了約定的臨時節點,誰就是master。

主要有兩個類,WorkServer為主服務類,RunningData用於記錄運行數據。因為是簡單的demo,我們只做搶master節點的編碼,對於從節點應該去哪裏註冊服務列表信息,不作編碼。

  采用zkClient實現,代碼如下:

  WorkServer類

技術分享圖片
package com.zookeeper.master;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.apache.zookeeper.CreateMode; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * Created by nevermore on 16/6/22.
*/ public class WorkServer { //客戶端狀態 private volatile boolean running = false; private ZkClient zkClient; //zk主節點路徑 public static final String MASTER_PATH = "/master"; //監聽(用於監聽主節點刪除事件) private IZkDataListener dataListener; //服務器基本信息 private RunningData serverData; //主節點基本信息 private RunningData masterData; //調度器 private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1); //延遲時間5s private int delayTime = 5; public WorkServer(RunningData runningData){ this.serverData = runningData; this.dataListener = new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { } @Override public void handleDataDeleted(String s) throws Exception { //takeMaster(); if(masterData != null && masterData.getName().equals(serverData.getName())){ //若之前master為本機,則立即搶主,否則延遲5秒搶主(防止小故障引起的搶主可能導致的網絡數據風暴) takeMaster(); }else{ delayExector.schedule(new Runnable() { @Override public void run() { takeMaster(); } },delayTime, TimeUnit.SECONDS); } } }; } //啟動 public void start() throws Exception{ if(running){ throw new Exception("server has startup...."); } running = true; zkClient.subscribeDataChanges(MASTER_PATH,dataListener); takeMaster(); } //停止 public void stop() throws Exception{ if(!running){ throw new Exception("server has stopped....."); } running = false; delayExector.shutdown(); zkClient.unsubscribeDataChanges(MASTER_PATH,dataListener); releaseMaster(); } //搶註主節點 private void takeMaster(){ if(!running) return ; try { zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL); masterData = serverData; System.out.println(serverData.getName()+" is master"); delayExector.schedule(new Runnable() {//測試搶主用,每5s釋放一次主節點 @Override public void run() { if(checkMaster()){ releaseMaster(); } } },5,TimeUnit.SECONDS); }catch (ZkNodeExistsException e){//節點已存在 RunningData runningData = zkClient.readData(MASTER_PATH,true); if(runningData == null){//讀取主節點時,主節點被釋放 takeMaster(); }else{ masterData = runningData; } } catch (Exception e) { // ignore; } } //釋放主節點 private void releaseMaster(){ if(checkMaster()){ zkClient.delete(MASTER_PATH); } } //檢驗自己是否是主節點 private boolean checkMaster(){ try { RunningData runningData = zkClient.readData(MASTER_PATH); masterData = runningData; if (masterData.getName().equals(serverData.getName())) { return true; } return false; }catch (ZkNoNodeException e){//節點不存在 return false; }catch (ZkInterruptedException e){//網絡中斷 return checkMaster(); }catch (Exception e){//其它 return false; } } public void setZkClient(ZkClient zkClient) { this.zkClient = zkClient; } public ZkClient getZkClient() { return zkClient; } }
View Code

RunningData類:

技術分享圖片
package com.zookeeper.master;

import java.io.Serializable;

/**
 * Created by nevermore on 16/6/22.
 */
public class RunningData implements Serializable {

    private static final long serialVersionUID = 4260577459043203630L;


    //服務器id
    private long cid;
    //服務器名稱
    private String name;


    public long getCid() {
        return cid;
    }

    public void setCid(long cid) {
        this.cid = cid;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
View Code

說明:在實際生產環境中,可能會由於插拔網線等導致網絡短時的不穩定,也就是網絡抖動。由於正式生產環境中可能server在zk上註冊的信息是比較多的,而且server的數量也是比較多的,那麽每一次切換主機,每臺server要同步的數據量(比如要獲取誰是master,當前有哪些salve等信息,具體視業務不同而定)也是比較大的。那麽我們希望,這種短時間的網絡抖動最好不要影響我們的系統穩定,也就是最好選出來的master還是原來的機器,那麽就可以避免發現master更換後,各個salve因為要同步數據等導致的zk數據網絡風暴。所以在WorkServer中,54-63行,我們搶主的時候,如果之前主機是本機,則立即搶主,否則延遲5s搶主。這樣就給原來主機預留出一定時間讓其在新一輪選主中占據優勢,從而利於環境穩定。

測試代碼:

技術分享圖片
package com.zookeeper.master;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

/**
 * Created by nevermore on 16/6/23.
 */
public class LeaderSelectorZkClient {

    //啟動的服務個數
    private static final int        CLIENT_QTY = 10;
    //zookeeper服務器的地址
    private static final String     ZOOKEEPER_SERVER = "localhost:2181";


    public static void main(String[] args) throws Exception{
        //保存所有zkClient的列表
        List<ZkClient> clients = new ArrayList<ZkClient>();
        //保存所有服務的列表
        List<WorkServer>  workServers = new ArrayList<WorkServer>();

        try{
            for ( int i = 0; i < CLIENT_QTY; ++i ){
                //創建zkClient
                ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer());
                clients.add(client);
                //創建serverData
                RunningData runningData = new RunningData();
                runningData.setCid(Long.valueOf(i));
                runningData.setName("Client #" + i);
                //創建服務
                WorkServer  workServer = new WorkServer(runningData);
                workServer.setZkClient(client);

                workServers.add(workServer);
                workServer.start();
            }

            System.out.println("敲回車鍵退出!\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        }finally{
            System.out.println("Shutting down...");

            for ( WorkServer workServer : workServers ){
                try {
                    workServer.stop();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            for ( ZkClient client : clients ){
                try {
                    client.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
View Code

zookeeper master 選舉