Curator典型應用場景之-分散式計數器
之前我們瞭解了基於Corator的分散式鎖之後,我們就很容易基於其實現一個分散式計數器,顧名思義,計數器是用來計數的, 利用ZooKeeper可以實現一個叢集共享的計數器。 只要使用相同的path就可以得到最新的計數器值, 這是由ZooKeeper的一致性保證的。Curator有兩種計數器。
SharedCount
這個類使用int型別來計數。 主要涉及三個類。
SharedCount
SharedCountReader
SharedCountListener
SharedCount代表計數器, 可以為它增加一個SharedCountListener,當計數器改變時此Listener可以監聽到改變的事件,而SharedCountReader可以讀取到最新的值, 包括字面值和帶版本資訊的值VersionedValue。SharedCount必須呼叫start()方法開啟,使用完之後必須呼叫close關閉它。
SharedCount有以下幾個主要方法
/** 強制設定值 */ public void setCount(int newCount) throws Exception; /** 第一個引數提供當前的VersionedValue,如果期間其它client更新了此計數值, * 你的更新可能不成功 更新不成功返回false 但可以通過getCount()讀取最新值*/ public boolean trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception; /** 獲取當前最新值 */ publicint getCount();
例子
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.shared.SharedCount; import org.apache.curator.framework.recipes.shared.SharedCountListener; import org.apache.curator.framework.recipes.shared.SharedCountReader;import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.ExponentialBackoffRetry; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class SharedCountCase { public static void main(String[] args) throws Exception { final int clientNum = 5; final String BASE_PATH = "/felixzh/counter"; CuratorFramework cfClient = CuratorFrameworkFactory.builder().connectString("felixzh:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); cfClient.start(); ExecutorService executorService = Executors.newFixedThreadPool(clientNum); for (int i = 0; i < clientNum; i++) { executorService.submit(() -> { try { SharedCount sharedCount = new SharedCount(cfClient, BASE_PATH, 0); sharedCount.addListener(new SharedCountListener() { @Override public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception { //每個執行緒都能監聽到變化 System.out.println(sharedCount.getVersionedValue().getValue() + "," + newCount); } @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { } }); sharedCount.start(); boolean res = false; while (!res) { res = sharedCount.trySetCount(sharedCount.getVersionedValue(), sharedCount.getVersionedValue().getValue() + 1); } System.out.println("current value: " + sharedCount.getVersionedValue().getValue()); } catch (Exception e) { e.printStackTrace(); } }); } Thread.sleep(3_000); executorService.shutdown(); } }
程式執行,輸出以下結果:
current value: 91
current value: 92
current value: 93
current value: 94
current value: 95
DistributedAtomicInteger 和 DistributedAtomicLong
DistributedAtomicInteger和SharedCount計數範圍是一樣的,都是int型別,但是DistributedAtomicInteger和DistributedAtomicLong和上面的計數器的實現有顯著的不同,它首先嚐試使用樂觀鎖的方式設定計數器, 如果不成功(比如期間計數器已經被其它client更新了), 它使用InterProcessMutex方式來更新計數值。 還記得InterProcessMutex是什麼嗎? 它是我們前面講的分散式可重入鎖。下面只講解DistributedAtomicLong。
可以從它的內部實現DistributedAtomicValue.trySet中看出端倪。
此計數器有一系列的操作:
get(): 獲取當前值
increment(): 加一
decrement(): 減一
add(): 增加特定的值
subtract(): 減去特定的值
trySet(): 嘗試設定計數值
forceSet(): 強制設定計數值
你必須檢查返回結果的succeeded(), 它代表此操作是否成功。 如果操作成功, preValue()代表操作前的值, postValue()代表操作後的值。
例子
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.atomic.AtomicValue; import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.RetryNTimes; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; public class DistributedAtomicLongCase { public static void main(String[] args) throws Exception { CuratorFramework cfClient = CuratorFrameworkFactory.builder().connectString("felixzh:2181") .retryPolicy(new ExponentialBackoffRetry(100, 3)).build(); cfClient.start(); final int clientNum = 5; final String BASE_PATH = "/felixzh_distributed_count"; ExecutorService executorService = Executors.newFixedThreadPool(clientNum); for (int i = 0; i < clientNum; i++) { executorService.submit(() -> { try { final DistributedAtomicLong distributedAtomicLong = new DistributedAtomicLong(cfClient, BASE_PATH, new RetryNTimes(3, 1000)); AtomicValue<Long> atomicValue = distributedAtomicLong.increment(); if (atomicValue.succeeded()) { System.out.println("pre value: " + atomicValue.preValue() + "," + "post value: " + atomicValue.postValue()); System.out.println("current value: " + distributedAtomicLong.get().postValue()); } } catch (Exception e) { e.printStackTrace(); } }); } Thread.sleep(3_000); executorService.shutdown(); } }
程式執行,輸出以下結果:
pre value: 55,post value: 56 current value: 56 pre value: 56,post value: 57 current value: 57 pre value: 57,post value: 58 current value: 58 pre value: 58,post value: 59 current value: 59 pre value: 59,post value: 60 current value: 60歡迎關注微信公眾號:大資料從業者