Curator分散式鎖原始碼分析
阿新 • • 發佈:2019-01-04
Curator是Apache ZooKeeper的Java / JVM客戶端庫,官網有個圖很形象。
curator對於zookeeper來說就像Guava之餘java.我們知道Guava是谷歌開源的java類庫,該庫經過高度優化,運用得當可極大提高我們的程式碼效率和質量。
所以,用Curator的前提是瞭解zookeeper,
在現在分散式應用大行其道的時代,分散式鎖一直是熱點問題。現在我們一起來看下curator是如果操作zookeeper,以及如何實現分散式鎖的。
//1、制定重試策略:初試時間為1s 重試3次 預設值參考官網 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //2、通過工廠建立連線 CuratorFramework client = CuratorFrameworkFactory.newClient(address, retryPolicy); //3、開啟連線 client.start(); //4 新建一個分散式鎖物件,既然是在zookeeper中,當然要在節點中,這裡節點已存在,不存在的自行建立 final InterProcessMutex mutex = new InterProcessMutex(client, "/testRoot"); //這是程式碼結構 if ( lock.acquire(maxWait, waitUnit) ) { try { // do some work inside of the critical section here } finally { lock.release(); } }
很明顯,我們需要進入acquire方法
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception
{
return internalLock(time, unit);
}
這個方法的作用就是獲取鎖,這方法還有兩個引數,分別是等待的時間和單位。該方法會一直阻塞知道鎖可用或者時間過期,並且該鎖是可重入鎖,每一個獲取鎖操作必須對應一個釋放鎖操作。
繼續進入internalLock方法
private boolean internalLock(long time, TimeUnit unit) throws Exception { //取得當前執行緒 Thread currentThread = Thread.currentThread(); //取出與當前執行緒繫結的LockData物件 LockData lockData = threadData.get(currentThread); //如果物件不為空,說明當前執行緒已經擁有鎖 if ( lockData != null ) { //每一次重入都原子計數加1 lockData.lockCount.incrementAndGet(); return true; } //獲得鎖的關鍵程式碼 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); //第一次獲得鎖後,將當前執行緒和鎖資訊放入map中 if ( lockPath != null ) { LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; }
繼續看獲取鎖的關鍵程式碼attemptLock
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; int retryCount = 0; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; while ( !isDone ) { isDone = true; try { //建立一個節點 ourPath = driver.createsTheLock(client, path, localLockNodeBytes); //根據建立的節點來判斷是否取得鎖 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) { if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } } if ( hasTheLock ) { return ourPath; } return null; }
這段程式碼中核心的就是中間加了註釋的那兩行,一個是建立節點的createsTheLock方法,進入該方法
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
用過java操作zookeeper的同學就能看出這就是對建立節點方法的封裝,這裡建立的是臨時順序節點。
再看internalLockLoop方法
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
//呼叫後一直獲取知道拿到鎖
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{ //獲得子節點,這個方法將子節點進行了從小到大的排序,為什麼從小到大,看下面就知道了
List<String> children = getSortedChildren();
//得到當前節點名去掉父節點的字首
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
//這個方法判斷當前執行緒建立的節點是否是最小的,因為之前按從小到大排序了,所以和第一元素比較就行啦
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
//獲得了鎖
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{ //如果沒有獲得鎖,就監聽當前最小值的節點(鎖在它身上所以要盯著它)
//這裡得到目前最小節點的值
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
進入getsTheLock方法檢視獲取具體細節
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
//在已排序的節點中查詢自己建立節點的下標
int ourIndex = children.indexOf(sequenceNodeName);
//驗證ourIndex是否小於0 ,小於0表示該接節點不存在
validateOurIndex(sequenceNodeName, ourIndex);
//如果比maxLeases小, 則說明是最小的節點 maxLeases在建立鎖InterProcessMutex示例時被初始化為1
boolean getsTheLock = ourIndex < maxLeases;
//如果不是最小的,則需要監聽離你最近的節點
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
獲取鎖的過程基本就是這樣,建議大家debug跟蹤下。
思路其實和java操作zookeeper一樣:
判斷是否已經擁有鎖,如果擁有技術加1直接返回
如果不是,則嘗試建立節點,如果當前執行緒建立的節點值是最小的,則獲取鎖
如果不是最小,則監聽利你最近的那個節點。
有獲取鎖就有釋放鎖,相對來說釋放鎖要簡單些。
public void release() throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
//如果沒有資料與當前執行緒繫結,則丟擲異常
if ( lockData == null )
{
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
//技術減一,由於鎖是可重入的,所以如果newLockCount>0,則不能刪除節點直接返回。
int newLockCount = lockData.lockCount.decrementAndGet();
if ( newLockCount > 0 )
{
return;
}//如果為負數則顯然不正常,可能是獲取一次釋放多次的緣故
if ( newLockCount < 0 )
{
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try
{ //釋放鎖,刪除節點,移除監聽,
internals.releaseLock(lockData.lockPath);
}
finally
{ //從map移除當前執行緒的鎖資訊
threadData.remove(currentThread);
}
}