zookeeper curator學習(增刪改查)(1)
阿新 • • 發佈:2017-11-05
oid lis edge java catch clas 獲得 data on()
主要參考的是curator,github地址:https://github.com/apache/curator/tree/master/curator-examples。
zookeeper版本為zookeeper-3.4.9(需要查找合適的curator版本)
源碼地址:https://gitee.com/zhangjunqing/spring-boot/tree/master/zookeeper
(1)pom文件如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>0.0.1-SNAPSHOT</version> <name>zookeeper</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-x-discovery</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency> </dependencies> </project>
(2)使用curator鏈接zookeeper程序如下:
package com.topsec.framework; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; public class CreateClientExamples { //創建簡單的 CuratorFramework public static CuratorFramework createSimple(String connectionString) { // these are reasonable arguments for the ExponentialBackoffRetry. The first // retry will wait 1 second - the second will wait up to 2 seconds - the // third will wait up to 4 seconds. ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3); // The simplest way to get a CuratorFramework instance. This will use default values. // The only required arguments are the connection string and the retry policy return CuratorFrameworkFactory.newClient(connectionString, retryPolicy); } //創建復雜的CuratorFramework public static CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) { // using the CuratorFrameworkFactory.builder() gives fine grained control // over creation options. See the CuratorFrameworkFactory.Builder javadoc // details return CuratorFrameworkFactory.builder() .connectString(connectionString) .retryPolicy(retryPolicy) .connectionTimeoutMs(connectionTimeoutMs) .sessionTimeoutMs(sessionTimeoutMs) // etc. etc. .build(); } }
(3)進行節點增刪改查的示例代碼
package com.topsec.framework; import java.util.List; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorListener; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.Watcher; public class CrudExamples { public static void create(CuratorFramework client, String path, byte[] payload) throws Exception { // this will create the given ZNode with the given data client.create().forPath(path, payload); } public static void createEphemeral(CuratorFramework client, String path, byte[] payload) throws Exception { // this will create the given EPHEMERAL ZNode with the given data client.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); } public static String createEphemeralSequential(CuratorFramework client, String path, byte[] payload) throws Exception { // this will create the given EPHEMERAL-SEQUENTIAL ZNode with the given data using Curator protection. /* Protection Mode: It turns out there is an edge case that exists when creating sequential-ephemeral nodes. The creation can succeed on the server, but the server can crash before the created node name is returned to the client. However, the ZK session is still valid so the ephemeral node is not deleted. Thus, there is no way for the client to determine what node was created for them. Even without sequential-ephemeral, however, the create can succeed on the sever but the client (for various reasons) will not know it. Putting the create builder into protection mode works around this. The name of the node that is created is prefixed with a GUID. If node creation fails the normal retry mechanism will occur. On the retry, the parent path is first searched for a node that has the GUID in it. If that node is found, it is assumed to be the lost node that was successfully created on the first try and is returned to the caller. */ return client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, payload); } public static void setData(CuratorFramework client, String path, byte[] payload) throws Exception { // set data for the given node client.setData().forPath(path, payload); } public static void setDataAsync(CuratorFramework client, String path, byte[] payload) throws Exception { // this is one method of getting event/async notifications CuratorListener listener = new CuratorListener() { public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception { // examine event for details System.out.println("返回事件" + event.getWatchedEvent()); } }; client.getCuratorListenable().addListener(listener); // set data for the given node asynchronously. The completion notification // is done via the CuratorListener. client.setData().inBackground().forPath(path, payload); } public static void setDataAsyncWithCallback(CuratorFramework client, BackgroundCallback callback, String path, byte[] payload) throws Exception { // this is another method of getting notification of an async completion client.setData().inBackground(callback).forPath(path, payload); } public static void delete(CuratorFramework client, String path) throws Exception { // delete the given node client.delete().forPath(path); } public static void guaranteedDelete(CuratorFramework client, String path) throws Exception { // delete the given node and guarantee that it completes /* Guaranteed Delete Solves this edge case: deleting a node can fail due to connection issues. Further, if the node was ephemeral, the node will not get auto-deleted as the session is still valid. This can wreak havoc with lock implementations. When guaranteed is set, Curator will record failed node deletions and attempt to delete them in the background until successful. NOTE: you will still get an exception when the deletion fails. But, you can be assured that as long as the CuratorFramework instance is open attempts will be made to delete the node. */ client.delete().guaranteed().forPath(path); } public static List<String> watchedGetChildren(CuratorFramework client, String path) throws Exception { /** * Get children and set a watcher on the node. The watcher notification will come through the * CuratorListener (see setDataAsync() above). */ return client.getChildren().watched().forPath(path); } public static List<String> watchedGetChildren(CuratorFramework client, String path, Watcher watcher) throws Exception { /** * Get children and set the given watcher on the node. */ return client.getChildren().usingWatcher(watcher).forPath(path); } }
(4)測試程序:
package com.topsec.framework; import org.apache.curator.framework.CuratorFramework; public class FrameworkTest { //zookeeper集群 public static final String ZOOKEEPERSTRING = "192.168.99.129:2181,192.168.99.153:2181,192.168.99.171:2181"; public static void main(String args[]) { CuratorFramework curatorFramework = null; try { curatorFramework = CreateClientExamples.createSimple(ZOOKEEPERSTRING); curatorFramework.start(); //創建節點 只能在存在的節點下創建,並且該節點必須不存在 // CrudExamples.create(curatorFramework, "/zookeeperTest/number2", "test1".getBytes()); //創建臨時節點 //CrudExamples.createEphemeral(curatorFramework, "/zookeeperTest/tempNode", "tempNode".getBytes()); //創建臨時順序節點 //CrudExamples.createEphemeralSequential(curatorFramework, "/zookeeperTest/tempSeqNode", "tempSeqNode".getBytes()); //修改節點 //CrudExamples.setData(curatorFramework, "/zookeeperTest/number2", "number2".getBytes()); //異步設置數據 //CrudExamples.setDataAsync(curatorFramework, "/zookeeperTest/number2", "number3".getBytes()); //刪除節點 //CrudExamples.delete(curatorFramework, "/zookeeperTest/number2"); //安全保證刪除 CrudExamples.guaranteedDelete(curatorFramework, "/zookeeperTest/number2"); } catch (Exception e) { e.printStackTrace(); }finally { if(curatorFramework!=null) { curatorFramework.close(); } } } }
補充知識
CuratorFramework提供的方法:
create() | 開始創建操作, 可以調用額外的方法(比如方式mode 或者後臺執行background) 並在最後調用forPath()指定要操作的ZNode |
delete() | 開始刪除操作. 可以調用額外的方法(版本或者後臺處理version or background)並在最後調用forPath()指定要操作的ZNode |
checkExists() | 開始檢查ZNode是否存在的操作. 可以調用額外的方法(監控或者後臺處理)並在最後調用forPath()指定要操作的ZNode |
getData() | 開始獲得ZNode節點數據的操作. 可以調用額外的方法(監控、後臺處理或者獲取狀態watch, background or get stat) 並在最後調用forPath()指定要操作的ZNode |
setData() | 開始設置ZNode節點數據的操作. 可以調用額外的方法(版本或者後臺處理) 並在最後調用forPath()指定要操作的ZNode |
getChildren() | 開始獲得ZNode的子節點列表。 以調用額外的方法(監控、後臺處理或者獲取狀態watch, background or get stat) 並在最後調用forPath()指定要操作的ZNode |
inTransaction() | 開始是原子ZooKeeper事務. 可以復合create, setData, check, and/or delete 等操作然後調用commit()作為一個原子操作提交 |
事件類型以及事件的方法如下:
Event Type | Event Methods |
---|---|
CREATE |
getResultCode() and getPath() |
DELETE | getResultCode() and getPath() |
EXISTS | getResultCode(), getPath() and getStat() |
GETDATA | getResultCode(), getPath(), getStat() and getData() |
SETDATA | getResultCode(), getPath() and getStat() |
CHILDREN | getResultCode(), getPath(), getStat(), getChildren() |
WATCHED | getWatchedEvent() |
zookeeper curator學習(增刪改查)(1)