Curator--2 基於ZK實現的分散式鎖
技術標籤:分散式
curator是Netflix公司開源的一個ZooKeeper客戶端封裝。curator 很好的實現了分散式鎖,curator 提供了InterProcessMutex 這樣一個 api。除了分散式鎖之外,還提供了 leader 選舉、分散式佇列等常用的功能。org.apache.curator.framework.recipes.locks包下。
Curator的幾種鎖方案:
1 InterProcessMutex:分散式可重入排它鎖
2 InterProcessSemaphoreMutex:分散式排它鎖
3 InterProcessReadWriteLock:分散式讀寫鎖
1 分散式鎖要解決的問題
1 鎖釋放:
使用Zookeeper可以有效的解決鎖無法釋放的問題,因為在建立鎖的時候,客戶端會在ZK中建立一個臨時節點,一旦客戶端獲取到鎖之後突然掛掉(Session連線斷開),那麼這個臨時節點就會自動刪除掉。其他客戶端就可以再次獲得鎖。
2 阻塞鎖
使用Zookeeper可以實現阻塞的鎖,客戶端可以通過在ZK中建立順序節點,並且在節點上繫結監聽器,一旦節點有變化,Zookeeper會通知客戶端,客戶端可以檢查自己建立的節點是不是當前所有節點中序號最小的,如果是,那麼自己就獲取到鎖,便可以執行業務邏輯了。
3 可重入鎖
使用Zookeeper也可以有效的解決不可重入的問題,客戶端在建立節點的時候,把當前客戶端的主機資訊和執行緒資訊直接寫入到節點中,下次想要獲取鎖的時候和當前最小的節點中的資料比對一下就可以了。如果和自己的資訊一樣,那麼自己直接獲取到鎖,如果不一樣就再建立一個臨時的順序節點,參與排隊。
4 單點問題
使用Zookeeper可以有效的解決單點問題,ZK是叢集部署的,只要叢集中有半數以上的機器存活,就可以對外提供服務。
5 超時問題
2 InterProcessLock介面
public interface InterProcessLock
{
/**
* Acquire the mutex - blocking until it's available. Each call to acquire must be balanced by a call
* to {@link #release()}
*
* @throws Exception ZK errors, connection interruptions
*/
public void acquire() throws Exception;
/**
* Acquire the mutex - blocks until it's available or the given time expires. Each call to acquire that returns true must be balanced by a call
* to {@link #release()}
*
* @param time time to wait
* @param unit time unit
* @return true if the mutex was acquired, false if not
* @throws Exception ZK errors, connection interruptions
*/
public boolean acquire(long time, TimeUnit unit) throws Exception;
/**
* Perform one release of the mutex.
*
* @throws Exception ZK errors, interruptions
*/
public void release() throws Exception;
/**
* Returns true if the mutex is acquired by a thread in this JVM
*
* @return true/false
*/
boolean isAcquiredInThisProcess();
}
實現InterProcessLock介面的類:
1 InterProcessMultiLock 將多個鎖作為單個實體管理的容器
2 InterProcessMutex 分散式可重入排它鎖
3 InterProcessSemaphoreMutex 分散式排它鎖
3 InterProcessMutex
分散式可重入排它鎖
public InterProcessMutex(CuratorFramework client, String path)
{
this(client, path, new StandardLockInternalsDriver());
}
/**
* @param client client
* @param path the path to lock
* @param driver lock driver
*/
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)
{
this(client, path, LOCK_NAME, 1, driver);
}
LockInternalsDriver 可自定義lock驅動實現分散式鎖
3.1 內部成員
// 申請鎖與釋放鎖的核心實現,zk節點建立刪除等
private final LockInternals internals;
// 鎖路徑
private final String basePath;
// 內部快取鎖的容器;
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
private static final String LOCK_NAME = "lock-";
private static class LockData
{
final Thread owningThread;
final String lockPath;
//鎖重入計數
final AtomicInteger lockCount = new AtomicInteger(1);
private LockData(Thread owningThread, String lockPath)
{
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
3.2 獲得鎖acquire
public void acquire() throws Exception
{
if ( !internalLock(-1, null) )
{
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
//1 嘗試從快取中獲取,成功則重入,計數加1
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
//2 通過internals獲得鎖
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
// 2.1 放入快取
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
org.apache.curator.framework.recipes.locks.LockInternals#attemptLock實現
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
// 1狀態基本資訊
//記錄當前時間
final long startMillis = System.currentTimeMillis();
//記錄鎖等待時間
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
//臨時順序節點的data
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
//節點不存在重試次數
int retryCount = 0;
//臨時順序節點path
String ourPath = null;
//當前path是否獲取到鎖
boolean hasTheLock = false;
//flag
boolean isDone = false;
// 2 迴圈獲得鎖
while ( !isDone )
{
isDone = true;
try
{
//1 建立臨時順序節點
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//2 獲取鎖
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// gets thrown by StandardLockInternalsDriver when it can't find the lock node
// this can happen when the session expires, etc. So, if the retry allows, just try it all again
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
//獲取到鎖,返回path
if ( hasTheLock )
{
return ourPath;
}
return null;
}
//createsTheLock方法
@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
// internalLockLoop方法
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
//沒有鎖
boolean haveTheLock = false;
//不刪除
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
//自旋,直到獲取到鎖
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
//獲取到所有的子節點並排序
List<String> children = getSortedChildren();
//當前節點名稱
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
//判斷是否可以拿到鎖,前一個path:當前path是否獲取到鎖
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
//如果拿到鎖,返回
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
//前一個path
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
//增加watcher,實現就是notifyAll,如果節點不存在丟擲節點不存在異常
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
//如果有等待時間
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
//無限等待
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
4 InterProcessSemaphoreMutex
是一種不可重入的分散式互斥鎖。
5 InterProcessMultiLock
6 InterProcessReadWriteLock
分散式讀寫鎖 沒有實現InterProcessLock介面
7 InterProcessSemaphoreV2
InterProcessSemaphoreV2 訊號量(InterProcessSemaphore已廢棄) 沒有實現InterProcessLock介面。基於令牌桶演算法,當一個執行緒要執行的時候就去桶裡面獲取令牌,如果有足夠的令牌那麼我就執行如果沒有那麼我就阻塞,當執行緒執行完畢也要將令牌放回桶裡。
官方文件:http://curator.apache.org/curator-recipes/shared-semaphore.html
參考
1 https://www.jianshu.com/p/5fa6a1464076