CSDN站內最全的zookeeper分散式鎖的講解
1 場景描述
在分散式應用, 往往存在多個程序提供同一服務. 這些程序有可能在相同的機器上, 也有可能分佈在不同的機器上. 如果這些程序共享了一些資源, 可能就需要分散式鎖來鎖定對這些資源的訪問。
2 思路
程序需要訪問共享資料時, 就在"/locks"節點下建立一個sequence型別的子節點, 稱為thisPath. 當thisPath在所有子節點中最小時, 說明該程序獲得了鎖. 程序獲得鎖之後, 就可以訪問共享資源了. 訪問完成後, 需要將thisPath刪除. 鎖由新的最小的子節點獲得.
有了清晰的思路之後, 還需要補充一些細節. 程序如何知道thisPath是所有子節點中最小的呢? 可以在建立的時候, 通過getChildren方法獲取子節點列表, 然後在列表中找到排名比thisPath前1位的節點, 稱為waitPath, 然後在waitPath上註冊監聽, 當waitPath被刪除後, 程序獲得通知, 此時說明該程序獲得了鎖.
3 演算法
lock操作過程:
首先為一個lock場景,在zookeeper中指定對應的一個根節點,用於記錄資源競爭的內容;
每個lock建立後,會lazy在zookeeper中建立一個node節點,表明對應的資源競爭標識。 (小技巧:node節點為EPHEMERAL_SEQUENTIAL,自增長的臨時節點);
進行lock操作時,獲取對應lock根節點下的所有子節點,也即處於競爭中的資源標識;
按照Fair(公平)競爭的原則,按照對應的自增內容做排序,取出編號最小的一個節點做為lock的owner,判斷自己的節點id是否就為owner id,如果是則返回,lock成功。
如果自己非owner id,按照排序的結果找到序號比自己前一位的id,關注它鎖釋放的操作(也就是exist watcher),形成一個鏈式的觸發過程;
unlock操作過程:
將自己id對應的節點刪除即可,對應的下一個排隊的節點就可以收到Watcher事件,從而被喚醒得到鎖後退出;
其中的幾個關鍵點:
node節點選擇為EPHEMERAL_SEQUENTIAL很重要。
自增長的特性,可以方便構建一個基於Fair特性的鎖,前一個節點喚醒後一個節點,形成一個鏈式的觸發過程。可以有效的避免"驚群效應"(一個鎖釋放,所有等待的執行緒都被喚醒),有針對性的喚醒,提升效能。
選擇一個EPHEMERAL臨時節點的特性。因為和zookeeper互動是一個網路操作,不可控因素過多,比如網路斷了,上一個節點釋放鎖的操作會失敗。臨時節點是和對應的session掛接的,session一旦超時或者異常退出其節點就會消失,類似於ReentrantLock中等待佇列Thread的被中斷處理。
獲取lock操作是一個阻塞的操作,而對應的Watcher是一個非同步事件,所以需要使用互斥訊號共享鎖BooleanMutex進行通知,可以比較方便的解決鎖重入的問題。(鎖重入可以理解為多次讀操作,鎖釋放為寫搶佔操作)
注意:
使用EPHEMERAL會引出一個風險:在非正常情況下,網路延遲比較大會出現session timeout,zookeeper就會認為該client已關閉,從而銷燬其id標示,競爭資源的下一個id就可以獲取鎖。這時可能會有兩個process同時拿到鎖在跑任務,所以設定好session timeout很重要。
同樣使用PERSISTENT同樣會存在一個死鎖的風險,程序異常退出後,對應的競爭資源id一直沒有刪除,下一個id一直無法獲取到鎖物件。
4 實現
- DistributedLock.java原始碼:分散式鎖
import java.io.IOException;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
/**
* Zookeeper 分散式鎖
*/
public class DistributedLock {
private static final int SESSION_TIMEOUT = 10000;
private static final int DEFAULT_TIMEOUT_PERIOD = 10000;
private static final String CONNECTION_STRING = "127.0.0.1:2180,127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
private static final byte[] data = {0x12, 0x34};
private ZooKeeper zookeeper;
private String root;
private String id;
private LockNode idName;
private String ownerId;
private String lastChildId;
private Throwable other = null;
private KeeperException exception = null;
private InterruptedException interrupt = null;
public DistributedLock(String root) {
try {
this.zookeeper = new ZooKeeper(CONNECTION_STRING, SESSION_TIMEOUT, null);
this.root = root;
ensureExists(root);
} catch (IOException e) {
e.printStackTrace();
other = e;
}
}
/**
* 嘗試獲取鎖操作,阻塞式可被中斷
*/
public void lock() throws Exception {
// 可能初始化的時候就失敗了
if (exception != null) {
throw exception;
}
if (interrupt != null) {
throw interrupt;
}
if (other != null) {
throw new Exception("", other);
}
if (isOwner()) {// 鎖重入
return;
}
BooleanMutex mutex = new BooleanMutex();
acquireLock(mutex);
// 避免zookeeper重啟後導致watcher丟失,會出現死鎖使用了超時進行重試
try {
// mutex.lockTimeOut(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值為true
mutex.lock();
} catch (Exception e) {
e.printStackTrace();
if (!mutex.state()) {
lock();
}
}
if (exception != null) {
throw exception;
}
if (interrupt != null) {
throw interrupt;
}
if (other != null) {
throw new Exception("", other);
}
}
/**
* 嘗試獲取鎖物件, 不會阻塞
*
* @throws InterruptedException
* @throws KeeperException
*/
public boolean tryLock() throws Exception {
// 可能初始化的時候就失敗了
if (exception != null) {
throw exception;
}
if (isOwner()) { // 鎖重入
return true;
}
acquireLock(null);
if (exception != null) {
throw exception;
}
if (interrupt != null) {
Thread.currentThread().interrupt();
}
if (other != null) {
throw new Exception("", other);
}
return isOwner();
}
/**
* 釋放鎖物件
*/
public void unlock() throws KeeperException {
if (id != null) {
try {
zookeeper.delete(root + "/" + id, -1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (KeeperException.NoNodeException e) {
// do nothing
} finally {
id = null;
}
} else {
//do nothing
}
}
/**
* 判斷某path節點是否存在,不存在就建立
* @param path
*/
private void ensureExists(final String path) {
try {
Stat stat = zookeeper.exists(path, false);
if (stat != null) {
return;
}
zookeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
exception = e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
interrupt = e;
}
}
/**
* 返回鎖物件對應的path
*/
public String getRoot() {
return root;
}
/**
* 判斷當前是不是鎖的owner
*/
public boolean isOwner() {
return id != null && ownerId != null && id.equals(ownerId);
}
/**
* 返回當前的節點id
*/
public String getId() {
return this.id;
}
===================== helper method =============================
/**
* 執行lock操作,允許傳遞watch變數控制是否需要阻塞lock操作
*/
private Boolean acquireLock(final BooleanMutex mutex) {
try {
do {
if (id == null) { // 構建當前lock的唯一標識
long sessionId = zookeeper.getSessionId();
String prefix = "x-" + sessionId + "-";
// 如果第一次,則建立一個節點
String path = zookeeper.create(root + "/" + prefix, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
int index = path.lastIndexOf("/");
id = StringUtils.substring(path, index + 1);
idName = new LockNode(id);
}
if (id != null) {
List<String> names = zookeeper.getChildren(root, false);
if (names.isEmpty()) {
id = null; // 異常情況,重新建立一個
} else {
// 對節點進行排序
SortedSet<LockNode> sortedNames = new TreeSet<>();
for (String name : names) {
sortedNames.add(new LockNode(name));
}
if (!sortedNames.contains(idName)) {
id = null;// 清空為null,重新建立一個
continue;
}
// 將第一個節點做為ownerId
ownerId = sortedNames.first().getName();
if (mutex != null && isOwner()) {
mutex.unlock();// 直接更新狀態,返回
return true;
} else if (mutex == null) {
return isOwner();
}
SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
if (!lessThanMe.isEmpty()) {
// 關注一下排隊在自己之前的最近的一個節點
LockNode lastChildName = lessThanMe.last();
lastChildId = lastChildName.getName();
// 非同步watcher處理
Stat stat = zookeeper.exists(root + "/" + lastChildId, new Watcher() {
public void process(WatchedEvent event) {
acquireLock(mutex);
}
});
if (stat == null) {
acquireLock(mutex);// 如果節點不存在,需要自己重新觸發一下,watcher不會被掛上去
}
} else {
if (isOwner()) {
mutex.unlock();
} else {
id = null;// 可能自己的節點已超時掛了,所以id和ownerId不相同
}
}
}
}
} while (id == null);
} catch (KeeperException e) {
exception = e;
if (mutex != null) {
mutex.unlock();
}
} catch (InterruptedException e) {
interrupt = e;
if (mutex != null) {
mutex.unlock();
}
} catch (Throwable e) {
other = e;
if (mutex != null) {
mutex.unlock();
}
}
if (isOwner() && mutex != null) {
mutex.unlock();
}
return Boolean.FALSE;
}
}
- BooleanMutex.java原始碼:互斥訊號共享鎖
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* 互斥訊號共享鎖
*/
public class BooleanMutex {
private Sync sync;
public BooleanMutex() {
sync = new Sync();
set(false);
}
/**
* 阻塞等待Boolean為true
* @throws InterruptedException
*/
public void lock() throws InterruptedException {
sync.innerLock();
}
/**
* 阻塞等待Boolean為true,允許設定超時時間
* @param timeout
* @param unit
* @throws InterruptedException
* @throws TimeoutException
*/
public void lockTimeOut(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
sync.innerLock(unit.toNanos(timeout));
}
public void unlock(){
set(true);
}
/**
* 重新設定對應的Boolean mutex
* @param mutex
*/
public void set(Boolean mutex) {
if (mutex) {
sync.innerSetTrue();
} else {
sync.innerSetFalse();
}
}
public boolean state() {
return sync.innerState();
}
/**
* 互斥訊號共享鎖
*/
private final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -7828117401763700385L;
/**
* 狀態為1,則喚醒被阻塞在狀態為FALSE的所有執行緒
*/
private static final int TRUE = 1;
/**
* 狀態為0,則當前執行緒阻塞,等待被喚醒
*/
private static final int FALSE = 0;
/**
* 返回值大於0,則執行;返回值小於0,則阻塞
*/
protected int tryAcquireShared(int arg) {
return getState() == 1 ? 1 : -1;
}
/**
* 實現AQS的介面,釋放共享鎖的判斷
*/
protected boolean tryReleaseShared(int ignore) {
// 始終返回true,代表可以release
return true;
}
private boolean innerState() {
return getState() == 1;
}
private void innerLock() throws InterruptedException {
acquireSharedInterruptibly(0);
}
private void innerLock(long nanosTimeout) throws InterruptedException, TimeoutException {
if (!tryAcquireSharedNanos(0, nanosTimeout))
throw new TimeoutException();
}
private void innerSetTrue() {
for (;;) {
int s = getState();
if (s == TRUE) {
return; // 直接退出
}
if (compareAndSetState(s, TRUE)) {// cas更新狀態,避免併發更新true操作
releaseShared(0);// 釋放一下鎖物件,喚醒一下阻塞的Thread
}
}
}
private void innerSetFalse() {
for (;;) {
int s = getState();
if (s == FALSE) {
return; //直接退出
}
if (compareAndSetState(s, FALSE)) {//cas更新狀態,避免併發更新false操作
setState(FALSE);
}
}
}
}
}
- 測試類:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.zookeeper.KeeperException;
/**
* 分散式鎖測試
* @version 1.0
*/
public class DistributedLockTest {
public static void main(String [] args) {
ExecutorService executor = Executors.newCachedThreadPool();
final int count = 50;
final CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; i++) {
final DistributedLock node = new DistributedLock("/locks");
executor.submit(new Runnable() {
public void run() {
try {
Thread.sleep(1000);
// node.tryLock(); // 無阻塞獲取鎖
node.lock(); // 阻塞獲取鎖
Thread.sleep(100);
System.out.println("id: " + node.getId() + " is leader: " + node.isOwner());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
try {
node.unlock();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
}
}
控制檯輸出:
id: x-239027745716109354-0000000248 is leader: true
id: x-22854963329433645-0000000249 is leader: true
id: x-22854963329433646-0000000250 is leader: true
id: x-166970151413415997-0000000251 is leader: true
id: x-166970151413415998-0000000252 is leader: true
id: x-166970151413415999-0000000253 is leader: true
id: x-166970151413416000-0000000254 is leader: true
id: x-166970151413416001-0000000255 is leader: true
id: x-166970151413416002-0000000256 is leader: true
id: x-22854963329433647-0000000257 is leader: true
id: x-239027745716109355-0000000258 is leader: true
id: x-166970151413416003-0000000259 is leader: true
id: x-94912557367427124-0000000260 is leader: true
id: x-22854963329433648-0000000261 is leader: true
id: x-239027745716109356-0000000262 is leader: true
id: x-239027745716109357-0000000263 is leader: true
id: x-166970151413416004-0000000264 is leader: true
id: x-239027745716109358-0000000265 is leader: true
id: x-239027745716109359-0000000266 is leader: true
id: x-22854963329433649-0000000267 is leader: true
id: x-22854963329433650-0000000268 is leader: true
id: x-94912557367427125-0000000269 is leader: true
id: x-22854963329433651-0000000270 is leader: true
id: x-94912557367427126-0000000271 is leader: true
id: x-239027745716109360-0000000272 is leader: true
id: x-94912557367427127-0000000273 is leader: true
id: x-94912557367427128-0000000274 is leader: true
id: x-166970151413416005-0000000275 is leader: true
id: x-94912557367427129-0000000276 is leader: true
id: x-166970151413416006-0000000277 is leader: true
id: x-94912557367427130-0000000278 is leader: true
id: x-94912557367427131-0000000279 is leader: true
id: x-239027745716109361-0000000280 is leader: true
id: x-239027745716109362-0000000281 is leader: true
id: x-166970151413416007-0000000282 is leader: true
id: x-94912557367427132-0000000283 is leader: true
id: x-22854963329433652-0000000284 is leader: true
id: x-166970151413416008-0000000285 is leader: true
id: x-239027745716109363-0000000286 is leader: true
id: x-239027745716109364-0000000287 is leader: true
id: x-166970151413416009-0000000288 is leader: true
id: x-166970151413416010-0000000289 is leader: true
id: x-239027745716109365-0000000290 is leader: true
id: x-94912557367427133-0000000291 is leader: true
id: x-239027745716109366-0000000292 is leader: true
id: x-94912557367427134-0000000293 is leader: true
id: x-22854963329433653-0000000294 is leader: true
id: x-94912557367427135-0000000295 is leader: true
id: x-239027745716109367-0000000296 is leader: true
id: x-239027745716109368-0000000297 is leader: true
5 升級版
實現了一個分散式lock後,可以解決多程序之間的同步問題,但設計多執行緒+多程序的lock控制需求,單jvm中每個執行緒都和zookeeper進行網路互動成本就有點高了,所以基於DistributedLock,實現了一個分散式二層鎖。
大致原理就是ReentrantLock 和 DistributedLock的一個結合:
單jvm的多執行緒競爭時,首先需要先拿到第一層的ReentrantLock的鎖;
拿到鎖之後這個執行緒再去和其他JVM的執行緒競爭鎖,最後拿到之後鎖之後就開始處理任務;
鎖的釋放過程是一個反方向的操作,先釋放DistributedLock,再釋放ReentrantLock。 可以思考一下,如果先釋放ReentrantLock,假如這個JVM ReentrantLock競爭度比較高,一直其他JVM的鎖競爭容易被餓死。
- DistributedReentrantLock.java原始碼:多程序+多執行緒分散式鎖
import java.text.MessageFormat;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.zookeeper.KeeperException;
/**
* 多程序+多執行緒分散式鎖
*/
public class DistributedReentrantLock extends DistributedLock {
private static final String ID_FORMAT = "Thread[{0}] Distributed[{1}]";
private ReentrantLock reentrantLock = new ReentrantLock();
public DistributedReentrantLock(String root) {
super(root);
}
public void lock() throws Exception {
reentrantLock.lock();//多執行緒競爭時,先拿到第一層鎖
super.lock();
}
public boolean tryLock() throws Exception {
//多執行緒競爭時,先拿到第一層鎖
return reentrantLock.tryLock() && super.tryLock();
}
public void unlock() throws KeeperException {
super.unlock();
reentrantLock.unlock();//多執行緒競爭時,釋放最外層鎖
}
@Override
public String getId() {
return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId());
}
@Override
public boolean isOwner() {
return reentrantLock.isHeldByCurrentThread() && super.isOwner();
}
}
- 測試程式碼:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.zookeeper.KeeperException;
/**
* @version 1.0
*/
public class DistributedReentrantLockTest {
public static void main(String [] args) {
ExecutorService executor = Executors.newCachedThreadPool();
final int count = 50;
final CountDownLatch latch = new CountDownLatch(count);
final DistributedReentrantLock lock = new DistributedReentrantLock("/locks"); //單個鎖
for (int i = 0; i < count; i++) {
executor.submit(new Runnable() {
public void run() {
try {
Thread.sleep(1000);
lock.lock();
Thread.sleep(100);
System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner());
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
try {
lock.unlock();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
}
}
6 最後
其實再可以發散一下,實現一個分散式的read/write lock,也差不多就是這個理了。大致思路:
競爭資源標示: read_自增id , write_自增id;
首先按照自增id進行排序,如果佇列的前邊都是read標識,對應的所有read都獲得鎖。如果佇列的前邊是write標識,第一個write節點獲取鎖;
watcher監聽: read監聽距離自己最近的一個write節點的exist,write監聽距離自己最近的一個節點(read或者write節點)