zookeeper原生客戶端
一:zookeeper常用客戶端
- zookeeper:官方提供的,原生的api,使用起來比較麻煩,比較底層,不夠直接,不建議使用。
- zkclient: 對原生api的封裝,開源專案(https://github.com/adyliu/zkclient),dubbo中使用的是這個。
- Apache Curator:Apache的開源專案,對原生客戶端zookeeper進行封裝,易於使用, 功能強大, 一般都是使用這個框架。
二:zookeeper原生客戶端
<dependency>
<groupId>org.apache.zookeeper</groupId >
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
</dependency>
public enum CreateMode {
/** 持久節點 */
PERSISTENT(0, false, false),
/** 持久順序節點 */
PERSISTENT_SEQUENTIAL(2, false, true),
/** 臨時節點(本次會話有效,會話結束後會自動刪除) */
EPHEMERAL(1, true , false),
/** 臨時順序節點 */
EPHEMERAL_SEQUENTIAL(3, true, true);
}
public class ZooKeeper {
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException;
// 不存在返回null
public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException;
/**
* 重複建立同一個節點會拋異常
* 建立子節點必選先保證父節點已經建立好了
*/
public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException;
public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException;
public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException;
public Stat setData(String path, byte[] data, int version) throws KeeperException, InterruptedException;
/**
* 只能刪除一個節點,不支援遞迴的刪除某個路徑
* 刪除的節點不能包含子節點
*/
public void delete(String path, int version) throws InterruptedException, KeeperException;
// 新增認證資訊(類似於密碼)
public void addAuthInfo(String scheme, byte[] auth);
// ACL(Access Control List)設定節點訪問許可權列表,每個節點都可以設定訪問許可權,指定只有特定的客戶端才能訪問和操作節點。
public Stat setACL(String path, List<ACL> acl, int aclVersion) throws KeeperException, InterruptedException;
public List<ACL> getACL(String path, Stat stat) throws KeeperException, InterruptedException;
public synchronized void close() throws InterruptedException;
}
public class ZooDefs {
public interface Ids {
Id ANYONE_ID_UNSAFE = new Id("world", "anyone");
Id AUTH_IDS = new Id("auth", "");
/** 這是一個完全開放的許可權,所有客戶端都有許可權 */
ArrayList<ACL> OPEN_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(31, ANYONE_ID_UNSAFE)));
/** 只有建立節點的客戶端才有所有許可權 */
ArrayList<ACL> CREATOR_ALL_ACL = new ArrayList(Collections.singletonList(new ACL(31, AUTH_IDS)));
/** 所有客戶端只有讀取的 */
ArrayList<ACL> READ_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(1, ANYONE_ID_UNSAFE)));
}
}
EventType
public static enum EventType {
None(-1),
NodeCreated(1),
NodeDeleted(2),
NodeDataChanged(3),
NodeChildrenChanged(4);
}
KeeperState
public static enum KeeperState {
Disconnected(0),
SyncConnected(3),
AuthFailed(4),
ConnectedReadOnly(5),
SaslAuthenticated(6),
Expired(-112);
}
注意:程式不能debug,只能run,如果debug會報錯org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired for /app1
原因是debug造成程式停止執行,導致會話過期
public class ZookeeperTest {
/** zookeeper地址 */
static final String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
/** session超時時間 */
static final int sessionTimeout = 5000;
/** 阻塞程式執行,用於等待zookeeper連線成功,傳送成功訊號 */
static final CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// 事件狀態
Event.KeeperState keeperState = watchedEvent.getState();
if (Event.KeeperState.SyncConnected == keeperState) {
// 事件型別
Event.EventType eventType = watchedEvent.getType();
if (Event.EventType.None == eventType) {
// 連線成功建立,傳送訊號量,讓後續阻塞程式向下執行
countDownLatch.countDown();
System.out.println("連線成功:" + watchedEvent);
}
}
}
};
// 注意:ZooKeeper客戶端和伺服器會話的建立是一個非同步的過程,也就是說程式方法在處理完客戶端初始化後立即返回
// 也就是說可能並沒有真正構建好一個可用的會話,只有會話宣告週期處於SyncConnected時才算真正建立好會話
// 這也是為什麼要使用CountDownLatch來等待連線成功
ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
// 新增認證資訊
// digest:是最常見的許可權控制模式,也更符合我們對許可權控制的認證,類似於使用者名稱和密碼的方式,zk會對形成的許可權標識先後進行兩次編碼處理,分別是SHA-1加密演算法、Base64編碼
zooKeeper.addAuthInfo("digest", "123456".getBytes());
// 如果沒有連線成功則進行阻塞,直到連線成功才繼續往下執行,連線時需要時間的,可能不會立即連線成功,肯能會等一兩秒之後才連線成功,
countDownLatch.await();
String app1Node = "/app1";
try {
Stat app1Stat = zooKeeper.exists(app1Node, false);
if (app1Stat == null) {
List<ACL> acls = new ArrayList<>(1);
for (ACL acl : ZooDefs.Ids.CREATOR_ALL_ACL) {
acls.add(acl);
}
// 建立節點(如果節點已存在則丟擲異常),返回節點名稱
// 該節點需要認證,只有認證的其它zk客戶端才能操作該節點
String app1NodePath = zooKeeper.create(app1Node, "app1".getBytes(), acls, CreateMode.PERSISTENT);
System.out.println("app1NodePath=" + app1NodePath);
Stat p1Stat = zooKeeper.exists(app1Node + "/p_1", false);
if (p1Stat == null) {
// 建立子節點(父節點必須存在,父節點不存在則拋異常)
// CreateMode.EPHEMERAL 表示臨時節點,當會話結束後會立即刪除
zooKeeper.create(app1Node + "/p_1", "p_1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create(app1Node + "/p_2", "p_2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create(app1Node + "/p_3", "p_3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
}
byte[] data = zooKeeper.getData(app1Node, false, null);
String app1NodeValue = new String(data);
System.out.println("app1NodeValue=" + app1NodeValue);
zooKeeper.setData(app1Node, "my p_1".getBytes(), -1);
List<String> children = zooKeeper.getChildren(app1Node, false);
for (String path : children) {
String fullPath = app1Node + "/" + path;
String nodeValue = new String(zooKeeper.getData(fullPath, false, null));
System.out.println(fullPath + "=" + nodeValue);
}
// version版本號 -1: 全刪除
zooKeeper.delete(app1Node + "/p_2", -1);
// Thread.sleep(10000);
} catch (Exception e) {
System.out.println(e);
} finally {
zooKeeper.close();
}
}
}
原生zookeeper客戶端使用注意:
ZooKeeper建立連線時是非同步建立的,有可能已經開始呼叫客戶端方法了,連線還沒有完全建立好,所以在建立連線時一般將非同步建立客戶端變成同步建立客戶端
session過期的問題: 在極端情況下,出現ZooKeeper session過期,客戶端需要自己去監聽該狀態並重新建立ZooKeeper例項
自動恢復(failover)的問題: 當client與一臺server的連線丟失,並試圖去連線另外一臺server時, client將回到初始連線模式
對可恢復異常的處理:當在server端建立一個有序ZNode,而在將節點名返回給客戶端時崩潰,此時client端丟擲可恢復的異常,使用者需要自己捕獲這些異常並進行重試
使用場景的問題:Zookeeper提供了一些標準的使用場景支援,但是ZooKeeper對這些功能的使用說明文件很少,而且很容易用錯.在一些極端場景下如何處理,zk並沒有給出詳細的文件說明.比如共享鎖服務,當伺服器端建立臨時順序節點成功,但是在客戶端接收到節點名之前掛掉了,如果不能很好的處理這種情況,將導致死鎖
三:Watcher示例
Watcher:監控者,監控某個節點對應的操作,也就是對某個節點操作時執行一下回調函式,Watcher#process()
如果某個動作(操作節點)需要進行監控需要設定某個動作的watch=true或者設定exists(path, true)。注意執行一次動作和一次watch繫結,如果想每次都需要監控,那麼每次動作都必須線上設定監控
public class ZookeeperWatcher implements Watcher {
/** zookeeper地址 */
private static final String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
/** session超時時間 */
private static final int sessionTimeout = 5000;
/** 阻塞程式執行,用於等待zookeeper連線成功,傳送成功訊號 */
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
private AtomicInteger seq = new AtomicInteger();
private ZooKeeper zooKeeper;
private static final String PARENT_PATH = "/p";
private static final String CHILDEREN_PATH = "/p/c";
public void createConnection(String connectString, int sessionTimeout){
try {
this.releaseConnection();
zooKeeper = new ZooKeeper(connectString, sessionTimeout, this);
countDownLatch.await();
} catch (Exception e) {
}
}
public void releaseConnection() {
if (this.zooKeeper == null) return;
try {
this.zooKeeper.close();
System.out.println("斷開連線");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public boolean createPath(String path, String data, boolean needWatch) {
try {
// 設定監控(由於zookeeper的監控都是一次性的,所以每次都需要重新設定監控)
System.out.println("新增節點");
this.zooKeeper.exists(path, needWatch);
this.zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("[" + Thread.currentThread().getName() + "] 節點建立成功 path=" + path + ", content=" + data);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
public void writeData(String path, String data) {
System.out.println("修改節點");
try {
this.zooKeeper.setData(path, data.getBytes(), -1);
} catch (Exception e) {
e.printStackTrace();
}
}
public List<String> getChildren(String path, boolean needWatch) {
try {
System.out.println("獲取子節點");
return this.zooKeeper.getChildren(path, needWatch);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public void deleteAllTestPath(boolean needWatch) {
if (this.reSetWatch(CHILDEREN_PATH) != null) {
this.deleteNode(CHILDEREN_PATH);
}
if (this.reSetWatch(PARENT_PATH) != null) {
this.deleteNode(PARENT_PATH);
}
}
public void deleteNode(String path) {
try {
System.out.println("刪除節點:" + path);
this.zooKeeper.delete(path, -1);
} catch (Exception e) {
e.printStackTrace();
}
}
public Stat reSetWatch(String path){
try {
// 重新設定監控
return this.zooKeeper.exists(path, true);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("process watchedEvent=" + watchedEvent);
if (watchedEvent == null) return;
Event.KeeperState keeperState = watchedEvent.getState();
Event.EventType eventType = watchedEvent.getType();
String path = watchedEvent.getPath();
String prefix = "[Watcher(" + Thread.currentThread().getName() + ") -" + this.seq.incrementAndGet() + "] path=" + path + "\t";
System.out.println(prefix + "連線狀態:" + keeperState);
System.out.println(prefix + "事件:" + eventType);
if (Event.KeeperState.SyncConnected == keeperState) {
if (Event.EventType.None == eventType) {
System.out.println(prefix + "成功連線上ZK伺服器 》》》》》");
countDownLatch.countDown();
} else if (Event.EventType.NodeCreated == eventType) {
System.out.println(prefix + "建立節點");
this.sleep(100);
} else if (Event.EventType.NodeDataChanged == eventType) {
System.out.println(prefix + " 修改節點");
this.sleep(100);
} else if (Event.EventType.NodeChildrenChanged == eventType) {
System.out.println(prefix + " 修改子節點");
this.sleep(3000);
} else if (Event.EventType.NodeDeleted == eventType) {
System.out.println(prefix + " 刪除子節點 " + path);
this.sleep(3000);
}
} else if (Event.KeeperState.Disconnected == keeperState) {
System.out.println(prefix + " 連線斷開 XXXXX");
} else if (Event.KeeperState.AuthFailed == keeperState) {
System.out.println(prefix + " 許可權檢查失敗");
} else if (Event.KeeperState.Expired == keeperState) {
System.out.println(prefix + " 會話過期");
}
System.out.println("--------------------------------------------------------------------");
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
ZookeeperWatcher zkWatcher = new ZookeeperWatcher();
// 第一次觸發watcher 建立連線會觸發
zkWatcher.createConnection(connectString, sessionTimeout);
Thread.sleep(1000);
// true:表示需要監控, 會觸發第二次watcher
if (zkWatcher.createPath(PARENT_PATH, System.currentTimeMillis() + "", true)) {
Thread.sleep(1000);
// 重新建立監控(每次監控完就會斷開,下次要想還監控必須手動再次設定監控)
zkWatcher.reSetWatch(PARENT_PATH);
// 第三次觸發監控(因監控是一次性的,createPath監控用完了就斷開監控了,而上面又再次建立了監控,所以writeData能被監控到)
zkWatcher.writeData(PARENT_PATH, System.currentTimeMillis() + "");
// 監控父節點(當下面的程式碼執行時,父節點增加了一個子節點也屬於父節點發生了變化)
zkWatcher.getChildren(PARENT_PATH, true);
// 監控子節點,會觸發監控
zkWatcher.createPath(CHILDEREN_PATH, System.currentTimeMillis() + "", true);
// 觸發兩次監控
zkWatcher.deleteAllTestPath(true);
}
zkWatcher.releaseConnection();
Thread.sleep(10000);
}
}