1. 程式人生 > >zookeeper實現主從選舉

zookeeper實現主從選舉

竊以為,對於zookeeper這種東西,僅僅知道怎麼安裝是遠遠不夠的(廢話麼這不是,,,),至少要對其幾個典型的應用場景進行了解,才能比較全面的知道zk究竟能幹啥,怎麼玩兒,以後的日子裡才能知道這貨如何能為我所用。於是,有了如下的學習:

我們知道zookeeper可以用於搭建高可用服務框架,主要先看以下幾個應用場景:
1、 master的選舉基本思路和編碼實現
2、 資料的釋出和訂閱
3、 軟負載均衡
4、 分散式佇列
5、 分散式鎖
6、 命名服務

目前zookeeper常用的開發包有zkclient跟curator,後者更為方便,日常開發使用較多。

----------------正文分割線-----------------------------------------------------------

master選舉

1、使用場景及結構

  現在很多時候我們的服務需要7*24小時工作,假如一臺機器掛了,我們希望能有其它機器頂替它繼續工作。此類問題現在多采用master-salve模式,也就是常說的主從模式,正常情況下主機提供服務,備機負責監聽主機狀態,當主機異常時,可以自動切換到備機繼續提供服務(這裡有點兒類似於資料庫主庫跟備庫,備機正常情況下只監聽,不工作),這個切換過程中選出下一個主機的過程就是master選舉。

  對於以上提到的場景,傳統的解決方式是採用一個備用節點,這個備用節點定期給當前主節點發送ping包,主節點收到ping包後會向備用節點發送應答ack,當備用節點收到應答,就認為主節點還活著,讓它繼續提供服務,否則就認為主節點掛掉了,自己將開始行使主節點職責。如圖1所示:

                                                            

                  圖1                                                                                                                                

   但這種方式會存在一個隱患,就是網路故障問題。看一下圖2:

    

          圖2

   也就是說,我們的主節點並沒有掛掉,只是在備用節點ping主節點,請求應答的時候發生網路故障,這樣我們的備用節點同樣收不到應答,就會認為主節點掛掉,然後備機會啟動自己的master例項。這樣就會導致系統中有兩個主節點,也就是雙master。出現雙master以後,我們的從節點會將它做的事情一部分彙報給主節點,一部分彙報給備用節點,這樣服務就亂套了。為了防止這種情況出現,我們可以考慮採用zookeeper,雖然它不能阻止網路故障的出現,但它能保證同一時刻系統中只存在一個主節點。我們來看zookeeper是怎麼實現的:

  在此處,搶主程式是包含在服務程式中,需要程式設計師來手動寫搶主邏輯的,比如噹噹開源框架elastic-job中,就有關於選主的部分,參見:elastic-job-core/main/java/com/dangdang/ddframe/job/internal/election資料夾下的選主程式碼。

  一點額外的話:zookeeper自己在叢集環境下的搶主演算法有三種,可以通過配置檔案來設定,預設採用FastLeaderElection,不作贅述;此處主要討論叢集環境中,應用程式利用master的特點,自己選主的過程。程式自己選主,每個人都有自己的一套演算法,有采用“最小編號”的,有采用類似“多數投票”的,各有優劣,本文的演算法僅作演示理解使用:

  結構圖:

       

  結構圖解釋:左側樹狀結構為zookeeper叢集,右側為程式伺服器。所有的伺服器在啟動的時候,都會訂閱zookeeper中master節點的刪除事件,以便在主伺服器掛掉的時候進行搶主操作;所有伺服器同時會在servers節點下注冊一個臨時節點(儲存自己的基本資訊),以便於應用程式讀取當前可用的伺服器列表。

  選主原理介紹:zookeeper的節點有兩種型別,持久節點跟臨時節點。臨時節點有個特性,就是如果註冊這個節點的機器失去連線(通常是宕機),那麼這個節點會被zookeeper刪除。選主過程就是利用這個特性,在伺服器啟動的時候,去zookeeper特定的一個目錄下注冊一個臨時節點(這個節點作為master,誰註冊了這個節點誰就是master),註冊的時候,如果發現該節點已經存在,則說明已經有別的伺服器註冊了(也就是有別的伺服器已經搶主成功),那麼當前伺服器只能放棄搶主,作為從機存在。同時,搶主失敗的當前伺服器需要訂閱該臨時節點的刪除事件,以便該節點刪除時(也就是註冊該節點的伺服器宕機了或者網路斷了之類的)進行再次搶主操作。從機具體需要去哪裡註冊伺服器列表的臨時節點,節點儲存什麼資訊,根據具體的業務不同自行約定。選主的過程,其實就是簡單的爭搶在zookeeper註冊臨時節點的操作,誰註冊了約定的臨時節點,誰就是master。

  ps:本文的例子中,並未用到結構圖server節點下的資料。但換一種演算法或者業務場景就會用到,演算法比如提到的最小編號,主要邏輯是主節點掛掉後,從節點裡邊編號最小的成為主節點,此時會用到該節點內容。換一種業務場景:叢集環境中,有很多工要處理, 主節點負責接收任務,並根據一定演算法將任務分配到不同的機器上執行;這種情況下,主節點跟從節點的職責也是不同的,主節點掛掉也會涉及到從節點進行master選舉的問題。這種情況下,很顯然,作為主節點需要知道當前有多少個從節點還活著,那麼此時也會需要用到servers節點下的資料了。

2、編碼實現

   主要有兩個類,WorkServer為主服務類,RunningData用於記錄執行資料。因為是簡單的demo,我們只做搶master節點的編碼,對於從節點應該去哪裡註冊服務列表資訊,不作編碼。

  採用zkClient實現,程式碼如下:

  WorkServer類:

   複製程式碼
  1 package mastersalve;
  2 
  3 import org.I0Itec.zkclient.IZkDataListener;
  4 import org.I0Itec.zkclient.ZkClient;
  5 import org.I0Itec.zkclient.exception.ZkInterruptedException;
  6 import org.I0Itec.zkclient.exception.ZkNoNodeException;
  7 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
  8 import org.apache.zookeeper.CreateMode;
  9 
 10 import java.util.concurrent.Executors;
 11 import java.util.concurrent.ScheduledExecutorService;
 12 import java.util.concurrent.TimeUnit;
 13 
 14 /**
 15  * Created by nevermore on 16/6/22.
 16  */
 17 public class WorkServer {
 18 
 19     //客戶端狀態
 20     private volatile boolean running = false;
 21 
 22     private ZkClient zkClient;
 23 
 24     //zk主節點路徑
 25     public static final String MASTER_PATH = "/master";
 26 
 27     //監聽(用於監聽主節點刪除事件)
 28     private IZkDataListener dataListener;
 29 
 30     //伺服器基本資訊
 31     private RunningData serverData;
 32     //主節點基本資訊
 33     private RunningData masterData;
 34 
 35     //排程器
 36     private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
 37     //延遲時間5s
 38     private int delayTime = 5;
 39 
 40 
 41 
 42     public WorkServer(RunningData runningData){
 43         this.serverData = runningData;
 44         this.dataListener = new IZkDataListener() {
 45             @Override
 46             public void handleDataChange(String s, Object o) throws Exception {
 47 
 48             }
 49 
 50             @Override
 51             public void handleDataDeleted(String s) throws Exception {
 52                 //takeMaster();
 53 
 54                 if(masterData != null && masterData.getName().equals(serverData.getName())){//若之前master為本機,則立即搶主,否則延遲5秒搶主(防止小故障引起的搶主可能導致的網路資料風暴)
 55                     takeMaster();
 56                 }else{
 57                     delayExector.schedule(new Runnable() {
 58                         @Override
 59                         public void run() {
 60                             takeMaster();
 61                         }
 62                     },delayTime, TimeUnit.SECONDS);
 63                 }
 64 
 65             }
 66         };
 67     }
 68 
 69     //啟動
 70     public void start() throws Exception{
 71         if(running){
 72             throw new Exception("server has startup....");
 73         }
 74         running = true;
 75         zkClient.subscribeDataChanges(MASTER_PATH,dataListener);
 76         takeMaster();
 77     }
 78 
 79     //停止
 80     public void stop() throws Exception{
 81         if(!running){
 82             throw new Exception("server has stopped.....");
 83         }
 84         running = false;
 85         delayExector.shutdown();
 86         zkClient.unsubscribeDataChanges(MASTER_PATH,dataListener);
 87         releaseMaster();
 88     }
 89 
 90     //搶注主節點
 91     private void takeMaster(){
 92         if(!running) return ;
 93 
 94         try {
 95             zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL);
 96             masterData = serverData;
 97             System.out.println(serverData.getName()+" is master");
 98 
 99             delayExector.schedule(new Runnable() {//測試搶主用,每5s釋放一次主節點
100                 @Override
101                 public void run() {
102                     if(checkMaster()){
103                         releaseMaster();
104                     }
105                 }
106             },5,TimeUnit.SECONDS);
107 
108 
109         }catch (ZkNodeExistsException e){//節點已存在
110             RunningData runningData = zkClient.readData(MASTER_PATH,true);
111             if(runningData == null){//讀取主節點時,主節點被釋放
112                 takeMaster();
113             }else{
114                 masterData = runningData;
115             }
116         } catch (Exception e) {
117             // ignore;
118         }
119 
120     }
121     //釋放主節點
122     private void releaseMaster(){
123         if(checkMaster()){
124             zkClient.delete(MASTER_PATH);
125         }
126     }
127     //檢驗自己是否是主節點
128     private boolean checkMaster(){
129         try {
130             RunningData runningData = zkClient.readData(MASTER_PATH);
131             masterData = runningData;
132             if (masterData.getName().equals(serverData.getName())) {
133                 return true;
134             }
135             return false;
136 
137         }catch (ZkNoNodeException e){//節點不存在
138             return  false;
139         }catch (ZkInterruptedException e){//網路中斷
140             return checkMaster();
141         }catch (Exception e){//其它
142             return false;
143         }
144     }
145 
146     public void setZkClient(ZkClient zkClient) {
147         this.zkClient = zkClient;
148     }
149 
150     public ZkClient getZkClient() {
151         return zkClient;
152     }
153 }
複製程式碼

  RunningData類:

    複製程式碼
 1 package mastersalve;
 2 
 3 import java.io.Serializable;
 4 
 5 /**
 6  * Created by nevermore on 16/6/22.
 7  */
 8 public class RunningData implements Serializable {
 9 
10     private static final long serialVersionUID = 4260577459043203630L;
11 
12 
13     //伺服器id
14     private long cid;
15     //伺服器名稱
16     private String name;
17 
18 
19     public long getCid() {
20         return cid;
21     }
22 
23     public void setCid(long cid) {
24         this.cid = cid;
25     }
26 
27     public String getName() {
28         return name;
29     }
30 
31     public void setName(String name) {
32         this.name = name;
33     }
34 }
複製程式碼

   說明:在實際生產環境中,可能會由於插拔網線等導致網路短時的不穩定,也就是網路抖動。由於正式生產環境中可能server在zk上註冊的資訊是比較多的,而且server的數量也是比較多的,那麼每一次切換主機,每臺server要同步的資料量(比如要獲取誰是master,當前有哪些salve等資訊,具體視業務不同而定)也是比較大的。那麼我們希望,這種短時間的網路抖動最好不要影響我們的系統穩定,也就是最好選出來的master還是原來的機器,那麼就可以避免發現master更換後,各個salve因為要同步資料等導致的zk資料網路風暴。所以在WorkServer中,54-63行,我們搶主的時候,如果之前主機是本機,則立即搶主,否則延遲5s搶主。這樣就給原來主機預留出一定時間讓其在新一輪選主中佔據優勢,從而利於環境穩定。

  測試程式碼:

    複製程式碼
 1 package mastersalve;
 2 
 3 import org.I0Itec.zkclient.ZkClient;
 4 import org.I0Itec.zkclient.serialize.SerializableSerializer;
 5 
 6 import java.io.BufferedReader;
 7 import java.io.InputStreamReader;
 8 import java.util.ArrayList;
 9 import java.util.List;
10 
11 /**
12  * Created by nevermore on 16/6/23.
13  */
14 public class LeaderSelectorZkClient {
15 
16     //啟動的服務個數
17     private static final int        CLIENT_QTY = 10;
18     //zookeeper伺服器的地址
19     private static final String     ZOOKEEPER_SERVER = "localhost:2181";
20 
21 
22     public static void main(String[] args) throws Exception{
23         //儲存所有zkClient的列表
24         List<ZkClient> clients = new ArrayList<ZkClient>();
25         //儲存所有服務的列表
26         List<WorkServer>  workServers = new ArrayList<WorkServer>();
27 
28         try{
29             for ( int i = 0; i < CLIENT_QTY; ++i ){
30                 //建立zkClient
31                 ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer());
32                 clients.add(client);
33                 //建立serverData
34                 RunningData runningData = new RunningData();
35                 runningData.setCid(Long.valueOf(i));
36                 runningData.setName("Client #" + i);
37                 //建立服務
38                 WorkServer  workServer = new WorkServer(runningData);
39                 workServer.setZkClient(client);
40 
41                 workServers.add(workServer);
42                 workServer.start();
43             }
44 
45             System.out.println("敲回車鍵退出!\n");
46             new BufferedReader(new InputStreamReader(System.in)).readLine();
47         }finally{
48             System.out.println("Shutting down...");
49 
50             for ( WorkServer workServer : workServers ){
51                 try {
52                     workServer.stop();
53                 } catch (Exception e) {
54                     e.printStackTrace();
55                 }
56             }
57             for ( ZkClient client : clients ){
58                 try {
59                     client.close();
60                 } catch (Exception e) {
61                     e.printStackTrace();
62                 }
63             }
64         }
65     }
66 }
複製程式碼

  兩次測試,本地模擬10臺server,分別不啟用防止網路抖動跟啟動防抖動兩次測試結果如下:

  未啟動防抖動:

  

  啟用防抖動:

  

  可以看到,未啟用的時候,斷線後重新選出的主機是隨機的,沒規律;啟用防抖動後,每次選出的master都是id為0的機器。

-----------------------------------------------------------------------------------------------------------------------------

  至此,我們已經通過編碼實現了簡單的master選舉。但是不知你有沒有發現,,,,這個選主過程的程式碼還真是麻煩啊!

  我們只是做一個demo,其中並未考慮複雜的業務場景,但其中的  監聽,異常  等程式碼的處理還是讓我覺得有些頭大,怎麼辦?Curator應運而生!

  為了熟悉Apache Curator,接下來,將用curator來實現master選舉的demo。