聊聊分散式鎖的實現(二)
阿新 • • 發佈:2019-12-31
上一篇給大家介紹了基於redis的分散式鎖不知道有沒有給你解釋清楚,這次介紹一種基於zooKeeper的實現方式,本文只會介紹相關的zooKeeper知識,有興趣的同學可以自行學習。
基於zooKeeper實現的分散式鎖
一、相關概念
zookeeper的知識點在這裡就不詳細介紹了,下面列出一些跟實現分散式鎖相關的概念
- 臨時節點:臨時節點區別於持久節點的就是它只存在於會話期間,會話結束或者超時會被自動刪除;
- 有序節點:顧名思義就是有順序的節點,zookeeper會根據現有節點做一個序號順延,如第一個建立的節點是/xiamu/lock-00001,下一個節點就是/xiamu/lock-00002;
- 監聽器:監聽器的作用就是監聽一些事件的發生,比如節點資料變化、節點的子節點變化、節點的刪除;
二、臨時節點方案
基於zookeeper的臨時節點方案,主要利用了zookeeper的建立節點的原子性、臨時節點、監聽器等功能,大致上的思路如下:
- 客戶端加鎖時建立一個臨時節點,建立成功則加鎖成功。
- 加鎖失敗則建立一個監聽器用於監聽這個節點的變化,然後當前執行緒進入等待。
- 持有鎖的客戶端解鎖時會刪除這個節點,或者會話結束自動被刪除。
- 監聽器監聽到節點的刪除通知等待的客戶端去重新獲取鎖。
部分程式碼實現如下:
/**
* 加鎖程式碼實現
**/
public void lock(String path) throws Exception {
boolean hasLock = false ;
while (!hasLock) {
try {
this.createTemporaryNode(path,"data");
hasLock = true;
log.info("{}獲取鎖成功",Thread.currentThread().getName());
} catch (Exception e) {
synchronized (this) {
try {
zooKeeperClient.getData(path,event -> {
if (SyncConnected.equals(event.getState()) && NodeDeleted.equals(event.getType())) {
notifyWait();
}
},null);
wait();
} catch (KeeperException.NoNodeException ex) {
log.info("節點已不存在");
}
}
}
}
}
/**
* 喚醒等待鎖的執行緒
**/
public synchronized void notifyWait() {
notifyAll();
}
複製程式碼
這裡我是使用的是ZooKeeper的Java原生API實現,這段實現程式碼並不嚴謹,我只是為了為了描述相關邏輯;ZooKeeper的Java原生API存在一些問題如:客戶端斷開連線時需要手動去重新連線;監聽器只能使用一次,想要繼續使用需要重複註冊;上述程式碼實現中如果監聽器被節點的資料改變事件觸發了,那麼就無法再一次監聽節點刪除事件。推薦大家使用第三方開源框架Curator
三、臨時順序節點方案
臨時順序節點方案和上述方案的不同點在於:
- 這裡所有的客戶端都能建立臨時順序節點,只有加鎖路徑下第一個節點才能獲取鎖;
- 獲取鎖失敗的客戶端並不監聽獲取鎖的客戶端的節點,而是監聽自己的前一個節點;
- 具有可重入性;
在這裡我們使用Curator已有的輪子來實現這個方案,並跟著原始碼來分析一下主要思路
InterProcessMutex lock = curatorLock.getCuratorLock(path);
/**
* curator獲取鎖
*/
public InterProcessMutex getCuratorLock(String path) {
return new InterProcessMutex(curatorClient,path);
}
/**
* curator方式加鎖
* @param lock 鎖
*/
public void curatorLock(InterProcessMutex lock) {
try {
lock.acquire();
log.info("{}獲取鎖成功",Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* curator方式釋放鎖
* @param lock 鎖
*/
public void curatorReleaseLock(InterProcessMutex lock) {
if (null != lock && lock.isAcquiredInThisProcess()) {
try {
lock.release();
log.info("{}釋放鎖成功",Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
}
}
複製程式碼
這種實現是不是非常方便呢,實際上主要邏輯就是之前講過的那些,都封裝在內部了,這裡簡單呼叫一下API即可實現。下面我們來看看acquire()和release()方法的原始碼分析實現方式。
原始碼分析
// 首先我們看acquire()方法的物件InterProcessMutex
// 從它的構造方法我們看下來可以得知這個鎖的基礎路徑就是我們傳入的path,鎖的名字暫時是lock-開頭
public InterProcessMutex(CuratorFramework client,String path,LockInternalsDriver driver) {
this(client,path,LOCK_NAME(lock-),1,driver);
}
InterProcessMutex(CuratorFramework client,String lockName,int maxLeases,LockInternalsDriver driver) {
basePath = PathUtils.validatePath(path);
internals = new LockInternals(client,driver,lockName,maxLeases);
}
複製程式碼
@Override
public void acquire() throws Exception {
if ( !internalLock(-1,null) ) {
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
private boolean internalLock(long time,TimeUnit unit) throws Exception {
// 獲取當前加鎖執行緒
Thread currentThread = Thread.currentThread();
// 從一個ConcurrentMap快取中嘗試獲取當前執行緒資訊
LockData lockData = threadData.get(currentThread);
// 如果map中存在這個執行緒則說明當前執行緒已加鎖成功,加鎖次數加一,返回加鎖成功
if ( lockData != null ) {
lockData.lockCount.incrementAndGet();
return true;
}
// 嘗試加鎖並返回加鎖路徑
String lockPath = internals.attemptLock(time,unit,getLockNodeBytes());
// 加鎖成功
if ( lockPath != null ) {
LockData newLockData = new LockData(currentThread,lockPath);
// 構造一個加鎖資料並加入快取map
threadData.put(currentThread,newLockData);
return true;
}
return false;
}
private static class LockData {
// 當前加鎖執行緒
final Thread owningThread;
// 加鎖path
final String lockPath;
// 加鎖次數
final AtomicInteger lockCount = new AtomicInteger(1);
}
String attemptLock(long time,TimeUnit unit,byte[] lockNodeBytes) throws Exception {
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try {
// 在加鎖路徑下建立臨時順序節點並返回路徑
ourPath = driver.createsTheLock(client,localLockNodeBytes);
// 獲取鎖
hasTheLock = internalLockLoop(startMillis,millisToWait,ourPath);
}
catch ( KeeperException.NoNodeException e ) {
// 會話超時會導致找不到鎖定節點,重新嘗試連線(允許重試)
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++,System.currentTimeMillis() - startMillis,RetryLoop.getDefaultRetrySleeper()) ) {
// 連線成功重新嘗試加鎖
isDone = false;
} else {
// 連線失敗丟擲異常
throw e;
}
}
}
// 獲取鎖成功返回加鎖路徑
if ( hasTheLock ) {
return ourPath;
}
return null;
}
// 在加鎖路徑下建立一個臨時順序節點並返回路徑
@Override
public String createsTheLock(CuratorFramework client,byte[] lockNodeBytes) throws Exception {
String ourPath;
if ( lockNodeBytes != null ) {
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path,lockNodeBytes);
} else {
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
private boolean internalLockLoop(long startMillis,Long millisToWait,String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
try {
if ( revocable.get() != null ) {
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
// 客戶端已啟動並且沒有獲取鎖則迴圈重試
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) {
// 獲取從小到大排序的加鎖路徑下的子節點列表
List<String> children = getSortedChildren();
// 獲取當前節點的序列號
String sequenceNodeName = ourPath.substring(basePath.length() + 1);
// 判斷是否能獲取鎖,返回是否成功和需要監聽的路徑
PredicateResults predicateResults = driver.getsTheLock(client,children,sequenceNodeName,maxLeases);
if ( predicateResults.getsTheLock() ) {
haveTheLock = true;
} else {
// 需要監聽節點的完整路徑
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try {
// 監聽節點
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
// 設定了超時等待時間
if ( millisToWait != null ) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
// 等待時間到了退出獲取
if ( millisToWait <= 0 ) {
doDelete = true; // timed out - delete our node
break;
}
// 超時等待
wait(millisToWait);
} else {
// 進入等待
wait();
}
} catch ( KeeperException.NoNodeException e ) {
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e ) {
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
} finally {
if ( doDelete ) {
// 等待時間到了沒有獲取鎖或則丟擲異常則刪除自己的節點
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
@Override
public PredicateResults getsTheLock(CuratorFramework client,List<String> children,String sequenceNodeName,int maxLeases) throws Exception {
// 獲取當前的index
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName,ourIndex);
// 如果當前的index比上一個小則獲得鎖
boolean getsTheLock = ourIndex < maxLeases;
// 如果沒有獲得鎖則獲取前一個節點的路徑
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch,getsTheLock);
}
// 監聽器,事件觸發時喚醒等待的執行緒
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
notifyFromWatcher();
}
};
@Override
public void release() throws Exception {
Thread currentThread = Thread.currentThread();
// 根據當前執行緒從快取map中獲取加鎖資訊
LockData lockData = threadData.get(currentThread);
// 如果沒有資訊則說明沒有獲得到鎖
if ( lockData == null ) {
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
// 減少加鎖次數一
int newLockCount = lockData.lockCount.decrementAndGet();
// 如果還有加鎖次數則返回
if ( newLockCount > 0 ) {
return;
}
if ( newLockCount < 0 ) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try {
// 釋放鎖
internals.releaseLock(lockData.lockPath);
} finally {
// 從快取map中移除加鎖資訊
threadData.remove(currentThread);
}
}
final void releaseLock(String lockPath) throws Exception {
// 移除監聽
client.removeWatchers();
revocable.set(null);
// 刪除節點
deleteOurPath(lockPath);
}
複製程式碼
整體流程:加鎖時對某個路徑建立臨時順序節點,如果當前已經獲取了鎖,那麼加鎖次數加一;否則如果建立的臨時節點是當前路徑下第一個節點那麼加鎖成功;否則找到當前加鎖路徑下的子節點列表,找到自己的前一個節點並監聽然後進入等待,如果前一個節點釋放了鎖或者當前會話失效那麼節點刪除觸發監聽事件,註冊監聽的執行緒喚醒重新獲取鎖。