1. 程式人生 > >Curator分散式鎖原始碼分析

Curator分散式鎖原始碼分析

Curator是Apache ZooKeeper的Java / JVM客戶端庫,官網有個圖很形象。

pic

curator對於zookeeper來說就像Guava之餘java.我們知道Guava是谷歌開源的java類庫,該庫經過高度優化,運用得當可極大提高我們的程式碼效率和質量。

所以,用Curator的前提是瞭解zookeeper,

在現在分散式應用大行其道的時代,分散式鎖一直是熱點問題。現在我們一起來看下curator是如果操作zookeeper,以及如何實現分散式鎖的。

//1、制定重試策略:初試時間為1s 重試3次  預設值參考官網
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); 
 //2、通過工廠建立連線
        CuratorFramework client = CuratorFrameworkFactory.newClient(address, retryPolicy);
  //3、開啟連線
        client.start();
//4 新建一個分散式鎖物件,既然是在zookeeper中,當然要在節點中,這裡節點已存在,不存在的自行建立
        final InterProcessMutex mutex = new InterProcessMutex(client, "/testRoot"); 
//這是程式碼結構
if ( lock.acquire(maxWait, waitUnit) ) 
{
    try 
    {
        // do some work inside of the critical section here
    }
    finally
    {
        lock.release();
    }
}

很明顯,我們需要進入acquire方法

 @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception
    {
        return internalLock(time, unit);
    }

這個方法的作用就是獲取鎖,這方法還有兩個引數,分別是等待的時間和單位。該方法會一直阻塞知道鎖可用或者時間過期,並且該鎖是可重入鎖,每一個獲取鎖操作必須對應一個釋放鎖操作。

繼續進入internalLock方法

private boolean internalLock(long time, TimeUnit unit) throws Exception
	    {
	        
		  	//取得當前執行緒
	        Thread currentThread = Thread.currentThread();
	        //取出與當前執行緒繫結的LockData物件
	        LockData lockData = threadData.get(currentThread);
	        //如果物件不為空,說明當前執行緒已經擁有鎖
	        if ( lockData != null )
	        {
	            //每一次重入都原子計數加1
	            lockData.lockCount.incrementAndGet();
	            return true;
	        }
	        //獲得鎖的關鍵程式碼
	        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
	        //第一次獲得鎖後,將當前執行緒和鎖資訊放入map中
	        if ( lockPath != null )
	        {
	            LockData newLockData = new LockData(currentThread, lockPath);
	            threadData.put(currentThread, newLockData);
	            return true;
	        }

	        return false;
	    }

繼續看獲取鎖的關鍵程式碼attemptLock

 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
	    {
	        final long      startMillis = System.currentTimeMillis();
	        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
	        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
	        int             retryCount = 0;

	        String          ourPath = null;
	        boolean         hasTheLock = false;
	        boolean         isDone = false;
	        while ( !isDone )
	        {
	            isDone = true;

	            try
	            {	//建立一個節點
	                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
	                //根據建立的節點來判斷是否取得鎖
	                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
	            }
	            catch ( KeeperException.NoNodeException e )
	            {
	                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
	                {
	                    isDone = false;
	                }
	                else
	                {
	                    throw e;
	                }
	            }
	        }

	        if ( hasTheLock )
	        {
	            return ourPath;
	        }

	        return null;
	    }

這段程式碼中核心的就是中間加了註釋的那兩行,一個是建立節點的createsTheLock方法,進入該方法

  public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
    {
        String ourPath;
        if ( lockNodeBytes != null )
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        }
        else
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }

用過java操作zookeeper的同學就能看出這就是對建立節點方法的封裝,這裡建立的是臨時順序節點。

再看internalLockLoop方法

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }
            //呼叫後一直獲取知道拿到鎖
            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {	//獲得子節點,這個方法將子節點進行了從小到大的排序,為什麼從小到大,看下面就知道了
                List<String>        children = getSortedChildren();
                //得到當前節點名去掉父節點的字首
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
                //這個方法判斷當前執行緒建立的節點是否是最小的,因為之前按從小到大排序了,所以和第一元素比較就行啦
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                //獲得了鎖
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {	//如果沒有獲得鎖,就監聽當前最小值的節點(鎖在它身上所以要盯著它)
                	//這裡得到目前最小節點的值
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try 
                        {
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }

                                wait(millisToWait);
                            }
                            else
                            {
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e ) 
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }

進入getsTheLock方法檢視獲取具體細節

   public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
	    {
		   //在已排序的節點中查詢自己建立節點的下標
	        int             ourIndex = children.indexOf(sequenceNodeName);
	        //驗證ourIndex是否小於0 ,小於0表示該接節點不存在
	        validateOurIndex(sequenceNodeName, ourIndex);
	        //如果比maxLeases小, 則說明是最小的節點  maxLeases在建立鎖InterProcessMutex示例時被初始化為1
	        boolean         getsTheLock = ourIndex < maxLeases;
	        //如果不是最小的,則需要監聽離你最近的節點
	        String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
	        return new PredicateResults(pathToWatch, getsTheLock);
	    }

獲取鎖的過程基本就是這樣,建議大家debug跟蹤下。

思路其實和java操作zookeeper一樣:
判斷是否已經擁有鎖,如果擁有技術加1直接返回
如果不是,則嘗試建立節點,如果當前執行緒建立的節點值是最小的,則獲取鎖
如果不是最小,則監聽利你最近的那個節點。

有獲取鎖就有釋放鎖,相對來說釋放鎖要簡單些。

public void release() throws Exception
    {
        /*
            Note on concurrency: a given lockData instance
            can be only acted on by a single thread so locking isn't necessary
         */

        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
		//如果沒有資料與當前執行緒繫結,則丟擲異常
        if ( lockData == null )
        {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }
		//技術減一,由於鎖是可重入的,所以如果newLockCount>0,則不能刪除節點直接返回。
        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 )
        {
            return;
        }//如果為負數則顯然不正常,可能是獲取一次釋放多次的緣故
        if ( newLockCount < 0 )
        {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try
        {	//釋放鎖,刪除節點,移除監聽,
            internals.releaseLock(lockData.lockPath);
        }
        finally
        {	//從map移除當前執行緒的鎖資訊
            threadData.remove(currentThread);
        }
    }