1. 程式人生 > 其它 >Zookeeper之路

Zookeeper之路

zookeeper之路

1.伺服器命令

#zk所在目錄
/usr/local/zookeeper-3.4.6/bin

[root@localhost zookeeper-3.4.6]# cd bin
#啟動zk
[root@localhost bin]# ./zkServer.sh start
#停止zk
[root@localhost bin]# ./zkServer.sh stop
#檢視zk狀態
[root@localhost bin]# ./zkServer.sh status

2.客戶端命令

#啟動客戶端
[root@localhost bin]# ./zkCli.sh -server localhost:2181

#退出到linux
[zk: localhost:2181(CONNECTED) 1] quit

#檢視節點
[zk: localhost:2181(CONNECTED) 1] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 2] ls /zookeeper
[quota]

#建立帶有資料的節點
[zk: localhost:2181(CONNECTED) 3] create /app1 helianthus

#獲取節點中的資料
[zk: localhost:2181(CONNECTED) 5] get /app1
helianthus
cZxid = 0x4
ctime = Wed Jun 09 13:41:34 CST 2021
mZxid = 0x4
mtime = Wed Jun 09 13:41:34 CST 2021
pZxid = 0x4
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 10
numChildren = 0

#更新節點中的資料
[zk: localhost:2181(CONNECTED) 12] set /app2 annuusl001
cZxid = 0x6
ctime = Wed Jun 09 13:45:15 CST 2021
mZxid = 0x7
mtime = Wed Jun 09 13:45:29 CST 2021
pZxid = 0x6
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 10
numChildren = 0

#刪除節點
[zk: localhost:2181(CONNECTED) 13] delete /app2

#建立子節點
[zk: localhost:2181(CONNECTED) 18] create /app1/p1 data1
Created /app1/p1
[zk: localhost:2181(CONNECTED) 19] create /app1/p2 data2 
Created /app1/p2
[zk: localhost:2181(CONNECTED) 20] ls /app1
[p1, p2]
[zk: localhost:2181(CONNECTED) 21] 

#刪除帶有子節點的節點
[zk: localhost:2181(CONNECTED) 28] rmr /app1
[zk: localhost:2181(CONNECTED) 28] deleteall /app1

#建立臨時節點(客戶端關閉節點自動刪除)
[zk: localhost:2181(CONNECTED) 3] create -e  /app2 data1

#建立持久化順序節點(節點會被自動加序號)
[zk: localhost:2181(CONNECTED) 1] create -s /node 1
Created /node0000000004
[zk: localhost:2181(CONNECTED) 2] create -s /node 2
Created /node0000000005

#建立臨時順序節點
[zk: localhost:2181(CONNECTED) 14] create -s -e /app3 data1

3.ZK JavaAPI操作(Curator API)

3.1 maven工程建立

3.1.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sjx</groupId>
    <artifactId>curator-zk</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <!--curator-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!--zookeeper-->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
        </dependency>
        <!--日誌-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.21</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.21</version>
        </dependency>
    </dependencies>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

</project>

3.2 測試 CRUD

package com.sjx.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CreateModable;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.nio.charset.StandardCharsets;
import java.util.List;

public class CuratorTest {

    //提升作用域
    private CuratorFramework client;

    /**
     * 建立連線
     **/
    @Before
    public void testConnect(){

        /*
        Create a new client
        Params:
        connectString  連線字串、zk server 地址和埠
        sessionTimeoutMs  回話超時時間 單位ms
        connectionTimeoutMs 連線超時時間 單位ms
        retryPolicy 重試策略
        Returns:
        client
        */

        //重試策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        //第一種方式
        /*CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.172.133:2181",
                60 * 1000, 15 * 1000, retryPolicy);
       */
        //第二種方式(鏈式程式設計)
        client = CuratorFrameworkFactory.builder().connectString("192.168.172.133:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .namespace("helianthus")  //預設會在根目錄下建立一個/helianthus的節點,其他建立的節點會放在這個節點下面
                .build();

        //開啟連線
        client.start();

    }

    /*============================================create=====================================================*/

    /**
     * 建立節點: create 持久 臨時 順序 資料
     * 1.基本建立   create().forPath("/app1");
     * 2.建立節點帶有資料   create().forPath("/app3","haha".getBytes());
     * 3.設定節點的型別    create().withMode(CreateMode.EPHEMERAL).forPath("/app4");
     * 4.建立多級節點 /app5/p1    create().creatingParentsIfNeeded().forPath("/app5/p1");
     */

    @Test
    public void testCreate() throws Exception {
        //1.基本建立
        //如果建立節點,沒有指定資料,則預設會見客戶端的if作為資料儲存
        String path = client.create().forPath("/app1");
        System.out.println(path);
    }
    @Test
    public void testCreate2() throws Exception {
        //2.建立節點,帶有資料
        //如果建立節點,沒有指定資料,則預設會見客戶端的if作為資料儲存
        String path = client.create().forPath("/app3","haha".getBytes());
        System.out.println(path);
    }
   @Test
    public void testCreate3() throws Exception {
        //3.建立節點
        //預設型別:持久化   測試臨時 EPHEMERAL
        String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app4");
        System.out.println(path);
    }
   @Test
    public void testCreate4() throws Exception {
        //4.建立多級節點 /app5/p1
       //creatingParentsIfNeeded() 如果父節點不存在,則建立父節點
        String path = client.create().creatingParentsIfNeeded().forPath("/app5/p1");
        System.out.println(path);
    }
/*===============================================get=====================================================*/

    /**
     * 查詢節點
     * 1.查詢資料:get   getData().forPath("/app1");
     * 2.查詢子節點: ls  getChildren().forPath("/");
     * 3.查詢節點狀態資訊: ls -s    getData().storingStatIn(status).forPath("/app1");
     */

    @Test
    public void testGet1() throws Exception{
        //1.查詢資料:get
        byte[] data = client.getData().forPath("/app1");
        System.out.println(new String(data));
    }

    @Test
    public void testGet2() throws Exception{
        //2.查詢子節點: ls
        //這裡的 /相當於==>/helianthus
        List<String> path = client.getChildren().forPath("/");
        System.out.println(path);
    }
    @Test
    public void testGet3() throws Exception{
        Stat status=new Stat();
        System.out.println(status);
        //3.查詢節點狀態資訊: ls -s
        client.getData().storingStatIn(status).forPath("/app1");
        System.out.println(status);
    }

/*===========================================set=========================================================*/

    /**
     * 修改資料
     * 1.基本修改資料 setData().forPath("/app1","test".getBytes());
     * 2.根據版本修改 setData().withVersion(version).forPath("/app1","test001".getBytes());
     *         *version 是通過查詢出來的,目的就是為了讓客戶端或者執行緒不干擾我,保證原子性操作
     */
    @Test
    public void testSet() throws Exception{
       client.setData().forPath("/app1","test".getBytes());
    }
    @Test
    public void testSet2() throws Exception{

        Stat status=new Stat();
        client.getData().storingStatIn(status).forPath("/app1");

        int version=status.getVersion();
        System.out.println(version);
        client.setData().withVersion(version).forPath("/app1","test001".getBytes());
    }


/*==============================================delete===================================================*/

    /**
     * 刪除節點
     * delete deleteAll
     * 1.刪除單個節點 delete().forPath("/app3");
     * 2.刪除帶有子節點的節點 delete().deletingChildrenIfNeeded().forPath("/app5");
     * 3.必須成功的刪除節點:為了防止網路抖動,本質是重試  delete().guaranteed().forPath("/app2");
     * 4.回撥 inBackground
     */

    @Test
    public void testDelete() throws Exception{
        // 1.刪除單個節點
        client.delete().forPath("/app3");
    }

    @Test
    public void testDelete2() throws Exception{
        // 2.刪除帶有子節點的節點
        client.delete().deletingChildrenIfNeeded().forPath("/app5");
    }
    @Test
    public void testDelete3() throws Exception{
        // 3.必須成功的刪除節點
        client.delete().guaranteed().forPath("/app2");
    }
    @Test
    public void testDelete4() throws Exception{
        // 4.回撥
        client.delete().guaranteed().inBackground(new BackgroundCallback() {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent curatorEvent) throws Exception {
                System.out.println("我被刪除了");
                System.out.println(curatorEvent);
            }
        }).forPath("/app1");
    }



/*================================================close===================================================*/
    @After
    public void close(){
        if(client!=null){
            client.close();
        }
    }
}

3.3 Watcher 監聽

package com.sjx.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

public class CuratorWatcherTest {

    //提升作用域
    private CuratorFramework client;

    /**
     * 建立連線
     **/
    @Before
    public void testConnect(){
        //重試策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        client = CuratorFrameworkFactory.builder().connectString("192.168.172.133:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .namespace("helianthus")  //預設會在根目錄下建立一個/helianthus的節點,其他建立的節點會放在這個節點下面
                .build();

        //開啟連線
        client.start();

    }

    @After
    public void close(){
        if(client!=null){
            client.close();
        }
    }
/*===================================NodeCache:監聽某個節點==============================================*/

    /**
     * 演示NodeCache:監聽某個節點
     */

    @Test
    public void testNodeCache() throws Exception {
        //1.建立NodeCache物件
        final NodeCache nodeCache = new NodeCache(client, "/app3");
        //2.註冊監聽
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("節點變化了");

                //獲取修改節點後的資料
                byte[] data = nodeCache.getCurrentData().getData();
                System.out.println(new String(data));
            }
        });
        //Lambda表示式形式
        //nodeCache.getListenable().addListener(()-> System.out.println("節點變化了"));
        //3.開啟監聽,如果設定為true,則開啟監聽時,載入緩衝資料
        nodeCache.start(true);

        while (true){

        }
    }
/*===========================PathChildrenCache:監聽某個節點的所有子節點們===================================*/
    /**
     * 演示PathChildrenCache:監聽某個節點的所有子節點們
     */

    @Test
    public void testPathChildrenCache() throws Exception {
        //1.建立NodeCache物件
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app5",true);
        //2.繫結監聽器
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println("子節點變化了");
                System.out.println(event);
                //監聽子節點的資料變更,並拿到變更後的資料
                //1.獲取型別
                PathChildrenCacheEvent.Type type = event.getType();
                //2.判斷型別是否是update
                if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                    System.out.println("資料變了");
                    byte[] data = event.getData().getData();
                    System.out.println(new String(data));
                }
            }
        });
        //3.開啟監聽
        pathChildrenCache.start();

        while (true){

        }
    }
    
/*==============================TreeCache:監聽某個節點位元組和所有子節點們=====================================*/
    /**
     * 演示TreeCache:監聽某個節點位元組和所有子節點們
     */

    @Test
    public void testTreeCache() throws Exception {
        //1.建立NodeCache物件
        TreeCache treeCache = new TreeCache(client,"/");
        //2.繫結監聽器
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                System.out.println("節點變化了");
                System.out.println(event);
            }
        });

        //3.開啟監聽
        treeCache.start();

        while (true){

        }
    }
}

4.分散式鎖

4.1 分散式鎖原理

4.2 分散式鎖API

4.3 Ticket12306

package com.sjx.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;

public class Ticket12306 implements Runnable{

    private int tickets=10;//資料庫的票數

    private InterProcessMutex lock; //分散式鎖

    public Ticket12306() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.172.133:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .build();
        //開啟連線
        client.start();

        lock=new InterProcessMutex(client,"/lock");
    }

    @Override
    public void run() {
        while (true){
            //獲取鎖
            try {
                lock.acquire(3, TimeUnit.SECONDS);
                if(tickets>0){
                    System.out.println(Thread.currentThread()+":"+tickets);
                    Thread.sleep(100);
                    tickets--;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //釋放鎖
                try {
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

        }
    }
}

4.4 LockTest

package com.sjx.curator;

public class LockTest {
    public static void main(String[] args) {
        Ticket12306 ticket12306=new Ticket12306();

        //建立客戶端
        Thread t1 = new Thread(ticket12306, "攜程");
        Thread t2 = new Thread(ticket12306, "飛豬");

        t1.start();
        t2.start();
    }
}

4.5 鎖核心程式碼

//分散式鎖
private InterProcessMutex lock; 

//建立客戶端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.172.133:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retryPolicy)
                .build();
        //開啟連線
        client.start();

lock=new InterProcessMutex(client,"/lock");
//獲取鎖
lock.acquire(3, TimeUnit.SECONDS);
//釋放鎖
lock.release();

5. ZK 叢集

5.1 叢集介紹

(https://img2020.cnblogs.com/blog/2414441/202106/2414441-20210610131044581-1131233733.png)

5.2 叢集搭建

#上傳zk的壓縮包到伺服器
#建立叢集目錄
[root@localhost /]# mkdir /usr/local/zookeeper-cluster
#解壓
[root@localhost local]# tar -zxvf zookeeper-3.4.6.tar.gz
#建立zk目錄
[root@localhost local]# cp -r zookeeper-3.4.6 /usr/local/zookeeper-cluster/zookeeper-1
[root@localhost local]# cp -r zookeeper-3.4.6 /usr/local/zookeeper-cluster/zookeeper-2
[root@localhost local]# cp -r zookeeper-3.4.6 /usr/local/zookeeper-cluster/zookeeper-3
#這個data目錄需要是空的
[root@localhost /]# mkdir /usr/local/zookeeper-cluster/zookeeper-1/data
[root@localhost /]# mkdir /usr/local/zookeeper-cluster/zookeeper-2/data
[root@localhost /]# mkdir /usr/local/zookeeper-cluster/zookeeper-3/data
#修改配置檔案
mv /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo_sample.cfg /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
mv /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo_sample.cfg /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
mv /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo_sample.cfg /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg
#更改資料目錄(偽叢集還需要改埠號)
vim /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
dataDir=/usr/local/zookeeper-cluster/zookeeper-1/data
#配置叢集
echo 1>/usr/local/zookeeper-cluster/zookeeper-1/data/myid
echo 2>/usr/local/zookeeper-cluster/zookeeper-2/data/myid
echo 3>/usr/local/zookeeper-cluster/zookeeper-3/data/myid
#修改配置
vim /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg
#server.伺服器id=伺服器ip地址:伺服器之間通訊埠:伺服器之間選舉投票埠
server.1=localhost:2881:3881
server.2=localhost:2882:3882
server.3=localhost:2883:3883
#啟動叢集
/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start
#檢視叢集狀態
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status