1. 程式人生 > >Zookeeper客戶端Curator使用詳解

Zookeeper客戶端Curator使用詳解

前提

最近剛好用到了zookeeper,做了一個基於SpringBoot、Curator、Bootstrap寫了一個視覺化的Web應用:

歡迎使用和star。

簡介

Curator是Netflix公司開源的一套zookeeper客戶端框架,解決了很多Zookeeper客戶端非常底層的細節開發工作,包括連線重連、反覆註冊Watcher和NodeExistsException異常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”給Curator予高度評價。
引子和趣聞:
Zookeeper名字的由來是比較有趣的,下面的片段摘抄自《從PAXOS到ZOOKEEPER分散式一致性原理與實踐》一書:
Zookeeper最早起源於雅虎的研究院的一個研究小組。在當時,研究人員發現,在雅虎內部很多大型的系統需要依賴一個類似的系統進行分散式協調,但是這些系統往往存在分散式單點問題。所以雅虎的開發人員就試圖開發一個通用的無單點問題的分散式協調框架。在立項初期,考慮到很多專案都是用動物的名字來命名的(例如著名的Pig專案),雅虎的工程師希望給這個專案也取一個動物的名字。時任研究院的首席科學家Raghu Ramakrishnan開玩笑說:再這樣下去,我們這兒就變成動物園了。此話一出,大家紛紛表示就叫動物園管理員吧——因為各個以動物命名的分散式元件放在一起,雅虎的整個分散式系統看上去就像一個大型的動物園了,而Zookeeper正好用來進行分散式環境的協調——於是,Zookeeper的名字由此誕生了。

Curator無疑是Zookeeper客戶端中的瑞士軍刀,它譯作"館長"或者''管理者'',不知道是不是開發小組有意而為之,筆者猜測有可能這樣命名的原因是說明Curator就是Zookeeper的館長(腦洞有點大:Curator就是動物園的園長)。
Curator包含了幾個包:
curator-framework:對zookeeper的底層api的一些封裝
curator-client:提供一些客戶端的操作,例如重試策略等
curator-recipes:封裝了一些高階特性,如:Cache事件監聽、選舉、分散式鎖、分散式計數器、分散式Barrier等
Maven依賴(使用curator的版本:2.12.0,對應Zookeeper的版本為:3.4.x,如果跨版本會有相容性問題,很有可能導致節點操作失敗

):

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

Curator的基本Api

建立會話

1.使用靜態工程方法建立客戶端

一個例子如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

newClient靜態工廠方法包含四個主要引數:

引數名 說明
connectionString 伺服器列表,格式host1:port1,host2:port2,...
retryPolicy 重試策略,內建有四種重試策略,也可以自行實現RetryPolicy介面
sessionTimeoutMs 會話超時時間,單位毫秒,預設60000ms
connectionTimeoutMs 連線建立超時時間,單位毫秒,預設60000ms

2.使用Fluent風格的Api建立會話

核心引數變為流式設定,一個列子如下:

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

3.建立包含隔離名稱空間的會話

為了實現不同的Zookeeper業務之間的隔離,需要為每個業務分配一個獨立的名稱空間(NameSpace),即指定一個Zookeeper的根路徑(官方術語:為Zookeeper新增“Chroot”特性)。例如(下面的例子)當客戶端指定了獨立名稱空間為“/base”,那麼該客戶端對Zookeeper上的資料節點的操作都是基於該目錄進行的。通過設定Chroot可以將客戶端應用與Zookeeper服務端的一課子樹相對應,在多個應用共用一個Zookeeper叢集的場景下,這對於實現不同應用之間的相互隔離十分有意義。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();

啟動客戶端

當建立會話成功,得到client的例項然後可以直接呼叫其start( )方法:

client.start();

資料節點操作

建立資料節點

Zookeeper的節點建立模式:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化並且帶序列號
  • EPHEMERAL:臨時
  • EPHEMERAL_SEQUENTIAL:臨時並且帶序列號

**建立一個節點,初始內容為空 **

client.create().forPath("path");

注意:如果沒有設定節點屬性,節點建立模式預設為持久化節點,內容預設為空

建立一個節點,附帶初始化內容

client.create().forPath("path","init".getBytes());

建立一個節點,指定建立模式(臨時節點),內容為空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

建立一個節點,指定建立模式(臨時節點),附帶初始化內容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

建立一個節點,指定建立模式(臨時節點),附帶初始化內容,並且自動遞迴建立父節點

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

這個creatingParentContainersIfNeeded()介面非常有用,因為一般情況開發人員在建立一個子節點必須判斷它的父節點是否存在,如果不存在直接建立會丟擲NoNodeException,使用creatingParentContainersIfNeeded()之後Curator能夠自動遞迴建立所有所需的父節點。

刪除資料節點

刪除一個節點

client.delete().forPath("path");

注意,此方法只能刪除葉子節點,否則會丟擲異常。

刪除一個節點,並且遞迴刪除其所有的子節點

client.delete().deletingChildrenIfNeeded().forPath("path");

刪除一個節點,強制指定版本進行刪除

client.delete().withVersion(10086).forPath("path");

刪除一個節點,強制保證刪除

client.delete().guaranteed().forPath("path");

guaranteed()介面是一個保障措施,只要客戶端會話有效,那麼Curator會在後臺持續進行刪除操作,直到刪除節點成功。

注意:上面的多個流式介面是可以自由組合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

讀取資料節點資料

讀取一個節點的資料內容

client.getData().forPath("path");

注意,此方法返的返回值是byte[ ];

讀取一個節點的資料內容,同時獲取到該節點的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

更新資料節點資料

更新一個節點的資料內容

client.setData().forPath("path","data".getBytes());

注意:該介面會返回一個Stat例項

更新一個節點的資料內容,強制指定版本進行更新

client.setData().withVersion(10086).forPath("path","data".getBytes());

檢查節點是否存在

client.checkExists().forPath("path");

注意:該方法返回一個Stat例項,用於檢查ZNode是否存在的操作. 可以呼叫額外的方法(監控或者後臺處理)並在最後呼叫forPath( )指定要操作的ZNode

獲取某個節點的所有子節點路徑

client.getChildren().forPath("path");

注意:該方法的返回值為List<String>,獲得ZNode的子節點Path列表。 可以呼叫額外的方法(監控、後臺處理或者獲取狀態watch, background or get stat) 並在最後呼叫forPath()指定要操作的父ZNode

事務

CuratorFramework的例項包含inTransaction( )介面方法,呼叫此方法開啟一個ZooKeeper事務. 可以複合create, setData, check, and/or delete 等操作然後呼叫commit()作為一個原子操作提交。一個例子如下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

非同步介面

上面提到的建立、刪除、更新、讀取等方法都是同步的,Curator提供非同步介面,引入了BackgroundCallback介面用於處理非同步介面呼叫之後服務端返回的結果資訊。BackgroundCallback介面中一個重要的回撥值為CuratorEvent,裡面包含事件型別、響應嗎和節點的詳細資訊。

CuratorEventType

事件型別 對應CuratorFramework例項的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

響應碼(#getResultCode())

響應碼 意義
0 OK,即呼叫成功
-4 ConnectionLoss,即客戶端與服務端斷開連線
-110 NodeExists,即節點已經存在
-112 SessionExpired,即會話過期

一個非同步建立節點的例子如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

注意:如果#inBackground()方法不指定executor,那麼會預設使用Curator的EventThread去進行非同步處理。

Curator食譜(高階特性)

提醒:首先你必須新增curator-recipes依賴,下文僅僅對recipes一些特性的使用進行解釋和舉例,不打算進行原始碼級別的探討

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

重要提醒:強烈推薦使用ConnectionStateListener監控連線的狀態,當連線狀態為LOST,curator-recipes下的所有Api將會失效或者過期,儘管後面所有的例子都沒有使用到ConnectionStateListener。

快取

Zookeeper原生支援通過註冊Watcher來進行事件監聽,但是開發者需要反覆註冊(Watcher只能單次註冊單次使用)。Cache是Curator中對事件監聽的包裝,可以看作是對事件監聽的本地快取檢視,能夠自動為開發者處理反覆註冊監聽。Curator提供了三種Watcher(Cache)來監聽結點的變化。

Path Cache

Path Cache用來監控一個ZNode的子節點. 當一個子節點增加, 更新,刪除時, Path Cache會改變它的狀態, 會包含最新的子節點, 子節點的資料和狀態,而狀態的更變將通過PathChildrenCacheListener通知。

實際使用時會涉及到四個類:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

通過下面的建構函式建立Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想使用cache,必須呼叫它的start方法,使用完後呼叫close方法。 可以設定StartMode來實現啟動的模式,

StartMode有下面幾種:

  1. NORMAL:正常初始化。
  2. BUILD_INITIAL_CACHE:在呼叫start()之前會呼叫rebuild()
  3. POST_INITIALIZED_EVENT: 當Cache初始化資料後傳送一個PathChildrenCacheEvent.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)可以增加listener監聽快取的變化。

getCurrentData()方法返回一個List<ChildData>物件,可以遍歷所有的子節點。

設定/更新、移除其實是使用client (CuratorFramework)來操作, 不通過PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件型別:" + event.getType());
            if (null != event.getData()) {
                System.out.println("節點資料:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:如果new PathChildrenCache(client, PATH, true)中的引數cacheData值設定為false,則示例中的event.getData().getData()、data.getData()將返回null,cache將不會快取節點資料。

注意:示例中的Thread.sleep(10)可以註釋掉,但是註釋後事件監聽的觸發次數會不全,這可能與PathCache的實現原理有關,不能太過頻繁的觸發事件!

Node Cache

Node Cache與Path Cache類似,Node Cache只是監聽某一個特定的節點。它涉及到下面的三個類:

  • NodeCache - Node Cache實現類
  • NodeCacheListener - 節點監聽器
  • ChildData - 節點資料

注意:使用cache,依然要呼叫它的start()方法,使用完後呼叫close()方法。

getCurrentData()將得到節點當前的狀態,通過它的狀態可以得到當前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("節點資料:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("節點被刪除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:示例中的Thread.sleep(10)可以註釋,但是註釋後事件監聽的觸發次數會不全,這可能與NodeCache的實現原理有關,不能太過頻繁的觸發事件!

注意:NodeCache只能監聽一個節點的狀態變化。

Tree Cache

Tree Cache可以監控整個樹上的所有節點,類似於PathCache和NodeCache的組合,主要涉及到下面四個類:

  • TreeCache - Tree Cache實現類
  • TreeCacheListener - 監聽器類
  • TreeCacheEvent - 觸發的事件類
  • ChildData - 節點資料
public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件型別:" + event.getType() +
                        " | 路徑:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:在此示例中沒有使用Thread.sleep(10),但是事件觸發次數也是正常的。

注意:TreeCache在初始化(呼叫start()方法)的時候會回撥TreeCacheListener例項一個事TreeCacheEvent,而回調的TreeCacheEvent物件的Type為INITIALIZED,ChildData為null,此時event.getData().getPath()很有可能導致空指標異常,這裡應該主動處理並避免這種情況。

Leader選舉

在分散式計算中, leader elections是很重要的一個功能, 這個選舉過程是這樣子的: 指派一個程序作為組織者,將任務分發給各節點。 在任務開始前, 哪個節點都不知道誰是leader(領導者)或者coordinator(協調者). 當選舉演算法開始執行後, 每個節點最終會得到一個唯一的節點作為任務leader. 除此之外, 選舉還經常會發生在leader意外宕機的情況下,新的leader要被選舉出來。

在zookeeper叢集中,leader負責寫操作,然後通過Zab協議實現follower的同步,leader或者follower都可以處理讀操作。

Curator 有兩種leader選舉的recipe,分別是LeaderSelectorLeaderLatch

前者是所有存活的客戶端不間斷的輪流做Leader,大同社會。後者是一旦選舉出Leader,除非有客戶端掛掉重新觸發選舉,否則不會交出領導權。某黨?

LeaderLatch

LeaderLatch有兩個建構函式:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的啟動:

leaderLatch.start( );

一旦啟動,LeaderLatch會和其它使用相同latch path的其它LeaderLatch交涉,然後其中一個最終會被選舉為leader,可以通過hasLeadership方法檢視LeaderLatch例項是否leader:

leaderLatch.hasLeadership( ); //返回true說明當前例項是leader

類似JDK的CountDownLatch, LeaderLatch在請求成為leadership會block(阻塞),一旦不使用LeaderLatch了,必須呼叫close方法。 如果它是leader,會釋放leadership, 其它的參與者將會選舉一個leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

異常處理: LeaderLatch例項可以增加ConnectionStateListener來監聽網路連線問題。 當 SUSPENDED 或 LOST 時, leader不再認為自己還是leader。當LOST後連線重連後RECONNECTED,LeaderLatch會刪除先前的ZNode然後重新建立一個。LeaderLatch使用者必須考慮導致leadership丟失的連線問題。 強烈推薦你使用ConnectionStateListener。

一個LeaderLatch的使用例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

可以新增test module的依賴方便進行測試,不需要啟動真實的zookeeper服務端:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

首先我們建立了10個LeaderLatch,啟動後它們中的一個會被選舉為leader。 因為選舉會花費一些時間,start後並不能馬上就得到leader。
通過hasLeadership檢視自己是否是leader, 如果是的話返回true。
可以通過.getLeader().getId()可以得到當前的leader的ID。
只能通過close釋放當前的領導權。
await是一個阻塞方法, 嘗試獲取leader地位,但是未必能上位。

LeaderSelector

LeaderSelector使用的時候主要涉及下面幾個類:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

核心類是LeaderSelector,它的建構函式如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

類似LeaderLatch,LeaderSelector必須start: leaderSelector.start(); 一旦啟動,當例項取得領導權時你的listener的takeLeadership()方法被呼叫。而takeLeadership()方法只有領導權被釋放時才返回。 當你不再使用LeaderSelector例項時,應該呼叫它的close方法。

異常處理 LeaderSelectorListener類繼承ConnectionStateListener。LeaderSelector必須小心連線狀態的改變。如果例項成為leader, 它應該響應SUSPENDED 或 LOST。 當 SUSPENDED 狀態出現時, 例項必須假定在重新連線成功之前它可能不再是leader了。 如果LOST狀態出現, 例項不再是leader, takeLeadership方法返回。

重要: 推薦處理方式是當收到SUSPENDED 或 LOST時丟擲CancelLeadershipException異常.。這會導致LeaderSelector例項中斷並取消執行takeLeadership方法的異常.。這非常重要, 你必須考慮擴充套件LeaderSelectorListenerAdapter. LeaderSelectorListenerAdapter提供了推薦的處理邏輯。

下面的一個例子摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

你可以在takeLeadership進行任務的分配等等,並且不要返回,如果你想要要此例項一直是leader的話可以加一個死迴圈。呼叫 leaderSelector.autoRequeue();保證在此例項釋放領導權之後還可能獲得領導權。 在這裡我們使用AtomicInteger來記錄此client獲得領導權的次數, 它是”fair”, 每個client有平等的機會獲得領導權。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

對比可知,LeaderLatch必須呼叫close()方法才會釋放領導權,而對於LeaderSelector,通過LeaderSelectorListener可以對領導權進行控制, 在適當的時候釋放領導權,這樣每個節點都有可能獲得領導權。從而,LeaderSelector具有更好的靈活性和可控性,建議有LeaderElection應用場景下優先使用LeaderSelector。

分散式鎖

提醒:

1.推薦使用ConnectionStateListener監控連線的狀態,因為當連線LOST時你不再擁有鎖

2.分散式的鎖全域性同步, 這意味著任何一個時間點不會有兩個客戶端都擁有相同的鎖。

可重入共享鎖—Shared Reentrant Lock

Shared意味著鎖是全域性可見的, 客戶端都可以請求鎖。 Reentrant和JDK的ReentrantLock類似,即可重入, 意味著同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。 它是由類InterProcessMutex來實現。 它的建構函式為:

public InterProcessMutex(CuratorFramework client, String path)

通過acquire()獲得鎖,並提供超時機制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

通過release()方法釋放鎖。 InterProcessMutex 例項可以重用。

Revoking ZooKeeper recipes wiki定義了可協商的撤銷機制。 為了撤銷mutex, 呼叫下面的方法:

public void makeRevocable(RevocationListener<T> listener)
將鎖設為可撤銷的. 當別的程序或執行緒想讓你釋放鎖時Listener會被呼叫。
Parameters:
listener - the listener

如果你請求撤銷當前的鎖, 呼叫attemptRevoke()方法,注意鎖釋放時RevocationListener將會回撥。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

二次提醒:錯誤處理 還是強烈推薦你使用ConnectionStateListener處理連線狀態的改變。 當連線LOST時你不再擁有鎖。

首先讓我們建立一個模擬的共享資源, 這個資源期望只能單執行緒的訪問,否則會有併發問題。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真實環境中我們會在這裡訪問/維護一個共享的資源
        //這個例子在使用鎖的情況下不會非法併發異常IllegalStateException
        //但是在無鎖的情況由於sleep了一段時間,很容易丟擲異常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

然後建立一個InterProcessMutexDemo類, 它負責請求鎖, 使用資源,釋放鎖這樣一個完整的訪問過程。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

程式碼也很簡單,生成10個client, 每個client重複執行10次 請求鎖–訪問資源–釋放鎖的過程。每個client都在獨立的執行緒中。 結果可以看到,鎖是隨機的被每個例項排他性的使用。

既然是可重用的,你可以在一個執行緒中多次呼叫acquire(),線上程擁有鎖時它總是返回true。

你不應該在多個執行緒中用同一個InterProcessMutex, 你可以在每個執行緒中都生成一個新的InterProcessMutex例項,它們的path都一樣,這樣它們可以共享同一個鎖。

不可重入共享鎖—Shared Lock

這個鎖和上面的InterProcessMutex相比,就是少了Reentrant的功能,也就意味著它不能在同一個執行緒中重入。這個類是InterProcessSemaphoreMutex,使用方法和InterProcessMutex類似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥鎖");
        }
        System.out.println(clientName + " 已獲取到互斥鎖");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥鎖");
        }
        System.out.println(clientName + " 再次獲取到互斥鎖");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 獲取鎖幾次 釋放鎖也要幾次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

執行後發現,有且只有一個client成功獲取第一個鎖(第一個acquire()方法返回true),然後它自己阻塞在第二個acquire()方法,獲取第二個鎖超時;其他所有的客戶端都阻塞在第一個acquire()方法超時並且丟擲異常。

這樣也就驗證了InterProcessSemaphoreMutex實現的鎖是不可重入的。

可重入讀寫鎖—Shared Reentrant Read Write Lock

類似JDK的ReentrantReadWriteLock。一個讀寫鎖管理一對相關的鎖。一個負責讀操作,另外一個負責寫操作。讀操作在寫鎖沒被使用時可同時由多個程序使用,而寫鎖在使用時不允許讀(阻塞)。

此鎖是可重入的。一個擁有寫鎖的執行緒可重入讀鎖,但是讀鎖卻不能進入寫鎖。這也意味著寫鎖可以降級成讀鎖, 比如請求寫鎖 --->請求讀鎖--->釋放讀鎖 ---->釋放寫鎖。從讀鎖升級成寫鎖是不行的。

可重入讀寫鎖主要由兩個類實現:InterProcessReadWriteLockInterProcessMutex。使用時首先建立一個InterProcessReadWriteLock例項,然後再根據你的需求得到讀鎖或者寫鎖,讀寫鎖的型別是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先得到寫鎖再得到讀鎖,不能反過來!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到寫鎖");
        }
        System.out.println(clientName + " 已得到寫鎖");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到讀鎖");
        }
        System.out.println(clientName + " 已得到讀鎖");
        try {
            resource.use(); // 使用資源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 釋放讀寫鎖");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

訊號量—Shared Semaphore

一個計數的訊號量類似JDK的Semaphore。 JDK中Semaphore維護的一組許可(permits),而Curator中稱之為租約(Lease)。 有兩種方式可以決定semaphore的最大租約數。第一種方式是使用者給定path並且指定最大LeaseSize。第二種方式使用者給定path並且使用SharedCountReader類。如果不使用SharedCountReader, 必須保證所有例項在多程序中使用相同的(最大)租約數量,否則有可能出現A程序中的例項持有最大租約數量為10,但是在B程序中持有的最大租約數量為20,此時租約的意義就失效了。

這次呼叫acquire()會返回一個租約物件。 客戶端必須在finally中close這些租約物件,否則這些租約會丟失掉。 但是, 但是,如果客戶端session由於某種原因比如crash丟掉, 那麼這些客戶端持有的租約會自動close, 這樣其它客戶端可以繼續使用這些租約。 租約還可以通過下面的方式返還:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

注意你可以一次性請求多個租約,如果Semaphore當前的租約不夠,則請求執行緒會被阻塞。 同時還提供了超時的過載方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的主要類包括下面幾個:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader
public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
}

首先我們先獲得了5個租約, 最後我們把它還給了semaphore。 接著請求了一個租約,因為semaphore還有5個租約,所以請求可以滿足,返回一個租約,還剩4個租約。 然後再請求一個租約,因為租約不夠,阻塞到超時,還是沒能滿足,返回結果為null(租約不足會阻塞到超時,然後返回null,不會主動丟擲異常;如果不設定超時時間,會一致阻塞)。

上面說講的鎖都是公平鎖(fair)。 總ZooKeeper的角度看, 每個客戶端都按照請求的順序獲得鎖,不存在非公平的搶佔的情況。

多共享鎖物件 —Multi Shared Lock

Multi Shared Lock是一個鎖的容器。 當呼叫acquire(), 所有的鎖都會被acquire(),如果請求失敗,所有的鎖都會被release。 同樣呼叫release時所有的鎖都被release(失敗被忽略)。 基本上,它就是組鎖的代表,在它上面的請求釋放操作都會傳遞給它包含的所有的鎖。

主要涉及兩個類:

  • InterProcessMultiLock
  • InterProcessLock

它的建構函式需要包含的鎖的集合,或者一組ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

新建一個InterProcessMultiLock, 包含一個重入鎖和一個非重入鎖。 呼叫acquire()後可以看到執行緒同時擁有了這兩個鎖。 呼叫release()看到這兩個鎖都被釋放了。

最後再重申一次, 強烈推薦使用ConnectionStateListener監控連線的狀態,當連線狀態為LOST,鎖將會丟失。

分散式計數器

顧名思義,計數器是用來計數的, 利用ZooKeeper可以實現一個叢集共享的計數器。 只要使用相同的path就可以得到最新的計數器值, 這是由ZooKeeper的一致性保證的。Curator有兩個計數器, 一個是用int來計數(SharedCount),一個用long來計數(DistributedAtomicLong)。

分散式int計數器—SharedCount

這個類使用int型別來計數。 主要涉及三個類。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount代表計數器, 可以為它增加一個SharedCountListener,當計數器改變時此Listener可以監聽到改變的事件,而SharedCountReader可以讀取到最新的值, 包括字面值和帶版本資訊的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

在這個例子中,我們使用baseCount來監聽計數值(addListener方法來新增SharedCountListener )。 任意的SharedCount, 只要使用相同的path,都可以得到這個計數值。 然後我們使用5個執行緒為計數值增加一個10以內的隨機數。相同的path的SharedCount對計數值進行更改,將會回撥給baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

這裡我們使用trySetCount去設定計數器。 第一個引數提供當前的VersionedValue,如果期間其它client更新了此計數值, 你的更新可能不成功, 但是這時你的client更新了最新的值,所以失敗了你可以嘗試再更新一次。 而setCount是強制更新計數器的值

注意計數器必須start,使用完之後必須呼叫close關閉它。

強烈推薦使用ConnectionStateListener。 在本例中SharedCountListener擴充套件ConnectionStateListener

分散式long計數器—DistributedAtomicLong

再看一個Long型別的計數器。 除了計數的範圍比SharedCount大了之外, 它首先嚐試使用樂觀鎖的方式設定計數器, 如果不成功(比如期間計數器已經被其它client更新了), 它使用InterProcessMutex方式來更新計數值。

可以從它的內部實現DistributedAtomicValue.trySet()中看出:

   AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此計數器有一系列的操作:

  • get(): 獲取當前值
  • increment(): 加一
  • decrement(): 減一
  • add(): 增加特定的值
  • subtract(): 減去特定的值
  • trySet(): 嘗試設定計數值
  • forceSet(): 強制設定計數值

必須檢查返回結果的succeeded(), 它代表此操作是否成功。 如果操作成功, preValue()代表操作前的值, postValue()代表操作後的值。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

分散式佇列

使用Curator也可以簡化Ephemeral Node (臨時節點)的操作。Curator也提供ZK Recipe的分散式佇列實現。 利用ZK的 PERSISTENTS_EQUENTIAL節點, 可以保證放入到佇列中的專案是按照順序排隊的。 如果單一的消費者從佇列中取資料, 那麼它是先入先出的,這也是佇列的特點。 如果你嚴格要求順序,你就的使用單一的消費者,可以使用Leader選舉只讓Leader作為唯一的消費者。

但是, 根據Netflix的Curator作者所說, ZooKeeper真心不適合做Queue,或者說ZK沒有實現一個好的Queue,詳細內容可以看 Tech Note 4, 原因有五:

  1. ZK有1MB 的傳輸限制。 實踐中ZNode必須相對較小,而佇列包含成千上萬的訊息,非常的大。
  2. 如果有很多節點,ZK啟動時相當的慢。 而使用queue會導致好多ZNode. 你需要顯著增大 initLimit 和 syncLimit.
  3. ZNode很大的時候很難清理。Netflix不得不建立了一個專門的程式做這事。
  4. 當很大量的包含成千上萬的子節點的ZNode時, ZK的效能變得不好
  5. ZK的資料庫完全放在記憶體中。 大量的Queue意味著會佔用很多的記憶體空間。

儘管如此, Curator還是建立了各種Queue的實現。 如果Queue的資料量不太多,資料量不太大的情況下,酌情考慮,還是可以使用的。

分散式佇列—DistributedQueue

DistributedQueue是最普通的一種佇列。 它設計以下四個類:

  • QueueBuilder - 建立佇列使用QueueBuilder,它也是其它佇列的建立類
  • QueueConsumer - 佇列中的訊息消費者介面
  • QueueSerializer - 佇列訊息序列化和反序列化介面,提供了對佇列中的物件的序列化和反序列化
  • DistributedQueue - 佇列實現類

QueueConsumer是消費者,它可以接收佇列的資料。處理佇列中的資料的程式碼邏輯可以放在QueueConsumer.consumeMessage()中。

正常情況下先將訊息從佇列中移除,再交給消費者消費。但這是兩個步驟,不是原子的。可以呼叫Builder的lockPath()消費者加鎖,當消費者消費資料時持有鎖,這樣其它消費者不能消費此訊息。如果消費失敗或者程序死掉,訊息可以交給其它程序。這會帶來一點效能的損失。最好還是單消費者模式使用佇列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clien