zookeeper之應用與實現
Leader Elections(leader選舉)
- 指派一個程序作為組織者,將任務分發給各節點。在任務開始前,哪個節點都不知道誰是
leader(領導者)
或者coordinator(協調者)
。當選舉演算法開始執行後,每個節點最終會得到一個唯一的節點作為任務leader
。除此之外,選舉還經常會發生在leader
意外宕機的情況下,新的leader
要被選舉出來。 Curator
有兩種leader
選舉方式LeaderSelector
:前者是所有存活的客戶端不間斷的輪流做Leader
LeaderLatch
:一旦選舉出Leader
,除非有客戶端掛掉重新觸發選舉,否則不會交出領導權
LeaderLatch
-
LeaderLatch
方法start()
:啟動,一旦啟動,當前的LeaderLatch
會與具有相同的path的LeaderLatch
進行交涉,最終選擇出一個leader
hasLeadership()
:返回true,表示當前的LeaderLatch
是leader
close()
:一旦不使用LeaderLatch
,必須呼叫此方法,如果是leader
則會釋放領導權,其他的引數者會再次選舉一個leader
-
程式碼,
Leader Latch
選舉的本質是連線ZooKeeper
,然後在/jannal-leader/leader
路徑為每個LeaderLatch
建立臨時有序節點:@Test
-
上面程式碼,
LeaderLatch
啟動之後Zookeeper
出現5個節點(如下圖),第一次選舉出的是C1
為leader,這是因為在建立臨時節點時,LeaderLatch
中的checkLeadership(List<String> children)
方法會將選舉路徑(/jannal-leader/leader/
)下面的所有節點按照序列號排序,如果當前節點的序列號最小,則將該節點設定為leader
。當C1
釋放領導權之後,C4
的序列號最小被選為Leader
,我們再通過Idea Zookeeper
外掛手動刪除C4
對應的節點jannal-leader/leader/_c_21fb5cfb-e92c-4a48-ba40-87b877e9100f-latch-0000000031
,會發現此時C0
(_c_e28110cd-902e-40dc-9de0-4559e70ba1ca-latch-0000000032
)被選為leader
leaderLatch.getId() 節點路徑 C0 /jannal-leader/leader/_c_e28110cd-902e-40dc-9de0-4559e70ba1ca-latch-0000000032 C1 /jannal-leader/leader/_c_ac6553fd-dbc7-4fab-834c-cbd5ea54ddba-latch-0000000030 C2 /jannal-leader/leader/_c_82d88ba2-5130-4840-8571-8eb80c16a664-latch-0000000033 C3 /jannal-leader/leader/_c_c7300cbd-fd34-40ff-9bc0-0795fa208ffa-latch-0000000034 C4 /jannal-leader/leader/_c_21fb5cfb-e92c-4a48-ba40-87b877e9100f-latch-0000000031 -
根據以上的分析,如果按照時間順序,依次啟動
LeaderLatch
,則第一個啟動的一定會是leader
,因為第一個序列號一定是最小的,只需要將上面的程式碼調整一下。因為LeaderLatch
啟動是非同步執行緒池執行的,所以這裡的sleep(5)
只是大概的模擬了一下啟動順序,實際執行緒的執行順序很難確定(我們只是假設5秒可以啟動完成)for (int i = 0; i < clientCount; i++) { 省略.... leaderLatch.start(); } TimeUnit.SECONDS.sleep(5); 調整為以下 for (int i = 0; i < clientCount; i++) { 省略.... leaderLatch.start(); //按照順序依次啟動 TimeUnit.SECONDS.sleep(5); }
LeaderSelector
-
這種選舉策略跟
LeaderLatch
選舉策略不同之處在於每個例項都能公平獲取領導權,而且當獲取領導權的例項在釋放領導權之後,該例項還有機會再次獲取領導權。另外,選舉出來的leader
不會一直佔有領導權,當takeLeadership(CuratorFramework client)
方法執行結束之後會自動釋放領導權,當例項取得領導權時你的listener的takeLeadership()
方法被呼叫 -
程式碼
class CustomLeaderSelectorListenerAdapter extends LeaderSelectorListenerAdapter implements Closeable { private String name; private LeaderSelector leaderSelector; public AtomicInteger leaderCount = new AtomicInteger(); public CustomLeaderSelectorListenerAdapter(CuratorFramework client, String path, String name) { this.name = name; this.leaderSelector = new LeaderSelector(client, path, this); /** * 自動重新排隊 * 該方法的呼叫可以確保此例項在釋放領導權後還可能獲得領導權 */ leaderSelector.autoRequeue(); } public void start() throws IOException { leaderSelector.start(); } @Override public void close() throws IOException { leaderSelector.close(); } /** * 獲取領導權 */ @Override public void takeLeadership(CuratorFramework client) throws Exception { final int waitSeconds = (int) (5 * Math.random()) + 1; logger.info("{}是leader,之前成為leader的次數:{}", name, leaderCount.getAndIncrement()); try { //等待waitSeconds秒後放棄領導權(模擬業務執行過程) Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds)); //如果想讓它一直是leader,這裡可以阻塞 //LockSupport.park(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); Thread.currentThread().interrupt(); } finally { logger.info("{}放棄領導權", name); } } } @Test public void testLeaderSelector() { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; int connectionTimeoutMs = 5000; String rootPath = "jannal-leader"; List<CuratorFramework> clients = Lists.newArrayList(); List<CustomLeaderSelectorListenerAdapter> leaderSelectorListenerList = new ArrayList<CustomLeaderSelectorListenerAdapter>(); int clientCount = 5; try { for (int i = 0; i < clientCount; i++) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3); CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(retryPolicy) .namespace(rootPath) .build(); clients.add(curatorFramework); curatorFramework.start(); //建立LeaderSelectorListenerAdapter例項 CustomLeaderSelectorListenerAdapter leaderSelectorListener = new CustomLeaderSelectorListenerAdapter(curatorFramework, "/leaderSelector", "C" + i); leaderSelectorListener.start(); leaderSelectorListenerList.add(leaderSelectorListener); } LockSupport.park(); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { for (CustomLeaderSelectorListenerAdapter customLeaderSelectorListenerAdapter : leaderSelectorListenerList) { CloseableUtils.closeQuietly(customLeaderSelectorListenerAdapter); } for (CuratorFramework client : clients) { CloseableUtils.closeQuietly(client); } } }
-
上面程式碼的執行結果
分散式鎖
可重入鎖
-
全域性可重入鎖
(Shared ReentrantLock),。Reentrant和JDK的ReentrantLock
類似,意味著同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。 -
程式碼示例
@Test public void testInterProcessMutex() { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; int connectionTimeoutMs = 5000; String rootPath = "jannal-leader"; String lockPath = "/shared/reentrantlock"; final List<CuratorFramework> clientList = new ArrayList<CuratorFramework>(); int size = 10; ExecutorService service = Executors.newFixedThreadPool(20); try { for (int i = 0; i < size; i++) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3); CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(retryPolicy) .namespace(rootPath) .build(); clientList.add(curatorFramework); curatorFramework.start(); } for (int j = 0; j < size; j++) { final CuratorFramework client = clientList.get(j); final int index = j; service.submit(new Callable<Void>() { @Override public Void call() throws Exception { String clientName = "C" + index; //每個執行緒都使用一個InterProcessMutex final InterProcessMutex lock = new InterProcessMutex(client, lockPath); //嘗試10次獲取鎖 for (int k = 0; k < 10; k++) { if (lock.acquire(1, TimeUnit.SECONDS)) { try { logger.info("{}已經獲取到鎖,開始執行業務程式碼", clientName); //模擬業務執行時間 TimeUnit.SECONDS.sleep(2); } finally { lock.release(); } } else { logger.warn("{}沒有獲取到鎖", clientName); } } return null; } }); } LockSupport.park(); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { service.shutdown(); for (CuratorFramework client : clientList) { CloseableUtils.closeQuietly(client); } } }
-
檢視節點,獲取鎖的客戶端節點會在ZK上建立一個節點,值為機器IP地址。鎖一旦釋放節點就刪除。
-
執行結果
不可重入鎖
-
InterProcessSemaphoreMutex
/** * 不可重入鎖InterProcessSemaphoreMutex */ @Test public void testInterProcessSemaphoreMutex() { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; int connectionTimeoutMs = 5000; String rootPath = "jannal-leader"; String lockPath = "/shared/reentrantlock"; final List<CuratorFramework> clientList = new ArrayList<CuratorFramework>(); int size = 10; ExecutorService service = Executors.newFixedThreadPool(20); try { for (int i = 0; i < size; i++) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3); CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(retryPolicy) .namespace(rootPath) .build(); clientList.add(curatorFramework); curatorFramework.start(); } for (int j = 0; j < size; j++) { final CuratorFramework client = clientList.get(j); final int index = j; service.submit(new Callable<Void>() { @Override public Void call() throws Exception { String clientName = "C" + index; //每個執行緒都使用一個InterProcessMutex final InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(client, lockPath); //嘗試10次獲取鎖 for (int k = 0; k < 10; k++) { if (lock.acquire(1, TimeUnit.SECONDS)) { try { logger.info("{}已經獲取到鎖,開始執行業務程式碼", clientName); //模擬業務執行時間 TimeUnit.SECONDS.sleep(2); } finally { lock.release(); } } else { logger.warn("{}沒有獲取到鎖", clientName); } } return null; } }); } LockSupport.park(); } catch (