五、Zookeeper基於API操作Node節點
-
org.apache.zookeeper //客戶端主要類檔案
-
org.apache.zookeeper.data //
-
org.apache.zookeeper.server
-
org.apache.zookeeper.server.quorum
-
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.14</version> </dependency>
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.concurrent.CountDownLatch; public class CreateSession implements Watcher { private static CountDownLatch countDownLatch = new CountDownLatch(1);/* 建立會話 */ public static void main(String[] args) throws IOException,InterruptedException { /* 客戶端可以通過建立一個zk例項來連線zk伺服器 new Zookeeper(connectString,sesssionTimeOut,Wather) connectString: 連線地址:IP:埠 sesssionTimeOut:會話超時時間:單位毫秒 Wather:監聽器(當特定事件觸發監聽時,zk會通過watcher通知到客戶端)*/ ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181",5000,new CreateSession()); System.out.println(zooKeeper.getState()); // 計數工具類:CountDownLatch:不讓main方法結束,讓執行緒處於等待阻塞 countDownLatch.await(); System.out.println("客戶端與服務端會話真正建立了"); } /* 回撥方法:處理來自伺服器端的watcher通知 */ public void process(WatchedEvent watchedEvent) { // SyncConnected if(watchedEvent.getState() == Event.KeeperState.SyncConnected){ //解除主程式在CountDownLatch上的等待阻塞 System.out.println("process方法執行了..."); countDownLatch.countDown(); } } }
注意:ZooKeeper客戶端和服務端會話的建立是一個非同步的過程,也就是說在程式中,構造方法會在處理完客戶端初始化工作後立即返回,在大多數情況下,此時並沒有真正建立好一個可用的會話,在會話的生命週期中處於“ CONNECTING的狀態。當該會話真正建立完畢後 ZooKeeper服務端會向會話對應的客戶端傳送一個事件通知,以告知客戶端,客戶端只有在獲取這個通知之後,才算真正建立了會話
建立節點
節點介紹:
/**
* path :節點建立的路徑
* data[] :節點建立要儲存的資料,是個byte型別的
* acl :節點建立的許可權資訊(4種類型)
* ANYONE_ID_UNSAFE : 表示任何人
* AUTH_IDS :此ID僅可用於設定ACL。它將被客戶機驗證的ID替換。
* OPEN_ACL_UNSAFE :這是一個完全開放的ACL(常用)--> world:anyone
* CREATOR_ALL_ACL :此ACL授予建立者身份驗證ID的所有許可權
* createMode :建立節點的型別(4種類型)
* PERSISTENT:持久節點
* PERSISTENT_SEQUENTIAL:持久順序節點
* EPHEMERAL:臨時節點
* EPHEMERAL_SEQUENTIAL:臨時順序節點
String node = zookeeper.create(path,data,acl,createMode);
*/
import org.apache.zookeeper.*; import java.io.IOException; import java.util.concurrent.CountDownLatch; public class CreateNote implements Watcher { private static CountDownLatch countDownLatch = new CountDownLatch(1); private static ZooKeeper zooKeeper; /* 建立會話 */ public static void main(String[] args) throws IOException,InterruptedException,KeeperException { /* 客戶端可以通過建立一個zk例項來連線zk伺服器 new Zookeeper(connectString,Wather) connectString: 連線地址:IP:埠 sesssionTimeOut:會話超時時間:單位毫秒 Wather:監聽器(當特定事件觸發監聽時,zk會通過watcher通知到客戶端) */ zooKeeper = new ZooKeeper("127.0.0.1:2181",new CreateNote()); System.out.println(zooKeeper.getState()); // 計數工具類:CountDownLatch:不讓main方法結束,讓執行緒處於等待阻塞 //countDownLatch.await();\ Thread.sleep(Integer.MAX_VALUE); } /* 回撥方法:處理來自伺服器端的watcher通知 */ public void process(WatchedEvent watchedEvent) { // SyncConnected if(watchedEvent.getState() == Event.KeeperState.SyncConnected){ //解除主程式在CountDownLatch上的等待阻塞 System.out.println("process方法執行了..."); // 建立節點 try { createNoteSync(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } /* 建立節點的方法 */ private static void createNoteSync() throws KeeperException,InterruptedException { /** * path :節點建立的路徑 * data[] :節點建立要儲存的資料,是個byte型別的 * acl :節點建立的許可權資訊(4種類型) * ANYONE_ID_UNSAFE : 表示任何人 * AUTH_IDS :此ID僅可用於設定ACL。它將被客戶機驗證的ID替換。 * OPEN_ACL_UNSAFE :這是一個完全開放的ACL(常用)--> world:anyone * CREATOR_ALL_ACL :此ACL授予建立者身份驗證ID的所有許可權 * createMode :建立節點的型別(4種類型) * PERSISTENT:持久節點 * PERSISTENT_SEQUENTIAL:持久順序節點 * EPHEMERAL:臨時節點 * EPHEMERAL_SEQUENTIAL:臨時順序節點 String node = zookeeper.create(path,createMode); */ // 持久節點 String note_persistent = zooKeeper.create("/lg-persistent","持久節點內容".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); // 臨時節點 String note_ephemeral = zooKeeper.create("/lg-ephemeral","臨時節點內容".getBytes(),CreateMode.EPHEMERAL); // 持久順序節點 String note_persistent_sequential = zooKeeper.create("/lg-persistent_sequential","持久順序節點內容".getBytes(),CreateMode.PERSISTENT_SEQUENTIAL); System.out.println("建立的持久節點" + note_persistent); System.out.println("建立的臨時節點" + note_ephemeral); System.out.println("建立的持久順序節點" + note_persistent_sequential); } }
讀取節點:
public class GetNoteData implements Watcher { private static CountDownLatch countDownLatch = new CountDownLatch(1); private static ZooKeeper zooKeeper; /* 建立會話 */ public static void main(String[] args) throws IOException,new GetNoteData()); System.out.println(zooKeeper.getState()); // 計數工具類:CountDownLatch:不讓main方法結束,讓執行緒處於等待阻塞 //countDownLatch.await();\ Thread.sleep(Integer.MAX_VALUE); } /* 回撥方法:處理來自伺服器端的watcher通知 */ public void process(WatchedEvent watchedEvent) { /* 子節點列表發生改變時,伺服器端會發生noteChildrenChanged事件通知 要重新獲取子節點列表,同時注意:通知是一次性的,需要反覆註冊監聽 */ if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){ List<String> children = null; try { children = zooKeeper.getChildren("/lg-persistent",true); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(children); } // SyncConnected if(watchedEvent.getState() == Event.KeeperState.SyncConnected){ //解除主程式在CountDownLatch上的等待阻塞 System.out.println("process方法執行了..."); // 獲取節點資料的方法 try { getNoteData(); // 獲取節點的子節點列表方法 getChildrens(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } /* 獲取某個節點的內容 */ private void getNoteData() throws KeeperException,InterruptedException { /** * path : 獲取資料的路徑 * watch : 是否開啟監聽 * stat : 節點狀態資訊 * null: 表示獲取最新版本的資料 * zk.getData(path,watch,stat); */ byte[] data = zooKeeper.getData("/lg-persistent",false,null); System.out.println(new String(data)); } /* 獲取某個節點的子節點列表方法 */ public static void getChildrens() throws KeeperException,InterruptedException { /* path:路徑 watch:是否要啟動監聽,當子節點列表發生變化,會觸發監聽 zooKeeper.getChildren(path,watch); */ List<String> children = zooKeeper.getChildren("/lg-persistent",true); System.out.println(children); } }
刪除節點
import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.concurrent.CountDownLatch; public class DeleteNote implements Watcher { private static CountDownLatch countDownLatch = new CountDownLatch(1); private static ZooKeeper zooKeeper; /* 建立會話 */ public static void main(String[] args) throws IOException,new DeleteNote()); System.out.println(zooKeeper.getState()); // 計數工具類:CountDownLatch:不讓main方法結束,讓執行緒處於等待阻塞 //countDownLatch.await();\ Thread.sleep(Integer.MAX_VALUE); } /* 回撥方法:處理來自伺服器端的watcher通知 */ public void process(WatchedEvent watchedEvent) { // SyncConnected if(watchedEvent.getState() == Event.KeeperState.SyncConnected){ //解除主程式在CountDownLatch上的等待阻塞 System.out.println("process方法執行了..."); // 刪除節點 try { deleteNoteSync(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } /* 刪除節點的方法 */ private void deleteNoteSync() throws KeeperException,InterruptedException { /* zooKeeper.exists(path,watch) :判斷節點是否存在 zookeeper.delete(path,version) : 刪除節點 */ Stat stat = zooKeeper.exists("/lg-persistent/c1",false); System.out.println(stat == null ? "該節點不存在":"該節點存在"); if(stat != null){ zooKeeper.delete("/lg-persistent/c1",-1); } Stat stat2 = zooKeeper.exists("/lg-persistent/c1",false); System.out.println(stat2 == null ? "該節點不存在":"該節點存在"); } }
更新節點
/* 更新資料節點內容的方法 */ private void updateNoteSync() throws KeeperException,InterruptedException { /* path:路徑 data:要修改的內容 byte[] version:為-1,表示對最新版本的資料進行修改 zooKeeper.setData(path,version); */ byte[] data = zooKeeper.getData("/lg-persistent",null); System.out.println("修改前的值:" + new String(data)); //修改/lg-persistent 的資料 stat: 狀態資訊物件 Stat stat = zooKeeper.setData("/lg-persistent","客戶端修改了節點資料".getBytes(),-1); byte[] data2 = zooKeeper.getData("/lg-persistent",null); System.out.println("修改後的值:" + new String(data2)); }