跟著例項學習ZooKeeper的用法: 計數器
這一篇文章我們將學習使用Curator來實現計數器。 顧名思義,計數器是用來計數的, 利用ZooKeeper可以實現一個叢集共享的計數器。 只要使用相同的path就可以得到最新的計數器值, 這是由ZooKeeper的一致性保證的。Curator有兩個計數器, 一個是用int來計數,一個用long來計數。
SharedCount
這個類使用int型別來計數。 主要涉及三個類。
- SharedCount
- SharedCountReader
- SharedCountListener
SharedCount
代表計數器, 可以為它增加一個SharedCountListener,當計數器改變時此Listener可以監聽到改變的事件,而SharedCountReader可以讀取到最新的值, 包括字面值和帶版本資訊的值VersionedValue。
例子程式碼:
package com.colobu.zkrecipe.counter;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.curator.framework.recipes.shared.SharedCountListener;
import org.apache.curator.framework.recipes.shared.SharedCountReader;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import com.google.common.collect.Lists;
public class SharedCounterExample implements SharedCountListener{
private static final int QTY = 5;
private static final String PATH = "/examples/counter";
public static void main(String[] args) throws IOException, Exception {
final Random rand = new Random();
SharedCounterExample example = new SharedCounterExample();
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
SharedCount baseCount = new SharedCount(client, PATH, 0);
baseCount.addListener(example);
baseCount.start();
List<SharedCount> examples = Lists.newArrayList();
ExecutorService service = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < QTY; ++i) {
final SharedCount count = new SharedCount(client, PATH, 0);
examples.add(count);
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
count.start();
Thread.sleep(rand.nextInt(10000));
System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
return null;
}
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
for (int i = 0; i < QTY; ++i) {
examples.get(i).close();
}
baseCount.close();
}
}
@Override
public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
System.out.println("State changed: " + arg1.toString());
}
@Override
public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
System.out.println("Counter's value is changed to " + newCount);
}
}
在這個例子中,我們使用baseCount
來監聽計數值(addListener
方法)。 任意的SharedCount, 只要使用相同的path,都可以得到這個計數值。 然後我們使用5個執行緒為計數值增加一個10以內的隨機數。
count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))
這裡我們使用trySetCount
去設定計數器。 第一個引數提供當前的VersionedValue,如果期間其它client更新了此計數值, 你的更新可能不成功, 但是這時你的client更新了最新的值,所以失敗了你可以嘗試再更新一次。 而setCount
是強制更新計數器的值。
注意計數器必須start
,使用完之後必須呼叫close
關閉它。
在這裡再重複一遍前面講到的, 強烈推薦你監控ConnectionStateListener
, 儘管我們的有些例子沒有監控它。 在本例中SharedCountListener
擴充套件了ConnectionStateListener
。 這一條針對所有的Curator recipes都適用,後面的文章中就不專門提示了。
DistributedAtomicLong
再看一個Long型別的計數器。 除了計數的範圍比SharedCount
大了之外, 它首先嚐試使用樂觀鎖的方式設定計數器, 如果不成功(比如期間計數器已經被其它client更新了), 它使用InterProcessMutex
方式來更新計數值。 還記得InterProcessMutex
是什麼嗎? 它是我們前面跟著例項學習ZooKeeper的用法: 分散式鎖 講的分散式可重入鎖。 這和上面的計數器的實現有顯著的不同。
可以從它的內部實現DistributedAtomicValue.trySet
中看出端倪。
AtomicValue<byte[]> trySet(MakeValue makeValue) throws Exception
{
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
tryOptimistic(result, makeValue);
if ( !result.succeeded() && (mutex != null) )
{
tryWithMutex(result, makeValue);
}
return result;
}
此計數器有一系列的操作:
- get(): 獲取當前值
- increment(): 加一
- decrement(): 減一
- add(): 增加特定的值
- subtract(): 減去特定的值
- trySet(): 嘗試設定計數值
- forceSet(): 強制設定計數值
你必須檢查返回結果的succeeded()
, 它代表此操作是否成功。 如果操作成功, preValue()
代表操作前的值, postValue()
代表操作後的值。
我們下面的例子中使用5個執行緒對計數器進行加一操作,如果成功,將操作前後的值打印出來。
package com.colobu.zkrecipe.counter;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;
import com.google.common.collect.Lists;
public class DistributedAtomicLongExample {
private static final int QTY = 5;
private static final String PATH = "/examples/counter";
public static void main(String[] args) throws IOException, Exception {
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
List<DistributedAtomicLong> examples = Lists.newArrayList();
ExecutorService service = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < QTY; ++i) {
final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));
examples.add(count);
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
try {
//Thread.sleep(rand.nextInt(1000));
AtomicValue<Long> value = count.increment();
//AtomicValue<Long> value = count.decrement();
//AtomicValue<Long> value = count.add((long)rand.nextInt(20));
System.out.println("succeed: " + value.succeeded());
if (value.succeeded())
System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
}
}
}