zookeeper——客戶端Curator 的簡單操作
阿新 • • 發佈:2021-02-03
目錄
curator簡介
curator是Netfilx公司開源的一個zookeeper客戶端,後捐獻給apache,curator框架在zookeeper原生API介面上進行了包裝,解決了很多zookeeper客戶端非常底層的細節開發。提供zookeeper各種應用場景(比如:分散式鎖服務、叢集領導選舉、共享計數器、快取機制、分散式佇列等)的抽象封裝,實現了Fluent風格的API介面,是最好用,最流行的zookeeper的客戶端。
匯入依賴:
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.2.0</version> </dependency>
1.curator連線zookeeper
package com.cjian.curator; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; public class CuratorConnection { public static void main(String[] args) { /** * 重連策略 * new RetryOneTime(3000) 三秒後重連一次,只重連一次 * new RetryNTimes(3, 3000); 每三秒重連一次,重連三次 * new RetryUntilElapsed(10000, 3000); 每三秒重連一次,總等待時間超過10秒後停止重連 * new ExponentialBackoffRetry(1000, 3);隨著重連次數的增加,重連的時間增加 baseSleepTimeMs*Math.max(1,random.nextInt(1<<retryCount+1)) * */ CuratorFramework client = CuratorFrameworkFactory.builder() //IP埠號 .connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183") //會話超時時間 .sessionTimeoutMs(5000) //重連機制 策略 .retryPolicy(new RetryOneTime(3000)) //名稱空間 父節點 .namespace("create") //構建連線物件 .build(); //開啟連線 client.start(); System.out.println(client.isStarted()); //關閉連線 client.close(); } }
2.建立節點
2.1簡單建立
@Test
public void create1() throws Exception {
client.create()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/node1", "node1".getBytes());
System.out.println("建立結束");
}
[zk: 127.0.0.1:2182(CONNECTED) 3] ls /
[zookeeper]
[zk: 127.0.0.1:2182(CONNECTED) 4] ls /
[create, zookeeper]
[zk: 127.0.0.1:2182(CONNECTED) 5] get /create/node1
node1
cZxid = 0x300000025
ctime = Tue Feb 02 09:21:13 CST 2021
mZxid = 0x300000025
mtime = Tue Feb 02 09:21:13 CST 2021
pZxid = 0x300000025
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0
2.2 自定義許可權
@Test
public void create2() throws Exception {
//自定義許可權列表
ArrayList<ACL> acls = new ArrayList<>();
Id id = new Id("ip", "127.0.0.1");
acls.add(new ACL(ZooDefs.Perms.ALL, id));
client.create()
.withMode(CreateMode.PERSISTENT)
.withACL(acls)
.forPath("/node2", "node2".getBytes());
System.out.println("建立結束");
}
[zk: 127.0.0.1:2182(CONNECTED) 7] getAcl /create/node2
'ip,'127.0.0.1
: cdrwa
2.3遞迴建立
@Test
public void create3() throws Exception {
//遞迴建立節點樹
client.create()
.creatingParentsIfNeeded()//支援遞迴建立節點
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath("/node3/node31", "node31".getBytes());
}
2.4非同步建立
@Test
public void create4() throws Exception {
//非同步方式建立節點
client.create()
.creatingParentsIfNeeded()//支援遞迴建立節點
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent)
throws Exception {
//節點路徑
System.out.println(curatorEvent.getPath());
//事件型別
System.out.println(curatorEvent.getType());
}
})
.forPath("/node4", "node4".getBytes());
Thread.sleep(5000);
System.out.println("建立結束");
}
/node4
CREATE
建立結束
3.修改節點
3.1 同步修改
@Test
public void set1() throws Exception {
client.setData()
.forPath("/node1", "node11".getBytes());
}
@Test
public void set2() throws Exception {
client.setData()
//指定版本號
.withVersion(-1)
.forPath("/node1", "node22".getBytes());
}
3.2 非同步修改
@Test
public void set3() throws Exception {
//非同步方式
client.setData()
//指定版本號
.withVersion(-1)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent)
throws Exception {
//節點路徑
System.out.println(curatorEvent.getPath());
//事件型別
System.out.println(curatorEvent.getType());
}
})
.forPath("/node1", "node33".getBytes());
Thread.sleep(5000);
System.out.println("修改結束");
}
/node1
SET_DATA
修改結束
4.刪除節點
4.1同步刪除
@Test
public void delete1() throws Exception {
client.delete().forPath("/node1");
}
@Test
public void delete2() throws Exception {
client.delete()
//版本號
.withVersion(-1)
.forPath("/node1");
}
@Test
public void delete3() throws Exception {
client.delete()
//刪除包含子節點的節點
.deletingChildrenIfNeeded()
.forPath("/node1");
}
4.2非同步刪除
@Test
public void delete4() throws Exception {
client.delete()
//刪除包含子節點的節點
.deletingChildrenIfNeeded()
.withVersion(-1)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent)
throws Exception {
//節點路徑
System.out.println(curatorEvent.getPath());
//事件型別
System.out.println(curatorEvent.getType());
}
})
.forPath("/node1");
Thread.sleep(5000);
System.out.println("刪除結束");
}
/node1
DELETE
刪除結束
5.檢視節點資料
5.1同步方式
@Test
public void getData1() throws Exception {
//讀取節點資料
byte[] node2s = client.getData().forPath("/node2");
System.out.println(new String(node2s));
}
@Test
public void getData2() throws Exception {
Stat stat = new Stat();
//讀取資料時讀取節點的屬性
byte[] node2s = client.getData()
.storingStatIn(stat).forPath("/node2");
System.out.println(new String(node2s));
System.out.println(stat.getVersion());
}
5.2非同步方式
@Test
public void getData3() throws Exception {
Stat stat = new Stat();
//讀取資料時讀取節點的屬性
byte[] node2s = client.getData()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent)
throws Exception {
//節點路徑
System.out.println(curatorEvent.getPath());
//事件型別
System.out.println(curatorEvent.getType());
//資料
System.out.println(new String(curatorEvent.getData()));
}
}).forPath("/node2");
Thread.sleep(5000);
}
/node2
GET_DATA
node2
6.檢視子節點
6.1同步方式
@Test
public void getChild1() throws Exception {
List<String> list = client.getChildren().forPath("/node1");
for (String s : list) {
System.out.println(s);
}
}
node11
6.2 非同步方式
@Test
public void getChild2() throws Exception {
client.getChildren()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent)
throws Exception {
//節點路徑
System.out.println(curatorEvent.getPath());
//事件型別
System.out.println(curatorEvent.getType());
List<String> list = curatorEvent.getChildren();
for (String s : list) {
System.out.println(s);
}
}
}).forPath("/node1");
Thread.sleep(5000);
}
/node1
CHILDREN
node11
7.是否存在
7.1同步方式
@Test
public void exists1() throws Exception {
Stat stat = client.checkExists().forPath("/node1");
System.out.println(stat.getVersion());
}
如果不存在,則stat為null
7.2非同步方式
@Test
public void exists2() throws Exception {
client.checkExists()
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent)
throws Exception {
//節點路徑
System.out.println(curatorEvent.getPath());
//事件型別
System.out.println(curatorEvent.getType());
System.out.println(curatorEvent.getStat().getVersion());
}
}).forPath("/node1");
}
/node1
EXISTS
0
8.監視器
curator提供了兩種watcher來監聽節點的變化
- NodeCache :之間聽某一個特定的節點,監聽節點的新增和修改
- PathChildrenCache:監聽一個znode的子節點,當一個子節點增加、更新、刪除時PathChildrenCache會改變它的狀態,會包含最新的子節點,子節點的資料和狀態
監視器可重複使用
8.1監視當前節點
@Test
public void watcher1() throws Exception {
//監視某個節點的資料變化
NodeCache nodeCache = new NodeCache(client, "/watcher");
//開啟監視器物件
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println(nodeCache.getCurrentData().getPath());
System.out.println(new String(nodeCache.getCurrentData().getData()));
}
});
Thread.sleep(60000);
//關閉監視器物件
nodeCache.close();
}
首先該節點是沒有的,當我們在命令視窗執行:create /create/watcher "111" 後,控制檯輸出:
/watcher
111
當我們在控制檯再次執行:set /create/watcher "222" 後,控制檯輸出:
/watcher
222
8.2 監聽子節點
@Test
public void watcher2() throws Exception {
//監視某個子節點的資料變化
//第三個引數為事件中是否可以獲取節點的資料
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/watcher",true);
//開啟監視器物件
pathChildrenCache.start();
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent)
throws Exception {
System.out.println(pathChildrenCacheEvent.getType());
System.out.println(pathChildrenCacheEvent.getData().getPath());
System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
}
});
Thread.sleep(60000);
//關閉監視器物件
pathChildrenCache.close();
}
首先該節點是沒有的,當我們在命令視窗執行:create /create/watcher/watcher1 "11" 後,控制檯輸出:
CHILD_ADDED
/watcher/watcher1
11
當我們在控制檯再次執行:set /create/watcher/watcher1 "123" 後,控制檯輸出:
CHILD_UPDATED
/watcher/watcher1
123
當我們在控制檯再次執行:delete /create/watcher/watcher1 後,控制檯輸出:
CHILD_REMOVED
/watcher/watcher1
123
9.事務
//事務
@Test
public void tra1() throws Exception {
//開啟事務
client.inTransaction().create().forPath("/test1","test1".getBytes())
.and()
.setData().forPath("/notExistsNode","not exists".getBytes())
.and()
//提交事務
.commit();
}
10.分散式鎖
package com.cjian.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorLock {
private static CuratorFramework client;
public static void main(String[] args) throws InterruptedException {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183")
.sessionTimeoutMs(5000)
.retryPolicy(retryPolicy)
//可選項
.namespace("lock")
.build();
client.start();
//排他鎖
InterProcessLock interProcessLock = new InterProcessMutex(client, "/lock1");
//讀寫鎖
//InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock1");
Order order = new Order(0,interProcessLock);
for (int i = 0; i < 5; i++) {
new Thread(order).start();
}
Thread.sleep(10000);
System.out.println(order.getNum());
}
}
class Order implements Runnable{
private int num;
private InterProcessLock interProcessLock;
public Order(int num, InterProcessLock interProcessLock) {
this.num = num;
this.interProcessLock = interProcessLock;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
interProcessLock.acquire();
num++;
interProcessLock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
}