1. 程式人生 > >Zookeeper客戶端Curator使用指南

Zookeeper客戶端Curator使用指南

what is Curator ?

Curator是zookeeper分散式協調服務的java客戶端庫,它包裝了一系列操作zk的高階API和實用庫,是的操作zk變得更加容易和可靠。例如使用原生zk的API實現分散式鎖的話,程式碼量多,複雜,使用Curator後就相對簡單的多,很多底層的api都直接封裝好了,開箱即用,學習成本低。

Getting Started

1、使用Curator之前,你需要引入maven依賴

<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-framework</artifactId>
   <version>2.8.0</version>
</dependency>

<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-recipes</artifactId>
   <version>2.8.0</version>
</dependency>

2、例項化Curator,你可以通過CuratorFrameworkFactory類提供的來產生一個CuratorFramework物件

CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy)

zookeeperConnectionString就是連線的ip:埠資訊,retryPolicy是重試策略,Curator提供了三種常用的重試策略,這裡不詳述

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();

或者你也可以使用鏈式呼叫來例項化curator

CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
        .connectString(zookeeperConnectionString)
        .sessionTimeoutMs(5000)
        .connectionTimeoutMs(5000)
        .retryPolicy(retryPolicy)
        .build();

curatorFramework.start();

How to use Curator API operate ZK ?

curator操作zk的api主要包括 節點的增刪改查、節點判斷、節點監聽等,下面的程式碼演示瞭如何使用基本的curator api

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* 類描述:zookeeper客戶端curator使用demo
*/
public class CuratorDemo {

    private static final Logger logger = LoggerFactory.getLogger(CuratorDemo.class);

    private static final String NODE_PATH = "/node_8";
    private static final String CONNECT_TOSTRING = "10.200.121.46:2181";

    /*建立執行緒池,供給非同步使用curator時呼叫*/
    public static ExecutorService executorService = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        try {
        
/*重試策略一:重試三次,每重試一次,重試的間隔時間會越來越大

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
*/
/*重試策略二:最多重試三次,每次重試間隔1s

        RetryPolicy retryPolicy1 = new RetryNTimes(3,1000);
*/

/*重試策略三:最大重試時間總和不超過5s,每次重試間隔為1s*/
            RetryPolicy retryPolicy2 = new RetryUntilElapsed(5000, 1000);

/*
        
/*      方式一建立zookeeper連線
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(CONNECT_TOSTRING,5000,5000,retryPolicy2);
*/

            /*方式二建立zookeeper連線*/
            CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                    .connectString(CONNECT_TOSTRING)
                    .sessionTimeoutMs(5000)
                    .connectionTimeoutMs(5000)
                    .retryPolicy(retryPolicy2)
                    .build();

            curatorFramework.start();

        /*建立節點資料*/        
 curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(NODE_PATH, "456".getBytes());

        /*刪除節點(包含子節點)*/            curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath(NODE_PATH);

        /*獲取子節點*/
        List<String> strings = curatorFramework.getChildren().forPath(NODE_PATH);

        /*獲取節點資料內容*/
        byte[] bytes = curatorFramework.getData().forPath(NODE_PATH);
        System.out.println(new String(bytes));
        
       /*獲取節點資料內容+狀態資訊*/
       Stat stat = new Stat();
        byte[] result = curatorFramework.getData().storingStatIn(stat).forPath(NODE_PATH);
        System.out.println(new String(result));

        /*修改節點資料內容*/
        curatorFramework.setData().forPath(NODE_PATH, "123".getBytes());

       /*判斷節點是否存在*/
        Stat stat1 = curatorFramework.checkExists().forPath(NODE_PATH);

        /*非同步操作,以判斷節點是否存在為例,注意使用執行緒池以便節省單個執行緒的建立銷燬開銷,及最後執行緒的關閉*/
        curatorFramework.checkExists().inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {

                    Object context = curatorEvent.getContext();   //這裡的上下文就是 傳遞進去的"123456"

                }
            }, "12345", executorService).forPath(NODE_PATH);

          /*設定節點事件監聽*/
          final NodeCache nodeCache = new NodeCache(curatorFramework, NODE_PATH);
            nodeCache.start();
            nodeCache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    byte[] result = nodeCache.getCurrentData().getData();
                    logger.info("事件監聽result=" + new String(result));
                }
            });

            /*設定子節點事件監聽*/
            final PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, NODE_PATH, true);
            childrenCache.start();
            childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                    PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
                    switch (type) {
                        case CHILD_ADDED:
                            logger.info("");
                        case CHILD_UPDATED:
                            logger.info("");
                        case CHILD_REMOVED:
                            logger.info("");
                        default:
                            break;
                    }
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }

}

使用Curator實現Master選舉

Curator中包裝的master選舉包含兩種,Leader Latch和Leader Election,原理採用的是zk的節點特性,即多個客戶端同時建立同一節點,zk保證只有一個客戶端能建立成功,成功的客戶端即為master節點,再master節點的機器上執行業務。Leader  Latcher裡面即包裝了zk建立節點、設定監聽,對應的操作均包裝在LeaderLatch類中。LeaderLatch將隨機選出一個master

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
* 類描述:基於Curator的Leader Latch實現的master選舉
* 建立人:simonsfan
*/
@Component
public class CuratorLeaderLatch {

    private static CuratorFramework curatorFramework;

    private static LeaderLatch leaderLatch;

    private static final String path = "/root/leaderlatch";

    private static final String connectStr = "10.200.121.46:2181,10.200.121.159:2181,10.200.121.168:2181";

    static {
        curatorFramework = CuratorFrameworkFactory
                .builder()
                .connectString(connectStr)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .connectionTimeoutMs(5000)
                .sessionTimeoutMs(5000)
                .build();

        curatorFramework.start();
        leaderLatch = new LeaderLatch(curatorFramework, path.concat("/testtast"));
    }

    @Lazy
    @Scheduled(cron = "")
    public void testTask() {
        //是master節點的執行業務流程,使用leaderLatch.hasLeadership()方法判斷是否為leader,true表示是master節點
        if (!leaderLatch.hasLeadership()) return;
        //TODO something

    }
    
}

使用Curator實現分散式鎖

這裡講的的是Shared Reentrant Lock(共享可重入鎖,推薦使用,Curator還封裝了其他型別的鎖:共享不可重入鎖之類的):全域性同步的、公平的分散式共享重入式鎖,可保證在任意同一時刻,只有一個客戶端持有鎖。使用到的類是InterProcessMutex

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* 類描述:Curator實現的分散式鎖
* 建立人:simonsfan
*/
public class DistributedLock {

    private static CuratorFramework curatorFramework;

    private static InterProcessMutex interProcessMutex;

    private static final String connectString = "10.200.121.46:2181,10.200.121.43:2181,10.200.121.167:2181";

    private static final String root = "/root";

    private static ExecutorService executorService;

    private String lockName;

    public String getLockName() {
        return lockName;
    }

    public void setLockName(String lockName) {
        this.lockName = lockName;
    }

    static {
        curatorFramework = CuratorFrameworkFactory.builder().connectString(connectString).connectionTimeoutMs(5000).sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        executorService = Executors.newCachedThreadPool();
        curatorFramework.start();
    }

    public DistributedLock(String lockName) {
        this.lockName = lockName;
        interProcessMutex = new InterProcessMutex(curatorFramework, root.concat(lockName));
    }

    /*上鎖*/
    public void tryLock() {
        int count = 0;
        try {
            while (!interProcessMutex.acquire(1, TimeUnit.SECONDS)) {
                count++;
                if (count > 3) {
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /*釋放*/
    public void releaseLock() {
        try {
            if (interProcessMutex != null) {
                interProcessMutex.release();
            }
            curatorFramework.delete().inBackground(new BackgroundCallback() {
                @Override
                public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {

                }
            }, executorService).forPath(root.concat(lockName));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}