1. 程式人生 > >【推薦】zookeeper典型應用場景: 分散式計數器

【推薦】zookeeper典型應用場景: 分散式計數器

一、技術介紹

zookeeper有很多典型應用場景,應用在分散式系統中,這裡介紹其分散式計數器應用。本文將討論如何使用Curator來實現計數器。 顧名思義,計數器是用來計數的, 利用ZooKeeper可以實現一個叢集共享的計數器。 只要使用相同的path就可以得到最新的計數器值, 這是由ZooKeeper的一致性保證的。

Curator有兩個計數器:SharedCount計數器,DistributedAtomicLong計數器(如int等其它型別計數器類似)

  • SharedCount

這個類使用int型別來計數。 主要涉及三個類。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount代表計數器, 可以為它增加一個SharedCountListener,當計數器改變時此Listener可以監聽到改變的事件,而SharedCountReader可以讀取到最新的值, 包括字面值和帶版本資訊的值VersionedValue。

  • DistributedAtomicLong

再看一個Long型別的計數器。 除了計數的範圍比SharedCount大了之外, 它首先嚐試使用樂觀鎖的方式設定計數器, 如果不成功(比如期間計數器已經被其它client更新了), 它使用InterProcessMutex

方式來更新計數值, 這和上面的計數器的實現有顯著的不同。你必須檢查返回結果的succeeded(), 它代表此操作是否成功。 如果操作成功, preValue()代表操作前的值,postValue()代表操作後的值。

具體實現步驟:

二、maven依賴

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>4.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
</dependency>

三、SharedCount計數器

SharedCount計數器在操作失敗時,需要自己去重試,程式碼如下:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.SharedCountListener;
import org.apache.curator.framework.recipes.shared.SharedCountReader;
import org.apache.curator.framework.recipes.shared.VersionedValue;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;

public class ZookeeperSharedCountMain {
    public static void main(String[] args) throws Exception {
        //建立zookeeper客戶端
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
        client.start();

        //指定鎖路徑
        String lockPath = "/zkLockRoot/lock_2";
        //建立分散式計數器,初始為0
        SharedCount sharedCount = new SharedCount(client, lockPath, 0);
        //定義監聽器
        SharedCountListener sharedCountListener = new SharedCountListener() {
            @Override
            public void countHasChanged(SharedCountReader sharedCountReader, int i) throws Exception {
                System.out.println("new cnt : " + i);
            }

            @Override
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                System.out.println("stated change : " + connectionState);
            }
        };
        //新增監聽器
        sharedCount.addListener(sharedCountListener);
        sharedCount.start();

        //生成執行緒池
        ExecutorService executor = Executors.newCachedThreadPool();
        Consumer<SharedCount> consumer = (SharedCount count) -> {
            try {
                List<Callable<Boolean>> callList = new ArrayList<>();
                Callable<Boolean> call = () -> {
                    boolean result = false;
                    try {
                        //資料更新,若失敗,則重試10次,每次間隔30毫秒
                        for(int i=0; i<10;i++){
                            //獲取當前版本資料
                            VersionedValue<Integer> oldVersion = sharedCount.getVersionedValue();
                            int newCnt = oldVersion.getValue() + 1;
                            result = sharedCount.trySetCount(oldVersion, newCnt);
                            if(result){
                                System.out.println(Thread.currentThread().getName()
                                        + "  oldVersion:"+oldVersion.getVersion()
                                        +"  oldCnt:"+oldVersion.getValue()
                                        +"  newCnt:"+sharedCount.getCount()
                                        +"  result:"+result);
                                break;
                            }
                            TimeUnit.MILLISECONDS.sleep(30);
                        }
                    } catch (Exception e) {
                    } finally {
                    }
                    return result;
                };
                //5個執行緒
                for (int i = 0; i < 5; i++) {
                    callList.add(call);
                }
                List<Future<Boolean>> futures = executor.invokeAll(callList);
            } catch (Exception e) {

            }
        };

        //測試分散式int型別的計數器
        consumer.accept(sharedCount);
        System.out.println("final cnt : " + sharedCount.getCount());
        sharedCount.close();

        executor.shutdown();
    }
}

輸出:

stated change : CONNECTED
pool-3-thread-5  oldVersion:0  oldCnt:0  newCnt:1  result:true
new cnt : 1
pool-3-thread-3  oldVersion:1  oldCnt:1  newCnt:2  result:true
new cnt : 2
pool-3-thread-1  oldVersion:2  oldCnt:2  newCnt:3  result:true
new cnt : 3
pool-3-thread-2  oldVersion:3  oldCnt:3  newCnt:4  result:true
new cnt : 4
pool-3-thread-4  oldVersion:4  oldCnt:4  newCnt:5  result:true
final cnt : 5

四、DistributedAtomicLong計數器

DistributedAtomicLong計數器可以自己設定重試的次數與間隔,程式碼如下:

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Consumer;

public class ZookeeperDistributedAtomicLongMain {
    public static void main(String[] args) throws Exception {
        //建立zookeeper客戶端
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
        client.start();

        //指定鎖路徑
        String lockPath = "/zkLockRoot/lock_3";
        //分散式計數器,失敗時重試10,每次間隔30毫秒
        DistributedAtomicLong distributedAtomicLong = new DistributedAtomicLong(client, lockPath, new RetryNTimes(10, 30));

        //生成執行緒池
        ExecutorService executor = Executors.newCachedThreadPool();
        Consumer<DistributedAtomicLong> consumer = (DistributedAtomicLong count) -> {
            try {
                List<Callable<Boolean>> callList = new ArrayList<>();
                Callable<Boolean> call = () -> {
                    boolean result = false;
                    try {
                        AtomicValue<Long> val = count.increment();
                        System.out.println("old cnt: "+val.preValue()+"   new cnt : "+ val.postValue()+"  result:"+val.succeeded());
                        result = val.succeeded();
                    } catch (Exception e) {
                    } finally {
                    }
                    return result;
                };
                //5個執行緒
                for (int i = 0; i < 5; i++) {
                    callList.add(call);
                }
                List<Future<Boolean>> futures = executor.invokeAll(callList);
            } catch (Exception e) {

            }
        };

        //測試計數器
        consumer.accept(distributedAtomicLong);
        System.out.println("final cnt : " + distributedAtomicLong.get().postValue());

        executor.shutdown();
    }
}

輸出:

old cnt: 0   new cnt : 1  result:true
old cnt: 1   new cnt : 2  result:true
old cnt: 2   new cnt : 3  result:true
old cnt: 3   new cnt : 4  result:true
old cnt: 4   new cnt : 5  result:true
final cnt : 5

如果在DistributedAtomicLong的構造方法引數中,RetryNTimes重試次數不夠,比如是3,你會發現並不一定每次加數都會成功。顯然這裡是用了樂觀鎖機制,它並不保證操作一定成功(它在重試這麼多次中都沒有成功獲得鎖,導致操作沒有執行),所以我們有必要通過呼叫 av.succeeded() 來檢視此次加數是否成功。