1. 程式人生 > >zookeeper二之Leader選舉

zookeeper二之Leader選舉

轉載:https://blog.csdn.net/u010670689/article/details/78054945

竊以為,對於zookeeper這種東西,僅僅知道怎麼安裝是遠遠不夠的(廢話麼這不是,,,),至少要對其幾個典型的應用場景進行了解,才能比較全面的知道zk究竟能幹啥,怎麼玩兒,以後的日子裡才能知道這貨如何能為我所用。於是,有了如下的學習:

我們知道zookeeper可以用於搭建高可用服務框架,主要先看以下幾個應用場景:
1、 master的選舉基本思路和編碼實現
2、 資料的釋出和訂閱
3、 軟負載均衡
4、 分散式佇列
5、 分散式鎖
6、 命名服務

目前zookeeper常用的開發包有zkclient跟curator,後者更為方便,日常開發使用較多。

----------------正文分割線-----------------------------------------------------------

master選舉

1、使用場景及結構

  現在很多時候我們的服務需要7*24小時工作,假如一臺機器掛了,我們希望能有其它機器頂替它繼續工作。此類問題現在多采用master-salve模式,也就是常說的主從模式,正常情況下主機提供服務,備機負責監聽主機狀態,當主機異常時,可以自動切換到備機繼續提供服務(這裡有點兒類似於資料庫主庫跟備庫,備機正常情況下只監聽,不工作),這個切換過程中選出下一個主機的過程就是master選舉。

  對於以上提到的場景,傳統的解決方式是採用一個備用節點,這個備用節點定期給當前主節點發送ping包,主節點收到ping包後會向備用節點發送應答ack,當備用節點收到應答,就認為主節點還活著,讓它繼續提供服務,否則就認為主節點掛掉了,自己將開始行使主節點職責。如圖1所示:

                                                            

                  圖1                                                                                                                                

   但這種方式會存在一個隱患,就是網路故障問題。看一下圖2:

    

          圖2

   也就是說,我們的主節點並沒有掛掉,只是在備用節點ping主節點,請求應答的時候發生網路故障,這樣我們的備用節點同樣收不到應答,就會認為主節點掛掉,然後備機會啟動自己的master例項。這樣就會導致系統中有兩個主節點,也就是雙master。出現雙master以後,我們的從節點會將它做的事情一部分彙報給主節點,一部分彙報給備用節點,這樣服務就亂套了。為了防止這種情況出現,我們可以考慮採用zookeeper,雖然它不能阻止網路故障的出現,但它能保證同一時刻系統中只存在一個主節點。我們來看zookeeper是怎麼實現的:

  在此處,搶主程式是包含在服務程式中,需要程式設計師來手動寫搶主邏輯的,比如噹噹開源框架elastic-job中,就有關於選主的部分,參見:elastic-job-core/main/java/com/dangdang/ddframe/job/internal/election資料夾下的選主程式碼。

  一點額外的話:zookeeper自己在叢集環境下的搶主演算法有三種,可以通過配置檔案來設定,預設採用FastLeaderElection,不作贅述;此處主要討論叢集環境中,應用程式利用master的特點,自己選主的過程。程式自己選主,每個人都有自己的一套演算法,有采用“最小編號”的,有采用類似“多數投票”的,各有優劣,本文的演算法僅作演示理解使用:

  結構圖:

       

  結構圖解釋:左側樹狀結構為zookeeper叢集,右側為程式伺服器。所有的伺服器在啟動的時候,都會訂閱zookeeper中master節點的刪除事件,以便在主伺服器掛掉的時候進行搶主操作;所有伺服器同時會在servers節點下注冊一個臨時節點(儲存自己的基本資訊),以便於應用程式讀取當前可用的伺服器列表。

  選主原理介紹:zookeeper的節點有兩種型別,持久節點跟臨時節點。臨時節點有個特性,就是如果註冊這個節點的機器失去連線(通常是宕機),那麼這個節點會被zookeeper刪除。選主過程就是利用這個特性,在伺服器啟動的時候,去zookeeper特定的一個目錄下注冊一個臨時節點(這個節點作為master,誰註冊了這個節點誰就是master),註冊的時候,如果發現該節點已經存在,則說明已經有別的伺服器註冊了(也就是有別的伺服器已經搶主成功),那麼當前伺服器只能放棄搶主,作為從機存在。同時,搶主失敗的當前伺服器需要訂閱該臨時節點的刪除事件,以便該節點刪除時(也就是註冊該節點的伺服器宕機了或者網路斷了之類的)進行再次搶主操作。從機具體需要去哪裡註冊伺服器列表的臨時節點,節點儲存什麼資訊,根據具體的業務不同自行約定。選主的過程,其實就是簡單的爭搶在zookeeper註冊臨時節點的操作,誰註冊了約定的臨時節點,誰就是master。

  ps:本文的例子中,並未用到結構圖server節點下的資料。但換一種演算法或者業務場景就會用到,演算法比如提到的最小編號,主要邏輯是主節點掛掉後,從節點裡邊編號最小的成為主節點,此時會用到該節點內容。換一種業務場景:叢集環境中,有很多工要處理, 主節點負責接收任務,並根據一定演算法將任務分配到不同的機器上執行;這種情況下,主節點跟從節點的職責也是不同的,主節點掛掉也會涉及到從節點進行master選舉的問題。這種情況下,很顯然,作為主節點需要知道當前有多少個從節點還活著,那麼此時也會需要用到servers節點下的資料了。

2、編碼實現

   主要有兩個類,WorkServer為主服務類,RunningData用於記錄執行資料。因為是簡單的demo,我們只做搶master節點的編碼,對於從節點應該去哪裡註冊服務列表資訊,不作編碼。

  採用zkClient實現,程式碼如下:

  WorkServer類:

package mastersalve;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.zookeeper.CreateMode;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * Created by nevermore on 16/6/22.
 */
public class WorkServer {

	// 客戶端狀態
	private volatile boolean running = false;

	private ZkClient zkClient;

	// zk主節點路徑
	public static final String MASTER_PATH = "/master";

	// 監聽(用於監聽主節點刪除事件)
	private IZkDataListener dataListener;

	// 伺服器基本資訊
	private RunningData serverData;
	// 主節點基本資訊
	private RunningData masterData;

	// 排程器
	private ScheduledExecutorService delayExector = Executors
			.newScheduledThreadPool(1);
	// 延遲時間5s
	private int delayTime = 5;

	public WorkServer(RunningData runningData) {
		this.serverData = runningData;
		this.dataListener = new IZkDataListener() {
			@Override
			public void handleDataChange(String s, Object o) throws Exception {

			}

			@Override
			public void handleDataDeleted(String s) throws Exception {
				// takeMaster();

				if (masterData != null
						&& masterData.getName().equals(serverData.getName())) {// 若之前master為本機,則立即搶主,否則延遲5秒搶主(防止小故障引起的搶主可能導致的網路資料風暴)
					takeMaster();
				} else {
					delayExector.schedule(new Runnable() {
						@Override
						public void run() {
							takeMaster();
						}
					}, delayTime, TimeUnit.SECONDS);
				}

			}
		};
	}

	// 啟動
	public void start() throws Exception {
		if (running) {
			throw new Exception("server has startup....");
		}
		running = true;
		zkClient.subscribeDataChanges(MASTER_PATH, dataListener);
		takeMaster();
	}

	// 停止
	public void stop() throws Exception {
		if (!running) {
			throw new Exception("server has stopped.....");
		}
		running = false;
		delayExector.shutdown();
		zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener);
		releaseMaster();
	}

	// 搶注主節點
	private void takeMaster() {
		if (!running)
			return;

		try {
			zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL);
			masterData = serverData;
			System.out.println(serverData.getName() + " is master");

			delayExector.schedule(new Runnable() {// 測試搶主用,每5s釋放一次主節點
						@Override
						public void run() {
							if (checkMaster()) {
								releaseMaster();
							}
						}
					}, 5, TimeUnit.SECONDS);

		} catch (ZkNodeExistsException e) {// 節點已存在
			RunningData runningData = zkClient.readData(MASTER_PATH, true);
			if (runningData == null) {// 讀取主節點時,主節點被釋放
				takeMaster();
			} else {
				masterData = runningData;
			}
		} catch (Exception e) {
			// ignore;
		}

	}

	// 釋放主節點
	private void releaseMaster() {
		if (checkMaster()) {
			zkClient.delete(MASTER_PATH);
		}
	}

	// 檢驗自己是否是主節點
	private boolean checkMaster() {
		try {
			RunningData runningData = zkClient.readData(MASTER_PATH);
			masterData = runningData;
			if (masterData.getName().equals(serverData.getName())) {
				return true;
			}
			return false;

		} catch (ZkNoNodeException e) {// 節點不存在
			return false;
		} catch (ZkInterruptedException e) {// 網路中斷
			return checkMaster();
		} catch (Exception e) {// 其它
			return false;
		}
	}

	public void setZkClient(ZkClient zkClient) {
		this.zkClient = zkClient;
	}

	public ZkClient getZkClient() {
		return zkClient;
	}
}

 

 

  RunningData類:

package mastersalve;

import java.io.Serializable;

/**
 * Created by nevermore on 16/6/22.
 */
public class RunningData implements Serializable {

	private static final long serialVersionUID = 4260577459043203630L;

	// 伺服器id
	private long cid;
	// 伺服器名稱
	private String name;

	public long getCid() {
		return cid;
	}

	public void setCid(long cid) {
		this.cid = cid;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}
}

 

   說明:在實際生產環境中,可能會由於插拔網線等導致網路短時的不穩定,也就是網路抖動。由於正式生產環境中可能server在zk上註冊的資訊是比較多的,而且server的數量也是比較多的,那麼每一次切換主機,每臺server要同步的資料量(比如要獲取誰是master,當前有哪些salve等資訊,具體視業務不同而定)也是比較大的。那麼我們希望,這種短時間的網路抖動最好不要影響我們的系統穩定,也就是最好選出來的master還是原來的機器,那麼就可以避免發現master更換後,各個salve因為要同步資料等導致的zk資料網路風暴。所以在WorkServer中,54-63行,我們搶主的時候,如果之前主機是本機,則立即搶主,否則延遲5s搶主。這樣就給原來主機預留出一定時間讓其在新一輪選主中佔據優勢,從而利於環境穩定。

  測試程式碼:

package mastersalve;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

/**
 * Created by nevermore on 16/6/23.
 */
public class LeaderSelectorZkClient {

	// 啟動的服務個數
	private static final int CLIENT_QTY = 10;
	// zookeeper伺服器的地址
	private static final String ZOOKEEPER_SERVER = "localhost:2181";

	public static void main(String[] args) throws Exception {
		// 儲存所有zkClient的列表
		List<ZkClient> clients = new ArrayList<ZkClient>();
		// 儲存所有服務的列表
		List<WorkServer> workServers = new ArrayList<WorkServer>();

		try {
			for (int i = 0; i < CLIENT_QTY; ++i) {
				// 建立zkClient
				ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000,
						new SerializableSerializer());
				clients.add(client);
				// 建立serverData
				RunningData runningData = new RunningData();
				runningData.setCid(Long.valueOf(i));
				runningData.setName("Client #" + i);
				// 建立服務
				WorkServer workServer = new WorkServer(runningData);
				workServer.setZkClient(client);

				workServers.add(workServer);
				workServer.start();
			}

			System.out.println("敲回車鍵退出!\n");
			new BufferedReader(new InputStreamReader(System.in)).readLine();
		} finally {
			System.out.println("Shutting down...");

			for (WorkServer workServer : workServers) {
				try {
					workServer.stop();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			for (ZkClient client : clients) {
				try {
					client.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
	}
}

 

  兩次測試,本地模擬10臺server,分別不啟用防止網路抖動跟啟動防抖動兩次測試結果如下:

  未啟動防抖動:

  

  啟用防抖動:

  

  可以看到,未啟用的時候,斷線後重新選出的主機是隨機的,沒規律;啟用防抖動後,每次選出的master都是id為0的機器。

-----------------------------------------------------------------------------------------------------------------------------

  至此,我們已經通過編碼實現了簡單的master選舉。但是,不知你有沒有發現,,,,這個選主過程的程式碼還真是麻煩啊!

  我們只是做一個demo,其中並未考慮複雜的業務場景,但其中的  監聽,異常  等程式碼的處理還是讓我覺得有些頭大,怎麼辦?Curator應運而生!

  為了熟悉Apache Curator,接下來,將用curator來實現master選舉的demo。