zookeeper學習筆記
zookeeper安裝與基本命令
ZooKeeper是一個分布式的,開放源碼的分布式應用程序協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要組件。它是一個為分布式應用提供一致性服務的軟件,提供的功能包括:配置維護、域名服務、分布式同步、組服務等。
要安裝zookeeper首先需要安裝jdk,可以去ZooKpper官網下載最新的版本。
並解壓到指定目錄配置zoo.cfg , 在conf文件夾下復制一份新的zoo_sample.cfg並重新命名為zoo.cfg。
安裝步驟
# cd /opt # wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz# tar -xzvf zookeeper-3.4.8.tar.gz # mv zookeeper-3.4.8 zookeeper # cd zookeeper/ # cd conf/ # mv zoo_sample.cfg zoo.cfg
啟動zk要到bin目錄下,執行zkServer.sh start
就可以了。
停止當然是zkServer.sh stop
查看狀態zkServer.sh status
鏈接zk使用zkCli.sh -server 127.0.0.1:2181
輸入help回車後查看幫助信息
ZooKeeper -server host:port cmd argsstat path [watch] set path data [version] ls path [watch] delquota [-n|-b] path ls2 path [watch] setAcl path acl setquota -n|-b val path history redo cmdno printwatches on|off delete path [version] sync path listquota path rmr path get path [watch] create [-s] [-e] path data acl addauth scheme auth quit getAcl path close connect host:port
創建節點以及值
create /node1 value1
查看節點
使用ls指令查看當前ZK中所包含的內容ls /
查看節點中的值
get /node1
更新節點中的值
set /node2 value3
刪除節點
delete /node2
watch介紹
watch表示監聽事件,比如執行命令ls /node2
然後我們新開個窗口連上zk,在node2下面新建個子節點,建完後馬上之前加了watch的窗口就能收到新建的事件了。
這種場景適合用在配置變更的時候各個子節點都需要重新加載配置。
zoo.cfg配置信息詳解
# The number of milliseconds of each tick tickTime=2000 ##ZooKeeper的最小時間單元,單位毫秒(ms),默認值為3000 # The number of ticks that the initial # synchronization phase can take initLimit=10 ##Leader服務器等待Follower啟動並完成數據同步的時間,默認值10,表示tickTime的10倍 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 ##Leader服務器和Follower之間進行心跳檢測的最大延時時間,默認值5,表示tickTime的5倍 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/tmp/zookeeper ##ZooKeeper服務器存儲快照文件的目錄,必須配值,建議放置在var目錄下 # the port at which the clients will connect clientPort=2181 ## 服務器對外服務端口,默認值為2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1
zookeeper權限控制 ACL操作
CL全稱為Access Control List(訪問控制列表),用於控制資源的訪問權限。Zookeeper利用ACL策略控制節點的訪問權限,如節點數據讀寫、節點創建、節點刪除、讀取子節點列表、設置節點權限等。
Zookeeper的ACL,scheme、id、permission,通常表示為:
scheme:id:permission。
- schema代表授權策略
- id代表用戶
- permission代表權限
permission分為下面幾種:
- CREATE(c):創建子節點的權限
- READ(r):讀取節點數據的權限
- DELETE(d):刪除節點的權限
- WRITE(w):修改節點數據的權限
- ADMIN(a):設置子節點權限的權限
密碼權限控制
語法:digest:username:BASE64(SHA1(password)):權限信息
首先我們創建一個節點
create /test 10
創建完成之後設置權限
setAcl /test digest:zhangsan:zwnqMhjMhpBo3CqM8qqH5mM73s8=:crdwa
然後來驗證下權限是否可用
[zk: localhost:2181(CONNECTED) 2] get /test Authentication is not valid : /test
執行命令會發現沒有權限操作
進行認證操作後再次執行get操作就可以了
addauth digest zhangsan:123456
這邊需要註意的是密碼的生成方式,密碼是加密的,不是明文的,我們需要借助於Zookeeper中的一個類來加密密碼,如果是代碼中操作就直接加密了,在shell中就得單獨去加密了。
java -Djava.ext.dirs=/Users/zhangsan/Documents/java/zookeeper-3.4.7/lib -cp /Users/zhangsan/Documents/java/zookeeper-3.4.7/zookeeper-3.4.7.jar org.apache.zookeeper.server.auth.DigestAuthenticationProvider zhangsan:123456
輸出加密內容如下:
zhangsan:123456->zhangsan:zwnqMhjMhpBo3CqM8qqH5mM73s8=
IP權限控制
還有一種就是通過IP來控制節點的訪問權限,一般不建議使用,因為IP發生變動的可能性比較大
語法是:ip:IP信息:權限信息
創建一個節點
create /ip2 ip
設置IP訪問權限
setAcl /ip2 ip:192.168.31.139:r
Java連接zookeeper
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId <version>3.4.7</version> </dependency>
/** * 連接ZK測試 */ public class ConnTest { public static void main(String[] args) { try { // 建立連接 ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 1000 * 10, null); // 創建節點 zooKeeper.create("/connTest", "connTest".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); byte[] result = zooKeeper.getData("/connTest", false, new Stat()); System.out.println(new String(result)); //zooKeeper.delete("/connTest", -1); } catch (Exception e) { e.printStackTrace(); } } }
使用curator客戶端更方便操作zookeeper
<!--<dependency>--> <!--<groupId>org.apache.zookeeper</groupId>--> <!--<artifactId>zookeeper</artifactId>--> <!--<version>3.4.7</version>--> <!--</dependency>--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.4.0</version> </dependency>
/** * zk客戶端 * 測試監聽zk節點變化 */ public class CuratorTest { public static void main(String[] args) throws Exception { // 1 重試策略:初試時間為1s 重試10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); // 2 通過工廠創建連接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString("localhost:2181") .sessionTimeoutMs(1000 * 10) .retryPolicy(retryPolicy) .build(); // 3 開啟連接 cf.start(); // cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/yjh/aa","aa內容".getBytes()); // System.out.println(new String(cf.getData().forPath("/yjh/aa"))); // cf.setData().forPath("/yjh/aa", "修改aa內容".getBytes()); // System.out.println(new String(cf.getData().forPath("/yjh/aa"))); // cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/yjh"); // if (cf.checkExists().forPath("/tt") == null) { // cf.create().creatingParentsIfNeeded().forPath("/tt","tt".getBytes()); // } // byte[] data = cf.getData().usingWatcher(new Watcher() { // public void process(WatchedEvent event) { // System.out.println("節點監聽器 : " + event.getType().getIntValue() + "\t" + event.getPath()); // } // }).forPath("/tt"); // System.out.println(new String(data)); ExecutorService pool = Executors.newFixedThreadPool(2); final NodeCache nodeCache = new NodeCache(cf, "/test", false); nodeCache.start(true); nodeCache.getListenable().addListener( new NodeCacheListener() { public void nodeChanged() throws Exception { System.out.println(nodeCache.getCurrentData().getPath() + "數據改變了, 新的數據是: " + new String(nodeCache.getCurrentData().getData())); } }, pool ); Thread.sleep(Integer.MAX_VALUE); } }
zookeeper分布式鎖
/** * zk分布式鎖 */ public class LockTest { static int count = 2; public static void main(String[] args) { // 1 重試策略:初試時間為1s 重試10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); // 2 通過工廠創建連接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString("localhost:2181") .sessionTimeoutMs(1000 * 10) .retryPolicy(retryPolicy) .build(); // 3 開啟連接 cf.start(); final InterProcessMutex lock = new InterProcessMutex(cf, "/mylock"); final CountDownLatch latch = new CountDownLatch(1); ExecutorService pool = Executors.newFixedThreadPool(20); for (int i = 0; i < 20; i++) { pool.execute(() -> { try { System.err.println(1); latch.await(); lock.acquire(); Thread.sleep(100); //synchronized (LockTest.class) { if (count > 0) { count--; lock.release(); System.out.println(count); } //} } catch (Exception e) { e.printStackTrace(); } }); } System.out.println("開始執行"); latch.countDown(); pool.shutdown(); } }
zookeeper作為服務註冊中心
首先編寫一個服務註冊中心類,監聽服務的註冊與停止
/** * 測試服務註冊中心 * 監聽服務註冊與停止 */ public class ServiceClient { public static void main(String[] args) { // client.get("http://localhost/get"); // client.get("http://localhost/get2"); // client.get("nginx地址"); ---> get./get2 // 從zk中獲取服務地址列表,選擇一個進行請求,本地執行負載均衡 // 1 重試策略:初試時間為1s 重試10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); // 2 通過工廠創建連接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString("localhost:2181") .sessionTimeoutMs(1000 * 10) .retryPolicy(retryPolicy) .build(); // 3 開啟連接 cf.start(); // 開始監聽 try { final PathChildrenCache childrenCache = new PathChildrenCache(cf, "/service", true); childrenCache.start(StartMode.POST_INITIALIZED_EVENT); childrenCache.getListenable().addListener( new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED: " + event.getData().getPath()); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED: " + event.getData().getPath()); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED: " + event.getData().getPath()); break; default: break; } } } ); List<String> urls = cf.getChildren().forPath("/service"); for (String url : urls) { System.out.println(url); } Thread.sleep(200000000); } catch (Exception e) { e.printStackTrace(); } } }
另外編寫兩個服務類,用於模擬兩個服務,分別往zookeeper創建節點,相當於註冊服務,由上面的服務註冊中心進行監聽
用戶服務:
/** * 測試服務註冊中心 * 模擬用戶服務 */ public class UserServiceApplication { public static void main(String[] args) { // 1 重試策略:初試時間為1s 重試10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); // 2 通過工廠創建連接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString("localhost:2181") .sessionTimeoutMs(1000 * 10) .retryPolicy(retryPolicy) .build(); // 3 開啟連接 cf.start(); try { cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/service/192.168.1.1", "".getBytes()); Thread.sleep(200000000); } catch (Exception e) { e.printStackTrace(); } } }
訂單服務:
/** * 測試服務註冊中心 * 模擬訂單服務 */ public class OrderServiceApplication { public static void main(String[] args) { // 1 重試策略:初試時間為1s 重試10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); // 2 通過工廠創建連接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString("localhost:2181") .sessionTimeoutMs(1000 * 10) .retryPolicy(retryPolicy) .build(); // 3 開啟連接 cf.start(); try { cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/service/192.168.1.2", "".getBytes()); Thread.sleep(200000000); } catch (Exception e) { e.printStackTrace(); } } }
然後分別啟動上面這三個類,服務註冊中心就可以監聽用戶服務和訂單服務的註冊和停止了。
ps:編寫一個簡單的zookeeper操作工具類
/** * Created by 唐哲 * 2018-06-24 16:26 * zk操作工具類 */ public class ZkUtils { private static final Integer ZK_SESSION_TIMEOUT = 10 * 1000; private static CountDownLatch latch = new CountDownLatch(1); private static ZkUtils instance = new ZkUtils(); private static ZooKeeper zk; public synchronized static ZkUtils getInstance(String host, int port) { if (zk == null) { connect(host, port); } return instance; } private static void connect(String host, int port) { String connectString = host + ":" + port; try { zk = new ZooKeeper(connectString, ZK_SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("已經觸發了" + event.getType() + "事件!"); // 判斷是否已連接ZK, 連接後計數器遞減 if (event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); } } }); // 若計數器不為0, 則等待 latch.await(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } } public String addNode(String nodeName) { Stat stat; try { stat = zk.exists(nodeName, false); if (stat == null) { return zk.create(nodeName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } return null; } public String addNode(String nodeName, String data) { Stat stat; try { stat = zk.exists(nodeName, false); if (stat == null) { return zk.create(nodeName, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } return null; } public String addNode(String nodeName, String data, List<ACL> acl, CreateMode createMode) { Stat stat; try { stat = zk.exists(nodeName, false); if (stat == null) { return zk.create(nodeName, data.getBytes(), acl, createMode); } } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } return null; } public void removeNode(String nodeName) { try { zk.delete(nodeName, -1); } catch (InterruptedException | KeeperException e) { e.printStackTrace(); } } public void removeNode(String nodeName, int version) { try { zk.delete(nodeName, version); } catch (InterruptedException | KeeperException e) { e.printStackTrace(); } } public String setData(String nodeName, String data) { Stat stat; try { stat = zk.exists(nodeName, false); if (stat != null) { zk.setData(nodeName, data.getBytes(), -1); return data; } } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } return null; } /** * 監控數據節點變化 */ public void monitorDataUpdate(String nodeName) { try { zk.getData(nodeName, new Watcher() { @Override public void process(WatchedEvent event) { // 節點的值有修改 if(event.getType() == EventType.NodeDataChanged) { System.out.println(nodeName + "修改了值" + event.getPath()); // 觸發一次就失效,所以需要遞歸註冊 monitorDataUpdate(nodeName); } } }, new Stat()); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { ZkUtils zkUtils = ZkUtils.getInstance("localhost", 2181); //zkUtils.removeNode("/test"); // String result = zkUtils.addNode("/test"); // System.out.println(result); // // result = zkUtils.addNode("/test", "10"); // System.out.println(result); String result = zkUtils.setData("/test", "hello"); System.out.println(result); zkUtils.monitorDataUpdate("/test"); CountDownLatch countDownLatch = new CountDownLatch(1); countDownLatch.await(); } }
zookeeper學習筆記