zookeeper【4】master選舉
阿新 • • 發佈:2018-11-27
考慮7*24小時向外提供服務的系統,不能有單點故障,於是我們使用叢集,採用的是Master+Slave。叢集中有一臺主機和多臺備機,由主機向外提 供服務,備機監聽主機狀態,一旦主機宕機,備機必需迅速接管主機繼續向外提供服務。在這個過程中,從備機選出一臺機作為主機的過程,就是Master選 舉。
架構圖:
左邊是ZooKeeper叢集,右邊是3臺工作伺服器。工作伺服器啟動時,會去ZooKeeper的Servers節點下建立臨時節點,並把基本資訊寫入 臨時節點。這個過程叫服務註冊,系統中的其他服務可以通過獲取Servers節點的子節點列表,來了解當前系統哪些伺服器可用,這該過程叫做服務發現。接 著這些伺服器會嘗試建立Master臨時節點,誰建立成功誰就是Master,其他的兩臺就作為Slave。所有的Work Server必需關注Master節點的刪除事件。通過監聽Master節點的刪除事件,來了解Master伺服器是否宕機(建立臨時節點的伺服器一旦宕 機,它所建立的臨時節點即會自動刪除)。一旦Master伺服器宕機,必需開始新一輪的Master選舉。
實現程式碼:
/** * 排程器 */ public class LeaderSelectorZkClient { //啟動的服務個數 private static final int CLIENT_QTY = 10; //zookeeper伺服器的地址 private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181"; public static void main(String[] args) throws Exception {//儲存所有zkClient的列表 List<ZkClient> clients = new ArrayList<ZkClient>(); //儲存所有服務的列表 List<WorkServer> workServers = new ArrayList<WorkServer>(); try { for ( int i = 0; i < CLIENT_QTY; ++i ) { // 模擬建立10個伺服器並啟動 //建立zkClient ZkClient client = newZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer()); clients.add(client); //建立serverData RunningData runningData = new RunningData(); runningData.setCid(Long.valueOf(i)); runningData.setName("Client #" + i); //建立服務 WorkServer workServer = new WorkServer(runningData); workServer.setZkClient(client); workServers.add(workServer); workServer.start(); } System.out.println("敲回車鍵退出!\n"); new BufferedReader(new InputStreamReader(System.in)).readLine(); } finally { System.out.println("Shutting down..."); for ( WorkServer workServer : workServers ) { try { workServer.stop(); } catch (Exception e) { e.printStackTrace(); } } for ( ZkClient client : clients ) { try { client.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
import java.io.Serializable; /** * 工作伺服器資訊 */ public class RunningData implements Serializable { private static final long serialVersionUID = 4260577459043203630L; private Long cid; private String name; public Long getCid() { return cid; } public void setCid(Long cid) { this.cid = cid; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
/** * 工作伺服器 */ public class WorkServer { // 記錄伺服器狀態 private volatile boolean running = false; private ZkClient zkClient; // Master節點對應zookeeper中的節點路徑 private static final String MASTER_PATH = "/master"; // 監聽Master節點刪除事件 private IZkDataListener dataListener; // 記錄當前節點的基本資訊 private RunningData serverData; // 記錄叢集中Master節點的基本資訊 private RunningData masterData; private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1); private int delayTime = 5; public WorkServer(RunningData rd) { this.serverData = rd; // 記錄伺服器基本資訊 this.dataListener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { //takeMaster(); if (masterData != null && masterData.getName().equals(serverData.getName())){ // 自己就是上一輪的Master伺服器,則直接搶 takeMaster(); } else { // 否則,延遲5秒後再搶。主要是應對網路抖動,給上一輪的Master伺服器優先搶佔master的權利,避免不必要的資料遷移開銷 delayExector.schedule(new Runnable(){ public void run(){ takeMaster(); } }, delayTime, TimeUnit.SECONDS); } } public void handleDataChange(String dataPath, Object data) throws Exception { } }; } public ZkClient getZkClient() { return zkClient; } public void setZkClient(ZkClient zkClient) { this.zkClient = zkClient; } // 啟動伺服器 public void start() throws Exception { if (running) { throw new Exception("server has startup..."); } running = true; // 訂閱Master節點刪除事件 zkClient.subscribeDataChanges(MASTER_PATH, dataListener); // 爭搶Master權利 takeMaster(); } // 停止伺服器 public void stop() throws Exception { if (!running) { throw new Exception("server has stoped"); } running = false; delayExector.shutdown(); // 取消Master節點事件訂閱 zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener); // 釋放Master權利 releaseMaster(); } // 爭搶Master private void takeMaster() { if (!running) return; try { // 嘗試建立Master臨時節點 zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL); masterData = serverData; System.out.println(serverData.getName()+" is master"); // 作為演示,我們讓伺服器每隔5秒釋放一次Master權利 delayExector.schedule(new Runnable() { public void run() { // TODO Auto-generated method stub if (checkMaster()){ releaseMaster(); } } }, 5, TimeUnit.SECONDS); } catch (ZkNodeExistsException e) { // 已被其他伺服器建立了 // 讀取Master節點資訊 RunningData runningData = zkClient.readData(MASTER_PATH, true); if (runningData == null) { takeMaster(); // 沒讀到,讀取瞬間Master節點宕機了,有機會再次爭搶 } else { masterData = runningData; } } catch (Exception e) { // ignore; } } // 釋放Master權利 private void releaseMaster() { if (checkMaster()) { zkClient.delete(MASTER_PATH); } } // 檢測自己是否為Master private boolean checkMaster() { try { RunningData eventData = zkClient.readData(MASTER_PATH); masterData = eventData; if (masterData.getName().equals(serverData.getName())) { return true; } return false; } catch (ZkNoNodeException e) { return false; // 節點不存在,自己肯定不是Master了 } catch (ZkInterruptedException e) { return checkMaster(); } catch (ZkException e) { return false; } } }