1. 程式人生 > >Java API操作ZK node

Java API操作ZK node

建立會話

  • 建立簡單連線
/**
 * 測試建立Zk會話
 * Created by liuhuichao on 2017/7/25.
 */
public class ZooKeeper_Constructor_Usage_Simple implements Watcher {
    private static CountDownLatch connectedSemaphore=new CountDownLatch(1);

    public static void main(String[] args) throws Exception{
        ZooKeeper zk=new
ZooKeeper("192.168.99.215:2181",5000,new ZooKeeper_Constructor_Usage_Simple()); System.out.println(zk.getState()); connectedSemaphore.await(); System.out.println("zk session established"); } /** * 處理來自ZK服務端的watcher通知 * @param watchedEvent */ @Override public
void process(WatchedEvent watchedEvent) { System.out.println("receive watched event:"+watchedEvent); if(Event.KeeperState.SyncConnected==watchedEvent.getState()){ connectedSemaphore.countDown();//解除等待阻塞 } } }
  • 複用會話
/**
 * 複用sessionId和sessionPassword的會話
 * Created by liuhuichao on 2017/7/25.
 */
public class ZooKeeper_Constructor_Usage_With_sid_password implements Watcher { private static CountDownLatch connectedSemaphore=new CountDownLatch(1); @Override public void process(WatchedEvent watchedEvent) { System.out.println("receive watched event:"+watchedEvent); if(Event.KeeperState.SyncConnected==watchedEvent.getState()){ connectedSemaphore.countDown(); } } public static void main(String[] args) throws Exception{ ZooKeeper zooKeeper=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeper_Constructor_Usage_With_sid_password()); connectedSemaphore.await(); long sessionId=zooKeeper.getSessionId(); byte[] password=zooKeeper.getSessionPasswd(); /**使用錯誤的sessionID跟sessionPwd連連線測試[192.168.99.215 lhc-centos0]**/ ZooKeeper zkWrong=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeper_Constructor_Usage_With_sid_password(),1l,"lhc".getBytes()); /**使用正確的來進行連線**/ ZooKeeper zkTrue=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeper_Constructor_Usage_With_sid_password(),sessionId,password); Thread.sleep(Integer.MAX_VALUE); } }

建立節點

  • 使用同步API建立節點
/**
 * 使用同步API建立一個節點
 * Created by liuhuichao on 2017/7/25.
 */
public class ZooKeeper_Create_API_Sync_Usage implements Watcher {
    private static CountDownLatch connectedSemaphore=new CountDownLatch(1);
    @Override
    public void process(WatchedEvent watchedEvent) {
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            connectedSemaphore.countDown();
        }
    }

    public static void main(String[] args) throws Exception{
        ZooKeeper zooKeeper=new ZooKeeper("192.168.99.215:2181",5000,new ZooKeeper_Create_API_Sync_Usage());
        connectedSemaphore.await();
        String path1=zooKeeper.create("/zk-test1","lhc".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);//臨時結點
        System.out.println(path1+" 建立成功!");

        String path2=zooKeeper.create("/zk-test2","lllhhhhhhhhhhhhhhhhc".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(path2+"  建立成功!");

    }
}
  • 使用非同步API建立一個節點
/**
 * 使用非同步API建立一個節點
 * Created by liuhuichao on 2017/7/25.
 */
public class ZooKeeper_Create_API_ASync_Usage implements Watcher{
    private static CountDownLatch connectedSamphore=new CountDownLatch(1);

    @Override
    public void process(WatchedEvent watchedEvent) {
        if(watchedEvent.getState()== Event.KeeperState.SyncConnected){
            connectedSamphore.countDown();
        }
    }

    public static void main(String[] args) throws Exception{
        ZooKeeper zk1=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeper_Create_API_ASync_Usage());
        connectedSamphore.await();
        zk1.create("/zk-test-1","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,new IStringCallBack(),"i am a context");
        zk1.create("/zk-test-2","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,new IStringCallBack(),"i am a context");
        zk1.create("/zk-test-3","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL,new IStringCallBack(),"i am a context");
        Thread.sleep(Integer.MAX_VALUE);
    }
}


/**
 * Created by liuhuichao on 2017/7/26.
 */
public class IStringCallBack implements AsyncCallback.StringCallback{

        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            System.out.println("result:"+rc+"; path="+path+" ctx="+ctx+" name = "+name);
        }
}

刪除節點

/**
 * 刪除zk的持久結點
 * Created by liuhuichao on 2017/7/26.
 */
public class ZooKeeperDeleteNode implements Watcher {

    private  static CountDownLatch conntedSamphore=new CountDownLatch(1);
    @Override
    public void process(WatchedEvent watchedEvent) {
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            conntedSamphore.countDown();
        }

    }

    public static void main(String[] args) throws Exception{
        /**同步刪除節點**/
        ZooKeeper zooKeeper=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeperDeleteNode());
        conntedSamphore.await();
        zooKeeper.delete("/zk-test-30000000014",0);
    }


}

讀取資料

  • 使用同步API獲取子節點列表
/**
 *獲取結點-同步
 * Created by liuhuichao on 2017/7/26.
 */
public class ZooKeeper_GetChildren_API_Sync_Usage implements Watcher {
    private  static CountDownLatch conntedSamphore=new CountDownLatch(1);
    private static ZooKeeper zooKeeper=null;

    @Override
    public void process(WatchedEvent watchedEvent) {
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            conntedSamphore.countDown();
        }else if(watchedEvent.getType()== Event.EventType.NodeChildrenChanged){
            try {
                System.out.println("--------------------------------------reget children:"+zooKeeper.getChildren(watchedEvent.getPath(),true));
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }


    public static void main(String[] args) throws Exception{
        String path="/zk-test-1";
        zooKeeper=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeper_GetChildren_API_Sync_Usage());
        conntedSamphore.await();
        zooKeeper.create(path+"/test1","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
        List<String> childrenList=zooKeeper.getChildren(path,true);
        System.out.println(childrenList);
        zooKeeper.create(path+"/test2","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
        Thread.sleep(Integer.MAX_VALUE);
    }

}


  • 使用非同步API獲取子節點列表
**
 * 非同步獲取結點
 * Created by liuhuichao on 2017/7/26.
 */
public class ZooKeeper_GetChildren_API_ASync_Usage implements Watcher {

    private static CountDownLatch connectedSemphore=new CountDownLatch(1);
    private static ZooKeeper zk=null;

    @Override
    public void process(WatchedEvent watchedEvent) {
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            connectedSemphore.countDown();
        }else if(watchedEvent.getType()== Event.EventType.NodeChildrenChanged){
            try {
                System.out.println("node changed===="+zk.getChildren(watchedEvent.getPath(),true));
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }


    }

    public static void main(String[] args) throws Exception{
        String path="/zk-test-1";
        zk=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeper_GetChildren_API_ASync_Usage());
        connectedSemphore.await();
        zk.create(path+"/test3","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk.getChildren(path,true,new ICChild2Callback(),null);
        Thread.sleep(Integer.MAX_VALUE);
    }


}


/**
 * 非同步獲取結點回調介面
 * Created by liuhuichao on 2017/7/26.
 */
public class ICChild2Callback implements AsyncCallback.Children2Callback{

    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        System.out.println("get children zonde result:[reponse code:"+rc+" path="+path+" ctx="+ctx+"  childrenlist="+children+" stat="+stat);
    }
}
  • 使用同步API獲取結點資料
/**
 *
 * 同步獲取資料
 * Created by liuhuichao on 2017/7/27.
 */
public class GetData_API_Sync_Usage  implements Watcher{

    private static CountDownLatch conntedSamphore=new CountDownLatch(1);
    private static ZooKeeper zk=null;
    private static Stat stat=new Stat();
    @Override
    public void process(WatchedEvent watchedEvent) {
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            conntedSamphore.countDown();
        }else if(watchedEvent.getType()== Event.EventType.NodeCreated){
            System.out.println("node changed:"+watchedEvent.getPath());
        }

    }

    public static void main(String[] args) throws Exception{
        String path="/test-1";
       zk =new ZooKeeper("rc-zkp-datn-rse-nmg-ooz-woasis:2181",5000,new GetData_API_Sync_Usage());
        conntedSamphore.await();
        System.out.println("zk-19 連線成功!");
        //zk.create(path+"/lhc", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        List<String> children=zk.getChildren(path,new GetData_API_Sync_Usage());
        System.out.println("children node:"+children);
        zk.setData(path+"/lhc","memeda".getBytes(),-1);
        byte[] nodeValue=zk.getData(path+"/lhc",true,stat);
        System.out.println(new String(nodeValue));


    }
}
  • 使用非同步API獲取結點資料
/**
 *
 * 同步/非同步獲取資料
 * Created by liuhuichao on 2017/7/27.
 */
public class GetData_API_Sync_Usage  implements Watcher{

    private static CountDownLatch conntedSamphore=new CountDownLatch(1);
    private static ZooKeeper zk=null;
    private static Stat stat=new Stat();
    @Override
    public void process(WatchedEvent watchedEvent) {
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            conntedSamphore.countDown();
        }else if(watchedEvent.getType()== Event.EventType.NodeCreated){
            System.out.println("node changed:"+watchedEvent.getPath());
        }

    }

    public static void main(String[] args) throws Exception{
        String path="/test-1";
       zk =new ZooKeeper("rc-zkp-datn-rse-nmg-ooz-woasis:2181",5000,new GetData_API_Sync_Usage());
        conntedSamphore.await();
        System.out.println("zk-19 連線成功!");
        //zk.create(path+"/lhc", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        List<String> children=zk.getChildren(path,new GetData_API_Sync_Usage());
        System.out.println("children node:"+children);
        zk.setData(path+"/lhc","lllhc".getBytes(),-1);
        zk.getData(path+"/lhc",true,new IDataCallback(),null);//非同步獲取資料
        Thread.sleep(Integer.MAX_VALUE);
    }
}

/**
 * 非同步獲取node資料回撥
 * Created by liuhuichao on 2017/7/27.
 */
public class IDataCallback implements AsyncCallback.DataCallback {
    @Override
    public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
        System.out.println("rc="+rc+" ;path="+path+" ;ctx="+ctx+" ;data="+data+" ;stat="+stat);
        System.out.println("string data="+new String(data));
        System.out.println("max version="+stat.getVersion());
    }
}

更新資料

  • 同步設定資料
  zk.setData(path+"/lhc","lllhc".getBytes(),-1);//同步設定資料
  • 非同步設定資料
/**
 *
 * 同步/非同步獲取資料
 * Created by liuhuichao on 2017/7/27.
 */
public class GetData_API_Sync_Usage  implements Watcher{

    private static CountDownLatch conntedSamphore=new CountDownLatch(1);
    private static ZooKeeper zk=null;
    private static Stat stat=new Stat();
    @Override
    public void process(WatchedEvent watchedEvent) {
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            conntedSamphore.countDown();
        }else if(watchedEvent.getType()== Event.EventType.NodeCreated){
            System.out.println("node changed:"+watchedEvent.getPath());
        }

    }

    public static void main(String[] args) throws Exception{
        String path="/test-1";
       zk =new ZooKeeper("rc-zkp-datn-rse-nmg-ooz-woasis:2181",5000,new GetData_API_Sync_Usage());
        conntedSamphore.await();
        System.out.println("zk-19 連線成功!");
        //zk.create(path+"/lhc", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        List<String> children=zk.getChildren(path,new GetData_API_Sync_Usage());
        System.out.println("children node:"+children);
        //zk.setData(path+"/lhc","lllhc".getBytes(),-1);//同步設定資料
        zk.setData(path+"/lhc","lhc".getBytes(),-1,new IStatCallback(),null);
        zk.getData(path+"/lhc",true,new IDataCallback(),null);//非同步獲取資料
        Thread.sleep(Integer.MAX_VALUE);
    }
}



/**
 * 非同步設定資料回撥介面
 * Created by liuhuichao on 2017/7/27.
 */
public class IStatCallback implements AsyncCallback.StatCallback{
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        System.out.println("rc="+rc+" ;path="+path+" ;ctx="+ctx+" ;stat="+stat);
        if(rc==0){
            System.out.println("資料設定成功!");
        }
    }
}

檢測節點是否存在


/**
 * 檢測zk node
 * Created by liuhuichao on 2017/7/27.
 */
public class Exist_API_Sync_Usage implements Watcher{
    private static CountDownLatch connetedSamphore=new CountDownLatch(1);
    private static ZooKeeper zk=null;

    @Override
    public void process(WatchedEvent watchedEvent) {
        if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
            connetedSamphore.countDown();
        }else if(Event.EventType.NodeCreated==watchedEvent.getType()){
            System.out.println("node created=="+watchedEvent.getPath());
        }else if(Event.EventType.NodeDataChanged==watchedEvent.getType()){
            System.out.println("node changed=="+watchedEvent.getPath());
        }else if(Event.EventType.NodeDeleted==watchedEvent.getType()){
            System.out.println("node deleted=="+watchedEvent.getPath());
        }
    }

    public static void main(String[] args)throws Exception {
        String path="/test-1";
        zk =new ZooKeeper("rc-zkp-datn-rse-nmg-ooz-woasis:2181",5000,new Exist_API_Sync_Usage());
        connetedSamphore.await();
        System.out.println("zk-19 連線成功!");
        Stat stat=zk.exists(path,new Exist_API_Sync_Usage());
        System.out.println("stat="+stat==null?"為空":"不為空");
        zk.setData(path,"".getBytes(),-1);
        Thread.sleep(Integer.MAX_VALUE);

    }
}