Java API 操作 zookeeper
阿新 • • 發佈:2018-12-13
1.需要的jar包: 1.1 zookeeper的核心包:zookeeper-3.4.5.jar 1.2 log4j的jar包:log4j-1.2.15.jar 1.3 netty的jar包:netty-3.2.2.Final.jar 1.4 slf4j的jar包:slf4j-api-1.6.1.jar 和 slf4j-log4j12-1.6.1.jar 在zookeeper的壓縮檔案裡面就有: https://pan.baidu.com/s/1imnS-78bqSorM5tkI4SkEQ
程式碼:
package com.thp.bigdata; import java.io.IOException; import java.util.List; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.junit.Before; import org.junit.Test; /** * zookeeper的客戶端 資料的增刪改查,已經對資料的監聽跟響應 * * @author tommy * */ public class SimpleZkClient { // zookeeper叢集裡面只要有半數存活就可用 /** * 注意點: 如果是直接使用的 ip地址的話,那麼我測試的時候總會報錯,ConnectionLoss 的錯誤 * 在host檔案裡面配置了IP地址 然後使用主機名的話,程式是可以執行的 */ // private static final String connectString = "192.168.17.150:2181,192.168.17.151:2181,192.168.17.153:2181"; private static final String connectString = "bd1:2181,bd2:2181,bd3:2181"; // 超時時間 private static final int sessionTimeout = 2000; ZooKeeper zkClient = null; @Before public void init() throws IOException { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { // zookeeper可以根據事件作出相應的響應 @Override public void process(WatchedEvent event) { // 收到事件通知後的回撥函式 (應該是我們自己的事件處理邏輯) System.out.println(event.getType() + " -- " + event.getPath()); // 事件的型別 -- 事件發生的節點 } }); } /** * 資料的增刪改查 * * @throws InterruptedException * @throws KeeperException */ // 建立資料節點到zookeeper中 @Test public void testCreate() throws KeeperException, InterruptedException { // 第一個引數 要建立節點的路徑 // 第二個引數 data 資料可以是任意的 只需要轉換成byte型別的陣列就可以 // 第三個引數是 許可權: Ids.CREATOR_ALL_ACL --- 是一個列舉 可以選擇對應的許可權 一般選擇 // OPEN_ACL_UNSAFE // 第四個引數 節點的型別 四種可供選擇 String nodeCreated = zkClient.create("/eclipse3", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 獲取子節點 @Test public void getChildren() throws KeeperException, InterruptedException { List<String> children = zkClient.getChildren("/", true); for(String child : children) { System.out.println(child); } // 睡一段時間 可以檢視到監聽的變化 在zookeeper客戶端 進行修改 // 注意 : 這個監聽器只監聽一次 Thread.sleep(Long.MAX_VALUE); // NodeChildrenChanged -- / } }
第二個測試程式碼 可以監聽zookeeper上面節點的變化,但是隻會監聽一次,之後就不再監聽 這樣是不符合業務的需求的,我們應該需要實時的監聽
我們需要改進程式碼:
@Before public void init() throws IOException { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { // zookeeper可以根據事件作出相應的響應 @Override public void process(WatchedEvent event) { // 收到事件通知後的回撥函式 (應該是我們自己的事件處理邏輯) System.out.println(event.getType() + " -- " + event.getPath()); // 事件的型別 -- 事件發生的節點 try { // getChildren 只是為了 設定這個監聽 相當於又監聽了 zkClient.getChildren("/", true); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } }); }
判斷節點是否存在:
// 判斷znode是否存在
@Test
public void testExist() throws KeeperException, InterruptedException {
Stat stat = zkClient.exists("/eclipse1", false);
System.out.println(stat == null ? "not exist" : "exist");
}
刪除節點:
// 刪除znode資料 @Test public void deleteZnode() throws KeeperException, InterruptedException, UnsupportedEncodingException { // 引數2是指定刪除的版本 -1 表示刪除所有的版本 zkClient.delete("/eclipse1", -1); }
修改節點資料:
// 修改znode資料
@Test
public void setData() throws KeeperException, InterruptedException, UnsupportedEncodingException {
zkClient.setData("/app1", "i lost you".getBytes(), -1);
byte[] data = zkClient.getData("/app1", false, null);
System.out.println(new String(data));
}
問題:ConnesctionLoss 連線丟失: 原因: 開啟zookeeper的配置檔案 /root/apps/zookeeper-3.4.5/conf 裡面配置的時候使用的就是主機名,所以我們在使用Java API 的時候連線也需要使用主機名,否則就會連線拒絕 是繫結在名字上的而不是繫結在IP上的,兩端得是一致的