curator是Netflix公司開源的一個ZooKeeper客戶端封裝。curator 很好的實現了分散式鎖,curator 提供了InterProcessMutex 這樣一個 api。除了分散式鎖之外,還提供了 leader 選舉、分散式佇列等常用的功能。org.apache.curator.framework.recipes.locks包下。

1 InterProcessMutex:分散式可重入排它鎖
2 InterProcessSemaphoreMutex:分散式排它鎖
3 InterProcessReadWriteLock:分散式讀寫鎖

4 InterProcessMultiLock:將多個鎖作為單個實體管理的容器

1 分散式鎖要解決的問題

1 鎖釋放:


2 阻塞鎖


3 可重入鎖


4 單點問題


5 超時問題

2 InterProcessLock介面

public interface InterProcessLock
     * Acquire the mutex - blocking until it's available. Each call to acquire must be balanced by a call
     * to {@link #release()}
     * @throws Exception ZK errors, connection interruptions
public void acquire() throws Exception; /** * Acquire the mutex - blocks until it's available or the given time expires. Each call to acquire that returns true must be balanced by a call * to {@link #release()} * * @param time time to wait * @param unit time unit * @return true if the mutex was acquired, false if not * @throws Exception ZK errors, connection interruptions */ public boolean acquire(long time, TimeUnit unit) throws Exception; /** * Perform one release of the mutex. * * @throws Exception ZK errors, interruptions */ public void release() throws Exception; /** * Returns true if the mutex is acquired by a thread in this JVM * * @return true/false */ boolean isAcquiredInThisProcess(); }


1 InterProcessMultiLock 將多個鎖作為單個實體管理的容器
2 InterProcessMutex 分散式可重入排它鎖
3 InterProcessSemaphoreMutex 分散式排它鎖

3 InterProcessMutex


    public InterProcessMutex(CuratorFramework client, String path)
        this(client, path, new StandardLockInternalsDriver());

     * @param client client
     * @param path   the path to lock
     * @param driver lock driver
    public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)
        this(client, path, LOCK_NAME, 1, driver);

LockInternalsDriver 可自定義lock驅動實現分散式鎖

3.1 內部成員

    // 申請鎖與釋放鎖的核心實現,zk節點建立刪除等
    private final LockInternals internals;
     // 鎖路徑
    private final String basePath;
    // 內部快取鎖的容器;
    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
    private static final String LOCK_NAME = "lock-";

    private static class LockData
        final Thread owningThread;
        final String lockPath;
        final AtomicInteger lockCount = new AtomicInteger(1);

        private LockData(Thread owningThread, String lockPath)
            this.owningThread = owningThread;
            this.lockPath = lockPath;

3.2 獲得鎖acquire

    public void acquire() throws Exception
        if ( !internalLock(-1, null) )
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
    private boolean internalLock(long time, TimeUnit unit) throws Exception
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        //1 嘗試從快取中獲取,成功則重入,計數加1
        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
            // re-entering
            return true;
        //2 通過internals獲得鎖
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
            // 2.1 放入快取
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;

        return false;
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
        // 1狀態基本資訊 
        final long      startMillis = System.currentTimeMillis();
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int             retryCount = 0;
        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        // 2 迴圈獲得鎖
        while ( !isDone )
            isDone = true;
                //1 建立臨時順序節點
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                //2 獲取鎖
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            catch ( KeeperException.NoNodeException e )
                // gets thrown by StandardLockInternalsDriver when it can't find the lock node
                // this can happen when the session expires, etc. So, if the retry allows, just try it all again
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                    isDone = false;
                    throw e;
        if ( hasTheLock )
            return ourPath;
        return null;

    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
        String ourPath;
        if ( lockNodeBytes != null )
            ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
            ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        return ourPath;

// internalLockLoop方法
    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
        boolean     haveTheLock = false;
        boolean     doDelete = false;
            if ( revocable.get() != null )
            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                    haveTheLock = true;
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                            if ( millisToWait != null )
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                    doDelete = true;    // timed out - delete our node
                        catch ( KeeperException.NoNodeException e ) 
                            // it has been deleted (i.e. lock released). Try to acquire again
        catch ( Exception e )
            doDelete = true;
            throw e;
            if ( doDelete )
        return haveTheLock;

4 InterProcessSemaphoreMutex


5 InterProcessMultiLock

6 InterProcessReadWriteLock

分散式讀寫鎖 沒有實現InterProcessLock介面

7 InterProcessSemaphoreV2

InterProcessSemaphoreV2 訊號量(InterProcessSemaphore已廢棄) 沒有實現InterProcessLock介面。基於令牌桶演算法,當一個執行緒要執行的時候就去桶裡面獲取令牌,如果有足夠的令牌那麼我就執行如果沒有那麼我就阻塞,當執行緒執行完畢也要將令牌放回桶裡。



