zookeeper - 通過java程式碼連線zookeeper(2)
阿新 • • 發佈:2018-12-04
首先建立一個Maven專案
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</pom.xmlmodelVersion> <groupId>groupId</groupId> <artifactId>code</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!-- https://mvnrepository.com/artifact/com.101tec/zkclient --> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.11</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.4-beta</version> <type>pom</type> </dependency> <!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies> </project>
package com.amber; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; /** * zookeeper * 連線zookeeper * 建立znode * 獲取znode值 * 斷開連結 */ public class ZookeeperDemo { private ZooKeeper zookeeper; /** * 連結zookeeper * @return * @throws IOException */ public ZooKeeper zkConnect( ) throws IOException { //zookeeper的ip:埠 String path = "127.0.0.1:2181"; //第二個引數是超時時間,第三個引數是設定觀察者,現在可以先不管 zookeeper = new ZooKeeper(path, 20 * 1000, null); return zookeeper; } /** * 建立znode節點 * @param path znode的路徑 * @param value znode的值 * @param watcher * @param node //建立node的模式 * @throws KeeperException * @throws InterruptedException */ public void createZnode(String path, byte[] value, Watcher watcher, CreateMode node ) throws KeeperException, InterruptedException { zookeeper.create(path, value, ZooDefs.Ids.OPEN_ACL_UNSAFE, node); } /** * 通過path獲得znode的值 * @param path * @return * @throws KeeperException * @throws InterruptedException */ public String getZnodeValue(String path ) throws KeeperException, InterruptedException { //第二個值是代表是否開啟監聽,這裡還是先不管.第三個引數就是結構體 byte[] data = zookeeper.getData(path, false, new Stat()); return new String(data); } public void close() { try { if (zookeeper != null) { zookeeper.close(); } } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { ZookeeperDemo zookeeperDemo = new ZookeeperDemo(); //獲取連線 ZooKeeper zooKeeper = zookeeperDemo.zkConnect(); //建立znode zookeeperDemo.createZnode("/amber", "hahaha".getBytes(), null, CreateMode.PERSISTENT); //獲取znode的值 String znodeValue = zookeeperDemo.getZnodeValue("/amber"); System.out.println(znodeValue); zookeeperDemo.close(); } }ZookeeperDemo
通過上面的程式碼就可以實現通過java程式碼操控zookeeper.但是你可能有疑惑的是
- Create.PERSISTENT是什麼
- watcher是什麼
Znode的四種類型
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package org.apache.zookeeper; import org.apache.yetus.audience.InterfaceAudience.Public; import org.apache.zookeeper.KeeperException.BadArgumentsException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Public public enum CreateMode { PERSISTENT(0, false, false), PERSISTENT_SEQUENTIAL(2, false, true), EPHEMERAL(1, true, false), EPHEMERAL_SEQUENTIAL(3, true, true); private static final Logger LOG = LoggerFactory.getLogger(CreateMode.class); private boolean ephemeral; private boolean sequential; private int flag; private CreateMode(int flag, boolean ephemeral, boolean sequential) { this.flag = flag; this.ephemeral = ephemeral; this.sequential = sequential; } public boolean isEphemeral() { return this.ephemeral; } public boolean isSequential() { return this.sequential; } public int toFlag() { return this.flag; } public static CreateMode fromFlag(int flag) throws KeeperException { switch(flag) { case 0: return PERSISTENT; case 1: return EPHEMERAL; case 2: return PERSISTENT_SEQUENTIAL; case 3: return EPHEMERAL_SEQUENTIAL; default: String errMsg = "Received an invalid flag value: " + flag + " to convert to a CreateMode"; LOG.error(errMsg); throw new BadArgumentsException(errMsg); } } }CreateMode
znode分四種類型
PERSISTENT
持久節點 對應命令 create path value
PERSISTENT_SEQUENTIAL 順序自動編號持久化節點,這種節點會根據當前已存在的節點數自動加
1 (有序持久節點)create -s path value
EPHEMERAL 臨時節點, 客戶端session超時這類節點就會被自動刪除 create -e path value
EPHEMERAL_SEQUENTIAL 臨時自動編號節點 (臨時有序節點) create -s -e path value
CreateMode是一個列舉型別,裡面有四個物件分別是.表示的就是Znode的型別
PERSISTENT(0, false, false), //持久節點 PERSISTENT_SEQUENTIAL(2, false, true), //有序節點 EPHEMERAL(1, true, false), //臨時節點 只存在本次session中,當伺服器重啟後就會不見 EPHEMERAL_SEQUENTIAL(3, true, true); //有序臨時節點 重啟後資料不見
在建立持久節點(PERSISTENT)的時候,應該注意因為znode的path是不允許重複的,因此在建立持久節點之前,應先判斷節點是否存在。但是持久有序節點(PERSISTENT_SEQUENTIAL)會自動在/path後面跟上dataVersion序號
if (zooKeeper.exists(path, false) == null) { zookeeperWatchDemo.createZnode(path, value.getBytes(), null, CreateMode.PERSISTENT); }
watcher
Zookeeper支援釋出訂閱功能,引入了watcher機制進行監聽。當資料進行變動以後,可以及時通知客戶端,資料進行了變動,並且把相應的時間通知給Watcher的Client。
watcher的特性:
- Watcher一次性觸發器:只能監聽一次,是一個一次性的動作,如果需要監聽多次,那麼應該遞迴
- 可以使用系統預設的watcher,也可以自定義Watcher.自定義Watcher必須實現
org.apache.zookeeper.Watcher介面
- Zookeeper的getData(),getChildren(),exists()都可以設定Watch選項。當watch為false的時候,或者watcher為null的時候代表不開啟watch
new Watcher() { @Override public void process(WatchedEvent watchedEvent) { triggerWatch(path); }
package com.amber; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; public class ZookeeperWatchDemo { private ZooKeeper zookeeper; private String oldValue = ""; private String newValue = ""; public ZooKeeper zkConnect( ) throws IOException { String path = "127.0.0.1:2181"; zookeeper = new ZooKeeper(path, 20 * 1000, null); return zookeeper; } public void createZnode(String path, byte[] value, Watcher watcher, CreateMode node ) throws KeeperException, InterruptedException { zookeeper.create(path, value, ZooDefs.Ids.OPEN_ACL_UNSAFE, node); } public String getZnodeValue(final String path ) throws KeeperException, InterruptedException { byte[] data = zookeeper.getData(path, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { triggerWatch(path); } }, new Stat()); oldValue = new String(data); return new String(data); } public boolean triggerWatch (String path) { byte[] data = new byte[0]; try { data = zookeeper.getData(path, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { triggerWatch(path); } }, new Stat()); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } newValue = new String(data); if (oldValue.equals(newValue)) { System.out.println("on change"); return false; } else { System.out.println("oldvalue: " + oldValue + "new value: " + newValue); oldValue = newValue; return true; } } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { //建立 ZookeeperWatchDemo zookeeperWatchDemo = new ZookeeperWatchDemo(); ZooKeeper zooKeeper = zookeeperWatchDemo.zkConnect(); String path = "/amberas"; String value = "hahahahaha"; if (zooKeeper.exists(path, false) == null) { zookeeperWatchDemo.createZnode(path, value.getBytes(), null, CreateMode.PERSISTENT); } String znodeValue = zookeeperWatchDemo.getZnodeValue(path); System.out.println(znodeValue); Thread.sleep(1000 * 60 * 50); } }ZookeeperWatchDemo