zookeeper之Curator框架(CRUD/事務/選舉/鎖)的使用
它有以下三個優點
1.提供了一套非常友好的操作API;
2. 提供一些高階特性(包括但不僅限於前篇文章中提到的)的封裝
3.易測試
maven依賴如下
[html] view plaincopyprint?- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>2.5.0</version
- </dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.5.0</version>
</dependency>
按照官方給出的文件和包結構,可以輕鬆的看出Curator功能分兩大類,一是對zookeeper的一些基本命令的封裝,比如增刪改查。是他的framework模組,一個是他的高階特性,即recipes模組。
一、framework模組
Curator提供了一套Fluent風格的操作API。這在很多指令碼類語言裡比較流行。
比如他建立client的程式碼是這樣
[java] view plaincopyprint?- CuratorFramework client = builder.connectString("192.168.11.56:2180")
- .sessionTimeoutMs(30000)
- .connectionTimeoutMs(30000)
- .canBeReadOnly(false)
- .retryPolicy(new
- .namespace(namespace)
- .defaultData(null)
- .build();
- client.start();
CuratorFramework client = builder.connectString("192.168.11.56:2180")
.sessionTimeoutMs(30000)
.connectionTimeoutMs(30000)
.canBeReadOnly(false)
.retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
.namespace(namespace)
.defaultData(null)
.build();
client.start();
一路點到底,這就是所謂的Fluent風格。
我們再看增刪改查的
[java] view plaincopyprint?- publicclass CrudExamples {
- privatestatic CuratorFramework client = ClientFactory.newClient();
- privatestaticfinal String PATH = "/crud";
- publicstaticvoid main(String[] args) {
- try {
- client.start();
- client.create().forPath(PATH, "I love messi".getBytes());
- byte[] bs = client.getData().forPath(PATH);
- System.out.println("新建的節點,data為:" + new String(bs));
- client.setData().forPath(PATH, "I love football".getBytes());
- // 由於是在background模式下獲取的data,此時的bs可能為null
- byte[] bs2 = client.getData().watched().inBackground().forPath(PATH);
- System.out.println("修改後的data為" + new String(bs2 != null ? bs2 : newbyte[0]));
- client.delete().forPath(PATH);
- Stat stat = client.checkExists().forPath(PATH);
- // Stat就是對zonde所有屬性的一個對映, stat=null表示節點不存在!
- System.out.println(stat);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- CloseableUtils.closeQuietly(client);
- }
- }
- }
public class CrudExamples {
private static CuratorFramework client = ClientFactory.newClient();
private static final String PATH = "/crud";
public static void main(String[] args) {
try {
client.start();
client.create().forPath(PATH, "I love messi".getBytes());
byte[] bs = client.getData().forPath(PATH);
System.out.println("新建的節點,data為:" + new String(bs));
client.setData().forPath(PATH, "I love football".getBytes());
// 由於是在background模式下獲取的data,此時的bs可能為null
byte[] bs2 = client.getData().watched().inBackground().forPath(PATH);
System.out.println("修改後的data為" + new String(bs2 != null ? bs2 : new byte[0]));
client.delete().forPath(PATH);
Stat stat = client.checkExists().forPath(PATH);
// Stat就是對zonde所有屬性的一個對映, stat=null表示節點不存在!
System.out.println(stat);
} catch (Exception e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
}
}
常用介面有
create()增
delete(): 刪
checkExists(): 判斷是否存在
setData(): 改
getData(): 查
所有這些方法都以forpath()結尾,輔以watch(監聽),withMode(指定模式),和inBackground(後臺執行)等方法來使用。
此外,Curator還支援事務,一組crud操作同生同滅。程式碼如下- /**
- * 事務操作
- *
- * @author shencl
- */
- public class TransactionExamples {
- private static CuratorFramework client = ClientFactory.newClient();
- public static void main(String[] args) {
- try {
- client.start();
- // 開啟事務
- CuratorTransaction transaction = client.inTransaction();
- Collection<CuratorTransactionResult> results = transaction.create()
- .forPath("/a/path", "some data".getBytes()).and().setData()
- .forPath("/another/path", "other data".getBytes()).and().delete().forPath("/yet/another/path")
- .and().commit();
- for (CuratorTransactionResult result : results) {
- System.out.println(result.getForPath() + " - " + result.getType());
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- // 釋放客戶端連線
- CloseableUtils.closeQuietly(client);
- }
- }
- }
這段的程式碼的執行結果,由於最後一步delete的節點不存在,所以整個事務commit失敗。失敗的原因會放在Collection<CuratorTransactionResult>中,非常友好。
好了framework部分的內容就這麼多,是不是特別簡單呢。下面就來看看recipes包的內容吧。。
Recipes部分提供的功能官網列的很詳細,點選這裡。注意文章第一段:Curator宣稱,Recipes模組實現了除二階段提交之外的所有zookeeper特性。
二、Recipes模組
主要有
Elections(選舉),Locks(鎖),Barriers(關卡),Atomic(原子量),Caches,Queues等
1、 Elections
選舉主要依賴於LeaderSelector和LeaderLatch2個類。前者是所有存活的客戶端不間斷的輪流做Leader,大同社會。後者是一旦選舉出Leader,除非有客戶端掛掉重新觸發選舉,否則不會交出領導權。某黨?
這兩者在實現上是可以切換的,直接上程式碼,怎麼切換註釋裡有。由於篇幅所限,這裡僅貼出基於LeaderSelector的選舉,更多程式碼見附件
[java] view plaincopyprint?
- /**
- * 本類基於leaderSelector實現,所有存活的client會公平的輪流做leader
- * 如果不想頻繁的變化Leader,需要在takeLeadership方法裡阻塞leader的變更! 或者使用 {@link}
- * LeaderLatchClient
- */
- publicclass LeaderSelectorClient extends LeaderSelectorListenerAdapter implements Closeable {
- privatefinal String name;
- privatefinal LeaderSelector leaderSelector;
- privatefinal String PATH = "/leaderselector";
- public LeaderSelectorClient(CuratorFramework client, String name) {
- this.name = name;
- leaderSelector = new LeaderSelector(client, PATH, this);
- leaderSelector.autoRequeue();
- }
- publicvoid start() throws IOException {
- leaderSelector.start();
- }
- @Override
- publicvoid close() throws IOException {
- leaderSelector.close();
- }
- /**
- * client成為leader後,會呼叫此方法
- */
- @Override
- publicvoid takeLeadership(CuratorFramework client) throws Exception {
- int waitSeconds = (int) (5 * Math.random()) + 1;
- System.out.println(name + "是當前的leader");
- try {
- Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- System.out.println(name + " 讓出領導權\n");
- }
- }
/**
* 本類基於leaderSelector實現,所有存活的client會公平的輪流做leader
* 如果不想頻繁的變化Leader,需要在takeLeadership方法裡阻塞leader的變更! 或者使用 {@link}
* LeaderLatchClient
*/
public class LeaderSelectorClient extends LeaderSelectorListenerAdapter implements Closeable {
private final String name;
private final LeaderSelector leaderSelector;
private final String PATH = "/leaderselector";
public LeaderSelectorClient(CuratorFramework client, String name) {
this.name = name;
leaderSelector = new LeaderSelector(client, PATH, this);
leaderSelector.autoRequeue();
}
public void start() throws IOException {
leaderSelector.start();
}
@Override
public void close() throws IOException {
leaderSelector.close();
}
/**
* client成為leader後,會呼叫此方法
*/
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
int waitSeconds = (int) (5 * Math.random()) + 1;
System.out.println(name + "是當前的leader");
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
System.out.println(name + " 讓出領導權\n");
}
}
[java] view plaincopyprint?
- /**
- * leader選舉
- *
- * @author shencl
- */
- publicclass LeaderSelectorExample {
- publicstaticvoid main(String[] args) {
- List<CuratorFramework> clients = Lists.newArrayList();
- List<LeaderSelectorClient> examples = Lists.newArrayList();
- try {
- for (int i = 0; i < 10; i++) {
- CuratorFramework client = ClientFactory.newClient();
- LeaderSelectorClient example = new LeaderSelectorClient(client, "Client #" + i);
- clients.add(client);
- examples.add(example);
- client.start();
- example.start();
- }
- System.out.println("----------先觀察一會選舉的結果-----------");
- Thread.sleep(10000);
- System.out.println("----------關閉前5個客戶端,再觀察選舉的結果-----------");
- for (int i = 0; i < 5; i++) {
- clients.get(i).close();
- }
- // 這裡有個小技巧,讓main程式一直監聽控制檯輸入,非同步的程式碼就可以一直在執行。不同於while(ture)的是,按回車或esc可退出
- new BufferedReader(new InputStreamReader(System.in)).readLine();
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- for (LeaderSelectorClient exampleClient : examples) {
- CloseableUtils.closeQuietly(exampleClient);
- }
- for (CuratorFramework client : clients) {
- CloseableUtils.closeQuietly(client);
- }
- }
- }
- }
/**
* leader選舉
*
* @author shencl
*/
public class LeaderSelectorExample {
public static void main(String[] args) {
List<CuratorFramework> clients = Lists.newArrayList();
List<LeaderSelectorClient> examples = Lists.newArrayList();
try {
for (int i = 0; i < 10; i++) {
CuratorFramework client = ClientFactory.newClient();
LeaderSelectorClient example = new LeaderSelectorClient(client, "Client #" + i);
clients.add(client);
examples.add(example);
client.start();
example.start();
}
System.out.println("----------先觀察一會選舉的結果-----------");
Thread.sleep(10000);
System.out.println("----------關閉前5個客戶端,再觀察選舉的結果-----------");
for (int i = 0; i < 5; i++) {
clients.get(i).close();
}
// 這裡有個小技巧,讓main程式一直監聽控制檯輸入,非同步的程式碼就可以一直在執行。不同於while(ture)的是,按回車或esc可退出
new BufferedReader(new InputStreamReader(System.in)).readLine();
} catch (Exception e) {
e.printStackTrace();
} finally {
for (LeaderSelectorClient exampleClient : examples) {
CloseableUtils.closeQuietly(exampleClient);
}
for (CuratorFramework client : clients) {
CloseableUtils.closeQuietly(client);
}
}
}
}
2、locks
curator lock相關的實現在recipes.locks包裡。頂級介面都是InterProcessLock。我們直接看最有代表性的InterProcessReadWriteLock 程序內部讀寫鎖(可重入讀寫鎖)。什麼叫可重入,什麼叫讀寫鎖。不清楚的先查好資料吧。總之讀寫鎖一定是成對出現的。 簡易傳送門
我們先定義兩個任務,可並行的執行的,和互斥執行的。
[java] view plaincopyprint?- /**
- * 並行任務
- *
- * @author shencl
- */
- publicclass ParallelJob implements Runnable {
- privatefinal String name;
- privatefinal InterProcessLock lock;
- // 鎖等待時間
- privatefinalint wait_time = 5;
- ParallelJob(String name, InterProcessLock lock) {
- this.name = name;
- this.lock = lock;
- }
- @Override
- publicvoid run() {
- try {
- doWork();
- } catch (Exception e) {
- // ingore;
- }
- }
- publicvoid doWork() throws Exception {
- try {
- if (!lock.acquire(wait_time, TimeUnit.SECONDS)) {
- System.err.println(name + "等待" + wait_time + "秒,仍未能獲取到lock,準備放棄。");
- }
- // 模擬job執行時間0-4000毫秒
- int exeTime = new Random().nextInt(4000);
- System.out.println(name + "開始執行,預計執行時間= " + exeTime + "毫秒----------");
- Thread.sleep(exeTime);
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- lock.release();
- }
- }
- }
/**
* 並行任務
*
* @author shencl
*/
public class ParallelJob implements Runnable {
private final String name;
private final InterProcessLock lock;
// 鎖等待時間
private final int wait_time = 5;
ParallelJob(String name, InterProcessLock lock) {
this.name = name;
this.lock = lock;
}
@Override
public void run() {
try {
doWork();
} catch (Exception e) {
// ingore;
}
}
public void doWork() throws Exception {
try {
if (!lock.acquire(wait_time, TimeUnit.SECONDS)) {
System.err.println(name + "等待" + wait_time + "秒,仍未能獲取到lock,準備放棄。");
}
// 模擬job執行時間0-4000毫秒
int exeTime = new Random().nextInt(4000);
System.out.println(name + "開始執行,預計執行時間= " + exeTime + "毫秒----------");
Thread.sleep(exeTime);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.release();
}
}
}
[java] view plaincopyprint?
- /**
- * 互斥任務
- *
- * @author shencl
- */
- publicclass MutexJob implements Runnable {
- privatefinal String name;
- privatefinal InterProcessLock lock;
- // 鎖等待時間
- privatefinalint wait_time = 10;
- MutexJob(String name, InterProcessLock lock) {
- this.name = name;
- this.lock = lock;
- }
- @Override
- publicvoid run() {
- try {
- doWork();
- } catch (Exception e) {
- // ingore;
- }
- }
- publicvoid doWork() throws Exception {
- try {
- if (!lock.acquire(wait_time, TimeUnit.SECONDS)) {
- System.err.println(name + "等待" + wait_time + "秒,仍未能獲取到lock,準備放棄。");
- }
- // 模擬job執行時間0-2000毫秒
- int exeTime = new Random().nextInt(2000);
- System.out.println(name + "開始執行,預計執行時間= " + exeTime + "毫秒----------"<