1. 程式人生 > >curator使用自增長znode生成id

curator使用自增長znode生成id

public class CuratorTest {

        static Object ob = new Object();
        private static CuratorFramework curatorFrameworkClient;

        private static RetryPolicy retryPolicy;

        private static ExecutorService executorService;

        private static String IP_TOSTRING = "192.168.0.170:2181";

        private static String ROOT = "/curtor";

        private static String NODE_NAME = "id-generator";

        static {
            retryPolicy = new ExponentialBackoffRetry(1000, 3);
            curatorFrameworkClient = CuratorFrameworkFactory
                    .builder()
                    .connectString(IP_TOSTRING)
                    .sessionTimeoutMs(5000)
                    .connectionTimeoutMs(5000)
                    .retryPolicy(retryPolicy)
                    .build();
            curatorFrameworkClient.start();
            //新增監聽事件
            curatorFrameworkClient.getCuratorListenable().addListener( new CuratorListener(){

                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    //do you job
                }
            } );

            try {
                executorService = Executors.newFixedThreadPool(10);
                //請先判斷父節點/root節點是否存在
                Stat stat = curatorFrameworkClient.checkExists().forPath(ROOT);
                if (stat == null) {
                    curatorFrameworkClient.create().withMode(CreateMode.PERSISTENT).forPath(ROOT, null);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public String generateId() {
            String backPath = "";

            String fullPath = ROOT.concat("/").concat(NODE_NAME);
            try {
                // 關鍵點:建立持久順序節點
                backPath = curatorFrameworkClient.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(fullPath, null);
                //為防止生成的節點浪費系統資源,故生成後非同步刪除此節點
                String finalBackPath = backPath;
                executorService.execute(() -> {
                    try {
                        curatorFrameworkClient.delete().forPath(finalBackPath);
                        synchronized (ob) {
                            ob.notifyAll();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                String ID = this.splitID(backPath);
                System.out.println("生成的ID=" + ID);

            } catch (Exception e) {
                e.printStackTrace();
            }
            return backPath;
        }

        public String splitID(String path) {
            int index = path.lastIndexOf(NODE_NAME);
            if (index >= 0) {
                index += NODE_NAME.length();
                return index <= path.length() ? path.substring(index) : "";
            }
            return path;

        }

        public  static void main(String[] args){
            CuratorTest tst = new CuratorTest();
            tst.generateId();
            try {
                synchronized (ob) {
                    ob.wait();
                }
                executorService.shutdown();
                curatorFrameworkClient.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }


        }


}

原始碼解析:

1.入口類
 curatorFrameworkClient = CuratorFrameworkFactory
                    .builder()
                    .connectString(IP_TOSTRING)
                    .sessionTimeoutMs(5000)
                    .connectionTimeoutMs(5000)
                    .retryPolicy(retryPolicy)
                    .build();
2.CuratorFrameworkFactory.builder()
    使用內部類 public static class Builder()

3.build()方法
    返回 new CuratorFrameworkImpl(this); this代表builder類

4.CuratorFrameworkImpl類
     //zk客戶端實現類
     this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher()
        {
            @Override
            public void process(WatchedEvent watchedEvent)
            {
                CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null);
                processEvent(event);
            }
        }, builder.getRetryPolicy(), builder.canBeReadOnly());
        //監聽
        listeners = new ListenerContainer<CuratorListener>();
        //不能解決的錯誤監聽
        unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
        //後臺的一些操作
        backgroundOperations = new DelayQueue<OperationAndData<?>>();
        namespace = new NamespaceImpl(this, builder.getNamespace());
        //自己不配置執行緒池的話 使用自帶的執行緒池
        threadFactory = getThreadFactory(builder);
        //connectionState管理器 就是zk connection的狀態管理器
        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
        //壓縮器
        compressionProvider = builder.getCompressionProvider();
        //許可權控制器
        aclProvider = builder.getAclProvider();
        //CuratorFramework狀態管理 通過cas控制狀態eumn
        state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);


5. new CuratorZookeeperClient()
    //主要的成員變數
    private final ConnectionState                   state;
    private final AtomicReference<RetryPolicy>      retryPolicy = new AtomicReference<RetryPolicy>();
    //構造器
     state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly);
        setRetryPolicy(retryPolicy);

    --ConnectionState.class
        //主要成員變數
          private final HandleHolder zooKeeper;
           private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
        //建構函式
         if ( parentWatcher != null )
        {
            parentWatchers.offer(parentWatcher);
        }

        zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);

        --HandleHolder.class
            //zookeeperFactory生成工廠
            private final ZookeeperFactory zookeeperFactory;
            //watcher
            private final Watcher watcher;
  

6. curatorFrameworkClient.start();

connectionStateManager.start(); // 連線管理器(用於管理zkclient session關斷狀態)

final ConnectionStateListener listener = new ConnectionStateListener()
{
    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState)
    {
        if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
        {
            logAsErrorConnectionErrors.set(true);
        }
    }
};

this.getConnectionStateListenable().addListener(listener);

client.start();//zk客戶端連線

executorService = Executors.newFixedThreadPool(2, threadFactory);  // 1 for listeners, 1 for background ops

executorService.submit(new Callable<Object>()
{
    @Override
    public Object call() throws Exception
    {
        backgroundOperationsLoop();
        return null;
    }
});