Java API操作ZK node
阿新 • • 發佈:2019-01-31
建立會話
- 建立簡單連線
/**
* 測試建立Zk會話
* Created by liuhuichao on 2017/7/25.
*/
public class ZooKeeper_Constructor_Usage_Simple implements Watcher {
private static CountDownLatch connectedSemaphore=new CountDownLatch(1);
public static void main(String[] args) throws Exception{
ZooKeeper zk=new ZooKeeper("192.168.99.215:2181",5000,new ZooKeeper_Constructor_Usage_Simple());
System.out.println(zk.getState());
connectedSemaphore.await();
System.out.println("zk session established");
}
/**
* 處理來自ZK服務端的watcher通知
* @param watchedEvent
*/
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("receive watched event:"+watchedEvent);
if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
connectedSemaphore.countDown();//解除等待阻塞
}
}
}
- 複用會話
/**
* 複用sessionId和sessionPassword的會話
* Created by liuhuichao on 2017/7/25.
*/
public class ZooKeeper_Constructor_Usage_With_sid_password implements Watcher {
private static CountDownLatch connectedSemaphore=new CountDownLatch(1);
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("receive watched event:"+watchedEvent);
if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
connectedSemaphore.countDown();
}
}
public static void main(String[] args) throws Exception{
ZooKeeper zooKeeper=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeper_Constructor_Usage_With_sid_password());
connectedSemaphore.await();
long sessionId=zooKeeper.getSessionId();
byte[] password=zooKeeper.getSessionPasswd();
/**使用錯誤的sessionID跟sessionPwd連連線測試[192.168.99.215 lhc-centos0]**/
ZooKeeper zkWrong=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeper_Constructor_Usage_With_sid_password(),1l,"lhc".getBytes());
/**使用正確的來進行連線**/
ZooKeeper zkTrue=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeper_Constructor_Usage_With_sid_password(),sessionId,password);
Thread.sleep(Integer.MAX_VALUE);
}
}
建立節點
- 使用同步API建立節點
/**
* 使用同步API建立一個節點
* Created by liuhuichao on 2017/7/25.
*/
public class ZooKeeper_Create_API_Sync_Usage implements Watcher {
private static CountDownLatch connectedSemaphore=new CountDownLatch(1);
@Override
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
connectedSemaphore.countDown();
}
}
public static void main(String[] args) throws Exception{
ZooKeeper zooKeeper=new ZooKeeper("192.168.99.215:2181",5000,new ZooKeeper_Create_API_Sync_Usage());
connectedSemaphore.await();
String path1=zooKeeper.create("/zk-test1","lhc".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);//臨時結點
System.out.println(path1+" 建立成功!");
String path2=zooKeeper.create("/zk-test2","lllhhhhhhhhhhhhhhhhc".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(path2+" 建立成功!");
}
}
- 使用非同步API建立一個節點
/**
* 使用非同步API建立一個節點
* Created by liuhuichao on 2017/7/25.
*/
public class ZooKeeper_Create_API_ASync_Usage implements Watcher{
private static CountDownLatch connectedSamphore=new CountDownLatch(1);
@Override
public void process(WatchedEvent watchedEvent) {
if(watchedEvent.getState()== Event.KeeperState.SyncConnected){
connectedSamphore.countDown();
}
}
public static void main(String[] args) throws Exception{
ZooKeeper zk1=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeper_Create_API_ASync_Usage());
connectedSamphore.await();
zk1.create("/zk-test-1","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,new IStringCallBack(),"i am a context");
zk1.create("/zk-test-2","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,new IStringCallBack(),"i am a context");
zk1.create("/zk-test-3","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL,new IStringCallBack(),"i am a context");
Thread.sleep(Integer.MAX_VALUE);
}
}
/**
* Created by liuhuichao on 2017/7/26.
*/
public class IStringCallBack implements AsyncCallback.StringCallback{
@Override
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("result:"+rc+"; path="+path+" ctx="+ctx+" name = "+name);
}
}
刪除節點
/**
* 刪除zk的持久結點
* Created by liuhuichao on 2017/7/26.
*/
public class ZooKeeperDeleteNode implements Watcher {
private static CountDownLatch conntedSamphore=new CountDownLatch(1);
@Override
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
conntedSamphore.countDown();
}
}
public static void main(String[] args) throws Exception{
/**同步刪除節點**/
ZooKeeper zooKeeper=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeperDeleteNode());
conntedSamphore.await();
zooKeeper.delete("/zk-test-30000000014",0);
}
}
讀取資料
- 使用同步API獲取子節點列表
/**
*獲取結點-同步
* Created by liuhuichao on 2017/7/26.
*/
public class ZooKeeper_GetChildren_API_Sync_Usage implements Watcher {
private static CountDownLatch conntedSamphore=new CountDownLatch(1);
private static ZooKeeper zooKeeper=null;
@Override
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
conntedSamphore.countDown();
}else if(watchedEvent.getType()== Event.EventType.NodeChildrenChanged){
try {
System.out.println("--------------------------------------reget children:"+zooKeeper.getChildren(watchedEvent.getPath(),true));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception{
String path="/zk-test-1";
zooKeeper=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeper_GetChildren_API_Sync_Usage());
conntedSamphore.await();
zooKeeper.create(path+"/test1","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
List<String> childrenList=zooKeeper.getChildren(path,true);
System.out.println(childrenList);
zooKeeper.create(path+"/test2","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
Thread.sleep(Integer.MAX_VALUE);
}
}
- 使用非同步API獲取子節點列表
**
* 非同步獲取結點
* Created by liuhuichao on 2017/7/26.
*/
public class ZooKeeper_GetChildren_API_ASync_Usage implements Watcher {
private static CountDownLatch connectedSemphore=new CountDownLatch(1);
private static ZooKeeper zk=null;
@Override
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
connectedSemphore.countDown();
}else if(watchedEvent.getType()== Event.EventType.NodeChildrenChanged){
try {
System.out.println("node changed===="+zk.getChildren(watchedEvent.getPath(),true));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception{
String path="/zk-test-1";
zk=new ZooKeeper("lhc-centos0:2181",5000,new ZooKeeper_GetChildren_API_ASync_Usage());
connectedSemphore.await();
zk.create(path+"/test3","".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.getChildren(path,true,new ICChild2Callback(),null);
Thread.sleep(Integer.MAX_VALUE);
}
}
/**
* 非同步獲取結點回調介面
* Created by liuhuichao on 2017/7/26.
*/
public class ICChild2Callback implements AsyncCallback.Children2Callback{
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
System.out.println("get children zonde result:[reponse code:"+rc+" path="+path+" ctx="+ctx+" childrenlist="+children+" stat="+stat);
}
}
- 使用同步API獲取結點資料
/**
*
* 同步獲取資料
* Created by liuhuichao on 2017/7/27.
*/
public class GetData_API_Sync_Usage implements Watcher{
private static CountDownLatch conntedSamphore=new CountDownLatch(1);
private static ZooKeeper zk=null;
private static Stat stat=new Stat();
@Override
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
conntedSamphore.countDown();
}else if(watchedEvent.getType()== Event.EventType.NodeCreated){
System.out.println("node changed:"+watchedEvent.getPath());
}
}
public static void main(String[] args) throws Exception{
String path="/test-1";
zk =new ZooKeeper("rc-zkp-datn-rse-nmg-ooz-woasis:2181",5000,new GetData_API_Sync_Usage());
conntedSamphore.await();
System.out.println("zk-19 連線成功!");
//zk.create(path+"/lhc", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
List<String> children=zk.getChildren(path,new GetData_API_Sync_Usage());
System.out.println("children node:"+children);
zk.setData(path+"/lhc","memeda".getBytes(),-1);
byte[] nodeValue=zk.getData(path+"/lhc",true,stat);
System.out.println(new String(nodeValue));
}
}
- 使用非同步API獲取結點資料
/**
*
* 同步/非同步獲取資料
* Created by liuhuichao on 2017/7/27.
*/
public class GetData_API_Sync_Usage implements Watcher{
private static CountDownLatch conntedSamphore=new CountDownLatch(1);
private static ZooKeeper zk=null;
private static Stat stat=new Stat();
@Override
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
conntedSamphore.countDown();
}else if(watchedEvent.getType()== Event.EventType.NodeCreated){
System.out.println("node changed:"+watchedEvent.getPath());
}
}
public static void main(String[] args) throws Exception{
String path="/test-1";
zk =new ZooKeeper("rc-zkp-datn-rse-nmg-ooz-woasis:2181",5000,new GetData_API_Sync_Usage());
conntedSamphore.await();
System.out.println("zk-19 連線成功!");
//zk.create(path+"/lhc", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
List<String> children=zk.getChildren(path,new GetData_API_Sync_Usage());
System.out.println("children node:"+children);
zk.setData(path+"/lhc","lllhc".getBytes(),-1);
zk.getData(path+"/lhc",true,new IDataCallback(),null);//非同步獲取資料
Thread.sleep(Integer.MAX_VALUE);
}
}
/**
* 非同步獲取node資料回撥
* Created by liuhuichao on 2017/7/27.
*/
public class IDataCallback implements AsyncCallback.DataCallback {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
System.out.println("rc="+rc+" ;path="+path+" ;ctx="+ctx+" ;data="+data+" ;stat="+stat);
System.out.println("string data="+new String(data));
System.out.println("max version="+stat.getVersion());
}
}
更新資料
- 同步設定資料
zk.setData(path+"/lhc","lllhc".getBytes(),-1);//同步設定資料
- 非同步設定資料
/**
*
* 同步/非同步獲取資料
* Created by liuhuichao on 2017/7/27.
*/
public class GetData_API_Sync_Usage implements Watcher{
private static CountDownLatch conntedSamphore=new CountDownLatch(1);
private static ZooKeeper zk=null;
private static Stat stat=new Stat();
@Override
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
conntedSamphore.countDown();
}else if(watchedEvent.getType()== Event.EventType.NodeCreated){
System.out.println("node changed:"+watchedEvent.getPath());
}
}
public static void main(String[] args) throws Exception{
String path="/test-1";
zk =new ZooKeeper("rc-zkp-datn-rse-nmg-ooz-woasis:2181",5000,new GetData_API_Sync_Usage());
conntedSamphore.await();
System.out.println("zk-19 連線成功!");
//zk.create(path+"/lhc", "".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
List<String> children=zk.getChildren(path,new GetData_API_Sync_Usage());
System.out.println("children node:"+children);
//zk.setData(path+"/lhc","lllhc".getBytes(),-1);//同步設定資料
zk.setData(path+"/lhc","lhc".getBytes(),-1,new IStatCallback(),null);
zk.getData(path+"/lhc",true,new IDataCallback(),null);//非同步獲取資料
Thread.sleep(Integer.MAX_VALUE);
}
}
/**
* 非同步設定資料回撥介面
* Created by liuhuichao on 2017/7/27.
*/
public class IStatCallback implements AsyncCallback.StatCallback{
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println("rc="+rc+" ;path="+path+" ;ctx="+ctx+" ;stat="+stat);
if(rc==0){
System.out.println("資料設定成功!");
}
}
}
檢測節點是否存在
/**
* 檢測zk node
* Created by liuhuichao on 2017/7/27.
*/
public class Exist_API_Sync_Usage implements Watcher{
private static CountDownLatch connetedSamphore=new CountDownLatch(1);
private static ZooKeeper zk=null;
@Override
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
connetedSamphore.countDown();
}else if(Event.EventType.NodeCreated==watchedEvent.getType()){
System.out.println("node created=="+watchedEvent.getPath());
}else if(Event.EventType.NodeDataChanged==watchedEvent.getType()){
System.out.println("node changed=="+watchedEvent.getPath());
}else if(Event.EventType.NodeDeleted==watchedEvent.getType()){
System.out.println("node deleted=="+watchedEvent.getPath());
}
}
public static void main(String[] args)throws Exception {
String path="/test-1";
zk =new ZooKeeper("rc-zkp-datn-rse-nmg-ooz-woasis:2181",5000,new Exist_API_Sync_Usage());
connetedSamphore.await();
System.out.println("zk-19 連線成功!");
Stat stat=zk.exists(path,new Exist_API_Sync_Usage());
System.out.println("stat="+stat==null?"為空":"不為空");
zk.setData(path,"".getBytes(),-1);
Thread.sleep(Integer.MAX_VALUE);
}
}