1. 程式人生 > >zookeeper Apache Curator

zookeeper Apache Curator

1 簡介

    Curator是Netflix公司開源的一套Zookeeper客戶端框架。瞭解過Zookeeper原生API都會清楚其複雜度。Curator幫助我們在其基礎上進行封裝、實現一些開發細節,包括接連重連、反覆註冊Watcher和NodeExistsException等。目前已經作為Apache的頂級專案出現,是最流行的Zookeeper客戶端之一。從編碼風格上來講,它提供了基於Fluent的程式設計風格支援。除此之外,Curator還提供了Zookeeper的各種應用場景:Recipe、共享鎖服務、Master選舉機制和分散式計數器等。 

2 依賴

    專案在GitHub上的開源地址隨著從Netflix轉移到Apache也發生了變化。地址為:https://github.com/apache/curator。我在實際操作中選取如下的版本:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.1</version>
        </dependency>

3 建立客戶端

        在curator中,CuratorFramework表示zk的客戶端。curator提供瞭如下的兩種方式去建立:

public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) {
        return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS,     
               DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
    }

public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int 
                                         connectionTimeoutMs, RetryPolicy retryPolicy) {
        return builder()
               .connectString(connectString)
               .sessionTimeoutMs(sessionTimeoutMs)
               .connectionTimeoutMs(connectionTimeoutMs)
               .retryPolicy(retryPolicy).build();
}

    該方法newClient()是CuratorFrameworkFactory的方法,通過工場的方式去建立。connectString表示連線zookeeper的地址,retryPolicy表示重連策略。ExponentialBackoffRetry實現了該介面。

RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,3);
    第一個引數表示休眠多長時間再次重連。第二個引數表示重連的次數。如下的程式碼就是新建了一個會話:
 public CuratorFramework getZKClient(){
        /**
         *  baseSleepTimeMs 休眠1s後重連
         *  maxRetries 重連的最大次數
         */
        RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,3);
        return CuratorFrameworkFactory.newClient("127.0.0.1:2182", retryPolicy);
}

4 建立節點測試

    使用create建立節點,預設建立的是永久節點,value為機器的ip

public void createNode(){
        CuratorFramework client = getZKClient();
        client.start();
        try {
            client.create().forPath("/curator");
        } catch (Exception e) {
            e.printStackTrace();
        }
}

    大家可以看到我的驗證結果。

5 獲取資料和更新資料

public void getData(){
        CuratorFramework client = getZKClient();
        client.start();
        // 包含狀態查詢
        Stat stat = new Stat();
        try {
            /**
             * 查詢資料
             * 普通查詢
             * 狀態查詢
             */
            byte[] bytes = client.getData().forPath("/curator");
            byte[] bytes1 = client.getData().storingStatIn(stat).forPath("/curator");
            /**
             * 更新資料
             * 普通更新
             * 指定版本更新
             */
            Stat stat1 = client.setData().forPath("/curator", "新內容".getBytes());
            Stat stat2 = client.setData().withVersion(1).forPath("/curator");
            System.out.println(new String(bytes));
            System.out.println(new String(bytes1));
            System.out.println(stat1);
            System.out.println(stat2);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

6 Curator分散式鎖之生成流水號

    在分散式系統中,為了保證資料的一致性,往往需要進行同步控制,比如減庫存、唯一流水號生成等。Curator對Zookeeper進行了封裝,實現了分散式鎖的功能,提供了執行緒的同步控制。同時,Curator也提供了多種鎖機制。下面對通過時間戳生成流水號的場景進行逐步分析。

    程式碼通過一個迴圈連續打印出10個時間戳。這裡沒有使用多執行緒,但分析下面的列印結果就會發現,其實在同一時刻會生成多個相同的流水號,執行時間在毫秒級別。

public static void main(String[] args) {
        for(int i=0; i< 10; i++){
            SimpleDateFormat sdf = new SimpleDateFormat("yyyyDDmm HH:mm:ss|SSS");
            String orderNo = sdf.format(new Date());
            System.out.println(orderNo);
        }
}

    結果如下:

201831413 16:13:10|794
201831413 16:13:10|796
201831413 16:13:10|796
201831413 16:13:10|797
201831413 16:13:10|797
201831413 16:13:10|797
201831413 16:13:10|797
201831413 16:13:10|797
201831413 16:13:10|798
201831413 16:13:10|798

    觀察上述生成的時間流水號,10個流水號,但是隻有794,796,797,798。這四個,其餘的都是重複的。上面生成的流水號重複的可能性不大,一旦出現高併發,那麼重複的訂單號就會大量出現,當然也有其他方案進行解決,本篇文章就不再進行討論。下說說如何通過分散式鎖來解決此問題。

分散式鎖示例

下面的程式碼利用Curator的分散式鎖來實現在同一時刻只會生成一個唯一的流水號。

public class ZKLock {
    /** 節點名稱*/
    private static final String path = "/lock_path";
    public static void main(String[] args) {
        /** 獲取客戶端*/
        CuratorFramework client = getClient();
        /** 獲取分散式鎖*/
        InterProcessMutex lock = new InterProcessMutex(client,path);
        /** 單個執行緒開始執行程式*/
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final long startTime = new Date().getTime();
        for(int i=0;i<10;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        countDownLatch.await();
                        /** 獲取鎖*/
                        lock.acquire();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss|SSS");
                    System.out.println(sdf.format(new Date()));
                    /** 釋放鎖*/
                    try {
                        lock.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println("顯示此執行緒大概花費時間(等待+執行):" + (new Date().getTime() - startTime) + "ms");
                }
            }).start();
        }
        System.out.println("建立執行緒花費時間:" + (new Date().getTime() - startTime) + "ms");
        countDownLatch.countDown();
    }

    private static CuratorFramework getClient() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2182")
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(6000)
                .connectionTimeoutMs(3000)
                .namespace("demo")
                .build();
        client.start();
        return client;
    }
}

列印結果為:

建立執行緒花費時間:2ms
20181110 16:33:31|776
顯示此執行緒大概花費時間(等待+執行):374ms
20181110 16:33:31|796
顯示此執行緒大概花費時間(等待+執行):394ms
20181110 16:33:31|814
顯示此執行緒大概花費時間(等待+執行):404ms
20181110 16:33:31|829
顯示此執行緒大概花費時間(等待+執行):419ms
20181110 16:33:31|841
顯示此執行緒大概花費時間(等待+執行):434ms
20181110 16:33:31|858
顯示此執行緒大概花費時間(等待+執行):449ms
20181110 16:33:31|874
顯示此執行緒大概花費時間(等待+執行):472ms
20181110 16:33:31|895
顯示此執行緒大概花費時間(等待+執行):485ms
20181110 16:33:31|905
顯示此執行緒大概花費時間(等待+執行):493ms
20181110 16:33:31|912
顯示此執行緒大概花費時間(等待+執行):502ms

    仔細觀察可發現,通過多執行緒的訪問,列印的時間戳卻是唯一的。這裡使用InterProcessMutex類來進行處理分散式鎖,實現了一個生產唯一流水號的功能。

注意事項


    在上面的程式碼中,列印了每步操作的時間,其中訪問的zookeeper伺服器是遠端伺服器。從列印的時間我們可以看出,通過這種方式生成唯一流水號並不能支撐很大的併發量。每次操作都需要通過網路訪問,zookeeper的節點操作等,會花費大量的時間。另外,由於精確到毫秒,因此一秒鐘最多也只能處理999個請求。

同時,在分散式環境中上面的示例還是會出現重複的可能性的,比如兩個伺服器的時間不一致,即兩個伺服器相差10ms,恰好第一個執行完,第二個執行的間隙也是10ms,那麼第二個生成的訂單號還是有可能跟第一個重複的,雖然這種概率及其小。

以上通過示例演示了Curator的分散式鎖功能,根據具體的業務需求可選擇不同的業務場景來使用。

參考:https://blog.csdn.net/wo541075754/article/details/71173552