1. 程式人生 > >zookeeper之Curator框架(CRUD/事務/選舉/鎖)的使用

zookeeper之Curator框架(CRUD/事務/選舉/鎖)的使用

Curator框架是最好用,最流行的zookeeper的客戶端。

它有以下三個優點

1.提供了一套非常友好的操作API;

2. 提供一些高階特性(包括但不僅限於前篇文章中提到的)的封裝

3.易測試

maven依賴如下

[html] view plaincopyprint?在CODE上檢視程式碼片派生到我的程式碼片
  1. <dependency>
  2.     <groupId>org.apache.curator</groupId>
  3.     <artifactId>curator-recipes</artifactId>
  4.     <version>2.5.0</version
    >
  5. </dependency>
<dependency>  
    <groupId>org.apache.curator</groupId>  
    <artifactId>curator-recipes</artifactId>  
    <version>2.5.0</version>  
</dependency>  

按照官方給出的文件和包結構,可以輕鬆的看出Curator功能分兩大類,一是對zookeeper的一些基本命令的封裝,比如增刪改查。是他的framework模組,一個是他的高階特性,即recipes模組。

一、framework模組

Curator提供了一套Fluent風格的操作API。這在很多指令碼類語言裡比較流行。

比如他建立client的程式碼是這樣

[java] view plaincopyprint?在CODE上檢視程式碼片派生到我的程式碼片
  1. CuratorFramework client = builder.connectString("192.168.11.56:2180")    
  2.         .sessionTimeoutMs(30000)    
  3.         .connectionTimeoutMs(30000)    
  4.         .canBeReadOnly(false)    
  5.         .retryPolicy(new
     ExponentialBackoffRetry(1000, Integer.MAX_VALUE))    
  6.         .namespace(namespace)    
  7.         .defaultData(null)    
  8.         .build();    
  9. client.start();    
CuratorFramework client = builder.connectString("192.168.11.56:2180")  
        .sessionTimeoutMs(30000)  
        .connectionTimeoutMs(30000)  
        .canBeReadOnly(false)  
        .retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))  
        .namespace(namespace)  
        .defaultData(null)  
        .build();  
client.start();  

 一路點到底,這就是所謂的Fluent風格。 

我們再看增刪改查的

[java] view plaincopyprint?在CODE上檢視程式碼片派生到我的程式碼片
  1. publicclass CrudExamples {    
  2.     privatestatic CuratorFramework client = ClientFactory.newClient();    
  3.     privatestaticfinal String PATH = "/crud";    
  4.     publicstaticvoid main(String[] args) {    
  5.         try {    
  6.             client.start();    
  7.             client.create().forPath(PATH, "I love messi".getBytes());    
  8.             byte[] bs = client.getData().forPath(PATH);    
  9.             System.out.println("新建的節點,data為:" + new String(bs));    
  10.             client.setData().forPath(PATH, "I love football".getBytes());    
  11.             // 由於是在background模式下獲取的data,此時的bs可能為null  
  12.             byte[] bs2 = client.getData().watched().inBackground().forPath(PATH);    
  13.             System.out.println("修改後的data為" + new String(bs2 != null ? bs2 : newbyte[0]));    
  14.             client.delete().forPath(PATH);    
  15.             Stat stat = client.checkExists().forPath(PATH);    
  16.             // Stat就是對zonde所有屬性的一個對映, stat=null表示節點不存在!  
  17.             System.out.println(stat);    
  18.         } catch (Exception e) {    
  19.             e.printStackTrace();    
  20.         } finally {    
  21.             CloseableUtils.closeQuietly(client);    
  22.         }    
  23.     }    
  24. }    
public class CrudExamples {  
    private static CuratorFramework client = ClientFactory.newClient();  
    private static final String PATH = "/crud";  
  
    public static void main(String[] args) {  
        try {  
            client.start();  
  
            client.create().forPath(PATH, "I love messi".getBytes());  
  
            byte[] bs = client.getData().forPath(PATH);  
            System.out.println("新建的節點,data為:" + new String(bs));  
  
            client.setData().forPath(PATH, "I love football".getBytes());  
  
            // 由於是在background模式下獲取的data,此時的bs可能為null  
            byte[] bs2 = client.getData().watched().inBackground().forPath(PATH);  
            System.out.println("修改後的data為" + new String(bs2 != null ? bs2 : new byte[0]));  
  
            client.delete().forPath(PATH);  
            Stat stat = client.checkExists().forPath(PATH);  
  
            // Stat就是對zonde所有屬性的一個對映, stat=null表示節點不存在!  
            System.out.println(stat);  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            CloseableUtils.closeQuietly(client);  
        }  
    }  
}  

 常用介面有

create()增

delete(): 刪

checkExists(): 判斷是否存在

setData():  改

getData(): 查

所有這些方法都以forpath()結尾,輔以watch(監聽),withMode(指定模式),和inBackground(後臺執行)等方法來使用。

此外,Curator還支援事務,一組crud操作同生同滅。程式碼如下

  1. /** 
  2.  * 事務操作 
  3.  *  
  4.  * @author shencl 
  5.  */  
  6. public class TransactionExamples {  
  7.     private static CuratorFramework client = ClientFactory.newClient();  
  8.     public static void main(String[] args) {  
  9.         try {  
  10.             client.start();  
  11.             // 開啟事務  
  12.             CuratorTransaction transaction = client.inTransaction();  
  13.             Collection<CuratorTransactionResult> results = transaction.create()  
  14.                     .forPath("/a/path""some data".getBytes()).and().setData()  
  15.                     .forPath("/another/path""other data".getBytes()).and().delete().forPath("/yet/another/path")  
  16.                     .and().commit();  
  17.             for (CuratorTransactionResult result : results) {  
  18.                 System.out.println(result.getForPath() + " - " + result.getType());  
  19.             }  
  20.         } catch (Exception e) {  
  21.             e.printStackTrace();  
  22.         } finally {  
  23.             // 釋放客戶端連線  
  24.             CloseableUtils.closeQuietly(client);  
  25.         }  
  26.     }  
  27. }  

 這段的程式碼的執行結果,由於最後一步delete的節點不存在,所以整個事務commit失敗。失敗的原因會放在Collection<CuratorTransactionResult>中,非常友好。

好了framework部分的內容就這麼多,是不是特別簡單呢。下面就來看看recipes包的內容吧。。

Recipes部分提供的功能官網列的很詳細,點選這裡。注意文章第一段:Curator宣稱,Recipes模組實現了除二階段提交之外的所有zookeeper特性。

二、Recipes模組

主要有

Elections(選舉),Locks(鎖),Barriers(關卡),Atomic(原子量),Caches,Queues等

1、 Elections

選舉主要依賴於LeaderSelector和LeaderLatch2個類。前者是所有存活的客戶端不間斷的輪流做Leader,大同社會。後者是一旦選舉出Leader,除非有客戶端掛掉重新觸發選舉,否則不會交出領導權。某黨?

這兩者在實現上是可以切換的,直接上程式碼,怎麼切換註釋裡有。由於篇幅所限,這裡僅貼出基於LeaderSelector的選舉,更多程式碼見附件


[java] view plaincopyprint?在CODE上檢視程式碼片派生到我的程式碼片
  1. /**  
  2.  * 本類基於leaderSelector實現,所有存活的client會公平的輪流做leader  
  3.  * 如果不想頻繁的變化Leader,需要在takeLeadership方法裡阻塞leader的變更! 或者使用 {@link}  
  4.  * LeaderLatchClient  
  5.  */
  6. publicclass LeaderSelectorClient extends LeaderSelectorListenerAdapter implements Closeable {    
  7.     privatefinal String name;    
  8.     privatefinal LeaderSelector leaderSelector;    
  9.     privatefinal String PATH = "/leaderselector";    
  10.     public LeaderSelectorClient(CuratorFramework client, String name) {    
  11.         this.name = name;    
  12.         leaderSelector = new LeaderSelector(client, PATH, this);    
  13.         leaderSelector.autoRequeue();    
  14.     }    
  15.     publicvoid start() throws IOException {    
  16.         leaderSelector.start();    
  17.     }    
  18.     @Override
  19.     publicvoid close() throws IOException {    
  20.         leaderSelector.close();    
  21.     }    
  22.     /**  
  23.      * client成為leader後,會呼叫此方法  
  24.      */
  25.     @Override
  26.     publicvoid takeLeadership(CuratorFramework client) throws Exception {    
  27.         int waitSeconds = (int) (5 * Math.random()) + 1;    
  28.         System.out.println(name + "是當前的leader");    
  29.         try {    
  30.             Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));    
  31.         } catch (InterruptedException e) {    
  32.             Thread.currentThread().interrupt();    
  33.         } finally {    
  34.             System.out.println(name + " 讓出領導權\n");    
  35.         }    
  36.     }    
/** 
 * 本類基於leaderSelector實現,所有存活的client會公平的輪流做leader 
 * 如果不想頻繁的變化Leader,需要在takeLeadership方法裡阻塞leader的變更! 或者使用 {@link} 
 * LeaderLatchClient 
 */  
public class LeaderSelectorClient extends LeaderSelectorListenerAdapter implements Closeable {  
    private final String name;  
    private final LeaderSelector leaderSelector;  
    private final String PATH = "/leaderselector";  
  
    public LeaderSelectorClient(CuratorFramework client, String name) {  
        this.name = name;  
        leaderSelector = new LeaderSelector(client, PATH, this);  
        leaderSelector.autoRequeue();  
    }  
  
    public void start() throws IOException {  
        leaderSelector.start();  
    }  
  
    @Override  
    public void close() throws IOException {  
        leaderSelector.close();  
    }  
  
    /** 
     * client成為leader後,會呼叫此方法 
     */  
    @Override  
    public void takeLeadership(CuratorFramework client) throws Exception {  
        int waitSeconds = (int) (5 * Math.random()) + 1;  
        System.out.println(name + "是當前的leader");  
        try {  
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));  
        } catch (InterruptedException e) {  
            Thread.currentThread().interrupt();  
        } finally {  
            System.out.println(name + " 讓出領導權\n");  
        }  
    }  

[java] view plaincopyprint?在CODE上檢視程式碼片派生到我的程式碼片
  1. /**  
  2.  * leader選舉  
  3.  *   
  4.  * @author shencl  
  5.  */
  6. publicclass LeaderSelectorExample {    
  7.     publicstaticvoid main(String[] args) {    
  8.         List<CuratorFramework> clients = Lists.newArrayList();    
  9.         List<LeaderSelectorClient> examples = Lists.newArrayList();    
  10.         try {    
  11.             for (int i = 0; i < 10; i++) {    
  12.                 CuratorFramework client = ClientFactory.newClient();    
  13.                 LeaderSelectorClient example = new LeaderSelectorClient(client, "Client #" + i);    
  14.                 clients.add(client);    
  15.                 examples.add(example);    
  16.                 client.start();    
  17.                 example.start();    
  18.             }    
  19.             System.out.println("----------先觀察一會選舉的結果-----------");    
  20.             Thread.sleep(10000);    
  21.             System.out.println("----------關閉前5個客戶端,再觀察選舉的結果-----------");    
  22.             for (int i = 0; i < 5; i++) {    
  23.                 clients.get(i).close();    
  24.             }    
  25.             // 這裡有個小技巧,讓main程式一直監聽控制檯輸入,非同步的程式碼就可以一直在執行。不同於while(ture)的是,按回車或esc可退出  
  26.             new BufferedReader(new InputStreamReader(System.in)).readLine();    
  27.         } catch (Exception e) {    
  28.             e.printStackTrace();    
  29.         } finally {    
  30.             for (LeaderSelectorClient exampleClient : examples) {    
  31.                 CloseableUtils.closeQuietly(exampleClient);    
  32.             }    
  33.             for (CuratorFramework client : clients) {    
  34.                 CloseableUtils.closeQuietly(client);    
  35.             }    
  36.         }    
  37.     }    
  38. }    
/** 
 * leader選舉 
 *  
 * @author shencl 
 */  
public class LeaderSelectorExample {  
  
    public static void main(String[] args) {  
  
        List<CuratorFramework> clients = Lists.newArrayList();  
        List<LeaderSelectorClient> examples = Lists.newArrayList();  
        try {  
            for (int i = 0; i < 10; i++) {  
                CuratorFramework client = ClientFactory.newClient();  
                LeaderSelectorClient example = new LeaderSelectorClient(client, "Client #" + i);  
                clients.add(client);  
                examples.add(example);  
  
                client.start();  
                example.start();  
            }  
  
            System.out.println("----------先觀察一會選舉的結果-----------");  
            Thread.sleep(10000);  
  
            System.out.println("----------關閉前5個客戶端,再觀察選舉的結果-----------");  
            for (int i = 0; i < 5; i++) {  
                clients.get(i).close();  
            }  
  
            // 這裡有個小技巧,讓main程式一直監聽控制檯輸入,非同步的程式碼就可以一直在執行。不同於while(ture)的是,按回車或esc可退出  
            new BufferedReader(new InputStreamReader(System.in)).readLine();  
  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            for (LeaderSelectorClient exampleClient : examples) {  
                CloseableUtils.closeQuietly(exampleClient);  
            }  
            for (CuratorFramework client : clients) {  
                CloseableUtils.closeQuietly(client);  
            }  
        }  
    }  
}  

2、locks

curator lock相關的實現在recipes.locks包裡。頂級介面都是InterProcessLock。我們直接看最有代表性的InterProcessReadWriteLock 程序內部讀寫鎖(可重入讀寫鎖)。什麼叫可重入,什麼叫讀寫鎖。不清楚的先查好資料吧。總之讀寫鎖一定是成對出現的。    簡易傳送門

我們先定義兩個任務,可並行的執行的,和互斥執行的。

[java] view plaincopyprint?在CODE上檢視程式碼片派生到我的程式碼片
  1. /**  
  2.  * 並行任務  
  3.  *   
  4.  * @author shencl  
  5.  */
  6. publicclass ParallelJob implements Runnable {    
  7.     privatefinal String name;    
  8.     privatefinal InterProcessLock lock;    
  9.     // 鎖等待時間  
  10.     privatefinalint wait_time = 5;    
  11.     ParallelJob(String name, InterProcessLock lock) {    
  12.         this.name = name;    
  13.         this.lock = lock;    
  14.     }    
  15.     @Override
  16.     publicvoid run() {    
  17.         try {    
  18.             doWork();    
  19.         } catch (Exception e) {    
  20.             // ingore;  
  21.         }    
  22.     }    
  23.     publicvoid doWork() throws Exception {    
  24.         try {    
  25.             if (!lock.acquire(wait_time, TimeUnit.SECONDS)) {    
  26.                 System.err.println(name + "等待" + wait_time + "秒,仍未能獲取到lock,準備放棄。");    
  27.             }    
  28.             // 模擬job執行時間0-4000毫秒  
  29.             int exeTime = new Random().nextInt(4000);    
  30.             System.out.println(name + "開始執行,預計執行時間= " + exeTime + "毫秒----------");    
  31.             Thread.sleep(exeTime);    
  32.         } catch (Exception e) {    
  33.             e.printStackTrace();    
  34.         } finally {    
  35.             lock.release();    
  36.         }    
  37.     }    
  38. }    
/** 
 * 並行任務 
 *  
 * @author shencl 
 */  
public class ParallelJob implements Runnable {  
  
    private final String name;  
  
    private final InterProcessLock lock;  
  
    // 鎖等待時間  
    private final int wait_time = 5;  
  
    ParallelJob(String name, InterProcessLock lock) {  
        this.name = name;  
        this.lock = lock;  
    }  
  
    @Override  
    public void run() {  
        try {  
            doWork();  
        } catch (Exception e) {  
            // ingore;  
        }  
    }  
  
    public void doWork() throws Exception {  
        try {  
            if (!lock.acquire(wait_time, TimeUnit.SECONDS)) {  
                System.err.println(name + "等待" + wait_time + "秒,仍未能獲取到lock,準備放棄。");  
            }  
            // 模擬job執行時間0-4000毫秒  
            int exeTime = new Random().nextInt(4000);  
            System.out.println(name + "開始執行,預計執行時間= " + exeTime + "毫秒----------");  
            Thread.sleep(exeTime);  
        } catch (Exception e) {  
            e.printStackTrace();  
        } finally {  
            lock.release();  
        }  
    }  
}  

[java] view plaincopyprint?在CODE上檢視程式碼片派生到我的程式碼片
  1. /**  
  2.  * 互斥任務  
  3.  *   
  4.  * @author shencl  
  5.  */
  6. publicclass MutexJob implements Runnable {    
  7.     privatefinal String name;    
  8.     privatefinal InterProcessLock lock;    
  9.     // 鎖等待時間  
  10.     privatefinalint wait_time = 10;    
  11.     MutexJob(String name, InterProcessLock lock) {    
  12.         this.name = name;    
  13.         this.lock = lock;    
  14.     }    
  15.     @Override
  16.     publicvoid run() {    
  17.         try {    
  18.             doWork();    
  19.         } catch (Exception e) {    
  20.             // ingore;  
  21.         }    
  22.     }    
  23.     publicvoid doWork() throws Exception {    
  24.         try {    
  25.             if (!lock.acquire(wait_time, TimeUnit.SECONDS)) {    
  26.                 System.err.println(name + "等待" + wait_time + "秒,仍未能獲取到lock,準備放棄。");    
  27.             }    
  28.             // 模擬job執行時間0-2000毫秒  
  29.             int exeTime = new Random().nextInt(2000);    
  30.             System.out.println(name + "開始執行,預計執行時間= " + exeTime + "毫秒----------"<