ZooKeeper的排他鎖、共享鎖
阿新 • • 發佈:2019-12-31
前言
最近一直在看《從Paxos到ZooKeeper》這本書,個人認為從分散式理論到ZooKeeper思想及實踐,這本書講的都很通俗易懂。對本菜鳥學習ZooKeeper幫助很大,本文將結合這本書的內容,根據分散式排他鎖、分散式共享鎖的思想,實現簡單demo(curator已經實現了各種分散式鎖,本文只是粗略實現思想)。
基本概念
- 分散式鎖是控制分散式系統之間同步訪問共享資源的一種方式。
- 排他鎖又稱為寫鎖或獨佔鎖,如果事務T1對資料物件O1加上了排他鎖,那麼整個加鎖期間,只允許事務T1對O1進行讀取和更新操作,其他任何事務都不能對O1進行任何操作 - 直到T1釋放了排他鎖。
- 共享鎖又稱為讀鎖,如果事務T1對資料物件O1加上了共享鎖,那麼當前事務只能對O1進行讀取操作,其他事務也只能對O1加共享鎖 - 直到O1上的共享鎖都被釋放。
- 排他鎖與共享鎖的區別在於,加上排他鎖後,資料物件只對一個事務可見,而加上共享鎖後,資料對所有事務都可見。
-
ZooKeeper的鎖可以理解為一個
Znode
臨時節點,按需要可以為臨時順序節點(EPHEMERAL_SEQUENTIAL)。 - Curator是一個開源的ZooKeeper客戶端,解決了很多ZooKeeper客戶端非常底層的細節開發工作,Guava is to Java what Curator is to ZooKeeper。
注:本文的原始碼實現基於Spring Boot
整合Curator
實現,整合請參考提供博文
排他鎖
- 鎖被定義為一個臨時節點,例如/exclusive_lock/lock(可以為任何ZooKeeper路徑)。
- 獲取鎖的過程為所有客戶端都試圖通過呼叫create()介面,建立/exclusive_lock/lock節點,ZooKeeper會保證只有一個客戶端建立成功,那麼就可以認為該客戶端獲取了鎖。
- 釋放鎖:已經提到,/exclusive_lock/lock是一個臨時節點,因此在以下情況下,都有可能釋放鎖。
當前獲取鎖的客戶端機器發生宕機,臨時節點被刪除。 正常執行完業務邏輯後,客戶端主動將建立的臨時節點刪除。
-
通知:由於通過
watch
機制watch
了/exclusive節點,一旦/exclusive的子節點發生變化,都會通知到客戶端,客戶端接到通知後,就會重複“獲取鎖”的流程。
@Component
public class ExclusiveLockByCurator implements InitializingBean,ILockByCurator {
private static final Logger LOGGER = LoggerFactory.getLogger(ExclusiveLockByCurator.class);
private static final String ROOT_LOCK_PATH = "exclusive_lock";
private CountDownLatch countDownLatch = new CountDownLatch(1);
private final CuratorFramework curatorFramework;
public ExclusiveLockByCurator(CuratorFramework curatorFramework) {
this.curatorFramework = curatorFramework.usingNamespace("lock-namespace");
}
@Override
public void acquireDistributedLock(String path) {
String keyPath = "/" + ROOT_LOCK_PATH + "/" + path;
while (true) {
try {
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(keyPath);
LOGGER.info("[ExclusiveLockByCurator acquireDistributedLock] success to acquire lock for path: {}",keyPath);
break;
} catch (Exception e) {
LOGGER.info("[ExclusiveLockByCurator acquireDistributedLock] failed to acquire lock for path: {}",keyPath);
LOGGER.info("try again... ...");
try {
if (countDownLatch.getCount() <= 0) {
countDownLatch = new CountDownLatch(1);
}
countDownLatch.await();
} catch (Exception e1) {
LOGGER.error("[ExclusiveLockByCurator acquireDistributedLock] error",e1);
}
}
}
}
@Override
public boolean releaseDistributedLock(String path) {
try {
String keyPath = "/" + ROOT_LOCK_PATH + "/" + path;
if (curatorFramework.checkExists().forPath(keyPath) != null) {
curatorFramework.delete().forPath(keyPath);
}
} catch (Exception e) {
LOGGER.error("[ExclusiveLockByCurator acquireDistributedLock] failed to release lock");
return false;
}
return true;
}
private void addWatcher(String path) throws Exception {
String keyPath;
if (path.equals(ROOT_LOCK_PATH)) {
keyPath = "/" + path;
} else {
keyPath = "/" + ROOT_LOCK_PATH + "/" + path;
}
final PathChildrenCache cache = new PathChildrenCache(curatorFramework,keyPath,false);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener((curatorFramework,event) -> {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
String oldPath = event.getData().getPath();
LOGGER.info("[ExclusiveLockByCurator acquireDistributedLock] success to release lock for path: {}",oldPath);
if (oldPath.contains(path)) {
// 子節點被刪除 通知其他客戶端
countDownLatch.countDown();
}
}
});
}
@Override
public void afterPropertiesSet() throws Exception {
String path = "/" + ROOT_LOCK_PATH;
try {
if (curatorFramework.checkExists().forPath(path) == null) {
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path);
}
// 初始化後即watch
addWatcher(ROOT_LOCK_PATH);
LOGGER.info("[ExclusiveLockByCurator acquireDistributedLock] root path add watcher success");
} catch (Exception e) {
LOGGER.error("[ExclusiveLockByCurator acquireDistributedLock] connect zookeeper fail",e);
}
}
}
複製程式碼
共享鎖
-
鎖定義:依然以一個臨時節點來表示一個鎖,但是不同點在於共享鎖由一個格式為
/shared_lock/[Hostname]-請求型別-序號
(/shared_lock/192.168.0.1-R-0000000001)的臨時順序節點表示。 - 獲取鎖:所有客戶端都會在/shared_lock路徑下,建立上述格式臨時順序節點。
- 判斷讀寫順序:共享鎖規定,不同的事務可以同時對同一個資料進行讀取操作,而更新操作必須在當前沒有任何讀寫操作的時候進行。
- 因此,成功獲取鎖的條件分為讀請求、寫請求兩種情況。
對於讀請求:沒有比自己序號小的節點 或 所有比自己序號小的子節點全是讀請求 對於寫請求:沒有比自己序號小的節點
- 釋放鎖與排他鎖無區別。
對於通知,則有以下兩種方式,結合“羊群效應”,會逐一說明。
watch /shared_lock
- 就和排他鎖無區別了,/shared_lock子節點的變化會通知到所有其他客戶端,然後重新獲取/shared_lock下的子節點列表,計算是否輪到自己進行讀取或更新操作。
- 因此,整個分散式鎖的競爭過程中,大量的“Watcher”通知和“子節點列表獲取”兩個操作重複執行。
- 客戶端無端地接收到過多和自己並不相關的事件通知,叢集規模較大時,不僅會影響效能,如果同一時間有多個節點對應的客戶端完成事務或是事務中斷引起節點消失,ZooKeeper伺服器就會在短時間內向其餘客戶端傳送大量的事件通知 - 這就是羊群效應。
watch 優化
結合上述“羊群效應”,可以提出一個優化方案,watch操作在獲取子節點列表後,根據情況watch相應節點。
讀請求,只需要關心比自己序號小的最後一個寫請求節點,故註冊Watcher監聽 寫請求,只需要關心比自己序號小的最後一個節點,故註冊Watcher監聽
@Component
public class SharedLockByCurator implements InitializingBean {
private static final Logger LOGGER = LoggerFactory.getLogger(SharedLockByCurator.class);
private static final String ROOT_LOCK_PATH = "shared_lock";
private final CuratorFramework curatorFramework;
private static CountDownLatch countDownLatch = new CountDownLatch(1);
// 根據序號排序
private static Ordering<String> ordering = Ordering.from((Comparator<String>) (o1,o2) -> {
String[] s1 = o1.split("-");
String[] s2 = o2.split("-");
return StringUtils.compare(s1[1],s2[1]);
}).nullsLast();
public SharedLockByCurator(CuratorFramework curatorFramework) {
this.curatorFramework = curatorFramework.usingNamespace("lock-namespace");
}
/**
* 建立/shared_lock節點
*
* @date 15:25 2019-09-19
**/
@Override
public void afterPropertiesSet() {
String path = "/" + ROOT_LOCK_PATH;
try {
if (curatorFramework.checkExists().forPath(path) == null) {
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(path);
}
} catch (Exception e) {
LOGGER.error("[SharedLockByCurator afterPropertiesSet] connect zookeeper fail",e);
}
}
public String acquireDistributedLock(String path,OpType type) {
String keyPath = "/" + ROOT_LOCK_PATH + "/" + path + "/" + type.getType() + "-";
String parentPath = "/" + ROOT_LOCK_PATH + "/" + path;
String thisPath = null;
try {
// 建立順序臨時節點
thisPath = curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(keyPath);
boolean haveTheLock = false;
while (!haveTheLock) {
// 獲取子節點列表
List<String> childPaths = curatorFramework.getChildren()
.forPath(parentPath);
List<String> sortedPaths = ordering.sortedCopy(childPaths);
// 獲取index
int index = sortedPaths.indexOf(thisPath.split("/")[3]);
if (index > 0) {
boolean flag = true;
// 讀操作時 所有序號比該操作小的為讀請求時才能獲取鎖
if (type == OpType.READ) {
for (int i = index - 1; i >= 0; i--) {
String temp = sortedPaths.get(i);
String tempType = temp.split("-")[0];
if (OpType.WRITE.getType().equals(tempType)) {
addWatcher(parentPath + "/" + sortedPaths.get(i));
flag = false;
break;
}
}
if (flag) {
LOGGER.info("[SharedLockByCurator acquireDistributedLock] success thread= {} path= {} index= {}",Thread.currentThread().getName(),thisPath,index);
haveTheLock = true;
}
} else {
addWatcher(parentPath + "/" + sortedPaths.get(index - 1));
}
if (countDownLatch.getCount() <= 0) {
countDownLatch = new CountDownLatch(1);
}
countDownLatch.await();
} else {
LOGGER.info("[SharedLockByCurator acquireDistributedLock] success thread= {} path= {} index= {}",index);
// 這裡,為了預防 別的客戶端沒有來得及watcher 這個客戶端已經release了。。。。筆者還在想
addWatcher(thisPath);
haveTheLock = true;
}
}
} catch (Exception e) {
LOGGER.error("[SharedLockByCurator acquireDistributedLock] error",e);
}
return thisPath;
}
private List<String> list = Collections.synchronizedList(Lists.newArrayList());
private void addWatcher(String path) throws Exception {
try {
// 粗略地實現 僅註冊一次watcher。。。
if (curatorFramework.checkExists().forPath(path) != null && !list.contains(path)) {
final NodeCache cache = new NodeCache(curatorFramework,path,false);
cache.start(true);
cache.getListenable().addListener(() -> {
LOGGER.info("[SharedLockByCurator nodeChanged] thread= {} path: {}",path);
countDownLatch.countDown();
});
list.add(path);
LOGGER.info("[SharedLockByCurator addWatcher] thread= {} success path= {} ",path);
}
} catch (Exception e) {
LOGGER.error("[SharedLockByCurator addWatcher] error! ",e);
}
}
public boolean releaseDistributedLock(String keyPath) {
try {
if (curatorFramework.checkExists().forPath(keyPath) != null) {
curatorFramework.delete().forPath(keyPath);
LOGGER.info("[SharedLockByCurator releaseDistributedLock] thread= {} success to release lock for path: {}",keyPath);
}
} catch (Exception e) {
LOGGER.error("[SharedLockByCurator releaseDistributedLock] failed to release lock",e);
return false;
}
return true;
}
}
複製程式碼
結語
說實話,共享鎖自己看書實現,遇到了很多問題,有幾個點還沒想明白,後續會結合分析curator
原始碼實現,優化我的第一版粗略實現。
突然,想起以前創業公司實習時,師傅對我說:“先實現一版再說,後續優化,比啥都沒有強”。。。
參考文獻
- 從Paxos到ZooKeeper
- www.jianshu.com/p/df99f8a37…