1. 程式人生 > >ZooKeeper框架Curator的分散式鎖原始碼分析

ZooKeeper框架Curator的分散式鎖原始碼分析

上一篇文章中,我們使用zookeeper的java api實現了分散式排他鎖。其實zookeeper有一個優秀的框架---Curator,提供了各種分散式協調的服務。Curator中有著更為標準、規範的分散式鎖實現。與其我們自己去實現,不如直接使用Curator。通過學習Curator的原始碼,我們也能瞭解實現分散式鎖更好的方式。

Curator中有各種分散式鎖,本文挑選其中一個---InterProcessMutex進行講解。

我們先看一下Curator程式碼中對於InterProcessMutex的註釋:

可重入的互斥鎖,跨JVM工作。使用ZooKeeper來控制鎖。所有JVM中的任何程序,只要使用同樣的鎖路徑,將會成為跨程序的一部分。此外,這個排他鎖是“公平的”,每個使用者按照申請的順序得到排他鎖。

可見InterProcessMutex和我們自己實現的例子都是一個排他鎖,此外還可以重入。 

如何使用InterProcessMutex

在分析InterProcessMutex程式碼前,我們先看一下它是如何使用的,下面程式碼簡單展示了InterProcessMutex的使用:

    public static void soldTickWithLock(CuratorFramework client) throws Exception {
        //建立分散式鎖, 鎖空間的根節點路徑為/curator/lock
        InterProcessMutex mutex = new InterProcessMutex(client, "/curator/locks");
        mutex.acquire();

        //獲得了鎖, 進行業務流程
        //代表複雜邏輯執行了一段時間
        int sleepMillis = (int) (Math.random() * 2000);
        Thread.sleep(sleepMillis);

        //完成業務流程, 釋放鎖
        mutex.release();
    }

使用方式和我們自己編寫的鎖是一樣的,首先通過mutex.acquire()獲取鎖,該方法會阻塞程序,直到獲取鎖,然後執行你的業務方法,最後通過 mutex.release()釋放鎖。

接下來我們進入正題,展開分析Curator關於分散式鎖的實現:

實現思路

Curator設計方式和之前我們自己實現的方式是類似的:

1、建立有序臨時節點

2、觸發“嘗試取鎖邏輯”,如果自己是臨時鎖節點序列的第一個,則取得鎖,獲取鎖成功。

3、如果自己不是序列中第一個,則監聽前一個鎖節點變更。同時阻塞執行緒。

4、當前一個鎖節點變更時,通過watcher恢復執行緒,然後再次到步驟2“嘗試取鎖邏輯”

如下圖所示:

 程式碼實現概述

Curator對於排它鎖的頂層實現邏輯在InterProcessMutex類中,它對客戶端暴露鎖的使用方法,如獲取鎖和釋放鎖等。但鎖的上述實現邏輯,是由他持有的LockInternals物件來具體實現的。LockInternals使用StandardLockInternalsDriver類中的方法來做一些處理。

簡單點解釋,我們打個比方,Curator好比是一家公司承接各種業務,InterProcessMutex是老闆,收到自己客戶(client)的需求後,分配給自己的下屬LockInternals去具體完成,同時給他一個工具StandardLockInternalsDriver,讓他在做任務的過程中使用。如下圖展示:

接下來我們將深入分析InterProcessMutex、LockInternals及StandardLockInternalsDriver類。

InterProcessMutex原始碼分析

InterProcessMutex類是curator中的排它鎖類,客戶端直接打交道的就是InterProcessMutex。所以我們從頂層開始,先分析InterProcessMutex。

實現介面

InterProcessMutex實現了兩個介面:

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>

InterProcessLock是分散式鎖介面,分散式鎖必須實現介面中的如下方法:

1、獲取鎖,直到鎖可用

public void acquire() throws Exception;

2、在指定等待的時間內獲取鎖。

public boolean acquire(long time, TimeUnit unit) throws Exception;

3、釋放鎖

public void release() throws Exception;

4、當前執行緒是否獲取了鎖

boolean isAcquiredInThisProcess();

以上方法也是InterProcessMutex暴露出來,供客戶端在使用分散式鎖時呼叫。

Revocable<T>,實現該介面的鎖,鎖是可以被撤銷的。本編文章重點講解鎖的實現機制,關於撤銷部分不做討論。

屬性

InterProcessMutex屬性如下:

型別 名稱 說明
LockInternals internals 鎖的實現都在該類中,InterProcessMutex通過此類的方法實現鎖
String basePath 鎖節點在zk中的根路徑
ConcurrentMap<Thread, LockData> threadData 執行緒和自己的鎖相關資料對映
String LOCK_NAME 常量,值為"lock-"。表示鎖節點的字首

它還有一個內部靜態類LockData,也是threadData中儲存的value,它定義了鎖的相關資料,包括鎖所屬執行緒,鎖的全路徑,和該執行緒加鎖的次數(InterProcessMutex為可重入鎖)。程式碼如下:

private static class LockData
{
    final Thread owningThread;
    final String lockPath;
    final AtomicInteger lockCount = new AtomicInteger(1);

    private LockData(Thread owningThread, String lockPath)
    {
        this.owningThread = owningThread;
        this.lockPath = lockPath;
    }
}

構造方法

InterProcessMutex有三個構造方法,根據入參不同,巢狀呼叫,最終呼叫的構造方法如下:

InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver)
{
    basePath = PathUtils.validatePath(path);
    internals = new LockInternals(client, driver, path, lockName, maxLeases);
}

可見構造方法最終初始化了兩個屬性,basePath被設定為我們傳入的值 "/curator/lock",這是鎖的根節點。此外就是初始化了internals,前面說過internals是真正實現鎖功能的物件。真正幹活的是internals。

構造完InterProcessMutex物件後,我們看看它是如何工作的。

方法

InterProcessMutex實現InterProcessLock介面,關於分散式鎖的幾個方法都在這個介面中,我們看看InterProcessMutex是如何實現的。

獲得鎖

獲得鎖有兩個方法,區別為是否限定了等待鎖的時間長度。其實最終都是呼叫的私有方法internalLock()。不限定等待時長的程式碼如下:

public void acquire() throws Exception
{
    if ( !internalLock(-1, null) )
    {
        throw new IOException("Lost connection while trying to acquire lock: " + basePath);
    }
}

可以看到internalLock()返回false時,只可能因為連線超時,否則會一直等待獲取鎖。

internalLock邏輯如下:

  1. 取得當前執行緒在threadData中的lockData
  2. 如果存在該執行緒的鎖資料,重入的話 lockData.lockCount加1,直接返回true。獲取鎖成功
  3. 如果不存在該執行緒的鎖資料,則通過internals.attemptLock()獲取鎖,此時執行緒被阻塞,直至獲得到鎖
  4. 鎖獲取成功後,把鎖的資訊儲存到threadData中。
  5. 如果沒能獲取到鎖,則返回false。

完整程式碼如下:

private boolean internalLock(long time, TimeUnit unit) throws Exception
{
    /*
       Note on concurrency: a given lockData instance
       can be only acted on by a single thread so locking isn't necessary
    */

    Thread currentThread = Thread.currentThread();

    LockData lockData = threadData.get(currentThread);
    if ( lockData != null )
    {
        // re-entering
        lockData.lockCount.incrementAndGet();
        return true;
    }

    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null )
    {
        LockData newLockData = new LockData(currentThread, lockPath);
        threadData.put(currentThread, newLockData);
        return true;
    }

    return false;
}

釋放鎖

釋放鎖的方法為release(),邏輯如下:

從threadData中取得當前執行緒的鎖資料,有如下情況:

  1. 不存在,丟擲無此鎖的異常
  2. 存在,而且lockCount-1後大於零,說明該執行緒鎖重入了,所以直接返回,並不在zk中釋放。
  3. 存在,而且lockCount-1後小於零,說明有某種異常發生,直接拋異常
  4. 存在,而且lockCount-1等於零,這是無重入的正確狀態,需要做的就是從zk中刪除臨時節點,通過internals.releaseLock(),不管結果如何,在threadData中移除該執行緒的資料。

InterProcessMutex小結

分散式鎖主要用到的是上面兩個方法,InterProcessMutex還有些其他的方法,這裡就不做具體講解,可以自己看一下,實現都不復雜。

通過對InterProcessMutex的講解,相信我們已經對鎖的獲得和釋放有了瞭解,應該也意識到其實最終實現鎖的是LockInternals類,InterProcessMutex對鎖的核心實現邏輯,實際在LockInternals中。那麼接下來我們將重點講解LockInternals。

LockInternals原始碼分析

Curator通過zk實現分散式鎖的核心邏輯都在LockInternals中,我們按獲取鎖到釋放鎖的流程為指引,逐步分析LockInternals的原始碼。

獲取鎖

在InterProcessMutex獲取鎖的程式碼分析中,可以看到它是通過internals.attemptLock(time, unit, getLockNodeBytes());來獲取鎖的,那麼我們就以這個方法為入口。此方法的邏輯比較簡單,如下:

  1. 通過driver在zk上建立鎖節點,獲得鎖節點路徑。
  2. 通過internalLockLoop()方法阻塞程序,直到獲取鎖成功。

核心程式碼如下:

ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);

我們繼續分析internalLockLoop方法,獲取鎖的核心邏輯在此方法中。

internalLockLoop中通過while自旋,判斷鎖如果沒有被獲取,那麼將不斷的去嘗試獲取鎖。

while迴圈中邏輯如下:

  1. 通過driver檢視當前鎖節點序號是否排在第一位,如果排在第一位獲,說明取鎖成功,跳出迴圈
  2. 如果沒有排在第一位,則監聽自己的前序鎖節點,然後阻塞執行緒。

當前序節點釋放了鎖,監聽會被觸發,恢復執行緒,此時主執行緒又回到while中第一步。

重複以上邏輯,直至獲取到鎖(自己鎖的序號排在首位)。

internalLockLoop方法核心程式碼如下:

while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
    List<String>        children = getSortedChildren();
    String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash

    PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
    if ( predicateResults.getsTheLock() )
    {
        haveTheLock = true;
    }
    else
    {
        String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

        synchronized(this)
        {
            try 
            {
                // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                if ( millisToWait != null )
                {
                    millisToWait -= (System.currentTimeMillis() - startMillis);
                    startMillis = System.currentTimeMillis();
                    if ( millisToWait <= 0 )
                    {
                        doDelete = true;    // timed out - delete our node
                        break;
                    }

                    wait(millisToWait);
                }
                else
                {
                    wait();
                }
            }
            catch ( KeeperException.NoNodeException e ) 
            {
                // it has been deleted (i.e. lock released). Try to acquire again
            }
        }
    }
}

獲取鎖的主要程式碼邏輯我們到這就已經分析完了,可見和我們自己的實現還是基本一樣的。此外上面提到了driver物件,也就是StandardLockInternalsDriver類,它提供了一些輔助的方法,比如說在zk建立鎖節點,判斷zk上鎖序列第一位是否為當前鎖,鎖序列的排序邏輯等。我們就不具體講解了。

釋放鎖

釋放鎖的邏輯很簡單,移除watcher,刪除鎖節點。程式碼如下:

final void releaseLock(String lockPath) throws Exception

{

client.removeWatchers();

revocable.set(null);

deleteOurPath(lockPath);

}

總結

至此,Curator中InterProcessMutex的原始碼分析全部完成。簡單回顧下,InterProcessMutex類封裝上層邏輯,對外暴露鎖的使用方法。而真正的鎖實現邏輯在LockInternals中,它通過對zk臨時有序鎖節點的建立和監控,判斷自己的鎖序號是否在首位,來實現鎖的獲取。此外它還結合StandardLockInternalsDriver提供的方法,共同實現了排他鎖。