ZooKeeper框架Curator的分散式鎖原始碼分析
上一篇文章中,我們使用zookeeper的java api實現了分散式排他鎖。其實zookeeper有一個優秀的框架---Curator,提供了各種分散式協調的服務。Curator中有著更為標準、規範的分散式鎖實現。與其我們自己去實現,不如直接使用Curator。通過學習Curator的原始碼,我們也能瞭解實現分散式鎖更好的方式。
Curator中有各種分散式鎖,本文挑選其中一個---InterProcessMutex進行講解。
我們先看一下Curator程式碼中對於InterProcessMutex的註釋:
可重入的互斥鎖,跨JVM工作。使用ZooKeeper來控制鎖。所有JVM中的任何程序,只要使用同樣的鎖路徑,將會成為跨程序的一部分。此外,這個排他鎖是“公平的”,每個使用者按照申請的順序得到排他鎖。
可見InterProcessMutex和我們自己實現的例子都是一個排他鎖,此外還可以重入。
如何使用InterProcessMutex
在分析InterProcessMutex程式碼前,我們先看一下它是如何使用的,下面程式碼簡單展示了InterProcessMutex的使用:
public static void soldTickWithLock(CuratorFramework client) throws Exception { //建立分散式鎖, 鎖空間的根節點路徑為/curator/lock InterProcessMutex mutex = new InterProcessMutex(client, "/curator/locks"); mutex.acquire(); //獲得了鎖, 進行業務流程 //代表複雜邏輯執行了一段時間 int sleepMillis = (int) (Math.random() * 2000); Thread.sleep(sleepMillis); //完成業務流程, 釋放鎖 mutex.release(); }
使用方式和我們自己編寫的鎖是一樣的,首先通過mutex.acquire()獲取鎖,該方法會阻塞程序,直到獲取鎖,然後執行你的業務方法,最後通過 mutex.release()釋放鎖。
接下來我們進入正題,展開分析Curator關於分散式鎖的實現:
實現思路
Curator設計方式和之前我們自己實現的方式是類似的:
1、建立有序臨時節點
2、觸發“嘗試取鎖邏輯”,如果自己是臨時鎖節點序列的第一個,則取得鎖,獲取鎖成功。
3、如果自己不是序列中第一個,則監聽前一個鎖節點變更。同時阻塞執行緒。
4、當前一個鎖節點變更時,通過watcher恢復執行緒,然後再次到步驟2“嘗試取鎖邏輯”
如下圖所示:
程式碼實現概述
Curator對於排它鎖的頂層實現邏輯在InterProcessMutex類中,它對客戶端暴露鎖的使用方法,如獲取鎖和釋放鎖等。但鎖的上述實現邏輯,是由他持有的LockInternals物件來具體實現的。LockInternals使用StandardLockInternalsDriver類中的方法來做一些處理。
簡單點解釋,我們打個比方,Curator好比是一家公司承接各種業務,InterProcessMutex是老闆,收到自己客戶(client)的需求後,分配給自己的下屬LockInternals去具體完成,同時給他一個工具StandardLockInternalsDriver,讓他在做任務的過程中使用。如下圖展示:
接下來我們將深入分析InterProcessMutex、LockInternals及StandardLockInternalsDriver類。
InterProcessMutex原始碼分析
InterProcessMutex類是curator中的排它鎖類,客戶端直接打交道的就是InterProcessMutex。所以我們從頂層開始,先分析InterProcessMutex。
實現介面
InterProcessMutex實現了兩個介面:
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
InterProcessLock是分散式鎖介面,分散式鎖必須實現介面中的如下方法:
1、獲取鎖,直到鎖可用
public void acquire() throws Exception;
2、在指定等待的時間內獲取鎖。
public boolean acquire(long time, TimeUnit unit) throws Exception;
3、釋放鎖
public void release() throws Exception;
4、當前執行緒是否獲取了鎖
boolean isAcquiredInThisProcess();
以上方法也是InterProcessMutex暴露出來,供客戶端在使用分散式鎖時呼叫。
Revocable<T>,實現該介面的鎖,鎖是可以被撤銷的。本編文章重點講解鎖的實現機制,關於撤銷部分不做討論。
屬性
InterProcessMutex屬性如下:
型別 | 名稱 | 說明 |
LockInternals | internals | 鎖的實現都在該類中,InterProcessMutex通過此類的方法實現鎖 |
String | basePath | 鎖節點在zk中的根路徑 |
ConcurrentMap<Thread, LockData> | threadData | 執行緒和自己的鎖相關資料對映 |
String | LOCK_NAME | 常量,值為"lock-"。表示鎖節點的字首 |
它還有一個內部靜態類LockData,也是threadData中儲存的value,它定義了鎖的相關資料,包括鎖所屬執行緒,鎖的全路徑,和該執行緒加鎖的次數(InterProcessMutex為可重入鎖)。程式碼如下:
private static class LockData
{
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1);
private LockData(Thread owningThread, String lockPath)
{
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
構造方法
InterProcessMutex有三個構造方法,根據入參不同,巢狀呼叫,最終呼叫的構造方法如下:
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver)
{
basePath = PathUtils.validatePath(path);
internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
可見構造方法最終初始化了兩個屬性,basePath被設定為我們傳入的值 "/curator/lock",這是鎖的根節點。此外就是初始化了internals,前面說過internals是真正實現鎖功能的物件。真正幹活的是internals。
構造完InterProcessMutex物件後,我們看看它是如何工作的。
方法
InterProcessMutex實現InterProcessLock介面,關於分散式鎖的幾個方法都在這個介面中,我們看看InterProcessMutex是如何實現的。
獲得鎖
獲得鎖有兩個方法,區別為是否限定了等待鎖的時間長度。其實最終都是呼叫的私有方法internalLock()。不限定等待時長的程式碼如下:
public void acquire() throws Exception
{
if ( !internalLock(-1, null) )
{
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
可以看到internalLock()返回false時,只可能因為連線超時,否則會一直等待獲取鎖。
internalLock邏輯如下:
- 取得當前執行緒在threadData中的lockData
- 如果存在該執行緒的鎖資料,重入的話 lockData.lockCount加1,直接返回true。獲取鎖成功
- 如果不存在該執行緒的鎖資料,則通過internals.attemptLock()獲取鎖,此時執行緒被阻塞,直至獲得到鎖
- 鎖獲取成功後,把鎖的資訊儲存到threadData中。
- 如果沒能獲取到鎖,則返回false。
完整程式碼如下:
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
釋放鎖
釋放鎖的方法為release(),邏輯如下:
從threadData中取得當前執行緒的鎖資料,有如下情況:
- 不存在,丟擲無此鎖的異常
- 存在,而且lockCount-1後大於零,說明該執行緒鎖重入了,所以直接返回,並不在zk中釋放。
- 存在,而且lockCount-1後小於零,說明有某種異常發生,直接拋異常
- 存在,而且lockCount-1等於零,這是無重入的正確狀態,需要做的就是從zk中刪除臨時節點,通過internals.releaseLock(),不管結果如何,在threadData中移除該執行緒的資料。
InterProcessMutex小結
分散式鎖主要用到的是上面兩個方法,InterProcessMutex還有些其他的方法,這裡就不做具體講解,可以自己看一下,實現都不復雜。
通過對InterProcessMutex的講解,相信我們已經對鎖的獲得和釋放有了瞭解,應該也意識到其實最終實現鎖的是LockInternals類,InterProcessMutex對鎖的核心實現邏輯,實際在LockInternals中。那麼接下來我們將重點講解LockInternals。
LockInternals原始碼分析
Curator通過zk實現分散式鎖的核心邏輯都在LockInternals中,我們按獲取鎖到釋放鎖的流程為指引,逐步分析LockInternals的原始碼。
獲取鎖
在InterProcessMutex獲取鎖的程式碼分析中,可以看到它是通過internals.attemptLock(time, unit, getLockNodeBytes());來獲取鎖的,那麼我們就以這個方法為入口。此方法的邏輯比較簡單,如下:
- 通過driver在zk上建立鎖節點,獲得鎖節點路徑。
- 通過internalLockLoop()方法阻塞程序,直到獲取鎖成功。
核心程式碼如下:
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
我們繼續分析internalLockLoop方法,獲取鎖的核心邏輯在此方法中。
internalLockLoop中通過while自旋,判斷鎖如果沒有被獲取,那麼將不斷的去嘗試獲取鎖。
while迴圈中邏輯如下:
- 通過driver檢視當前鎖節點序號是否排在第一位,如果排在第一位獲,說明取鎖成功,跳出迴圈
- 如果沒有排在第一位,則監聽自己的前序鎖節點,然後阻塞執行緒。
當前序節點釋放了鎖,監聽會被觸發,恢復執行緒,此時主執行緒又回到while中第一步。
重複以上邏輯,直至獲取到鎖(自己鎖的序號排在首位)。
internalLockLoop方法核心程式碼如下:
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
獲取鎖的主要程式碼邏輯我們到這就已經分析完了,可見和我們自己的實現還是基本一樣的。此外上面提到了driver物件,也就是StandardLockInternalsDriver類,它提供了一些輔助的方法,比如說在zk建立鎖節點,判斷zk上鎖序列第一位是否為當前鎖,鎖序列的排序邏輯等。我們就不具體講解了。
釋放鎖
釋放鎖的邏輯很簡單,移除watcher,刪除鎖節點。程式碼如下:
final void releaseLock(String lockPath) throws Exception
{
client.removeWatchers();
revocable.set(null);
deleteOurPath(lockPath);
}
總結
至此,Curator中InterProcessMutex的原始碼分析全部完成。簡單回顧下,InterProcessMutex類封裝上層邏輯,對外暴露鎖的使用方法。而真正的鎖實現邏輯在LockInternals中,它通過對zk臨時有序鎖節點的建立和監控,判斷自己的鎖序號是否在首位,來實現鎖的獲取。此外它還結合StandardLockInternalsDriver提供的方法,共同實現了排他鎖。