zookeeper Apache Curator
1 簡介
Curator是Netflix公司開源的一套Zookeeper客戶端框架。瞭解過Zookeeper原生API都會清楚其複雜度。Curator幫助我們在其基礎上進行封裝、實現一些開發細節,包括接連重連、反覆註冊Watcher和NodeExistsException等。目前已經作為Apache的頂級專案出現,是最流行的Zookeeper客戶端之一。從編碼風格上來講,它提供了基於Fluent的程式設計風格支援。除此之外,Curator還提供了Zookeeper的各種應用場景:Recipe、共享鎖服務、Master選舉機制和分散式計數器等。
2 依賴
專案在GitHub上的開源地址隨著從Netflix轉移到Apache也發生了變化。地址為:https://github.com/apache/curator。我在實際操作中選取如下的版本:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.1</version> </dependency>
3 建立客戶端
在curator中,CuratorFramework表示zk的客戶端。curator提供瞭如下的兩種方式去建立:
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) { return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy); } public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) { return builder() .connectString(connectString) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .retryPolicy(retryPolicy).build(); }
該方法newClient()是CuratorFrameworkFactory的方法,通過工場的方式去建立。connectString表示連線zookeeper的地址,retryPolicy表示重連策略。ExponentialBackoffRetry實現了該介面。
RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,3);
第一個引數表示休眠多長時間再次重連。第二個引數表示重連的次數。如下的程式碼就是新建了一個會話:
public CuratorFramework getZKClient(){
/**
* baseSleepTimeMs 休眠1s後重連
* maxRetries 重連的最大次數
*/
RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,3);
return CuratorFrameworkFactory.newClient("127.0.0.1:2182", retryPolicy);
}
4 建立節點測試
使用create建立節點,預設建立的是永久節點,value為機器的ip
public void createNode(){
CuratorFramework client = getZKClient();
client.start();
try {
client.create().forPath("/curator");
} catch (Exception e) {
e.printStackTrace();
}
}
大家可以看到我的驗證結果。
5 獲取資料和更新資料
public void getData(){
CuratorFramework client = getZKClient();
client.start();
// 包含狀態查詢
Stat stat = new Stat();
try {
/**
* 查詢資料
* 普通查詢
* 狀態查詢
*/
byte[] bytes = client.getData().forPath("/curator");
byte[] bytes1 = client.getData().storingStatIn(stat).forPath("/curator");
/**
* 更新資料
* 普通更新
* 指定版本更新
*/
Stat stat1 = client.setData().forPath("/curator", "新內容".getBytes());
Stat stat2 = client.setData().withVersion(1).forPath("/curator");
System.out.println(new String(bytes));
System.out.println(new String(bytes1));
System.out.println(stat1);
System.out.println(stat2);
} catch (Exception e) {
e.printStackTrace();
}
}
6 Curator分散式鎖之生成流水號
在分散式系統中,為了保證資料的一致性,往往需要進行同步控制,比如減庫存、唯一流水號生成等。Curator對Zookeeper進行了封裝,實現了分散式鎖的功能,提供了執行緒的同步控制。同時,Curator也提供了多種鎖機制。下面對通過時間戳生成流水號的場景進行逐步分析。
程式碼通過一個迴圈連續打印出10個時間戳。這裡沒有使用多執行緒,但分析下面的列印結果就會發現,其實在同一時刻會生成多個相同的流水號,執行時間在毫秒級別。
public static void main(String[] args) {
for(int i=0; i< 10; i++){
SimpleDateFormat sdf = new SimpleDateFormat("yyyyDDmm HH:mm:ss|SSS");
String orderNo = sdf.format(new Date());
System.out.println(orderNo);
}
}
結果如下:
201831413 16:13:10|794
201831413 16:13:10|796
201831413 16:13:10|796
201831413 16:13:10|797
201831413 16:13:10|797
201831413 16:13:10|797
201831413 16:13:10|797
201831413 16:13:10|797
201831413 16:13:10|798
201831413 16:13:10|798
觀察上述生成的時間流水號,10個流水號,但是隻有794,796,797,798。這四個,其餘的都是重複的。上面生成的流水號重複的可能性不大,一旦出現高併發,那麼重複的訂單號就會大量出現,當然也有其他方案進行解決,本篇文章就不再進行討論。下說說如何通過分散式鎖來解決此問題。
分散式鎖示例
下面的程式碼利用Curator的分散式鎖來實現在同一時刻只會生成一個唯一的流水號。
public class ZKLock {
/** 節點名稱*/
private static final String path = "/lock_path";
public static void main(String[] args) {
/** 獲取客戶端*/
CuratorFramework client = getClient();
/** 獲取分散式鎖*/
InterProcessMutex lock = new InterProcessMutex(client,path);
/** 單個執行緒開始執行程式*/
final CountDownLatch countDownLatch = new CountDownLatch(1);
final long startTime = new Date().getTime();
for(int i=0;i<10;i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
countDownLatch.await();
/** 獲取鎖*/
lock.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss|SSS");
System.out.println(sdf.format(new Date()));
/** 釋放鎖*/
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("顯示此執行緒大概花費時間(等待+執行):" + (new Date().getTime() - startTime) + "ms");
}
}).start();
}
System.out.println("建立執行緒花費時間:" + (new Date().getTime() - startTime) + "ms");
countDownLatch.countDown();
}
private static CuratorFramework getClient() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2182")
.retryPolicy(retryPolicy)
.sessionTimeoutMs(6000)
.connectionTimeoutMs(3000)
.namespace("demo")
.build();
client.start();
return client;
}
}
列印結果為:
建立執行緒花費時間:2ms
20181110 16:33:31|776
顯示此執行緒大概花費時間(等待+執行):374ms
20181110 16:33:31|796
顯示此執行緒大概花費時間(等待+執行):394ms
20181110 16:33:31|814
顯示此執行緒大概花費時間(等待+執行):404ms
20181110 16:33:31|829
顯示此執行緒大概花費時間(等待+執行):419ms
20181110 16:33:31|841
顯示此執行緒大概花費時間(等待+執行):434ms
20181110 16:33:31|858
顯示此執行緒大概花費時間(等待+執行):449ms
20181110 16:33:31|874
顯示此執行緒大概花費時間(等待+執行):472ms
20181110 16:33:31|895
顯示此執行緒大概花費時間(等待+執行):485ms
20181110 16:33:31|905
顯示此執行緒大概花費時間(等待+執行):493ms
20181110 16:33:31|912
顯示此執行緒大概花費時間(等待+執行):502ms
仔細觀察可發現,通過多執行緒的訪問,列印的時間戳卻是唯一的。這裡使用InterProcessMutex類來進行處理分散式鎖,實現了一個生產唯一流水號的功能。
注意事項
在上面的程式碼中,列印了每步操作的時間,其中訪問的zookeeper伺服器是遠端伺服器。從列印的時間我們可以看出,通過這種方式生成唯一流水號並不能支撐很大的併發量。每次操作都需要通過網路訪問,zookeeper的節點操作等,會花費大量的時間。另外,由於精確到毫秒,因此一秒鐘最多也只能處理999個請求。
同時,在分散式環境中上面的示例還是會出現重複的可能性的,比如兩個伺服器的時間不一致,即兩個伺服器相差10ms,恰好第一個執行完,第二個執行的間隙也是10ms,那麼第二個生成的訂單號還是有可能跟第一個重複的,雖然這種概率及其小。
以上通過示例演示了Curator的分散式鎖功能,根據具體的業務需求可選擇不同的業務場景來使用。
參考:https://blog.csdn.net/wo541075754/article/details/71173552