1. 程式人生 > 其它 >Curator典型應用場景之-分散式計數器

Curator典型應用場景之-分散式計數器

之前我們瞭解了基於Corator的分散式鎖之後,我們就很容易基於其實現一個分散式計數器,顧名思義,計數器是用來計數的, 利用ZooKeeper可以實現一個叢集共享的計數器。 只要使用相同的path就可以得到最新的計數器值, 這是由ZooKeeper的一致性保證的。Curator有兩種計數器。

SharedCount
這個類使用int型別來計數。 主要涉及三個類。

SharedCount
SharedCountReader
SharedCountListener

SharedCount代表計數器, 可以為它增加一個SharedCountListener,當計數器改變時此Listener可以監聽到改變的事件,而SharedCountReader可以讀取到最新的值, 包括字面值和帶版本資訊的值VersionedValue。SharedCount必須呼叫start()方法開啟,使用完之後必須呼叫close關閉它。

SharedCount有以下幾個主要方法

/** 強制設定值 */
public void setCount(int newCount) throws Exception;
 /** 第一個引數提供當前的VersionedValue,如果期間其它client更新了此計數值, 
 * 你的更新可能不成功 更新不成功返回false 但可以通過getCount()讀取最新值*/
public boolean  trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception;
/** 獲取當前最新值 */
public
int getCount();

例子

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.SharedCountListener;
import org.apache.curator.framework.recipes.shared.SharedCountReader;
import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class SharedCountCase { public static void main(String[] args) throws Exception { final int clientNum = 5; final String BASE_PATH = "/felixzh/counter"; CuratorFramework cfClient = CuratorFrameworkFactory.builder().connectString("felixzh:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); cfClient.start(); ExecutorService executorService = Executors.newFixedThreadPool(clientNum); for (int i = 0; i < clientNum; i++) { executorService.submit(() -> { try { SharedCount sharedCount = new SharedCount(cfClient, BASE_PATH, 0); sharedCount.addListener(new SharedCountListener() { @Override public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception { //每個執行緒都能監聽到變化 System.out.println(sharedCount.getVersionedValue().getValue() + "," + newCount); } @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { } }); sharedCount.start(); boolean res = false; while (!res) { res = sharedCount.trySetCount(sharedCount.getVersionedValue(), sharedCount.getVersionedValue().getValue() + 1); } System.out.println("current value: " + sharedCount.getVersionedValue().getValue()); } catch (Exception e) { e.printStackTrace(); } }); } Thread.sleep(3_000); executorService.shutdown(); } }

程式執行,輸出以下結果:

current value: 91
current value: 92
current value: 93
current value: 94
current value: 95

DistributedAtomicInteger 和 DistributedAtomicLong

DistributedAtomicInteger和SharedCount計數範圍是一樣的,都是int型別,但是DistributedAtomicInteger和DistributedAtomicLong和上面的計數器的實現有顯著的不同,它首先嚐試使用樂觀鎖的方式設定計數器, 如果不成功(比如期間計數器已經被其它client更新了), 它使用InterProcessMutex方式來更新計數值。 還記得InterProcessMutex是什麼嗎? 它是我們前面講的分散式可重入鎖。下面只講解DistributedAtomicLong。
可以從它的內部實現DistributedAtomicValue.trySet中看出端倪。

此計數器有一系列的操作:
get(): 獲取當前值
increment(): 加一
decrement(): 減一
add(): 增加特定的值
subtract(): 減去特定的值
trySet(): 嘗試設定計數值
forceSet(): 強制設定計數值
你必須檢查返回結果的succeeded(), 它代表此操作是否成功。 如果操作成功, preValue()代表操作前的值, postValue()代表操作後的值。

例子

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class DistributedAtomicLongCase {
    public static void main(String[] args) throws Exception {
        CuratorFramework cfClient = CuratorFrameworkFactory.builder().connectString("felixzh:2181")
                .retryPolicy(new ExponentialBackoffRetry(100, 3)).build();
        cfClient.start();

        final int clientNum = 5;
        final String BASE_PATH = "/felixzh_distributed_count";

        ExecutorService executorService = Executors.newFixedThreadPool(clientNum);
        for (int i = 0; i < clientNum; i++) {

            executorService.submit(() -> {
                try {
                    final DistributedAtomicLong distributedAtomicLong = new DistributedAtomicLong(cfClient, BASE_PATH, new RetryNTimes(3, 1000));
                    AtomicValue<Long> atomicValue = distributedAtomicLong.increment();
                    if (atomicValue.succeeded()) {
                        System.out.println("pre value: " + atomicValue.preValue() + "," + "post value: " + atomicValue.postValue());
                        System.out.println("current value: " + distributedAtomicLong.get().postValue());
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }

        Thread.sleep(3_000);
        executorService.shutdown();
    }
}

程式執行,輸出以下結果:

pre value: 55,post value: 56
current value: 56
pre value: 56,post value: 57
current value: 57
pre value: 57,post value: 58
current value: 58
pre value: 58,post value: 59
current value: 59
pre value: 59,post value: 60
current value: 60
歡迎關注微信公眾號:大資料從業者