【推薦】zookeeper典型應用場景: 分散式計數器
一、技術介紹
zookeeper有很多典型應用場景,應用在分散式系統中,這裡介紹其分散式計數器應用。本文將討論如何使用Curator來實現計數器。 顧名思義,計數器是用來計數的, 利用ZooKeeper可以實現一個叢集共享的計數器。 只要使用相同的path就可以得到最新的計數器值, 這是由ZooKeeper的一致性保證的。
Curator有兩個計數器:SharedCount計數器,DistributedAtomicLong計數器(如int等其它型別計數器類似)
-
SharedCount
這個類使用int型別來計數。 主要涉及三個類。
- SharedCount
- SharedCountReader
- SharedCountListener
SharedCount
代表計數器, 可以為它增加一個SharedCountListener,當計數器改變時此Listener可以監聽到改變的事件,而SharedCountReader可以讀取到最新的值, 包括字面值和帶版本資訊的值VersionedValue。
-
DistributedAtomicLong
再看一個Long型別的計數器。 除了計數的範圍比SharedCount
大了之外, 它首先嚐試使用樂觀鎖的方式設定計數器, 如果不成功(比如期間計數器已經被其它client更新了), 它使用InterProcessMutex
succeeded()
, 它代表此操作是否成功。 如果操作成功, preValue()
代表操作前的值,postValue()
代表操作後的值。
具體實現步驟:
二、maven依賴
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency>
三、SharedCount計數器
SharedCount計數器在操作失敗時,需要自己去重試,程式碼如下:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.SharedCountListener;
import org.apache.curator.framework.recipes.shared.SharedCountReader;
import org.apache.curator.framework.recipes.shared.VersionedValue;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;
public class ZookeeperSharedCountMain {
public static void main(String[] args) throws Exception {
//建立zookeeper客戶端
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
//指定鎖路徑
String lockPath = "/zkLockRoot/lock_2";
//建立分散式計數器,初始為0
SharedCount sharedCount = new SharedCount(client, lockPath, 0);
//定義監聽器
SharedCountListener sharedCountListener = new SharedCountListener() {
@Override
public void countHasChanged(SharedCountReader sharedCountReader, int i) throws Exception {
System.out.println("new cnt : " + i);
}
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
System.out.println("stated change : " + connectionState);
}
};
//新增監聽器
sharedCount.addListener(sharedCountListener);
sharedCount.start();
//生成執行緒池
ExecutorService executor = Executors.newCachedThreadPool();
Consumer<SharedCount> consumer = (SharedCount count) -> {
try {
List<Callable<Boolean>> callList = new ArrayList<>();
Callable<Boolean> call = () -> {
boolean result = false;
try {
//資料更新,若失敗,則重試10次,每次間隔30毫秒
for(int i=0; i<10;i++){
//獲取當前版本資料
VersionedValue<Integer> oldVersion = sharedCount.getVersionedValue();
int newCnt = oldVersion.getValue() + 1;
result = sharedCount.trySetCount(oldVersion, newCnt);
if(result){
System.out.println(Thread.currentThread().getName()
+ " oldVersion:"+oldVersion.getVersion()
+" oldCnt:"+oldVersion.getValue()
+" newCnt:"+sharedCount.getCount()
+" result:"+result);
break;
}
TimeUnit.MILLISECONDS.sleep(30);
}
} catch (Exception e) {
} finally {
}
return result;
};
//5個執行緒
for (int i = 0; i < 5; i++) {
callList.add(call);
}
List<Future<Boolean>> futures = executor.invokeAll(callList);
} catch (Exception e) {
}
};
//測試分散式int型別的計數器
consumer.accept(sharedCount);
System.out.println("final cnt : " + sharedCount.getCount());
sharedCount.close();
executor.shutdown();
}
}
輸出:
stated change : CONNECTED
pool-3-thread-5 oldVersion:0 oldCnt:0 newCnt:1 result:true
new cnt : 1
pool-3-thread-3 oldVersion:1 oldCnt:1 newCnt:2 result:true
new cnt : 2
pool-3-thread-1 oldVersion:2 oldCnt:2 newCnt:3 result:true
new cnt : 3
pool-3-thread-2 oldVersion:3 oldCnt:3 newCnt:4 result:true
new cnt : 4
pool-3-thread-4 oldVersion:4 oldCnt:4 newCnt:5 result:true
final cnt : 5
四、DistributedAtomicLong計數器
DistributedAtomicLong計數器可以自己設定重試的次數與間隔,程式碼如下:
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Consumer;
public class ZookeeperDistributedAtomicLongMain {
public static void main(String[] args) throws Exception {
//建立zookeeper客戶端
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
//指定鎖路徑
String lockPath = "/zkLockRoot/lock_3";
//分散式計數器,失敗時重試10,每次間隔30毫秒
DistributedAtomicLong distributedAtomicLong = new DistributedAtomicLong(client, lockPath, new RetryNTimes(10, 30));
//生成執行緒池
ExecutorService executor = Executors.newCachedThreadPool();
Consumer<DistributedAtomicLong> consumer = (DistributedAtomicLong count) -> {
try {
List<Callable<Boolean>> callList = new ArrayList<>();
Callable<Boolean> call = () -> {
boolean result = false;
try {
AtomicValue<Long> val = count.increment();
System.out.println("old cnt: "+val.preValue()+" new cnt : "+ val.postValue()+" result:"+val.succeeded());
result = val.succeeded();
} catch (Exception e) {
} finally {
}
return result;
};
//5個執行緒
for (int i = 0; i < 5; i++) {
callList.add(call);
}
List<Future<Boolean>> futures = executor.invokeAll(callList);
} catch (Exception e) {
}
};
//測試計數器
consumer.accept(distributedAtomicLong);
System.out.println("final cnt : " + distributedAtomicLong.get().postValue());
executor.shutdown();
}
}
輸出:
old cnt: 0 new cnt : 1 result:true
old cnt: 1 new cnt : 2 result:true
old cnt: 2 new cnt : 3 result:true
old cnt: 3 new cnt : 4 result:true
old cnt: 4 new cnt : 5 result:true
final cnt : 5
如果在DistributedAtomicLong的構造方法引數中,RetryNTimes重試次數不夠,比如是3,你會發現並不一定每次加數都會成功。顯然這裡是用了樂觀鎖機制,它並不保證操作一定成功(它在重試這麼多次中都沒有成功獲得鎖,導致操作沒有執行),所以我們有必要通過呼叫 av.succeeded() 來檢視此次加數是否成功。