1. 程式人生 > >zookeeper之應用與實現

zookeeper之應用與實現

Leader Elections(leader選舉)

  1. 指派一個程序作為組織者,將任務分發給各節點。在任務開始前,哪個節點都不知道誰是leader(領導者)或者coordinator(協調者)。當選舉演算法開始執行後,每個節點最終會得到一個唯一的節點作為任務leader。除此之外,選舉還經常會發生在leader意外宕機的情況下,新的leader要被選舉出來。
  2. Curator有兩種leader選舉方式
    • LeaderSelector:前者是所有存活的客戶端不間斷的輪流做Leader
    • LeaderLatch:一旦選舉出Leader,除非有客戶端掛掉重新觸發選舉,否則不會交出領導權

LeaderLatch

  1. LeaderLatch方法

    • start():啟動,一旦啟動,當前的LeaderLatch會與具有相同的path的LeaderLatch進行交涉,最終選擇出一個leader
    • hasLeadership():返回true,表示當前的LeaderLatchleader
    • close():一旦不使用LeaderLatch,必須呼叫此方法,如果是leader則會釋放領導權,其他的引數者會再次選舉一個leader
  2. 程式碼,Leader Latch選舉的本質是連線ZooKeeper,然後在/jannal-leader/leader路徑為每個LeaderLatch建立臨時有序節點:

        @Test
    public void testLeader() { String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183"; int sessionTimeoutMs = 25000; int connectionTimeoutMs = 5000; String rootPath = "jannal-leader"; List<CuratorFramework> clients = Lists.newArrayList(); List<
    LeaderLatch>
    leaderLatches = Lists.newArrayList(); int clientCount = 5; try { for (int i = 0; i < clientCount; i++) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3); CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(retryPolicy) .namespace(rootPath) .build(); clients.add(curatorFramework); curatorFramework.start(); LeaderLatch leaderLatch = new LeaderLatch(curatorFramework, "/leader", "C" + i); leaderLatch.addListener(new LeaderLatchListener() { @Override public void isLeader() { logger.info("{}是leader", leaderLatch.getId()); } @Override public void notLeader() { logger.info("{}不是leader", leaderLatch.getId()); } }); leaderLatches.add(leaderLatch); leaderLatch.start(); } //等待選舉完成 TimeUnit.SECONDS.sleep(10); LeaderLatch currentLeader = null; int currentIndex = 0; for (int i = 0; i < clientCount; ++i) { LeaderLatch leaderLatch = leaderLatches.get(i); //檢視是否是leader if (leaderLatch.hasLeadership()) { currentLeader = leaderLatch; currentIndex = i; } } logger.info("當前leader:{}", currentLeader.getId()); TimeUnit.SECONDS.sleep(30); //釋放領導權 currentLeader.close(); CloseableUtils.closeQuietly(clients.get(currentIndex)); //因為已經釋放領導權(client已經關閉),所以必須移除 leaderLatches.remove(currentIndex); clients.remove(currentIndex); LockSupport.park(); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { for (LeaderLatch leaderLatch : leaderLatches) { CloseableUtils.closeQuietly(leaderLatch); } for (CuratorFramework client : clients) { CloseableUtils.closeQuietly(client); } } }
  3. 上面程式碼,LeaderLatch啟動之後Zookeeper出現5個節點(如下圖),第一次選舉出的是C1為leader,這是因為在建立臨時節點時,LeaderLatch中的checkLeadership(List<String> children) 方法會將選舉路徑(/jannal-leader/leader/)下面的所有節點按照序列號排序,如果當前節點的序列號最小,則將該節點設定為leader。當C1釋放領導權之後,C4的序列號最小被選為Leader,我們再通過Idea Zookeeper外掛手動刪除C4對應的節點jannal-leader/leader/_c_21fb5cfb-e92c-4a48-ba40-87b877e9100f-latch-0000000031,會發現此時C0(_c_e28110cd-902e-40dc-9de0-4559e70ba1ca-latch-0000000032)被選為leader -w708

    leaderLatch.getId() 節點路徑
    C0 /jannal-leader/leader/_c_e28110cd-902e-40dc-9de0-4559e70ba1ca-latch-0000000032
    C1 /jannal-leader/leader/_c_ac6553fd-dbc7-4fab-834c-cbd5ea54ddba-latch-0000000030
    C2 /jannal-leader/leader/_c_82d88ba2-5130-4840-8571-8eb80c16a664-latch-0000000033
    C3 /jannal-leader/leader/_c_c7300cbd-fd34-40ff-9bc0-0795fa208ffa-latch-0000000034
    C4 /jannal-leader/leader/_c_21fb5cfb-e92c-4a48-ba40-87b877e9100f-latch-0000000031

    -w681

  4. 根據以上的分析,如果按照時間順序,依次啟動LeaderLatch,則第一個啟動的一定會是leader,因為第一個序列號一定是最小的,只需要將上面的程式碼調整一下。因為LeaderLatch啟動是非同步執行緒池執行的,所以這裡的sleep(5)只是大概的模擬了一下啟動順序,實際執行緒的執行順序很難確定(我們只是假設5秒可以啟動完成)

              for (int i = 0; i < clientCount; i++) {
                    省略....
                    leaderLatch.start();
                }
                TimeUnit.SECONDS.sleep(5);
            調整為以下
    
              for (int i = 0; i < clientCount; i++) {
                    省略....
                    leaderLatch.start();
                    //按照順序依次啟動
                    TimeUnit.SECONDS.sleep(5);
               }
     
    
    

LeaderSelector

  1. 這種選舉策略跟LeaderLatch選舉策略不同之處在於每個例項都能公平獲取領導權,而且當獲取領導權的例項在釋放領導權之後,該例項還有機會再次獲取領導權。另外,選舉出來的leader不會一直佔有領導權,當takeLeadership(CuratorFramework client)方法執行結束之後會自動釋放領導權,當例項取得領導權時你的listener的takeLeadership()方法被呼叫

  2. 程式碼

     class CustomLeaderSelectorListenerAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    
            private String name;
            private LeaderSelector leaderSelector;
            public AtomicInteger leaderCount = new AtomicInteger();
    
            public CustomLeaderSelectorListenerAdapter(CuratorFramework client, String path, String name) {
                this.name = name;
                this.leaderSelector = new LeaderSelector(client, path, this);
    
                /**
                 * 自動重新排隊
                 * 該方法的呼叫可以確保此例項在釋放領導權後還可能獲得領導權
                 */
                leaderSelector.autoRequeue();
            }
    
            public void start() throws IOException {
                leaderSelector.start();
            }
    
            @Override
            public void close() throws IOException {
                leaderSelector.close();
            }
    
            /**
             * 獲取領導權
             */
            @Override
            public void takeLeadership(CuratorFramework client) throws Exception {
                final int waitSeconds = (int) (5 * Math.random()) + 1;
                logger.info("{}是leader,之前成為leader的次數:{}", name, leaderCount.getAndIncrement());
                try {
                    //等待waitSeconds秒後放棄領導權(模擬業務執行過程)
                    Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
                     //如果想讓它一直是leader,這裡可以阻塞
                    //LockSupport.park();
                } catch (InterruptedException e) {
                    logger.error(e.getMessage(), e);
                    Thread.currentThread().interrupt();
                } finally {
                    logger.info("{}放棄領導權", name);
                }
    
            }
    
        }
    
        @Test
        public void testLeaderSelector() {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            int connectionTimeoutMs = 5000;
            String rootPath = "jannal-leader";
    
            List<CuratorFramework> clients = Lists.newArrayList();
            List<CustomLeaderSelectorListenerAdapter> leaderSelectorListenerList =
                    new ArrayList<CustomLeaderSelectorListenerAdapter>();
            int clientCount = 5;
            try {
                for (int i = 0; i < clientCount; i++) {
                    RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3);
                    CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                            .connectString(connectString)
                            .sessionTimeoutMs(sessionTimeoutMs)
                            .connectionTimeoutMs(connectionTimeoutMs)
                            .retryPolicy(retryPolicy)
                            .namespace(rootPath)
                            .build();
                    clients.add(curatorFramework);
                    curatorFramework.start();
    
                    //建立LeaderSelectorListenerAdapter例項
                    CustomLeaderSelectorListenerAdapter leaderSelectorListener =
                            new CustomLeaderSelectorListenerAdapter(curatorFramework, "/leaderSelector", "C" + i);
    
                    leaderSelectorListener.start();
                    leaderSelectorListenerList.add(leaderSelectorListener);
                }
    
                LockSupport.park();
    
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                for (CustomLeaderSelectorListenerAdapter customLeaderSelectorListenerAdapter : leaderSelectorListenerList) {
                    CloseableUtils.closeQuietly(customLeaderSelectorListenerAdapter);
                }
                for (CuratorFramework client : clients) {
                    CloseableUtils.closeQuietly(client);
                }
            }
        }
    
  3. 上面程式碼的執行結果

分散式鎖

可重入鎖

  1. 全域性可重入鎖(Shared ReentrantLock),。Reentrant和JDK的ReentrantLock類似,意味著同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。

  2. 程式碼示例

        @Test
        public void testInterProcessMutex() {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            int connectionTimeoutMs = 5000;
            String rootPath = "jannal-leader";
            String lockPath = "/shared/reentrantlock";
            final List<CuratorFramework> clientList = new ArrayList<CuratorFramework>();
            int size = 10;
            ExecutorService service = Executors.newFixedThreadPool(20);
            try {
                for (int i = 0; i < size; i++) {
                    RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3);
                    CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                            .connectString(connectString)
                            .sessionTimeoutMs(sessionTimeoutMs)
                            .connectionTimeoutMs(connectionTimeoutMs)
                            .retryPolicy(retryPolicy)
                            .namespace(rootPath)
                            .build();
                    clientList.add(curatorFramework);
                    curatorFramework.start();
                }
    
                for (int j = 0; j < size; j++) {
                    final CuratorFramework client = clientList.get(j);
                    final int index = j;
                    service.submit(new Callable<Void>() {
    
                        @Override
                        public Void call() throws Exception {
                            String clientName = "C" + index;
                            //每個執行緒都使用一個InterProcessMutex
                            final InterProcessMutex lock = new InterProcessMutex(client, lockPath);
                            //嘗試10次獲取鎖
                            for (int k = 0; k < 10; k++) {
                                if (lock.acquire(1, TimeUnit.SECONDS)) {
                                    try {
                                        logger.info("{}已經獲取到鎖,開始執行業務程式碼", clientName);
                                        //模擬業務執行時間
                                        TimeUnit.SECONDS.sleep(2);
                                    } finally {
                                        lock.release();
                                    }
                                } else {
                                    logger.warn("{}沒有獲取到鎖", clientName);
                                }
                            }
                            return null;
                        }
                    });
                }
                LockSupport.park();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                service.shutdown();
                for (CuratorFramework client : clientList) {
                    CloseableUtils.closeQuietly(client);
                }
            }
        }
    
    
  3. 檢視節點,獲取鎖的客戶端節點會在ZK上建立一個節點,值為機器IP地址。鎖一旦釋放節點就刪除。 -w650

  4. 執行結果 -w639

不可重入鎖

  1. InterProcessSemaphoreMutex

        /**
         * 不可重入鎖InterProcessSemaphoreMutex
         */
        @Test
        public void testInterProcessSemaphoreMutex() {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            int connectionTimeoutMs = 5000;
            String rootPath = "jannal-leader";
            String lockPath = "/shared/reentrantlock";
            final List<CuratorFramework> clientList = new ArrayList<CuratorFramework>();
            int size = 10;
            ExecutorService service = Executors.newFixedThreadPool(20);
            try {
                for (int i = 0; i < size; i++) {
                    RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3);
                    CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                            .connectString(connectString)
                            .sessionTimeoutMs(sessionTimeoutMs)
                            .connectionTimeoutMs(connectionTimeoutMs)
                            .retryPolicy(retryPolicy)
                            .namespace(rootPath)
                            .build();
                    clientList.add(curatorFramework);
                    curatorFramework.start();
                }
    
                for (int j = 0; j < size; j++) {
                    final CuratorFramework client = clientList.get(j);
                    final int index = j;
                    service.submit(new Callable<Void>() {
    
                        @Override
                        public Void call() throws Exception {
                            String clientName = "C" + index;
                            //每個執行緒都使用一個InterProcessMutex
                            final InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(client, lockPath);
                            //嘗試10次獲取鎖
                            for (int k = 0; k < 10; k++) {
                                if (lock.acquire(1, TimeUnit.SECONDS)) {
                                    try {
                                        logger.info("{}已經獲取到鎖,開始執行業務程式碼", clientName);
                                        //模擬業務執行時間
                                        TimeUnit.SECONDS.sleep(2);
                                    } finally {
                                        lock.release();
                                    }
                                } else {
                                    logger.warn("{}沒有獲取到鎖", clientName);
                                }
                            }
                            return null;
                        }
                    });
                }
                LockSupport.park();
            } catch (