zookeeper實現主從選舉
竊以為,對於zookeeper這種東西,僅僅知道怎麼安裝是遠遠不夠的(廢話麼這不是,,,),至少要對其幾個典型的應用場景進行了解,才能比較全面的知道zk究竟能幹啥,怎麼玩兒,以後的日子裡才能知道這貨如何能為我所用。於是,有了如下的學習:
我們知道zookeeper可以用於搭建高可用服務框架,主要先看以下幾個應用場景:
1、 master的選舉基本思路和編碼實現
2、 資料的釋出和訂閱
3、 軟負載均衡
4、 分散式佇列
5、 分散式鎖
6、 命名服務
目前zookeeper常用的開發包有zkclient跟curator,後者更為方便,日常開發使用較多。
----------------正文分割線-----------------------------------------------------------
master選舉
1、使用場景及結構
現在很多時候我們的服務需要7*24小時工作,假如一臺機器掛了,我們希望能有其它機器頂替它繼續工作。此類問題現在多采用master-salve模式,也就是常說的主從模式,正常情況下主機提供服務,備機負責監聽主機狀態,當主機異常時,可以自動切換到備機繼續提供服務(這裡有點兒類似於資料庫主庫跟備庫,備機正常情況下只監聽,不工作),這個切換過程中選出下一個主機的過程就是master選舉。
對於以上提到的場景,傳統的解決方式是採用一個備用節點,這個備用節點定期給當前主節點發送ping包,主節點收到ping包後會向備用節點發送應答ack,當備用節點收到應答,就認為主節點還活著,讓它繼續提供服務,否則就認為主節點掛掉了,自己將開始行使主節點職責。如圖1所示:
圖1
但這種方式會存在一個隱患,就是網路故障問題。看一下圖2:
圖2
也就是說,我們的主節點並沒有掛掉,只是在備用節點ping主節點,請求應答的時候發生網路故障,這樣我們的備用節點同樣收不到應答,就會認為主節點掛掉,然後備機會啟動自己的master例項。這樣就會導致系統中有兩個主節點,也就是雙master。出現雙master以後,我們的從節點會將它做的事情一部分彙報給主節點,一部分彙報給備用節點,這樣服務就亂套了。為了防止這種情況出現,我們可以考慮採用zookeeper,雖然它不能阻止網路故障的出現,但它能保證同一時刻系統中只存在一個主節點。我們來看zookeeper是怎麼實現的:
在此處,搶主程式是包含在服務程式中,需要程式設計師來手動寫搶主邏輯的,比如噹噹開源框架elastic-job中,就有關於選主的部分,參見:elastic-job-core/main/java/com/dangdang/ddframe/job/internal/election資料夾下的選主程式碼。
一點額外的話:zookeeper自己在叢集環境下的搶主演算法有三種,可以通過配置檔案來設定,預設採用FastLeaderElection,不作贅述;此處主要討論叢集環境中,應用程式利用master的特點,自己選主的過程。程式自己選主,每個人都有自己的一套演算法,有采用“最小編號”的,有采用類似“多數投票”的,各有優劣,本文的演算法僅作演示理解使用:
結構圖:
結構圖解釋:左側樹狀結構為zookeeper叢集,右側為程式伺服器。所有的伺服器在啟動的時候,都會訂閱zookeeper中master節點的刪除事件,以便在主伺服器掛掉的時候進行搶主操作;所有伺服器同時會在servers節點下注冊一個臨時節點(儲存自己的基本資訊),以便於應用程式讀取當前可用的伺服器列表。
選主原理介紹:zookeeper的節點有兩種型別,持久節點跟臨時節點。臨時節點有個特性,就是如果註冊這個節點的機器失去連線(通常是宕機),那麼這個節點會被zookeeper刪除。選主過程就是利用這個特性,在伺服器啟動的時候,去zookeeper特定的一個目錄下注冊一個臨時節點(這個節點作為master,誰註冊了這個節點誰就是master),註冊的時候,如果發現該節點已經存在,則說明已經有別的伺服器註冊了(也就是有別的伺服器已經搶主成功),那麼當前伺服器只能放棄搶主,作為從機存在。同時,搶主失敗的當前伺服器需要訂閱該臨時節點的刪除事件,以便該節點刪除時(也就是註冊該節點的伺服器宕機了或者網路斷了之類的)進行再次搶主操作。從機具體需要去哪裡註冊伺服器列表的臨時節點,節點儲存什麼資訊,根據具體的業務不同自行約定。選主的過程,其實就是簡單的爭搶在zookeeper註冊臨時節點的操作,誰註冊了約定的臨時節點,誰就是master。
ps:本文的例子中,並未用到結構圖server節點下的資料。但換一種演算法或者業務場景就會用到,演算法比如提到的最小編號,主要邏輯是主節點掛掉後,從節點裡邊編號最小的成為主節點,此時會用到該節點內容。換一種業務場景:叢集環境中,有很多工要處理, 主節點負責接收任務,並根據一定演算法將任務分配到不同的機器上執行;這種情況下,主節點跟從節點的職責也是不同的,主節點掛掉也會涉及到從節點進行master選舉的問題。這種情況下,很顯然,作為主節點需要知道當前有多少個從節點還活著,那麼此時也會需要用到servers節點下的資料了。
2、編碼實現
主要有兩個類,WorkServer為主服務類,RunningData用於記錄執行資料。因為是簡單的demo,我們只做搶master節點的編碼,對於從節點應該去哪裡註冊服務列表資訊,不作編碼。
採用zkClient實現,程式碼如下:
WorkServer類:
1 package mastersalve; 2 3 import org.I0Itec.zkclient.IZkDataListener; 4 import org.I0Itec.zkclient.ZkClient; 5 import org.I0Itec.zkclient.exception.ZkInterruptedException; 6 import org.I0Itec.zkclient.exception.ZkNoNodeException; 7 import org.I0Itec.zkclient.exception.ZkNodeExistsException; 8 import org.apache.zookeeper.CreateMode; 9 10 import java.util.concurrent.Executors; 11 import java.util.concurrent.ScheduledExecutorService; 12 import java.util.concurrent.TimeUnit; 13 14 /** 15 * Created by nevermore on 16/6/22. 16 */ 17 public class WorkServer { 18 19 //客戶端狀態 20 private volatile boolean running = false; 21 22 private ZkClient zkClient; 23 24 //zk主節點路徑 25 public static final String MASTER_PATH = "/master"; 26 27 //監聽(用於監聽主節點刪除事件) 28 private IZkDataListener dataListener; 29 30 //伺服器基本資訊 31 private RunningData serverData; 32 //主節點基本資訊 33 private RunningData masterData; 34 35 //排程器 36 private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1); 37 //延遲時間5s 38 private int delayTime = 5; 39 40 41 42 public WorkServer(RunningData runningData){ 43 this.serverData = runningData; 44 this.dataListener = new IZkDataListener() { 45 @Override 46 public void handleDataChange(String s, Object o) throws Exception { 47 48 } 49 50 @Override 51 public void handleDataDeleted(String s) throws Exception { 52 //takeMaster(); 53 54 if(masterData != null && masterData.getName().equals(serverData.getName())){//若之前master為本機,則立即搶主,否則延遲5秒搶主(防止小故障引起的搶主可能導致的網路資料風暴) 55 takeMaster(); 56 }else{ 57 delayExector.schedule(new Runnable() { 58 @Override 59 public void run() { 60 takeMaster(); 61 } 62 },delayTime, TimeUnit.SECONDS); 63 } 64 65 } 66 }; 67 } 68 69 //啟動 70 public void start() throws Exception{ 71 if(running){ 72 throw new Exception("server has startup...."); 73 } 74 running = true; 75 zkClient.subscribeDataChanges(MASTER_PATH,dataListener); 76 takeMaster(); 77 } 78 79 //停止 80 public void stop() throws Exception{ 81 if(!running){ 82 throw new Exception("server has stopped....."); 83 } 84 running = false; 85 delayExector.shutdown(); 86 zkClient.unsubscribeDataChanges(MASTER_PATH,dataListener); 87 releaseMaster(); 88 } 89 90 //搶注主節點 91 private void takeMaster(){ 92 if(!running) return ; 93 94 try { 95 zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL); 96 masterData = serverData; 97 System.out.println(serverData.getName()+" is master"); 98 99 delayExector.schedule(new Runnable() {//測試搶主用,每5s釋放一次主節點 100 @Override 101 public void run() { 102 if(checkMaster()){ 103 releaseMaster(); 104 } 105 } 106 },5,TimeUnit.SECONDS); 107 108 109 }catch (ZkNodeExistsException e){//節點已存在 110 RunningData runningData = zkClient.readData(MASTER_PATH,true); 111 if(runningData == null){//讀取主節點時,主節點被釋放 112 takeMaster(); 113 }else{ 114 masterData = runningData; 115 } 116 } catch (Exception e) { 117 // ignore; 118 } 119 120 } 121 //釋放主節點 122 private void releaseMaster(){ 123 if(checkMaster()){ 124 zkClient.delete(MASTER_PATH); 125 } 126 } 127 //檢驗自己是否是主節點 128 private boolean checkMaster(){ 129 try { 130 RunningData runningData = zkClient.readData(MASTER_PATH); 131 masterData = runningData; 132 if (masterData.getName().equals(serverData.getName())) { 133 return true; 134 } 135 return false; 136 137 }catch (ZkNoNodeException e){//節點不存在 138 return false; 139 }catch (ZkInterruptedException e){//網路中斷 140 return checkMaster(); 141 }catch (Exception e){//其它 142 return false; 143 } 144 } 145 146 public void setZkClient(ZkClient zkClient) { 147 this.zkClient = zkClient; 148 } 149 150 public ZkClient getZkClient() { 151 return zkClient; 152 } 153 }
RunningData類:
1 package mastersalve; 2 3 import java.io.Serializable; 4 5 /** 6 * Created by nevermore on 16/6/22. 7 */ 8 public class RunningData implements Serializable { 9 10 private static final long serialVersionUID = 4260577459043203630L; 11 12 13 //伺服器id 14 private long cid; 15 //伺服器名稱 16 private String name; 17 18 19 public long getCid() { 20 return cid; 21 } 22 23 public void setCid(long cid) { 24 this.cid = cid; 25 } 26 27 public String getName() { 28 return name; 29 } 30 31 public void setName(String name) { 32 this.name = name; 33 } 34 }
說明:在實際生產環境中,可能會由於插拔網線等導致網路短時的不穩定,也就是網路抖動。由於正式生產環境中可能server在zk上註冊的資訊是比較多的,而且server的數量也是比較多的,那麼每一次切換主機,每臺server要同步的資料量(比如要獲取誰是master,當前有哪些salve等資訊,具體視業務不同而定)也是比較大的。那麼我們希望,這種短時間的網路抖動最好不要影響我們的系統穩定,也就是最好選出來的master還是原來的機器,那麼就可以避免發現master更換後,各個salve因為要同步資料等導致的zk資料網路風暴。所以在WorkServer中,54-63行,我們搶主的時候,如果之前主機是本機,則立即搶主,否則延遲5s搶主。這樣就給原來主機預留出一定時間讓其在新一輪選主中佔據優勢,從而利於環境穩定。
測試程式碼:
1 package mastersalve; 2 3 import org.I0Itec.zkclient.ZkClient; 4 import org.I0Itec.zkclient.serialize.SerializableSerializer; 5 6 import java.io.BufferedReader; 7 import java.io.InputStreamReader; 8 import java.util.ArrayList; 9 import java.util.List; 10 11 /** 12 * Created by nevermore on 16/6/23. 13 */ 14 public class LeaderSelectorZkClient { 15 16 //啟動的服務個數 17 private static final int CLIENT_QTY = 10; 18 //zookeeper伺服器的地址 19 private static final String ZOOKEEPER_SERVER = "localhost:2181"; 20 21 22 public static void main(String[] args) throws Exception{ 23 //儲存所有zkClient的列表 24 List<ZkClient> clients = new ArrayList<ZkClient>(); 25 //儲存所有服務的列表 26 List<WorkServer> workServers = new ArrayList<WorkServer>(); 27 28 try{ 29 for ( int i = 0; i < CLIENT_QTY; ++i ){ 30 //建立zkClient 31 ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer()); 32 clients.add(client); 33 //建立serverData 34 RunningData runningData = new RunningData(); 35 runningData.setCid(Long.valueOf(i)); 36 runningData.setName("Client #" + i); 37 //建立服務 38 WorkServer workServer = new WorkServer(runningData); 39 workServer.setZkClient(client); 40 41 workServers.add(workServer); 42 workServer.start(); 43 } 44 45 System.out.println("敲回車鍵退出!\n"); 46 new BufferedReader(new InputStreamReader(System.in)).readLine(); 47 }finally{ 48 System.out.println("Shutting down..."); 49 50 for ( WorkServer workServer : workServers ){ 51 try { 52 workServer.stop(); 53 } catch (Exception e) { 54 e.printStackTrace(); 55 } 56 } 57 for ( ZkClient client : clients ){ 58 try { 59 client.close(); 60 } catch (Exception e) { 61 e.printStackTrace(); 62 } 63 } 64 } 65 } 66 }
兩次測試,本地模擬10臺server,分別不啟用防止網路抖動跟啟動防抖動兩次測試結果如下:
未啟動防抖動:
啟用防抖動:
可以看到,未啟用的時候,斷線後重新選出的主機是隨機的,沒規律;啟用防抖動後,每次選出的master都是id為0的機器。
-----------------------------------------------------------------------------------------------------------------------------
至此,我們已經通過編碼實現了簡單的master選舉。但是,不知你有沒有發現,,,,這個選主過程的程式碼還真是麻煩啊!
我們只是做一個demo,其中並未考慮複雜的業務場景,但其中的
監聽,異常 等程式碼的處理還是讓我覺得有些頭大,怎麼辦?Curator應運而生!
為了熟悉Apache Curator,接下來,將用curator來實現master選舉的demo。