Zookeeper分散式鎖的使用
阿新 • • 發佈:2018-12-12
由於公司引入了dubbo+zookeeper框架,裡面不可避免的引入的zookeeper分散式鎖,所以自己大致瞭解了一下。由於是自己研究,有不正確的地方還請大佬批評指正。
首先先介紹一下自己對zookeeper分散式鎖的理解,之後會引入一版別人的感覺比較好的描述給大家
1.dubbo的微服務後場生產者會暴露介面給前場的消費者。在zookeeper會生成一個相應的節點,比如時候節點名字是/lock。
2.當多個客戶端訪問的時候,會在這個節點下依次生成臨時的順序節點,只有第一個節點可以獲取到鎖,其他節點等待。
3.當第一個節點釋放鎖,這個節點將會被刪除,這個行為會被客戶端監聽,一旦收到這個通知,剩餘的客戶端會繼續搶鎖。
4.這種行為導致了羊群效應,所有在等待的客戶端都會被觸發。於是優化方法是每個客戶端應該對剛好在它之前的子節點設定事件監聽。我前面這個節點哥們完了,我再搶鎖,否則說明我前面有很多人,我就得繼續排隊
5.執行業務程式碼邏輯
6.完成業務程式碼,刪除對應的臨時節點,釋放鎖。
說完zookeeper分散式鎖邏輯上的實現,下面介紹一下Curator,它也提供了對zookeeper分散式鎖實現
先看一下大致流程
public class ZKLockTestTread extends Thread{ private Logger logger=LoggerFactory.getLogger(getClass()); private final String ZKLOCK_NODE_PATH="/zklock/readfile"; public ZKLockTestTread(){ } @Override public void run() { AbstractMutexLock lock=null; boolean lockflag=false; try{ lock=ZKMutexLockFactory.getZKMutexLock(ZKLOCK_NODE_PATH); //指定zk的方式 //lock=ZKMutexLockFactory.getZKMutexLock("127.0.0.1:8080", ZKLOCK_NODE_PATH); //lock.acquire();//爭鎖,無限等待 lockflag=lock.acquire(10, TimeUnit.SECONDS);//爭鎖,超時時間10秒。 if(lockflag){ //獲取到分散式鎖,執行任務 logger.info("SUCESS執行緒【"+Thread.currentThread().getName()+"】獲取到分散式鎖,執行任務"); Thread.sleep(1000000000); } else{ //未獲取到分散式鎖,不執行任務 logger.info("FAILURE執行緒【"+Thread.currentThread().getName()+"】未獲取到分散式鎖,不執行任務"); } } catch (PaasException e) { logger.error("執行緒【"+Thread.currentThread().getName()+"】獲取分散式鎖出錯:"+e.getMessage(),e); //e.printStackTrace(); } catch(Exception e) { logger.error("執行緒【"+Thread.currentThread().getName()+"】執行出錯:"+e.getMessage(),e); //e.printStackTrace(); } finally{ if(lock!=null&&lockflag){ try { lock.release(); logger.error("執行緒【"+Thread.currentThread().getName()+"】釋放分散式鎖OK"); } catch (Exception e) { logger.error("執行緒【"+Thread.currentThread().getName()+"】釋放分散式鎖出錯:"+e.getMessage(),e); //e.printStackTrace(); } } } //System.out.println("【"+Thread.currentThread().getName()+"】"); } }
這裡指定了節點的路徑:
AbstractMutexLock lock=ZKMutexLockFactory.getZKMutexLock(ZKLOCK_NODE_PATH);
獲得鎖的方式是:
boolean lockflag=lock.acquire(10, TimeUnit.SECONDS);
返回值就是是否拿到了鎖,接著我們點進去看一眼,首先是介面
/** * Acquire the mutex - blocks until it's available or the given time expires. Each call to acquire that returns true must be balanced by a call * to {@link #release()} * * @param time time to wait * @param unit time unit * @return true if the mutex was acquired, false if not * @throws Exception ZK errors, connection interruptions */ public boolean acquire(long time, TimeUnit unit) throws Exception;
意思是說直到他狀態是可用或者超過了超時時間才獲取互斥鎖,需用release()平衡,我們再進一步看一下實現
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception
{
return internalLock(time, unit);
}
呼叫了internalLock才真正的呼叫了zookeeper鎖。多說一句,time的值如果為-1,說明鎖被佔用時永久阻塞等待
接著進入internalLock
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary
*/
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
attemptLock嘗試去獲得鎖處理了zookeeper鎖
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try
{
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// gets thrown by StandardLockInternalsDriver when it can't find the lock node
// this can happen when the session expires, etc. So, if the retry allows, just try it all again
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
if ( hasTheLock )
{
return ourPath;
}
return null;
}
其中internalLockLoop:阻塞等待直到獲得鎖
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}