Zookeeper之路
阿新 • • 發佈:2021-06-10
zookeeper之路
1.伺服器命令
#zk所在目錄
/usr/local/zookeeper-3.4.6/bin
[root@localhost zookeeper-3.4.6]# cd bin
#啟動zk
[root@localhost bin]# ./zkServer.sh start
#停止zk
[root@localhost bin]# ./zkServer.sh stop
#檢視zk狀態
[root@localhost bin]# ./zkServer.sh status
2.客戶端命令
#啟動客戶端 [root@localhost bin]# ./zkCli.sh -server localhost:2181 #退出到linux [zk: localhost:2181(CONNECTED) 1] quit #檢視節點 [zk: localhost:2181(CONNECTED) 1] ls / [zookeeper] [zk: localhost:2181(CONNECTED) 2] ls /zookeeper [quota] #建立帶有資料的節點 [zk: localhost:2181(CONNECTED) 3] create /app1 helianthus #獲取節點中的資料 [zk: localhost:2181(CONNECTED) 5] get /app1 helianthus cZxid = 0x4 ctime = Wed Jun 09 13:41:34 CST 2021 mZxid = 0x4 mtime = Wed Jun 09 13:41:34 CST 2021 pZxid = 0x4 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 10 numChildren = 0 #更新節點中的資料 [zk: localhost:2181(CONNECTED) 12] set /app2 annuusl001 cZxid = 0x6 ctime = Wed Jun 09 13:45:15 CST 2021 mZxid = 0x7 mtime = Wed Jun 09 13:45:29 CST 2021 pZxid = 0x6 cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 10 numChildren = 0 #刪除節點 [zk: localhost:2181(CONNECTED) 13] delete /app2 #建立子節點 [zk: localhost:2181(CONNECTED) 18] create /app1/p1 data1 Created /app1/p1 [zk: localhost:2181(CONNECTED) 19] create /app1/p2 data2 Created /app1/p2 [zk: localhost:2181(CONNECTED) 20] ls /app1 [p1, p2] [zk: localhost:2181(CONNECTED) 21] #刪除帶有子節點的節點 [zk: localhost:2181(CONNECTED) 28] rmr /app1 [zk: localhost:2181(CONNECTED) 28] deleteall /app1 #建立臨時節點(客戶端關閉節點自動刪除) [zk: localhost:2181(CONNECTED) 3] create -e /app2 data1 #建立持久化順序節點(節點會被自動加序號) [zk: localhost:2181(CONNECTED) 1] create -s /node 1 Created /node0000000004 [zk: localhost:2181(CONNECTED) 2] create -s /node 2 Created /node0000000005 #建立臨時順序節點 [zk: localhost:2181(CONNECTED) 14] create -s -e /app3 data1
3.ZK JavaAPI操作(Curator API)
3.1 maven工程建立
3.1.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sjx</groupId> <artifactId>curator-zk</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <!--curator--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <!--zookeeper--> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <!--日誌--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.21</version> </dependency> </dependencies> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> </project>
3.2 測試 CRUD
package com.sjx.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CreateModable; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.nio.charset.StandardCharsets; import java.util.List; public class CuratorTest { //提升作用域 private CuratorFramework client; /** * 建立連線 **/ @Before public void testConnect(){ /* Create a new client Params: connectString 連線字串、zk server 地址和埠 sessionTimeoutMs 回話超時時間 單位ms connectionTimeoutMs 連線超時時間 單位ms retryPolicy 重試策略 Returns: client */ //重試策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); //第一種方式 /*CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.172.133:2181", 60 * 1000, 15 * 1000, retryPolicy); */ //第二種方式(鏈式程式設計) client = CuratorFrameworkFactory.builder().connectString("192.168.172.133:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retryPolicy) .namespace("helianthus") //預設會在根目錄下建立一個/helianthus的節點,其他建立的節點會放在這個節點下面 .build(); //開啟連線 client.start(); } /*============================================create=====================================================*/ /** * 建立節點: create 持久 臨時 順序 資料 * 1.基本建立 create().forPath("/app1"); * 2.建立節點帶有資料 create().forPath("/app3","haha".getBytes()); * 3.設定節點的型別 create().withMode(CreateMode.EPHEMERAL).forPath("/app4"); * 4.建立多級節點 /app5/p1 create().creatingParentsIfNeeded().forPath("/app5/p1"); */ @Test public void testCreate() throws Exception { //1.基本建立 //如果建立節點,沒有指定資料,則預設會見客戶端的if作為資料儲存 String path = client.create().forPath("/app1"); System.out.println(path); } @Test public void testCreate2() throws Exception { //2.建立節點,帶有資料 //如果建立節點,沒有指定資料,則預設會見客戶端的if作為資料儲存 String path = client.create().forPath("/app3","haha".getBytes()); System.out.println(path); } @Test public void testCreate3() throws Exception { //3.建立節點 //預設型別:持久化 測試臨時 EPHEMERAL String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app4"); System.out.println(path); } @Test public void testCreate4() throws Exception { //4.建立多級節點 /app5/p1 //creatingParentsIfNeeded() 如果父節點不存在,則建立父節點 String path = client.create().creatingParentsIfNeeded().forPath("/app5/p1"); System.out.println(path); } /*===============================================get=====================================================*/ /** * 查詢節點 * 1.查詢資料:get getData().forPath("/app1"); * 2.查詢子節點: ls getChildren().forPath("/"); * 3.查詢節點狀態資訊: ls -s getData().storingStatIn(status).forPath("/app1"); */ @Test public void testGet1() throws Exception{ //1.查詢資料:get byte[] data = client.getData().forPath("/app1"); System.out.println(new String(data)); } @Test public void testGet2() throws Exception{ //2.查詢子節點: ls //這裡的 /相當於==>/helianthus List<String> path = client.getChildren().forPath("/"); System.out.println(path); } @Test public void testGet3() throws Exception{ Stat status=new Stat(); System.out.println(status); //3.查詢節點狀態資訊: ls -s client.getData().storingStatIn(status).forPath("/app1"); System.out.println(status); } /*===========================================set=========================================================*/ /** * 修改資料 * 1.基本修改資料 setData().forPath("/app1","test".getBytes()); * 2.根據版本修改 setData().withVersion(version).forPath("/app1","test001".getBytes()); * *version 是通過查詢出來的,目的就是為了讓客戶端或者執行緒不干擾我,保證原子性操作 */ @Test public void testSet() throws Exception{ client.setData().forPath("/app1","test".getBytes()); } @Test public void testSet2() throws Exception{ Stat status=new Stat(); client.getData().storingStatIn(status).forPath("/app1"); int version=status.getVersion(); System.out.println(version); client.setData().withVersion(version).forPath("/app1","test001".getBytes()); } /*==============================================delete===================================================*/ /** * 刪除節點 * delete deleteAll * 1.刪除單個節點 delete().forPath("/app3"); * 2.刪除帶有子節點的節點 delete().deletingChildrenIfNeeded().forPath("/app5"); * 3.必須成功的刪除節點:為了防止網路抖動,本質是重試 delete().guaranteed().forPath("/app2"); * 4.回撥 inBackground */ @Test public void testDelete() throws Exception{ // 1.刪除單個節點 client.delete().forPath("/app3"); } @Test public void testDelete2() throws Exception{ // 2.刪除帶有子節點的節點 client.delete().deletingChildrenIfNeeded().forPath("/app5"); } @Test public void testDelete3() throws Exception{ // 3.必須成功的刪除節點 client.delete().guaranteed().forPath("/app2"); } @Test public void testDelete4() throws Exception{ // 4.回撥 client.delete().guaranteed().inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent curatorEvent) throws Exception { System.out.println("我被刪除了"); System.out.println(curatorEvent); } }).forPath("/app1"); } /*================================================close===================================================*/ @After public void close(){ if(client!=null){ client.close(); } } }
3.3 Watcher 監聽
package com.sjx.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
public class CuratorWatcherTest {
//提升作用域
private CuratorFramework client;
/**
* 建立連線
**/
@Before
public void testConnect(){
//重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
client = CuratorFrameworkFactory.builder().connectString("192.168.172.133:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.namespace("helianthus") //預設會在根目錄下建立一個/helianthus的節點,其他建立的節點會放在這個節點下面
.build();
//開啟連線
client.start();
}
@After
public void close(){
if(client!=null){
client.close();
}
}
/*===================================NodeCache:監聽某個節點==============================================*/
/**
* 演示NodeCache:監聽某個節點
*/
@Test
public void testNodeCache() throws Exception {
//1.建立NodeCache物件
final NodeCache nodeCache = new NodeCache(client, "/app3");
//2.註冊監聽
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("節點變化了");
//獲取修改節點後的資料
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
//Lambda表示式形式
//nodeCache.getListenable().addListener(()-> System.out.println("節點變化了"));
//3.開啟監聽,如果設定為true,則開啟監聽時,載入緩衝資料
nodeCache.start(true);
while (true){
}
}
/*===========================PathChildrenCache:監聽某個節點的所有子節點們===================================*/
/**
* 演示PathChildrenCache:監聽某個節點的所有子節點們
*/
@Test
public void testPathChildrenCache() throws Exception {
//1.建立NodeCache物件
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app5",true);
//2.繫結監聽器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("子節點變化了");
System.out.println(event);
//監聽子節點的資料變更,並拿到變更後的資料
//1.獲取型別
PathChildrenCacheEvent.Type type = event.getType();
//2.判斷型別是否是update
if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("資料變了");
byte[] data = event.getData().getData();
System.out.println(new String(data));
}
}
});
//3.開啟監聽
pathChildrenCache.start();
while (true){
}
}
/*==============================TreeCache:監聽某個節點位元組和所有子節點們=====================================*/
/**
* 演示TreeCache:監聽某個節點位元組和所有子節點們
*/
@Test
public void testTreeCache() throws Exception {
//1.建立NodeCache物件
TreeCache treeCache = new TreeCache(client,"/");
//2.繫結監聽器
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("節點變化了");
System.out.println(event);
}
});
//3.開啟監聽
treeCache.start();
while (true){
}
}
}
4.分散式鎖
4.1 分散式鎖原理
4.2 分散式鎖API
4.3 Ticket12306
package com.sjx.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.concurrent.TimeUnit;
public class Ticket12306 implements Runnable{
private int tickets=10;//資料庫的票數
private InterProcessMutex lock; //分散式鎖
public Ticket12306() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.172.133:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.build();
//開啟連線
client.start();
lock=new InterProcessMutex(client,"/lock");
}
@Override
public void run() {
while (true){
//獲取鎖
try {
lock.acquire(3, TimeUnit.SECONDS);
if(tickets>0){
System.out.println(Thread.currentThread()+":"+tickets);
Thread.sleep(100);
tickets--;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//釋放鎖
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
4.4 LockTest
package com.sjx.curator;
public class LockTest {
public static void main(String[] args) {
Ticket12306 ticket12306=new Ticket12306();
//建立客戶端
Thread t1 = new Thread(ticket12306, "攜程");
Thread t2 = new Thread(ticket12306, "飛豬");
t1.start();
t2.start();
}
}
4.5 鎖核心程式碼
//分散式鎖
private InterProcessMutex lock;
//建立客戶端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.172.133:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy)
.build();
//開啟連線
client.start();
lock=new InterProcessMutex(client,"/lock");
//獲取鎖
lock.acquire(3, TimeUnit.SECONDS);
//釋放鎖
lock.release();
5. ZK 叢集
5.1 叢集介紹
(https://img2020.cnblogs.com/blog/2414441/202106/2414441-20210610131044581-1131233733.png)
5.2 叢集搭建
#上傳zk的壓縮包到伺服器
#建立叢集目錄
[root@localhost /]# mkdir /usr/local/zookeeper-cluster
#解壓
[root@localhost local]# tar -zxvf zookeeper-3.4.6.tar.gz
#建立zk目錄
[root@localhost local]# cp -r zookeeper-3.4.6 /usr/local/zookeeper-cluster/zookeeper-1
[root@localhost local]# cp -r zookeeper-3.4.6 /usr/local/zookeeper-cluster/zookeeper-2
[root@localhost local]# cp -r zookeeper-3.4.6 /usr/local/zookeeper-cluster/zookeeper-3
#這個data目錄需要是空的
[root@localhost /]# mkdir /usr/local/zookeeper-cluster/zookeeper-1/data
[root@localhost /]# mkdir /usr/local/zookeeper-cluster/zookeeper-2/data
[root@localhost /]# mkdir /usr/local/zookeeper-cluster/zookeeper-3/data
#修改配置檔案
mv /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo_sample.cfg /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
mv /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo_sample.cfg /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
mv /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo_sample.cfg /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg
#更改資料目錄(偽叢集還需要改埠號)
vim /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
dataDir=/usr/local/zookeeper-cluster/zookeeper-1/data
#配置叢集
echo 1>/usr/local/zookeeper-cluster/zookeeper-1/data/myid
echo 2>/usr/local/zookeeper-cluster/zookeeper-2/data/myid
echo 3>/usr/local/zookeeper-cluster/zookeeper-3/data/myid
#修改配置
vim /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg
#server.伺服器id=伺服器ip地址:伺服器之間通訊埠:伺服器之間選舉投票埠
server.1=localhost:2881:3881
server.2=localhost:2882:3882
server.3=localhost:2883:3883
#啟動叢集
/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start
#檢視叢集狀態
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status