1. 程式人生 > >zookeeper使用(三)--開源客戶端

zookeeper使用(三)--開源客戶端

一、前言

  上一篇部落格已經介紹瞭如何使用Zookeeper提供的原生態Java API進行操作,本篇博文主要講解如何通過開源客戶端來進行操作。

二、ZkClient

  ZkClient是在Zookeeper原聲API介面之上進行了包裝,是一個更易用的Zookeeper客戶端,其內部還實現了諸如Session超時重連、Watcher反覆註冊等功能。

  2.1 新增依賴

  在pom.xml檔案中新增如下內容即可。  

        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.2</version>
        </dependency>

  2.2 建立會話

  使用ZkClient可以輕鬆的建立會話,連線到服務端。  

複製程式碼

package com.hust.grid.leesf.zkclient.examples;

import java.io.IOException;
import org.I0Itec.zkclient.ZkClient;

public class Create_Session_Sample {
    public static void main(String[] args) throws IOException, InterruptedException {
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        System.out.println("ZooKeeper session established.");
    }
}

複製程式碼

  執行結果:  

ZooKeeper session established.

  結果表明已經成功建立會話。

  2.3 建立節點 

  ZkClient提供了遞迴建立節點的介面,即其幫助開發者完成父節點的建立,再建立子節點。

複製程式碼

package com.hust.grid.leesf.zkclient.examples;
import org.I0Itec.zkclient.ZkClient;

public class Create_Node_Sample {
    public static void main(String[] args) throws Exception {
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        String path = "/zk-book/c1";
        zkClient.createPersistent(path, true);
        System.out.println("success create znode.");
    }
}

複製程式碼

  執行結果: 

success create znode.

  結果表明已經成功建立了節點,值得注意的是,在原生態介面中是無法建立成功的(父節點不存在),但是通過ZkClient可以遞迴的先建立父節點,再建立子節點。

  

  可以看到確實成功建立了/zk-book和/zk-book/c1兩個節點。

  2.4 刪除節點

  ZkClient提供了遞迴刪除節點的介面,即其幫助開發者先刪除所有子節點(存在),再刪除父節點。  

複製程式碼

package com.hust.grid.leesf.zkclient.examples;

import org.I0Itec.zkclient.ZkClient;

public class Del_Data_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        zkClient.createPersistent(path, "");
        zkClient.createPersistent(path+"/c1", "");
        zkClient.deleteRecursive(path);
        System.out.println("success delete znode.");
    }
}

複製程式碼

  執行結果:  

success delete znode.

  結果表明ZkClient可直接刪除帶子節點的父節點,因為其底層先刪除其所有子節點,然後再刪除父節點。

  2.5 獲取子節點  

複製程式碼

package com.hust.grid.leesf.zkclient.examples;

import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;

public class Get_Children_Sample {

    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println(parentPath + " 's child changed, currentChilds:" + currentChilds);
            }
        });

        zkClient.createPersistent(path);
        Thread.sleep(1000);
        zkClient.createPersistent(path + "/c1");
        Thread.sleep(1000);
        zkClient.delete(path + "/c1");
        Thread.sleep(1000);
        zkClient.delete(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}

複製程式碼

  執行結果:  

/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:[c1]
/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:null

  結果表明:

  客戶端可以對一個不存在的節點進行子節點變更的監聽。

  一旦客戶端對一個節點註冊了子節點列表變更監聽之後,那麼當該節點的子節點列表發生變更時,服務端都會通知客戶端,並將最新的子節點列表傳送給客戶端

  該節點本身的建立或刪除也會通知到客戶端。

  2.6 獲取資料

複製程式碼

package com.hust.grid.leesf.zkclient.examples;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

public class Get_Data_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
        zkClient.createEphemeral(path, "123");

        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("Node " + dataPath + " deleted.");
            }

            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println("Node " + dataPath + " changed, new data: " + data);
            }
        });

        System.out.println(zkClient.readData(path));
        zkClient.writeData(path, "456");
        Thread.sleep(1000);
        zkClient.delete(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}

複製程式碼

  執行結果: 

123
Node /zk-book changed, new data: 456
Node /zk-book deleted.

  結果表明可以成功監聽節點資料變化或刪除事件。

  2.7 檢測節點是否存在  

複製程式碼

package com.hust.grid.leesf.zkclient.examples;

import org.I0Itec.zkclient.ZkClient;

public class Exist_Node_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        ZkClient zkClient = new ZkClient("127.0.0.1:2181", 2000);
        System.out.println("Node " + path + " exists " + zkClient.exists(path));
    }
}

複製程式碼

  執行結果:

Node /zk-book exists false

  結果表明,可以通過ZkClient輕易檢測節點是否存在,其相比於原生態的介面更易於理解。

三、Curator客戶端

  Curator解決了很多Zookeeper客戶端非常底層的細節開發工作,包括連線重連,反覆註冊Watcher和NodeExistsException異常等,現已成為Apache的頂級專案。

  3.1 新增依賴

  在pom.xml檔案中新增如下內容即可。  

複製程式碼

        <!-- https://mvnrepository.com/artifact/org.apache.curator/apache-curator -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.4.2</version>
        </dependency>

複製程式碼

  3.2 建立會話

  Curator除了使用一般方法建立會話外,還可以使用fluent風格進行建立。

複製程式碼

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class Create_Session_Sample {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 3000, retryPolicy);
        client.start();
        System.out.println("Zookeeper session1 established. ");
        CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("base").build();
        client1.start();
        System.out.println("Zookeeper session2 established. ");        
    }
}

複製程式碼

  執行結果: 

Zookeeper session1 established. 
Zookeeper session2 established. 

  值得注意的是session2會話含有隔離名稱空間,即客戶端對Zookeeper上資料節點的任何操作都是相對/base目錄進行的,這有利於實現不同的Zookeeper的業務之間的隔離。

  3.3 建立節點

  通過使用Fluent風格的介面,開發人員可以進行自由組合來完成各種型別節點的建立。  

複製程式碼

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class Create_Node_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book/c1";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
        System.out.println("success create znode: " + path);
    }
}

複製程式碼

  執行結果: 

success create znode: /zk-book/c1

  其中,也建立了/zk-book/c1的父節點/zk-book節點。

  3.4 刪除節點  

複製程式碼

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class Del_Data_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book/c1";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
        Stat stat = new Stat();
        System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));
        client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path);
        System.out.println("success delete znode " + path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}

複製程式碼

  執行結果: 

init
success delete znode /zk-book/c1

  結果表明成功刪除/zk-book/c1節點。

  3.5 獲取資料 

複製程式碼

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class Get_Data_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
        Stat stat = new Stat();
        System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));
    }
}

複製程式碼

  執行結果:  

init

  結果表明成功獲取了節點的資料。

  3.6 更新資料  

複製程式碼

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

public class Set_Data_Sample {
    public static void main(String[] args) throws Exception {
        String path = "/zk-book";
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
                .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);
        System.out.println("Success set node for : " + path + ", new version: "
                + client.setData().withVersion(stat.getVersion()).forPath(path).getVersion());
        try {
            client.setData().withVersion(stat.getVersion()).forPath(path);
        } catch (Exception e) {
            System.out.println("Fail set node due to " + e.getMessage());
        }
    }
}

複製程式碼

  執行結果:  

Success set node for : /zk-book, new version: 1
Fail set node due to KeeperErrorCode = BadVersion for /zk-book

  結果表明當攜帶資料版本不一致時,無法完成更新操作。

  3.7 非同步介面

  如同Zookeeper原生API提供了非同步介面,Curator也提供了非同步介面。在Zookeeper中,所有的非同步通知事件處理都是由EventThread這個執行緒來處理的,EventThread執行緒用於序列處理所有的事件通知,其可以保證對事件處理的順序性,但是一旦碰上覆雜的處理單元,會消耗過長的處理時間,從而影響其他事件的處理,Curator允許使用者傳入Executor例項,這樣可以將比較複雜的事件處理放到一個專門的執行緒池中去。 

複製程式碼

package com.hust.grid.leesf.curator.examples;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class Create_Node_Background_Sample {
    static String path = "/zk-book";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
    static CountDownLatch semaphore = new CountDownLatch(2);
    static ExecutorService tp = Executors.newFixedThreadPool(2);

    public static void main(String[] args) throws Exception {
        client.start();
        System.out.println("Main thread: " + Thread.currentThread().getName());

        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
                System.out.println();
                semaphore.countDown();
            }
        }, tp).forPath(path, "init".getBytes());

        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());
                semaphore.countDown();
            }
        }).forPath(path, "init".getBytes());

        semaphore.await();
        tp.shutdown();
    }
}

複製程式碼

  執行結果:

Main thread: main
event[code: -110, type: CREATE], Thread of processResult: main-EventThread
event[code: 0, type: CREATE], Thread of processResult: pool-3-thread-1

  其中,建立節點的事件由執行緒池自己處理,而非預設執行緒處理。

  Curator除了提供很便利的API,還提供了一些典型的應用場景,開發人員可以使用參考更好的理解如何使用Zookeeper客戶端,所有的都在recipes包中,只需要在pom.xml中新增如下依賴即可

<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-recipes</artifactId>
   <version>2.4.2</version>
</dependency>

  3.8 節點監聽  

複製程式碼

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class NodeCache_Sample {
    static String path = "/zk-book/nodecache";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

    public static void main(String[] args) throws Exception {
        client.start();
        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
        final NodeCache cache = new NodeCache(client, path, false);
        cache.start(true);
        cache.getListenable().addListener(new NodeCacheListener() {
            public void nodeChanged() throws Exception {
                System.out.println("Node data update, new data: " + new String(cache.getCurrentData().getData()));
            }
        });
        client.setData().forPath(path, "u".getBytes());
        Thread.sleep(1000);
        client.delete().deletingChildrenIfNeeded().forPath(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}

複製程式碼

  執行結果:  

Node data update, new data: u

  當節點資料變更後收到了通知。NodeCache不僅可以監聽資料節點的內容變更,也能監聽指定節點是否存在。

  3.9 子節點監聽 

複製程式碼

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

public class PathChildrenCache_Sample {
    static String path = "/zk-book";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).sessionTimeoutMs(5000).build();

    public static void main(String[] args) throws Exception {
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, path, true);
        cache.start(StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("CHILD_ADDED," + event.getData().getPath());
                    break;
                case CHILD_UPDATED:
                    System.out.println("CHILD_UPDATED," + event.getData().getPath());
                    break;
                case CHILD_REMOVED:
                    System.out.println("CHILD_REMOVED," + event.getData().getPath());
                    break;
                default:
                    break;
                }
            }
        });
        client.create().withMode(CreateMode.PERSISTENT).forPath(path);
        Thread.sleep(1000);
        client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
        Thread.sleep(1000);
        client.delete().forPath(path + "/c1");
        Thread.sleep(1000);
        client.delete().forPath(path);
        Thread.sleep(Integer.MAX_VALUE);
    }
}

複製程式碼

  執行結果:

CHILD_ADDED,/zk-book/c1
CHILD_REMOVED,/zk-book/c1

  監聽節點的子節點,包括新增、資料變化、刪除三類事件。

  3.10 Master選舉

  藉助Zookeeper,開發者可以很方便地實現Master選舉功能,其大體思路如下:選擇一個根節點,如/master_select,多臺機器同時向該節點建立一個子節點/master_select/lock,利用Zookeeper特性,最終只有一臺機器能夠成功建立,成功的那臺機器就是Master。

複製程式碼

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class Recipes_MasterSelect {
    static String master_path = "/curator_recipes_master_path";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

    public static void main(String[] args) throws Exception {
        client.start();
        LeaderSelector selector = new LeaderSelector(client, master_path, new LeaderSelectorListenerAdapter() {
            public void takeLeadership(CuratorFramework client) throws Exception {
                System.out.println("成為Master角色");
                Thread.sleep(3000);
                System.out.println("完成Master操作,釋放Master權利");
            }
        });
        selector.autoRequeue();
        selector.start();
        Thread.sleep(Integer.MAX_VALUE);
    }
}

複製程式碼

  執行結果:

成為Master角色
完成Master操作,釋放Master權利
成為Master角色

  以上結果會反覆迴圈,並且當一個應用程式完成Master邏輯後,另外一個應用程式的相應方法才會被呼叫,即當一個應用例項成為Master後,其他應用例項會進入等待,直到當前Master掛了或者推出後才會開始選舉Master。

  3.11 分散式鎖

  為了保證資料的一致性,經常在程式的某個執行點需要進行同步控制。以流水號生成場景為例,普通的後臺應用通常採用時間戳方式來生成流水號,但是在使用者量非常大的情況下,可能會出現併發問題。 

複製程式碼

package com.hust.grid.leesf.curator.examples;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;

public class Recipes_NoLock {
    public static void main(String[] args) throws Exception {
        final CountDownLatch down = new CountDownLatch(1);
        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                public void run() {
                    try {
                        down.await();
                    } catch (Exception e) {
                    }
                    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                    String orderNo = sdf.format(new Date());
                    System.err.println("生成的訂單號是 : " + orderNo);
                }
            }).start();
        }
        down.countDown();
    }
}

複製程式碼

  執行結果: 

複製程式碼

生成的訂單號是 : 16:29:10|590
生成的訂單號是 : 16:29:10|590
生成的訂單號是 : 16:29:10|591
生成的訂單號是 : 16:29:10|591
生成的訂單號是 : 16:29:10|590
生成的訂單號是 : 16:29:10|590
生成的訂單號是 : 16:29:10|591
生成的訂單號是 : 16:29:10|590
生成的訂單號是 : 16:29:10|592
生成的訂單號是 : 16:29:10|591

複製程式碼

  結果表示訂單號出現了重複,即普通的方法無法滿足業務需要,因為其未進行正確的同步。可以使用Curator來實現分散式鎖功能。

複製程式碼

package com.hust.grid.leesf.curator.examples;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class Recipes_Lock {
    static String lock_path = "/curator_recipes_lock_path";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

    public static void main(String[] args) throws Exception {
        client.start();
        final InterProcessMutex lock = new InterProcessMutex(client, lock_path);
        final CountDownLatch down = new CountDownLatch(1);
        for (int i = 0; i < 30; i++) {
            new Thread(new Runnable() {
                public void run() {
                    try {
                        down.await();
                        lock.acquire();
                    } catch (Exception e) {
                    }
                    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
                    String orderNo = sdf.format(new Date());
                    System.out.println("生成的訂單號是 : " + orderNo);
                    try {
                        lock.release();
                    } catch (Exception e) {
                    }
                }
            }).start();
        }
        down.countDown();
    }
}

複製程式碼

  執行結果:

複製程式碼

生成的訂單號是 : 16:31:50|293
生成的訂單號是 : 16:31:50|319
生成的訂單號是 : 16:31:51|278
生成的訂單號是 : 16:31:51|326
生成的訂單號是 : 16:31:51|402
生成的訂單號是 : 16:31:51|420
生成的訂單號是 : 16:31:51|546
生成的訂單號是 : 16:31:51|602
生成的訂單號是 : 16:31:51|626
生成的訂單號是 : 16:31:51|656
生成的訂單號是 : 16:31:51|675
生成的訂單號是 : 16:31:51|701
生成的訂單號是 : 16:31:51|708
生成的訂單號是 : 16:31:51|732
生成的訂單號是 : 16:31:51|763
生成的訂單號是 : 16:31:51|785
生成的訂單號是 : 16:31:51|805
生成的訂單號是 : 16:31:51|823
生成的訂單號是 : 16:31:51|839
生成的訂單號是 : 16:31:51|853
生成的訂單號是 : 16:31:51|868
生成的訂單號是 : 16:31:51|884
生成的訂單號是 : 16:31:51|897
生成的訂單號是 : 16:31:51|910
生成的訂單號是 : 16:31:51|926
生成的訂單號是 : 16:31:51|939
生成的訂單號是 : 16:31:51|951
生成的訂單號是 : 16:31:51|965
生成的訂單號是 : 16:31:51|972
生成的訂單號是 : 16:31:51|983

複製程式碼

  結果表明此時已經不存在重複的流水號。

  3.12 分散式計數器

  分散式計數器的典型應用是統計系統的線上人數,藉助Zookeeper也可以很方便實現分散式計數器功能:指定一個Zookeeper資料節點作為計數器,多個應用例項在分散式鎖的控制下,通過更新節點的內容來實現計數功能。 

複製程式碼

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;

public class Recipes_DistAtomicInt {
    static String distatomicint_path = "/curator_recipes_distatomicint_path";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

    public static void main(String[] args) throws Exception {
        client.start();
        DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client, distatomicint_path,
                new RetryNTimes(3, 1000));
        AtomicValue<Integer> rc = atomicInteger.add(8);
        System.out.println("Result: " + rc.succeeded());
    }
}

複製程式碼

  執行結果:

Result: true

  結果表明已經將資料成功寫入資料節點中。

  3.13 分散式Barrier

  如同JDK的CyclicBarrier,Curator提供了DistributedBarrier來實現分散式Barrier。  

複製程式碼

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class Recipes_Barrier {
    static String barrier_path = "/curator_recipes_barrier_path";
    static DistributedBarrier barrier;

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                public void run() {
                    try {
                        CuratorFramework client = CuratorFrameworkFactory.builder()
                                .connectString("127.0.0.1:2181")
                                .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
                        client.start();
                        barrier = new DistributedBarrier(client, barrier_path);
                        System.out.println(Thread.currentThread().getName() + "號barrier設定");
                        barrier.setBarrier();
                        barrier.waitOnBarrier();
                        System.err.println("啟動...");
                    } catch (Exception e) {
                    }
                }
            }).start();
        }
        Thread.sleep(2000);
        barrier.removeBarrier();
    }
}

複製程式碼

  執行結果:

複製程式碼

Thread-1號barrier設定
Thread-2號barrier設定
Thread-4號barrier設定
Thread-3號barrier設定
Thread-0號barrier設定
啟動...
啟動...
啟動...
啟動...
啟動...

複製程式碼

  結果表明通過DistributedBarrier可以實現類似於CyclicBarrier的分散式Barrier功能。

四、Curator工具類

  4.1 ZKPaths

  其提供了簡單的API來構建znode路徑、遞迴建立、刪除節點等。   

複製程式碼

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.utils.ZKPaths.PathAndNode;
import org.apache.zookeeper.ZooKeeper;

public class ZKPaths_Sample {
    static String path = "/curator_zkpath_sample";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

    public static void main(String[] args) throws Exception {
        client.start();
        ZooKeeper zookeeper = client.getZookeeperClient().getZooKeeper();

        System.out.println(ZKPaths.fixForNamespace(path, "sub"));
        System.out.println(ZKPaths.makePath(path, "sub"));
        System.out.println(ZKPaths.getNodeFromPath("/curator_zkpath_sample/sub1"));

        PathAndNode pn = ZKPaths.getPathAndNode("/curator_zkpath_sample/sub1");
        System.out.println(pn.getPath());
        System.out.println(pn.getNode());

        String dir1 = path + "/child1";
        String dir2 = path + "/child2";
        ZKPaths.mkdirs(zookeeper, dir1);
        ZKPaths.mkdirs(zookeeper, dir2);
        System.out.println(ZKPaths.getSortedChildren(zookeeper, path));

        ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
    }
}

複製程式碼

  執行結果: 

複製程式碼

/curator_zkpath_sample/sub
/curator_zkpath_sample/sub
sub1
/curator_zkpath_sample
sub1
[child1, child2]

複製程式碼

  藉助ZKPaths可快速方便的完成節點的建立等操作。

  4.2 EnsurePath

  其提供了一種能夠確保資料節點存在的機制,當上層業務希望對一個數據節點進行操作時,操作前需要確保該節點存在。 

複製程式碼

package com.hust.grid.leesf.curator.examples;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.EnsurePath;

public class EnsurePathDemo {
    static String path = "/zk-book/c1";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();

    public static void main(String[] args) throws Exception {
        client.start();
        client.usingNamespace("zk-book");

        EnsurePath ensurePath = new EnsurePath(path);
        ensurePath.ensure(client.getZookeeperClient());
        ensurePath.ensure(client.getZookeeperClient());

        EnsurePath ensurePath2 = client.newNamespaceAwareEnsurePath("/c1");
        ensurePath2.ensure(client.getZookeeperClient());
    }
}

複製程式碼

  EnsurePath採取瞭如下節點建立方式,試圖建立指定節點,如果節點已經存在,那麼就不進行任何操作,也不對外丟擲異常,否則正常建立資料節點。

五、總結

  本篇介紹了使用Zookeeper的開源客戶端如何操作Zookeeper的方法,相應的原始碼也已經上傳至github,謝謝各位園友的觀看~

出處:http://www.cnblogs.com/leesf456/