curator使用自增長znode生成id
public class CuratorTest { static Object ob = new Object(); private static CuratorFramework curatorFrameworkClient; private static RetryPolicy retryPolicy; private static ExecutorService executorService; private static String IP_TOSTRING = "192.168.0.170:2181"; private static String ROOT = "/curtor"; private static String NODE_NAME = "id-generator"; static { retryPolicy = new ExponentialBackoffRetry(1000, 3); curatorFrameworkClient = CuratorFrameworkFactory .builder() .connectString(IP_TOSTRING) .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); curatorFrameworkClient.start(); //新增監聽事件 curatorFrameworkClient.getCuratorListenable().addListener( new CuratorListener(){ @Override public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception { //do you job } } ); try { executorService = Executors.newFixedThreadPool(10); //請先判斷父節點/root節點是否存在 Stat stat = curatorFrameworkClient.checkExists().forPath(ROOT); if (stat == null) { curatorFrameworkClient.create().withMode(CreateMode.PERSISTENT).forPath(ROOT, null); } } catch (Exception e) { e.printStackTrace(); } } public String generateId() { String backPath = ""; String fullPath = ROOT.concat("/").concat(NODE_NAME); try { // 關鍵點:建立持久順序節點 backPath = curatorFrameworkClient.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(fullPath, null); //為防止生成的節點浪費系統資源,故生成後非同步刪除此節點 String finalBackPath = backPath; executorService.execute(() -> { try { curatorFrameworkClient.delete().forPath(finalBackPath); synchronized (ob) { ob.notifyAll(); } } catch (Exception e) { e.printStackTrace(); } }); String ID = this.splitID(backPath); System.out.println("生成的ID=" + ID); } catch (Exception e) { e.printStackTrace(); } return backPath; } public String splitID(String path) { int index = path.lastIndexOf(NODE_NAME); if (index >= 0) { index += NODE_NAME.length(); return index <= path.length() ? path.substring(index) : ""; } return path; } public static void main(String[] args){ CuratorTest tst = new CuratorTest(); tst.generateId(); try { synchronized (ob) { ob.wait(); } executorService.shutdown(); curatorFrameworkClient.close(); } catch (InterruptedException e) { e.printStackTrace(); } } }
原始碼解析:
1.入口類
curatorFrameworkClient = CuratorFrameworkFactory
.builder()
.connectString(IP_TOSTRING)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
2.CuratorFrameworkFactory.builder()
使用內部類 public static class Builder()
3.build()方法
返回 new CuratorFrameworkImpl(this); this代表builder類
4.CuratorFrameworkImpl類
//zk客戶端實現類
this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null);
processEvent(event);
}
}, builder.getRetryPolicy(), builder.canBeReadOnly());
//監聽
listeners = new ListenerContainer<CuratorListener>();
//不能解決的錯誤監聽
unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
//後臺的一些操作
backgroundOperations = new DelayQueue<OperationAndData<?>>();
namespace = new NamespaceImpl(this, builder.getNamespace());
//自己不配置執行緒池的話 使用自帶的執行緒池
threadFactory = getThreadFactory(builder);
//connectionState管理器 就是zk connection的狀態管理器
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
//壓縮器
compressionProvider = builder.getCompressionProvider();
//許可權控制器
aclProvider = builder.getAclProvider();
//CuratorFramework狀態管理 通過cas控制狀態eumn
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
5. new CuratorZookeeperClient()
//主要的成員變數
private final ConnectionState state;
private final AtomicReference<RetryPolicy> retryPolicy = new AtomicReference<RetryPolicy>();
//構造器
state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly);
setRetryPolicy(retryPolicy);
--ConnectionState.class
//主要成員變數
private final HandleHolder zooKeeper;
private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
//建構函式
if ( parentWatcher != null )
{
parentWatchers.offer(parentWatcher);
}
zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
--HandleHolder.class
//zookeeperFactory生成工廠
private final ZookeeperFactory zookeeperFactory;
//watcher
private final Watcher watcher;
6. curatorFrameworkClient.start();
connectionStateManager.start(); // 連線管理器(用於管理zkclient session關斷狀態) final ConnectionStateListener listener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState ) { logAsErrorConnectionErrors.set(true); } } }; this.getConnectionStateListenable().addListener(listener); client.start();//zk客戶端連線 executorService = Executors.newFixedThreadPool(2, threadFactory); // 1 for listeners, 1 for background ops executorService.submit(new Callable<Object>() { @Override public Object call() throws Exception { backgroundOperationsLoop(); return null; } });