zookeeper開源客戶端Curator典型應用場景之-Master選舉(十)
在生產環境中,一般要保證服務的高可用,有時候只需要選出一臺機器來執行,其餘機器處於備用狀態,比如,在分散式系統中很常見的一個問題就是定時任務的執行。如果多臺機器同時執行相同的定時任務,業務複雜則可能出現災難性的後果。我使用的是噹噹網的elastic-job分散式定時任務框架,分一片,部署兩臺機器,其中一臺處於備用狀態,只有一臺機器是工作的,當這臺機器宕機了,備用機器才開始工作。
分散式鎖和Master選舉相似點
上一篇部落格講了curator的分散式鎖應用,分散式鎖和 Master選舉有幾種相似點,實際上其實現機制也相近:
同一時刻只有一個獲取鎖 / 只能有一個leader
對於分散式排他鎖來說,任意時刻,只能有一個程序(對於單程序內的鎖是單執行緒)可以獲得鎖。
對於領導選舉來說,任意時刻,只能有一個成功當選為leader。否則就會出現腦裂。
鎖重入 / 確認自己是leader
對於分散式鎖,需要保證獲得鎖的程序在釋放鎖之前可再次獲得鎖,即鎖的可重入性。
對於領導選舉,Leader需要能夠確認自己已經獲得領導權,即確認自己是Leader。
釋放鎖 / 放棄領導權
鎖的獲得者應該能夠正確釋放已經獲得的鎖,並且當獲得鎖的程序宕機時,鎖應該自動釋放,從而使得其它競爭方可以獲得該鎖,從而避免出現死鎖的狀態。
領導應該可以主動放棄領導權,並且當領導所在程序宕機時,領導權應該自動釋放,從而使得其它參與者可重新競爭領導而避免進入無主狀態。
感知鎖釋放 / 感知領導權釋放
當獲得鎖的一方釋放鎖時,其它對於鎖的競爭方需要能夠感知到鎖的釋放,並再次嘗試獲取鎖。
原來的Leader放棄領導權時,其它參與方應該能夠感知該事件,並重新發起選舉流程。
Curator中選舉分為兩種:
Leader Latch和Leader Election
Leader Latch
LeaderLatch方式就是以一種搶佔方式來決定選主,是一種非公平的領導選舉,誰搶到就是誰,會隨機從候選者中選擇一臺作為leader, 選中後除非leader自己 呼叫close()釋放leadership,否則其他的候選者不能成為leader。
選主過程
假設現在又三個zookeeper的客戶端,如下圖所示,同時競爭leader。這三個客戶端同時向zookeeper叢集註冊Ephemeral且Non-sequence型別的節點,路徑都為/zkroot/leader。
如上圖所示,由於是Non-sequence節點,這三個客戶端只會有一個建立成功,其它節點均建立失敗。此時,建立成功的客戶端(即上圖中的Client 1)即成功競選為 Leader 。其它客戶端(即上圖中的Client 2和Client 3)此時勻為 Follower。
放棄領導權
如果Leader打算主動放棄領導權,直接刪除/zkroot/leader節點即可。
如果Leader程序意外宕機,其與Zookeeper間的Session也結束,該節點由於是Ephemeral型別的節點,因此也會自動被刪除。
此時/zkroot/leader節點不復存在,對於其它參與競選的客戶端而言,之前的Leader已經放棄了領導權。
感知領導權的放棄
由上圖可見,建立節點失敗的節點,除了成為 Follower 以外,還會向/zkroot/leader註冊一個 Watch ,一旦 Leader 放棄領導權,也即該節點被刪除,所有的 Follower 會收到通知。
重新選舉
感知到舊 Leader 放棄領導權後,所有的 Follower 可以再次發起新一輪的領導選舉,如下圖所示。
從上圖中可見
新一輪的領導選舉方法與最初的領導選舉方法完全一樣,都是發起節點建立請求,建立成功即為Leader,否則為Follower,且Follower會Watch該節點。
新一輪的選舉結果,無法預測,與它們在第一輪選舉中的順序無關。這也是該方案被稱為非公平模式的原因。
Leader Latch模式總結
- Leader Latch實現很簡單,每一輪的選舉演算法都一樣。
- 非公平模式,每一次選舉都是隨機,誰搶到就是誰的,假如是第二次選舉,每個 Follower 通過 Watch 感知到節點被刪除的時間不完全一樣,只要有一個 Follower 得到通知即發起競選。
- 給zookeeper造成的負載大,假如有上萬個客戶端都參與競選,意味著同時會有上萬個寫請求傳送給 Zookeper。同時一旦 Leader 放棄領導權,Zookeeper 需要同時通知上萬個 Follower,負載較大。
使用過程
相關的類
LeaderLatch
構造LeaderLatch ,構造方法如下:
public LeaderLatch(CuratorFramework client, String latchPath);
public LeaderLatch(CuratorFramework client, String latchPath, String id);
啟動
通過start()方法啟動之後,再等待幾秒鐘後,Curator會自動從中選舉出Leader。
public void start() throws Exception;
可以呼叫例項的hasLeadership()判斷該例項是否為leader。
public boolean hasLeadership();
嘗試獲取leadership
呼叫await()方法會使執行緒一直阻塞到獲得leadership為止。
public void await() throws InterruptedException, EOFException;
public boolean await(long timeout, TimeUnit unit) throws InterruptedException;
釋放leadership
只能通過close()釋放leadership, 只有leader將leadership釋放時,其他的候選者才有機會被選為leader
public void close() throws IOException;
public synchronized void close(CloseMode closeMode) throws IOException;
示例程式碼
public class TestLeaderLatch {
private static final String PATH = "/demo/leader";
/** 5個客戶端 */
private static final Integer CLIENT_COUNT = 5;
public static void main(String[] args) throws Exception {
//5個執行緒,5個客戶端
ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT);
for (int i = 0; i < CLIENT_COUNT ; i++) {
final int index = i;
service.submit(new Runnable() {
@Override
public void run() {
try {
new TestLeaderLatch().schedule(index);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
//休眠50秒之後結束main方法
Thread.sleep(30 * 1000);
service.shutdownNow();
}
private void schedule(int thread) throws Exception {
//獲取一個client
CuratorFramework client = this.getClient(thread);
//獲取一個latch
LeaderLatch latch = new LeaderLatch(client, PATH,String.valueOf(thread));
//給latch新增監聽,在
latch.addListener(new LeaderLatchListener() {
@Override
public void notLeader() {
//如果不是leader
System.out.println("Client [" + thread + "] I am the follower !");
}
@Override
public void isLeader() {
//如果是leader
System.out.println("Client [" + thread + "] I am the leader !");
}
});
//開始選取 leader
latch.start();
//每個執行緒 休眠時間不一樣,但是最大不能超過 main方法中的那個休眠時間,那個是50秒 到時候main方法結束 會中斷休眠時間
Thread.sleep(2 * (thread + 5) * 1000);
if (latch != null) {
//釋放leadership
//CloseMode.NOTIFY_LEADER 節點狀態改變時,通知LeaderLatchListener
latch.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
}
if (client != null) {
client.close();
}
System.out.println("Client [" + latch.getId() + "] Server closed...");
}
private CuratorFramework getClient(final int thread) {
RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);
// Fluent風格建立
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.58.42:2181")
.sessionTimeoutMs(1000000)
.connectionTimeoutMs(3000)
.retryPolicy(rp)
.build();
client.start();
System.out.println("Client [" + thread + "] Server connected...");
return client;
}
}
程式執行,輸出以下結果:
Client [3] Server connected…
Client [2] Server connected…
Client [4] Server connected…
Client [0] Server connected…
Client [1] Server connected…
Client [1] I am the leader !
Client [0] Server closed…
Client [1] I am the follower !
Client [1] Server closed…
Client [2] I am the leader !
Client [2] I am the follower !
Client [2] Server closed…
Client [4] I am the leader !
Client [3] Server closed…
Client [4] I am the follower !
Client [4] Server closed…
在上面的程式中,啟動了5個zookeeper客戶端,程式會隨機選中其中一個作為leader。通過註冊監聽的方式來判斷自己是否成為leader。呼叫close()方法釋放當前領導權。有可能優先close的並不是leader節點,但是當leader節點close的時候,可以繼續在已有的節點中重新選舉leader節點。
LeaderElection
上面講了怎麼使用LeaderLatch方式進行master選舉,Curator提供了兩種選舉,一種是LeaderLatch,提供的另一種Leader選舉策略是Leader Election。
跟LeaderLatch選舉策略相比,LeaderElection選舉策略不同之處在於每個例項都能公平獲取領導權,而且當獲取領導權的例項在釋放領導權之後,該例項還有機會再次獲取領導權。
另外,選舉出來的leader不會一直佔有領導權,當 takeLeadership(CuratorFramework client) 方法執行結束之後會自動釋放領導權。LeaderElection屬於公平的選舉方式,通過LeaderSelectorListener可以對領導權進行控制, 在適當的時候釋放領導權,這樣每個節點都有可能獲得領導權。 而LeaderLatch則一直持有leadership, 除非呼叫close方法,否則它不會釋放領導權。
選主過程
如下圖所示,LeaderElection選舉中,各客戶端均建立/zkroot/leader節點,且其型別為Ephemeral與Sequence。
由於是Sequence型別節點,故上圖中三個客戶端均建立成功,只是序號不一樣。此時,每個客戶端都會判斷自己建立成功的節點的序號是不是當前最小的。如果是,則該客戶端為 Leader,否則即為 Follower。
在上圖中,Client1 建立的節點序號為1 ,Client2建立的節點序號為2,Client3建立的節點序號為3。由於最小序號為 1 ,且該節點由Client1建立,故Client 1為 Leader 。
放棄領導權
Leader 如果主動放棄領導權,直接刪除其建立的節點即可。
如果 Leader 所在程序意外宕機,其與 Zookeeper 間的 Session 結束,由於其建立的節點為Ephemeral型別,故該節點自動被刪除。
感知領導權的放棄
與LeaderLatch方式不同,每個 Follower 並非都 Watch 由 Leader 創建出來的節點,而是 Watch 序號剛好比自己序號小的節點。
在上圖中,總共有 1、2、3 共三個節點,因此Client 2 Watch /zkroot/leader1,Client 3 Watch /zkroot/leader2。(注:序號應該是10位數字,而非一位數字,序號最大為int最大值)。
一旦Leader棄權或者宕機,/zkroot/leader1被刪除,Client2可得到通知。此時Client3由於 Watch 的是/zkroot/leader2,故不會得到通知。
重新選舉
Client2得到/zkroot/leader1被刪除的通知後,不會立即成為新的 Leader 。而是先判斷自己的序號2是不是當前最小的序號。在該場景下,其序號確為最小。因此Client 2成為新的 Leader 。
這裡要注意,如果在Client1放棄領導權之前,Client2就宕機了,Client3會收到通知。此時Client3不會立即成為Leader,而是要先判斷自己的序號3是否為當前最小序號。很顯然,由於Client1建立的/zkroot/leader1還在,因此Client 3不會成為新的 Leader ,並向Client2序號2 前面的序號,也即 1 建立 Watch。該過程如下圖所示。
LeaderElection模式總結
- 擴充套件性好,每個客戶端都只Watch 一個節點且每次節點被刪除只須通知一個客戶端
- 舊 Leader 放棄領導權時,其它客戶端根據競選的先後順序(也即節點序號)成為新 Leader,這也是公平模式的由來。
- 延遲相對非公平模式要高,因為它必須等待特定節點得到通知才能選出新的 Leader。
使用過程
相關的類
LeaderSelector
LeaderSelectorListener
LeaderSelectorListenerAdapter
CancelLeadershipException
使用方法 建立 LeaderSelector
public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener);
public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener);
public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener);
啟動
leaderSelector.start();
一旦啟動,如果獲取了leadership的話,takeLeadership()會被呼叫,只有當leader釋放了leadership的時候,takeLeadership()才會返回。
釋放
呼叫close()釋放 leadership
leaderSelector.close();
示例程式碼
LeaderSelectorListener的實現類
實現LeaderSelectorListener 或者 繼承LeaderSelectorListenerAdapter,用於定義獲取領導權後的業務邏輯:
public class CustomLeaderSelectorListenerAdapter extends LeaderSelectorListenerAdapter implements Closeable {
/** 客戶端名稱 */
private String name;
/** leaderSelector */
private LeaderSelector leaderSelector;
/** 原子性的 用來記錄獲取 leader的次數 */
public AtomicInteger leaderCount = new AtomicInteger(1);
public CustomLeaderSelectorListenerAdapter(CuratorFramework client,String path,String name ){
this.name = name;
this.leaderSelector = new LeaderSelector(client, path, this);
/**
* 自動重新排隊
* 該方法的呼叫可以確保此例項在釋放領導權後還可能獲得領導權
*/
leaderSelector.autoRequeue();
}
/**
* 啟動 呼叫leaderSelector.start()
* @throws IOException
*/
public void start() throws IOException {
leaderSelector.start();
}
/**
* 獲取領導權之後執行的業務邏輯,執行完自動放棄領導權
* @param client
* @throws Exception
*/
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
final int waitSeconds = 2;
System.out.println(name + "成為當前leader" + " 共成為leader的次數:" + leaderCount.getAndIncrement() + "次");
try{
//模擬業務邏輯執行2秒
Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
}catch ( InterruptedException e ){
System.err.println(name + "已被中斷");
Thread.currentThread().interrupt();
}finally{
System.out.println(name + "放棄領導權");
}
}
@Override
public void close() throws IOException {
leaderSelector.close();
}
}
多個客戶端測試
public class TestLeaderElection {
private static final String PATH = "/demo/leader";
/** 3個客戶端 */
private static final Integer CLIENT_COUNT = 3;
public static void main(String[] args) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT);
for (int i = 0; i < CLIENT_COUNT; i++) {
final int index = i;
service.submit(new Runnable() {
@Override
public void run() {
try {
new TestLeaderElection().schedule(index);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
Thread.sleep(30 * 1000);
service.shutdownNow();
}
private void schedule(final int thread) throws Exception {
CuratorFramework client = this.getClient(thread);
CustomLeaderSelectorListenerAdapter leaderSelectorListener =
new CustomLeaderSelectorListenerAdapter(client, PATH, "Client #" + thread);
leaderSelectorListener.start();
}
private CuratorFramework getClient(final int thread) {
RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);
// Fluent風格建立
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.58.42:2181")
.sessionTimeoutMs(1000000)
.connectionTimeoutMs(3000)
.retryPolicy(rp)
.build();
client.start();
System.out.println("Client [" + thread + "] Server connected...");
return client;
}
}
執行程式,輸出以下內容:
Client [0] Server connected…
Client [1] Server connected…
Client [2] Server connected…
Client #2成為當前leader 共成為leader的次數:1次
Client #2放棄領導權
Client #0成為當前leader 共成為leader的次數:1次
Client #0放棄領導權
Client #1成為當前leader 共成為leader的次數:1次
Client #1放棄領導權
Client #2成為當前leader 共成為leader的次數:2次
Client #2放棄領導權
Client #0成為當前leader 共成為leader的次數:2次
Client #0放棄領導權
Client #1成為當前leader 共成為leader的次數:2次
Client #1放棄領導權
Client #2成為當前leader 共成為leader的次數:3次
Client #2放棄領導權
Client #0成為當前leader 共成為leader的次數:3次
Client #0放棄領導權
Client #1成為當前leader 共成為leader的次數:3次
Client #1放棄領導權
Client #2成為當前leader 共成為leader的次數:4次
Client #2放棄領導權
Client #0成為當前leader 共成為leader的次數:4次
Client #0放棄領導權
Client #1成為當前leader 共成為leader的次數:4次
Client #1放棄領導權
上面只是簡單測試程式碼,並沒有關閉client等操作,每個例項在獲取領導權後,如果 takeLeadership(CuratorFramework client) 方法執行結束,將會釋放其領導權。而且獲取領導權 也是按照 Client #2, Client #0 ,Client #1 順序來的,正好驗證了它的公平性。
LeaderSelectorListener類繼承了ConnectionStateListener。一旦LeaderSelector啟動,它會向curator客戶端新增監聽器。 使用LeaderSelector必須時刻注意連線的變化。一旦出現連線問題如SUSPENDED,curator例項必須確保它可能不再是leader,直至它重新收到RECONNECTED。如果LOST出現,curator例項不再是leader並且其takeLeadership()應該直接退出。
推薦的做法是,如果發生SUSPENDED或者LOST連線問題,最好直接拋CancelLeadershipException,此時,leaderSelector例項會嘗試中斷並且取消正在執行takeLeadership()方法的執行緒。
建議擴充套件LeaderSelectorListenerAdapter, LeaderSelectorListenerAdapter中已經提供了推薦的處理方式 。