1. 程式人生 > >zookeeper學習筆記

zookeeper學習筆記

單獨 bytes child imu snap rgs pes listen ise

zookeeper安裝與基本命令

ZooKeeper是一個分布式的,開放源碼的分布式應用程序協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要組件。它是一個為分布式應用提供一致性服務的軟件,提供的功能包括:配置維護、域名服務、分布式同步、組服務等。

要安裝zookeeper首先需要安裝jdk,可以去ZooKpper官網下載最新的版本。

並解壓到指定目錄配置zoo.cfg , 在conf文件夾下復制一份新的zoo_sample.cfg並重新命名為zoo.cfg。

安裝步驟

# cd /opt
# wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz 
# tar -xzvf zookeeper-3.4.8.tar.gz # mv zookeeper-3.4.8 zookeeper # cd zookeeper/ # cd conf/ # mv zoo_sample.cfg zoo.cfg

啟動zk要到bin目錄下,執行zkServer.sh start就可以了。

停止當然是zkServer.sh stop

查看狀態zkServer.sh status

鏈接zk使用zkCli.sh -server 127.0.0.1:2181

輸入help回車後查看幫助信息

ZooKeeper -server host:port cmd args
    
stat path [watch] set path data [version] ls path [watch] delquota [-n|-b] path ls2 path [watch] setAcl path acl setquota -n|-b val path history redo cmdno printwatches on|off delete path [version] sync path listquota path rmr path get path [watch] create [
-s] [-e] path data acl addauth scheme auth quit getAcl path close connect host:port

創建節點以及值

create /node1 value1

查看節點

使用ls指令查看當前ZK中所包含的內容ls /

查看節點中的值

get /node1

更新節點中的值

set /node2 value3

刪除節點

delete /node2

watch介紹

watch表示監聽事件,比如執行命令ls /node2

然後我們新開個窗口連上zk,在node2下面新建個子節點,建完後馬上之前加了watch的窗口就能收到新建的事件了。

這種場景適合用在配置變更的時候各個子節點都需要重新加載配置。

zoo.cfg配置信息詳解

# The number of milliseconds of each tick
tickTime=2000 ##ZooKeeper的最小時間單元,單位毫秒(ms),默認值為3000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10 ##Leader服務器等待Follower啟動並完成數據同步的時間,默認值10,表示tickTime的10倍
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5 ##Leader服務器和Follower之間進行心跳檢測的最大延時時間,默認值5,表示tickTime的5倍
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/tmp/zookeeper ##ZooKeeper服務器存儲快照文件的目錄,必須配值,建議放置在var目錄下
# the port at which the clients will connect
clientPort=2181 ## 服務器對外服務端口,默認值為2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

zookeeper權限控制 ACL操作

CL全稱為Access Control List(訪問控制列表),用於控制資源的訪問權限。Zookeeper利用ACL策略控制節點的訪問權限,如節點數據讀寫、節點創建、節點刪除、讀取子節點列表、設置節點權限等。

Zookeeper的ACL,scheme、id、permission,通常表示為:

scheme:id:permission。
  • schema代表授權策略
  • id代表用戶
  • permission代表權限

permission分為下面幾種:

  • CREATE(c):創建子節點的權限
  • READ(r):讀取節點數據的權限
  • DELETE(d):刪除節點的權限
  • WRITE(w):修改節點數據的權限
  • ADMIN(a):設置子節點權限的權限

密碼權限控制

語法:digest:username:BASE64(SHA1(password)):權限信息
首先我們創建一個節點

create /test 10

創建完成之後設置權限

setAcl /test digest:zhangsan:zwnqMhjMhpBo3CqM8qqH5mM73s8=:crdwa

然後來驗證下權限是否可用

[zk: localhost:2181(CONNECTED) 2] get /test
Authentication is not valid : /test

執行命令會發現沒有權限操作

進行認證操作後再次執行get操作就可以了

addauth digest zhangsan:123456

這邊需要註意的是密碼的生成方式,密碼是加密的,不是明文的,我們需要借助於Zookeeper中的一個類來加密密碼,如果是代碼中操作就直接加密了,在shell中就得單獨去加密了。

java -Djava.ext.dirs=/Users/zhangsan/Documents/java/zookeeper-3.4.7/lib -cp /Users/zhangsan/Documents/java/zookeeper-3.4.7/zookeeper-3.4.7.jar org.apache.zookeeper.server.auth.DigestAuthenticationProvider zhangsan:123456

輸出加密內容如下:

zhangsan:123456->zhangsan:zwnqMhjMhpBo3CqM8qqH5mM73s8=

IP權限控制

還有一種就是通過IP來控制節點的訪問權限,一般不建議使用,因為IP發生變動的可能性比較大

語法是:ip:IP信息:權限信息

創建一個節點

create /ip2 ip

設置IP訪問權限

setAcl /ip2 ip:192.168.31.139:r

Java連接zookeeper

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId
    <version>3.4.7</version>
</dependency>
/**
 * 連接ZK測試
 */
public class ConnTest {

    public static void main(String[] args) {
        try {
            // 建立連接
            ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 1000 * 10, null);
            // 創建節點
            zooKeeper.create("/connTest", "connTest".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            byte[] result = zooKeeper.getData("/connTest", false, new Stat());
            System.out.println(new String(result));
            
            //zooKeeper.delete("/connTest", -1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

使用curator客戶端更方便操作zookeeper

        <!--<dependency>-->
            <!--<groupId>org.apache.zookeeper</groupId>-->
            <!--<artifactId>zookeeper</artifactId>-->
            <!--<version>3.4.7</version>-->
        <!--</dependency>-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.4.0</version>
        </dependency>    
/**
 * zk客戶端
 * 測試監聽zk節點變化
 */
public class CuratorTest {

    public static void main(String[] args) throws Exception {
        // 1 重試策略:初試時間為1s 重試10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        // 2 通過工廠創建連接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString("localhost:2181")
                .sessionTimeoutMs(1000 * 10)
                .retryPolicy(retryPolicy)
                .build();
        // 3 開啟連接
        cf.start();
        
//        cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/yjh/aa","aa內容".getBytes());
//        System.out.println(new String(cf.getData().forPath("/yjh/aa")));
//        cf.setData().forPath("/yjh/aa", "修改aa內容".getBytes());
//        System.out.println(new String(cf.getData().forPath("/yjh/aa")));
//        cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/yjh");
        
//        if (cf.checkExists().forPath("/tt") == null) {
//            cf.create().creatingParentsIfNeeded().forPath("/tt","tt".getBytes());
//        }
//        byte[] data = cf.getData().usingWatcher(new Watcher() {  
//            public void process(WatchedEvent event) {
//                System.out.println("節點監聽器 : " + event.getType().getIntValue() + "\t" + event.getPath());  
//            }  
//        }).forPath("/tt");
//        System.out.println(new String(data));
        
         ExecutorService pool = Executors.newFixedThreadPool(2);
            
         final NodeCache nodeCache = new NodeCache(cf, "/test", false);
         nodeCache.start(true);
         nodeCache.getListenable().addListener(
            new NodeCacheListener() {
                public void nodeChanged() throws Exception {
                    System.out.println(nodeCache.getCurrentData().getPath() + "數據改變了, 新的數據是: " +
                        new String(nodeCache.getCurrentData().getData()));
                }
            }, 
            pool
            );
        Thread.sleep(Integer.MAX_VALUE);
    }

}

zookeeper分布式鎖

/**
 * zk分布式鎖
 */
public class LockTest {
    
    static int count = 2;

    public static void main(String[] args) {
        // 1 重試策略:初試時間為1s 重試10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        // 2 通過工廠創建連接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString("localhost:2181")
                .sessionTimeoutMs(1000 * 10)
                .retryPolicy(retryPolicy)
                .build();
        // 3 開啟連接
        cf.start();
        final InterProcessMutex lock = new InterProcessMutex(cf, "/mylock");
        final CountDownLatch latch = new CountDownLatch(1);
        ExecutorService pool = Executors.newFixedThreadPool(20);
        for (int i = 0; i < 20; i++) {
            pool.execute(() -> {
                try {
                    System.err.println(1);
                    latch.await();
                    lock.acquire();
                    Thread.sleep(100);
                    //synchronized (LockTest.class) {
                        if (count > 0) {
                            count--;
                            lock.release();
                            System.out.println(count);
                        }
                        
                    //}
                    
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        System.out.println("開始執行");
        latch.countDown();
        pool.shutdown();
    }

}

zookeeper作為服務註冊中心

首先編寫一個服務註冊中心類,監聽服務的註冊與停止

/**
 * 測試服務註冊中心
 * 監聽服務註冊與停止
 */
public class ServiceClient {

    public static void main(String[] args) {
        // client.get("http://localhost/get");
        // client.get("http://localhost/get2");
        // client.get("nginx地址"); --->   get./get2
        
        // 從zk中獲取服務地址列表,選擇一個進行請求,本地執行負載均衡
        
        // 1 重試策略:初試時間為1s 重試10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        // 2 通過工廠創建連接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString("localhost:2181")
                .sessionTimeoutMs(1000 * 10)
                .retryPolicy(retryPolicy)
                .build();
        // 3 開啟連接
        cf.start();

        // 開始監聽
        try {
            final PathChildrenCache childrenCache = new PathChildrenCache(cf, "/service", true);
            childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
            childrenCache.getListenable().addListener(
                new PathChildrenCacheListener() {
                    @Override
                    public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
                            throws Exception {
                            switch (event.getType()) {
                            case CHILD_ADDED:
                                System.out.println("CHILD_ADDED: " + event.getData().getPath());
                                break;
                            case CHILD_REMOVED:
                                System.out.println("CHILD_REMOVED: " + event.getData().getPath());
                                break;
                            case CHILD_UPDATED:
                                System.out.println("CHILD_UPDATED: " + event.getData().getPath());
                                break;
                            default:
                                break;
                        }
                    }
                }
            );
            List<String> urls = cf.getChildren().forPath("/service");
            for (String url : urls) {
                System.out.println(url);
            }
            Thread.sleep(200000000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

另外編寫兩個服務類,用於模擬兩個服務,分別往zookeeper創建節點,相當於註冊服務,由上面的服務註冊中心進行監聽

用戶服務:

/**
 * 測試服務註冊中心
 * 模擬用戶服務
 */
public class UserServiceApplication {

    public static void main(String[] args) {
        // 1 重試策略:初試時間為1s 重試10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        // 2 通過工廠創建連接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString("localhost:2181")
                .sessionTimeoutMs(1000 * 10)
                .retryPolicy(retryPolicy)
                .build();
        // 3 開啟連接
        cf.start();
        try {
            cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/service/192.168.1.1", "".getBytes());
            Thread.sleep(200000000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

訂單服務:

/**
 * 測試服務註冊中心
 * 模擬訂單服務
 */
public class OrderServiceApplication {

    public static void main(String[] args) {
        // 1 重試策略:初試時間為1s 重試10次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
        // 2 通過工廠創建連接
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString("localhost:2181")
                .sessionTimeoutMs(1000 * 10)
                .retryPolicy(retryPolicy)
                .build();
        // 3 開啟連接
        cf.start();
        try {
            cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/service/192.168.1.2", "".getBytes());
            Thread.sleep(200000000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

然後分別啟動上面這三個類,服務註冊中心就可以監聽用戶服務和訂單服務的註冊和停止了。

ps:編寫一個簡單的zookeeper操作工具類

/**
 * Created by 唐哲
 * 2018-06-24 16:26
 * zk操作工具類
 */
public class ZkUtils {

    private static final Integer ZK_SESSION_TIMEOUT = 10 * 1000;
    private static CountDownLatch latch = new CountDownLatch(1);
    private static ZkUtils instance = new ZkUtils();
    private static ZooKeeper zk;

    public synchronized static ZkUtils getInstance(String host, int port) {
        if (zk == null) {
            connect(host, port);
        }
        return instance;
    }

    private static void connect(String host, int port) {
        String connectString = host + ":" + port;
        try {
            zk = new ZooKeeper(connectString, ZK_SESSION_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("已經觸發了" + event.getType() + "事件!");
                    // 判斷是否已連接ZK, 連接後計數器遞減
                    if (event.getState() == Event.KeeperState.SyncConnected) {
                        latch.countDown();
                    }
                }
            });
            // 若計數器不為0, 則等待
            latch.await();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String addNode(String nodeName) {
        Stat stat;
        try {
            stat = zk.exists(nodeName, false);
            if (stat == null) {
                return zk.create(nodeName, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public String addNode(String nodeName, String data) {
        Stat stat;
        try {
            stat = zk.exists(nodeName, false);
            if (stat == null) {
                return zk.create(nodeName, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public String addNode(String nodeName, String data, List<ACL> acl, CreateMode createMode) {
        Stat stat;
        try {
            stat = zk.exists(nodeName, false);
            if (stat == null) {
                return zk.create(nodeName, data.getBytes(), acl, createMode);
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public void removeNode(String nodeName) {
        try {
            zk.delete(nodeName, -1);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }

    public void removeNode(String nodeName, int version) {
        try {
            zk.delete(nodeName, version);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }

    public String setData(String nodeName, String data) {
        Stat stat;
        try {
            stat = zk.exists(nodeName, false);
            if (stat != null) {
                zk.setData(nodeName, data.getBytes(), -1);
                return data;
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 監控數據節點變化
     */
    public void monitorDataUpdate(String nodeName) {
        try {
            zk.getData(nodeName, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    // 節點的值有修改
                    if(event.getType() == EventType.NodeDataChanged) {
                        System.out.println(nodeName + "修改了值" + event.getPath());
                        // 觸發一次就失效,所以需要遞歸註冊
                        monitorDataUpdate(nodeName);
                    }
                }
            }, new Stat());
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ZkUtils zkUtils = ZkUtils.getInstance("localhost", 2181);
        //zkUtils.removeNode("/test");
        
//        String result = zkUtils.addNode("/test");
//        System.out.println(result);
//
//        result = zkUtils.addNode("/test", "10");
//        System.out.println(result);
        String result = zkUtils.setData("/test", "hello");
        System.out.println(result);

        zkUtils.monitorDataUpdate("/test");

        CountDownLatch countDownLatch = new CountDownLatch(1);
        countDownLatch.await();
    }

}

zookeeper學習筆記