1. 程式人生 > >【zookeeper】Apache curator的使用及zk分布式鎖實現

【zookeeper】Apache curator的使用及zk分布式鎖實現

sets finally tac -- ont zkcli 單節點 基本操作 新建

上篇,本篇主要講Apache開源的curator的使用,有了curator,利用Java對zookeeper的操作變得極度便捷.

其實在學之前我也有個疑慮,我為啥要學curator,撇開漲薪這些外在的東西,就單技術層面來講,學curator能幫我做些什麽?這就不得不從zookeeper說起,上篇我已經大篇幅講了zk是做什麽的了,但真正要靠zk去實現多服務器自動拉取更新的配置文件等功能是非常難的,如果沒有curator,直接去寫的話基本上能把你累哭,就好比連Mybatis或者jpa都沒有,讓你用原生的代碼去寫個網站一樣,你可以把curator當做一個比較強大的工具,有了它操作zk不再是事,說這麽多,是時候進入正題了:

curator 官網:http://curator.apache.org

使用curator去實現的幾塊內容:


學習目錄:
1.使用curator建立與zk的連接
2.使用curator添加/遞歸添加節點
3.使用curator刪除/遞歸刪除節點
4.使用curator創建/驗證 ACL(訪問權限列表)
5.使用curator監聽 單個/父 節點的變化(watch事件)
---------------------------------------------
6.基於curator實現zookeeper分布式鎖(需要掌握基本的多線程知識)

前置條件:已掌握zookeeper的基本操作,對zookeeper有所了解,如果沒有掌握請翻閱我前面的博客去學習.
本節所需要引入的依賴有以下三個,建議直接全部引入即可:

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
1.通過curator建立與zk的連接

需要準備連接zk的url,建議直接寫成工具類,因為接下來會頻繁用到,功能類似於jdbc.

public class ZkClientUtil {
private static final int BASE_SLEEP_TIME_MS = 5000; //定義失敗重試間隔時間 單位:毫秒
private static final int MAX_RETRIES = 3; //定義失敗重試次數
private static final int SESSION_TIME_OUT = 1000000; //定義會話存活時間,根據業務靈活指定 單位:毫秒
private static final String ZK_URI = "192.168.174.132:2181";//你自己的zkurl和端口號
private static final String NAMESPACE = "laohan_jianshen";
//工作空間,可以不指定,建議指定,功能類似於項目包,之後創建的所有的節點都會在該工作空間下,方便管理

public static CuratorFramework build(){
//創建比較簡單,鏈式編程,很爽,基本上指定點參數就OK了
RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS,MAX_RETRIES);//重試策略
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString(ZK_URI)
.retryPolicy(retryPolicy)
.namespace(NAMESPACE)
.sessionTimeoutMs(SESSION_TIME_OUT)
.build();
return client;
}
}
2.通過curator添加/遞歸添加節點

//通過上一步獲取到的client,直接啟動該client,值得註意的是client在使用前必須先啟動:
client.start;
client
.create()//創建節點
.withMode(CreateMode.xxx)//節點屬性:永久節點/臨時節點/有序節點 通過CreateMode.即可看到
.withACL(ZooDefs.Ids.xxx)//節點訪問權限,通過Ids.即可看到 默認是OPEN_ACL_UNSAFE(開放不安全權限)
.forPath("/yourpath","yourdata".getBytes());//指明你的節點路徑,數據可以不指定,數據必須是byte[]
創建遞歸節點:

//比如我想一次性創建/yourpath/a/b/c/1/2/3...這樣的節點,如果按傳統方法會累死你
//curator可以一次性創建好,只需要在創建時添加creatingParentsIfNeeded即可.
client
.create()//創建節點
.creatingParentsIfNeeded()//創建父節點,如果需要的話

...
3.使用curator刪除/遞歸刪除節點

client
.delete() //刪除
.guaranteed()//保證一定幫你刪了它
.withVersion(0)//指定要刪節點的版本號
.forPath("/yourpath")//指定要刪節點的路徑
遞歸刪除:

//比如我當前的節點結構是這樣:/yourpath/a/b/c/1/2/3 我想刪除a節點下面的所有目錄
//傳統方法累死個人,現在只需要添加deletingChildrenIfNeeded即可
client
.delete() //刪除
.deletingChildrenIfNeeded()//如果它有兒子都給刪了...
4.使用curator創建/驗證 ACL(訪問權限列表)

//為了保證安全,有時需要對節點的訪問權限做一些限制,否則可能會引起重要信息泄露/篡改/刪除等
//節點ACL的創建方式有兩種,一種是使用ZK提供的,一種是自定義的
//1.ZK提供的,比較簡單,拿來即用,在創建節點時指明withACL即可
client
.create()
.withACL(ZooDefs.Ids.READ_ACL_UNSAFE)//指明該節點是只讀節點,還有其他屬性,可以通過Ids.查看
//創建自定義ACL,需要自己new Id(),並指明是否是加密的,然後賬號和密碼是多少,加密策略使用zk提供的:
List<ACL> aclList = new ArrayList<ACL>();
ACL acl1 = new ACL(ZooDefs.Perms.READ,new Id("digest",DigestAuthenticationProvider.generateDigest("user:123456")));
ACL acl2 = new ACL(ZooDefs.Perms.ALL,new Id("digest",DigestAuthenticationProvider.generateDigest("root:123456")));
aclList.add(acl1);
aclList.add(acl2);
//如此我就創建好了兩種不同的權限賬號,user只能對該節點有讀的權限,但root用戶對該節點有所有權限
//ACL驗證,創建好節點之後,可以在服務器的zk安裝目錄的bin目錄下 連接客戶端./zkCli
//然後通過ls /該目錄 查看是否可以訪問 正常是不能訪問的 會提示權限不夠
//下面我們通過curator去連接,要想訪問該節點需要在創建client時就指明賬號和密碼:
CuratorFramework client = CuratorFrameworkFactory
.builder()
.authorization("digest","root:123456".getBytes())//指明使用了加密,用戶名和密碼用:隔開,以byte[]輸入
//如此,接下來通過該client可以對剛剛創建的節點具有所有權限,如果登錄的是user,則只具有讀權限.
5.通過curator創建單個節點及其父節點的watch事件

由於zk的watch事件是只能被觸發一次的,觸發完即銷毀監聽,這顯然不是我們想要的,在實際開發中更多的場景是需要對某個節點持續監聽,所以這裏我只介紹創建持續監聽的單節點/父節點

//對單個節點創建watch事件
//定義NodeCache,指明被監聽節點的路徑:
final NodeCache nodeCache = new NodeCache(client,"/yourpath");
nodeCache.start(true);//開啟
nodeCache
.getCurrentData()//可以獲取該監聽節點的數據
.getPath();//可以獲取該監聽節點的路徑

//對指定父節點創建watch事件,只要其任何一個子節點,或子節點的子節點...發生變化,就會觸發watch事件.
//定義PathChildrenCache,指明要watch的目錄
final PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"yourpath");
//啟動,啟動策略有三種:同步,異步提交,異步 用的比較多的就是下面這種,用StartMode.可以查看到
pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
//對該節點創建監聽器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
//TODO 可以通過PathChildrenCacheEvent.拿到你想要的數據和路徑等
}
});
至此,curator的常用內容已學習完畢,建議每個都親自操作一下,為之後的自動配置和分布式鎖操作打下基礎.

6.基於curator實現zookeeper分布式鎖

先來了解下分布式鎖應用場景:

比如我有一個電商商城,為了提高系統的服務量,業務被拆分小了,分別部署在不同的服務器上,下單成功後訂單系統通知庫存系統要及時減庫存,但此時並發極高,庫存系統還沒來得及減庫存,又有新的人進來了,讀取了庫存,數據被臟讀了,於是他也下單成功了,但發貨的時候發現庫存不夠了,於是鬧得兩家都不開心...這時候就需要分布式鎖來解決.

為了演示這樣的場景,我寫了個小項目來模擬這種場景:

我數據庫裏電腦庫存只有10臺,然後寫了兩個下單頁面,每個訂單買8臺電腦,讓線程適當休眠來模擬高並發下的延遲,於是我幾乎同時訪問了這兩個下單頁面,最後兩邊都提示我下單成功了,但數據庫裏的庫存數量變成了-6

這顯然不是我想要的結果,正確的應該是,這兩個人裏只有一個人下單成功,另外一個人下單失敗,提示庫存不足,最後數據庫裏剩2臺電腦庫存.

為了解決這個問題,就必須讓第一個人下單時,第二個人不能下單,只能等到第一個人下單完成後方可下單,或者第二個人下單時,第一個人不能下單,只能等到第二人下單完成方可下單,由於兩套系統是分開部署的,不能像以前那樣用同步鎖/同步代碼塊Synchronized來解決了,這個時候就需要引出分布式鎖,分布式鎖可以用Redis或者zookeeper等實現,這篇主要講一下用zk去實現.

思路:提供一把全局的鎖,所有來購買的請求競爭這一把鎖,誰先拿到這把鎖,誰就有資格執行下單,沒搶到鎖的請求被掛起,等待有鎖的請求完成下單後釋放鎖,然後喚醒被掛起的請求繼續去競爭這把鎖...

可以把這把鎖當做是zk上的一個節點,所有請求發起時,創建該節點,第一個創建該節點成功的請求就意味著獲得了鎖,其他請求創建都會拋出異常,然後捕獲該異常,用全局的countDownLatch將該請求掛起,等獲得鎖的節點完成下單後,把該節點刪除(釋放鎖),然後計數器-1,把掛起的線程都喚醒,繼續去競爭該鎖...

下面就順著這個思路一起去實現分布式鎖:

這裏默認使用上面已經寫好的連接ZK的工具類來創建client.

public class ZkLockUtil {
//分布式鎖,用於掛起當前線程,等待上一把分布式鎖釋放
private static CountDownLatch DISTRIBUTE_LOCK = new CountDownLatch(1);
//分布式鎖的總結點名
private final static String ZK_LOCK_PROJECT = "zk-lock";
//分布式鎖節點名
private final static String DISTRIBUTE_LOCK_NAME = "distribute-lock";
/**
* 獲取分布式鎖
*/
public static void getLock() {
CuratorFramework client = ZkClientUtil.build();
client.start();
while (true) {
try {
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/" + ZK_LOCK_PROJECT + "/" + DISTRIBUTE_LOCK_NAME);
System.out.println("獲取分布式鎖成功...");
return;
} catch (Exception e) {
try {
//如果沒有獲取到鎖,需要重新設置同步資源值
if (DISTRIBUTE_LOCK.getCount() <= 0) {
DISTRIBUTE_LOCK = new CountDownLatch(1);
}
System.out.println("獲取分布式鎖失敗,等待他人釋放鎖中...");
DISTRIBUTE_LOCK.await();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}

/**
* 釋放鎖資源
*/
public static void release(String path) {
CuratorFramework client = ZkClientUtil.build();
client.start();
try {
client.delete().forPath(path);
System.out.println("鎖釋放成功...");
} catch (Exception e) {
System.out.println("釋放鎖失敗...");
e.printStackTrace();
} finally {
client.close();
}
}

/**
* 為指定路徑節點創建watch,觀察鎖狀態
*/
public static void addWatcher2Path(final String path) throws Exception {
CuratorFramework client = ZkClientUtil.build();
client.start();
final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, true);
pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
System.out.println("創建觀察者成功...");
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
String nodePath = pathChildrenCacheEvent.getData().getPath();
System.out.println("上一會話已釋放鎖或會話已斷開...節點路徑為:" + nodePath);
if (nodePath.contains(DISTRIBUTE_LOCK_NAME)) {
DISTRIBUTE_LOCK.countDown();
System.out.println("釋放計數器,計數器值為:"+DISTRIBUTE_LOCK.getCount()+"讓當前請求來獲取分布式鎖...");
}
}
}
});
}
}
下面來測試一下,有空的話你可以寫一個類似我這種下單的模式去測試,如果時間緊寫個測試類模擬也無妨:

public class Test {
public static void main(String[] args) {
final ExecutorService threadpool = Executors.newCachedThreadPool();
System.out.println("開始購買...");
for (int i = 0; i <2 ; i++) {
threadpool.execute(new Runnable() {
public void run() {
System.out.println("我是線程:"+Thread.currentThread().getName()+"我開始搶購了...");
ZkLockUtil.getLock();
System.out.println(Thread.currentThread().getName()+":我正在瘋狂的剁手購買中...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+":我買完了,有請下一位...");
try {
ZkLockUtil.addWatcher2Path("/zk-lock");
System.out.println("添加完畢...");
ZkLockUtil.release("/zk-lock/distribute-lock");
System.out.println("釋放完畢...");
Thread.sleep(1000);

} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}
測試結果可以看出,在其中一個線程購買時,另外一個線程被掛起等待...

或者用我這種2個服務一起下單模式的:

@RestController
public class SellController {
@Autowired
private SellRepository sellRepository;
@RequestMapping("sell")
public String sell(@RequestParam("num")Integer num){
String msg = "下單成功!";
try {
Thread.sleep(3000);
ZkLockUtil.getLock();
Sell sell = sellRepository.getOne(1);
int remain = sell.getStock()-num;
sell.setStock(remain);
if (remain >= 0){
sellRepository.save(sell);
}else{
msg = "下單失敗,庫存不足...";
}
ZkLockUtil.addWatcher2Path("/zk-lock");
ZkLockUtil.release("/zk-lock/distribute-lock");
} catch (Exception e) {
e.printStackTrace();
msg = "下單異常...";
}
return msg;
}
測試結果:

數據庫中電腦庫存由原來的10變為2,達到期望效果:

但這樣就大功告成了嗎? 其實我覺得沒有,原因是因為我在測試的時候發現一個問題,當我在瀏覽器中按住F5進行刷新頁面(模擬高並發下的請求頻率),在一開始一切都是正常的,刷一陣子之後八爾哥就出來了:

找了一下原因,發現問題出在了這裏:

由於我不斷的刷新頁面,就意味著不斷的去獲取鎖和釋放鎖,當鎖被釋放後計數器減1,會去喚醒線程去競爭鎖,然後這個時候來沒來的及喚醒,新的請求又進來了,此時新請求創建鎖成功了,被喚醒的線程又搶不到鎖了,但計數器仍處於0的狀態,它會繼續去創建鎖,此時又有新的請求不斷進來,不斷創建鎖...導致zk認為你是在不斷的進行重復操作,於是它就把連接給退出了,然後又有新請求進來了,又要重新建立連接:

新建立的連接又會像上面一樣在連綿不斷的請求中斷開,這樣頻繁的連接和斷開,重復數次之後,ZK直接關閉了連接,導致後臺無限報錯...

為了解決這個問題,我搜羅各大網站,最後沒有找到什麽可以參考的東西,我甚至開始懷疑是我寫的鎖有問題,但後來我在apache curator官網找到了解釋,其實curator已經幫我們封裝好了一套分布式鎖,可以直接拿來用的:

於是我直接調用了zk封裝好的這一套分布式鎖去做測試,普通情況下跟我自己寫的分布式鎖沒啥兩樣,效果一樣,然後我繼續用F5模擬高並發下的情況,結果跟我自己的鎖如出一轍...也是報同樣的錯,錯誤原因也一樣,最後閱讀了下源碼,其實自己寫的跟apache寫的沒啥兩樣,原理都一樣,只是人家封裝的更方便別人使用一些...

@RequestMapping("plus")
public String sellPlus(@RequestParam("num")int num) throws Exception {
CuratorFramework client = ZkClientUtil.build();
client.start();
String lockPath = "/plus_lock";
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
String msg = "下單成功!";
if ( lock.acquire(3, TimeUnit.SECONDS) )
{
try
{
Sell sell = sellRepository.getOne(1);
int remain = sell.getStock()-num;
if (remain >= 0){
sell.setStock(remain);
sellRepository.save(sell);
}else {
msg = "下單失敗,庫存不足...";
}
}
finally
{
lock.release();
}
}
return msg;
}
我總不能去懷疑apache 寫的鎖也有問題吧,那問題應該就出在了zk建立連接或者zk過濾連接的機制上,應該是zk以為那些頻繁關閉又連接的請求是被攻擊或者無效的請求,所以強制關閉這些連接,目前尚未去研究zk的連接機制,也不清楚研究了是否能解決該問題,所以基於zk的分布式鎖就講到這裏,同時在我心裏它已經不是做分布式鎖的首選了,我會考慮使用redis或者其它分布式鎖去解決,尤其是在高並發的情況下,感興趣的可以繼續關註本博,在redis系列教程中,我會講如何用redis實現分布式鎖.
---------------------
作者:老漢健身
來源:CSDN
原文:https://blog.csdn.net/lovexiaotaozi/article/details/83382128
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!

【zookeeper】Apache curator的使用及zk分布式鎖實現