1. 程式人生 > >ZooKeeper的三種典型應用場景

ZooKeeper的三種典型應用場景

sre ddl take consumer ron ica alt needed 開發

引言

  ZooKeeper是中典型的pub/sub模式的分布式數據管理與協調框架,開發人員可以使用它進行分布式數據的發布與訂閱。另外,其豐富的數據節點類型可以交叉使用,配合Watcher事件通知機制,可以應用於分布式都會涉及的一些核心功能:數據發布/訂閱、Master選舉、命名服務、分布式協調/通知、集群管理、分布式鎖、分布式隊列等。本博文主要介紹:發布/訂閱、分布式鎖、Master選舉三種最常用的場景

  本文中的代碼示例均是由Curator客戶端編寫的,已經對ZooKeeper原生API做好很多封裝。參考資料《從Paxos到Zookeeper 分布式一致性原理與實踐》(有需要電子PDF的朋友,可以評論私信我)


一、數據發布/訂閱

1、基本概念

(1)數據發布/訂閱系統即所謂的配置中心,也就是發布者將數據發布到ZooKeeper的一個節點或者一系列節點上,提供訂閱者進行數據訂閱,從而實現動態更新數據的目的,實現配置信息的集中式管理和數據的動態更新。ZooKeeper采用的是推拉相結合的方式:客戶端向服務器註冊自己需要關註的節點,一旦該節點的數據發生改變,那麽服務端就會向相應的客戶端發送Wacher事件通知,客戶端接收到消息通知後,需要主動到服務端獲取最新的數據。

(2)實際系統開發過程中:我們可以將初始化配置信息放到節點上集中管理,應用在啟動時都會主動到ZooKeeper服務端進行一次配置讀取,同時在指定節點註冊Watcher監聽,主要配置信息一旦變更,訂閱者就可以獲取讀取最新的配置信息。

通常系統中需要使用一些通用的配置信息,比如機器列表信息、運行時的開關配置、數據庫配置信息等全局配置信息,這些都會有以下3點特性:

  1) 數據量通常比較小(通常是一些配置文件)

  2) 數據內容在運行時會經常發生動態變化(比如數據庫的臨時切換等)

  3) 集群中各機器共享,配置一致(比如數據庫配置共享)。

(3)利用的ZooKeeper特性是:ZooKeeper對任何節點(包括子節點)的變更,只要註冊Wacther事件(使用Curator等客戶端工具已經被封裝好)都可以被其它客戶端監聽

2、代碼示例

技術分享圖片
package com.lijian.zookeeper.demo;

import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import java.util.concurrent.CountDownLatch; public class ZooKeeper_Subsciption { private static final String ADDRESS = "xxx.xxx.xxx.xxx:2181"; private static final int SESSION_TIMEOUT = 5000; private static final String PATH = "/configs"; private static RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); private static String config = "jdbc_configuration"; private static CountDownLatch countDownLatch = new CountDownLatch(4); public static void main(String[] args) throws Exception { // 訂閱該配置信息的集群節點(客戶端):sub1-sub3 for (int i = 0; i < 3; i++) { CuratorFramework consumerClient = getClient(); subscribe(consumerClient, "sub" + String.valueOf(i)); } // 更改配置信息的集群節點(客戶端):pub CuratorFramework publisherClient = getClient(); publish(publisherClient, "pub"); } private static void init() throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(ADDRESS) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(retryPolicy) .build(); client.start(); // 檢查節點是否存在,不存在則初始化創建 if (client.checkExists().forPath(PATH) == null) { client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .forPath(PATH, config.getBytes()); } } /** * 創建客戶端並且初始化建立一個存儲配置數據的節點 * * @return * @throws Exception */ private static CuratorFramework getClient() throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(ADDRESS) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(retryPolicy) .build(); client.start(); if (client.checkExists().forPath(PATH) == null) { client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .forPath(PATH, config.getBytes()); } return client; } /** * 集群中的某個節點機器更改了配置信息:即發布了更新了數據 * * @param client * @throws Exception */ private static void publish(CuratorFramework client, String znode) throws Exception { System.out.println("節點[" + znode + "]更改了配置數據..."); client.setData().forPath(PATH, "configuration".getBytes()); countDownLatch.await(); } /** * 集群中訂閱的節點客戶端(機器)獲得最新的配置數據 * * @param client * @param znode * @throws Exception */ private static void subscribe(CuratorFramework client, String znode) throws Exception { // NodeCache監聽ZooKeeper數據節點本身的變化 final NodeCache cache = new NodeCache(client, PATH); // 設置為true:NodeCache在第一次啟動的時候就立刻從ZooKeeper上讀取節點數據並保存到Cache中 cache.start(true); System.out.println("節點["+ znode +"]已訂閱當前配置數據:" + new String(cache.getCurrentData().getData())); // 節點監聽 countDownLatch.countDown(); cache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() { System.out.println("配置數據已發生改變, 節點[" + znode + "]讀取當前新配置數據: " + new String(cache.getCurrentData().getData())); } }); } }
View Code

運行結果:節點[pub]更改了配置數據為“configuration”,訂閱"/configs"節點的sub1-sub3觀測到配置被改變,立馬讀取當前最新的配置數據“configuration”

技術分享圖片

二、Master選舉

1、基本概念

  (1)在一些讀寫分離的應用場景中,客戶端寫請求往往是由Master處理的,而另一些場景中,Master則常常負責處理一些復雜的邏輯,並將處理結果同步給集群中其它系統單元。比如一個廣告投放系統後臺與ZooKeeper交互,廣告ID通常都是經過一系列海量數據處理中計算得到(非常消耗I/O和CPU資源的過程),那就可以只讓集群中一臺機器處理數據得到計算結果,之後就可以共享給整個集群中的其它所有客戶端機器。

  (2)利用ZooKeeper的特性:利用ZooKeeper的強一致性,即能夠很好地保證分布式高並發情況下節點的創建一定能夠保證全局唯一性,ZooKeeper將會保證客戶端無法重復創建一個已經存在的數據節點,也就是說如果多個客戶端請求創建同一個節點,那麽最終一定只有一個客戶端請求能夠創建成功,這個客戶端就是Master,而其它客戶端註在該節點上註冊子節點Wacther,用於監控當前Master是否存活,如果當前Master掛了,那麽其余客戶端立馬重新進行Master選舉。

  (3)競爭成為Master角色之後,創建的子節點都是臨時順序節點,比如:_c_862cf0ce-6712-4aef-a91d-fc4c1044d104-lock-0000000001,並且序號是遞增的。需要註意的是這裏有"lock"單詞,這說明ZooKeeper這一特性,也可以運用於分布式鎖。

   技術分享圖片

2、代碼示例

技術分享圖片
package com.lijian.zookeeper.demo;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class ZooKeeper_Master {

    private static final String ADDRESS="xxx.xxx.xxx.xxx:2181";
    private static final int SESSION_TIMEOUT=5000;
    private static final String MASTER_PATH = "/master_path";
    private static final int CLIENT_COUNT = 5;

    private static RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);


    public static void main(String[] args) throws InterruptedException {

        ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT);
        for (int i = 0; i < CLIENT_COUNT; i++) {
            final String index = String.valueOf(i);
            service.submit(() -> {
                masterSelect(index);
            });
        }
    }

    private static void  masterSelect(final String znode){
        // client成為master的次數統計
        AtomicInteger leaderCount = new AtomicInteger(1);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(ADDRESS)
                .sessionTimeoutMs(SESSION_TIMEOUT)
                .retryPolicy(retryPolicy)
                .build();
        client.start();
        // 一旦執行完takeLeadership,就會重新進行選舉
        LeaderSelector selector = new LeaderSelector(client, MASTER_PATH, new LeaderSelectorListenerAdapter() {
            @Override
            public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                System.out.println("節點["+ znode +"]成為master");
                System.out.println("節點["+ znode +"]已經成為master次數:"+ leaderCount.getAndIncrement());
                // 睡眠5s模擬成為master後完成任務
                Thread.sleep(5000);
                System.out.println("節點["+ znode +"]釋放master");
            }
        });
        // autoRequeue自動重新排隊:使得上一次選舉為master的節點還有可能再次成為master
        selector.autoRequeue();
        selector.start();
    }
}
View Code

運行結果:由於執行selector.autoRequeue()方法,被選舉為master後的節點可能會再次獲被選舉為master,所以會一直循環執行,以下只截圖部分。其中獲取成為master的次數充分表明了Master選舉的公平性。

 技術分享圖片

三、分布式鎖

1、基本概念

  (1)對於排他鎖:ZooKeeper通過數據節點表示一個鎖,例如/exclusive_lock/lock節點就可以定義一個鎖,所有客戶端都會調用create()接口,試圖在/exclusive_lock下創建lock子節點,但是ZooKeeper的強一致性會保證所有客戶端最終只有一個客戶創建成功。也就可以認為獲得了鎖,其它線程Watcher監聽子節點變化(等待釋放鎖,競爭獲取資源)。

     對於共享鎖:ZooKeeper同樣可以通過數據節點表示一個鎖,類似於/shared_lock/[Hostname]-請求類型(讀/寫)-序號的臨時節點,比如/shared_lock/192.168.0.1-R-0000000000

2、代碼示例

Curator提供的有四種鎖,分別如下:

  (1)InterProcessMutex:分布式可重入排它鎖

  (2)InterProcessSemaphoreMutex:分布式排它鎖

  (3)InterProcessReadWriteLock:分布式讀寫鎖

  (4)InterProcessMultiLock:將多個鎖作為單個實體管理的容器

主要是以InterProcessMutex為例,編寫示例:

技術分享圖片
package com.lijian.zookeeper.demo;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ZooKeeper_Lock {
    private static final String ADDRESS = "xxx.xxx.xxx.xxx:2181";
    private static final int SESSION_TIMEOUT = 5000;
    private static final String LOCK_PATH = "/lock_path";
    private static final int CLIENT_COUNT = 10;

    private static RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    private static int resource = 0;

    public static void main(String[] args){
        ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT);
        for (int i = 0; i < CLIENT_COUNT; i++) {
            final String index = String.valueOf(i);
            service.submit(() -> {
                distributedLock(index);
            });
        }
    }

    private static void distributedLock(final String znode) {
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(ADDRESS)
                .sessionTimeoutMs(SESSION_TIMEOUT)
                .retryPolicy(retryPolicy)
                .build();
        client.start();
        final InterProcessMutex lock = new InterProcessMutex(client, LOCK_PATH);
        try {
//            lock.acquire();
            System.out.println("客戶端節點[" + znode + "]獲取lock");
            System.out.println("客戶端節點[" + znode + "]讀取的資源為:" + String.valueOf(resource));
            resource ++;
//            lock.release();
            System.out.println("客戶端節點[" + znode + "]釋放lock");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
View Code

運行結果:加鎖後可以從左圖看到讀取的都是最新的資源值。如果去掉鎖的話讀取的資源值不能保證是最新值看右圖

技術分享圖片    技術分享圖片

ZooKeeper的三種典型應用場景