zookeeper 共享鎖問題—— 監聽最小目錄節點時,並準備向監聽器列表中註冊監聽器時,最小目錄節點被刪除。
前言: zookeeper 分佈鎖的原來是 如圖:
使用zookeeper 來實現分散式鎖,發現一個問題,當客戶端A執行緒發出命令刪除最小的目錄lock_001被刪除時,並不會立刻刪除,因為命令是通過網路協議傳輸過去的,中間會產生一定的時間段,雖然這個時間段很小很小,但是在這個時間段內會發生一件特殊的事:此時會有另外一個客戶端B執行緒 在A執行緒 沒有傳送到zookeeper伺服器的時候,B執行緒查詢了最小目錄lock_001並且還存在,此時B執行緒準備向監聽器列表新增監聽器lock_001的watcher,就在此時執行緒A的命令執行了,lock_001 目錄刪除了,但是B執行緒還以為lock_001存在,並在監聽器列表新增監聽器lock_001的watcher,然後B執行緒等待進入wait() 等待,直到lock_001 節點被刪除的時候對應的watcher被呼叫,在process中 將B執行緒 notify 喚醒,B執行緒才能執行,可是此時的lock_001已經被刪除了,那麼B執行緒將會一直等待下去。
按順序啟動: ShareLock1 ShareLock2 ShareLock3 ShareLock4 : 這4個類的邏輯相同,
/ElecLock 下面的子節點都是臨時有序的節點。
ShareLock1 會向 zookeeper 中的下注冊目錄 /ElecLock/lock_001 並獲得ElecLock中的子節點中的最小節點是否等於自己穿件的節點,如果等於則執行業務邏輯
如果不是則監聽最小的目錄,在監聽器中重新執行這個邏輯判斷
ShareLock2 會向 zookeeper 中的下注冊目錄 /ElecLock/lock_002
ShareLock3 會向 zookeeper 中的下注冊目錄 /ElecLock/lock_003
ShareLock4 會向 zookeeper 中的下注冊目錄 /ElecLock/lock_004
ShareLock2 ShareLock3 ShareLock4 與 ShareLock1 一樣,唯一的區別就是 ShareLock1 中模擬業務邏輯消耗了20秒
在ShareLock1 類中,判斷建立的節點是否是ElecLock 中的最小節點,是最小的,並執行邏輯,模擬業務邏輯的消耗時間是20秒,當執行完以後刪除lock_001,
ShareLock2 ShareLock3 ShareLock4 中的註冊的監聽器被觸發,同理ShareLock2 是最小的 開始執行,但是此時沒有模擬業務邏輯的消耗時間是20秒,只是簡單的列印一下,業務開始,業務結束,刪除節點lock_002,此時上述所寫的問題出現了,
ShareLock3,ShareLock4 這兩個下的業務執行緒,因為判斷 節點 lock_003 ,lock_003 不是為最小的,並且開始向 最小的lock_002 註冊監聽器,並且還註冊成功了,但是註冊成功的同時,lock_002也被刪掉了,所以造成ShareLock3,ShareLock4 永久的等待。(本人常事不監聽最小目錄換為監聽父目錄ElecLock,這種情況任然存在。)
下面是我的程式碼:
package com.jvm.others.zookeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
/**利用zookeeper實現共享鎖
* 思想:在zookeeper中的一個Lock目錄下注冊臨時ephemeral節點,每個節點為一把鎖
* @author pc
*
*/
public class ShareLock {
static ZooKeeper keeper = null;
static String parentNode="/ElecLock";
static{
try {
keeper = new ZooKeeper("192.168.17.7:2181", 3000000, new Watcher() { //監聽與伺服器建立連線後的回撥函式process
@Override
public void process(WatchedEvent event) {
//不處理邏輯
System.out.println(this.toString()+"-----------預設監聽器---------------");
}
});
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Test
public void RegisterLock() throws Exception {
//建立鎖節點,在建立臨時有序節點時,他會在設定的節點名稱 拼接序列數字 如 lock_0000000007,來生成 真正要建立的節點名稱
String create = keeper.create("/ElecLock/lock_", "lock".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL );
callBack(create);
System.out.println(create);
}
//重複監聽或者執行業務邏輯
private void callBack(final String create) throws KeeperException,
InterruptedException {
//獲得所有鎖節點
List<String> children = keeper.getChildren(parentNode, false);
String[] array = children.toArray(new String[children.size()]);
Arrays.sort(array);
//獲得最小的鎖節點
String minNode = array[0];
minNode=parentNode+"/"+minNode;
//判斷當前建立的鎖節點是否為最小鎖節點
if(minNode.equals(create)){
//如果是進行業務操作
synchronized (keeper) {
System.out.println("業務邏輯 開始 執行");
Thread.sleep(20000);// 模擬執行邏輯的時間
keeper.notify();
System.out.println("業務邏輯 開始 完畢");
System.out.println("刪除此節點"+ create);
keeper.delete(create, -1 );
}
}else{
//如果不是進行等待並且監聽當前最小節點(即鎖庫parentNode)
//如果節點不存在,則返回null, 並且watcher 不回被觸發
Stat exists = keeper.exists(minNode, new Watcher() {
@Override
public void process(WatchedEvent watchedevent) {
try {
System.out.println("重新監聽");
callBack(create);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
/**
* 高併發注意這種情況,當我們獲得了zookeeper中的資料最小的minNode,在我們還沒有執行到keeper.exists(minNode, new Watcher()時
* 此時 minNode 正好被其他業務的執行緒執行了,並且此節點已經被刪除。我們需要對 exists 進行判斷,如果為null,我們應該重新呼叫執行callBack 方法
* 可以避免 這種特殊情況
*
* 第二種特殊情況: 當我們獲得了zookeeper中的資料最小的minNode,在執行到keeper.exists(minNode, new Watcher()時,並判斷當前minNode節點存在,準備向註冊監聽器列表中註冊監聽器時,
* minNode 節點在這一時刻被刪除了,此時程式還以為minNode 節點存在,並且向註冊監聽器列表中註冊事件,當註冊完畢時,並返回結果,此時程式等待 minNode 被刪除時 監聽器被執行,
* 但是 minNode 早已經不存在,所以,此時的執行緒會一直處於等待的狀態。並且業務邏輯永遠不會被執行。
*/
System.out.println("最小節點"+minNode+(exists ==null?"節點不存在":exists));
if(exists == null){ //如果當前最小的節點不存在了,則重新執行callBack
callBack(create);
}
synchronized (keeper) {
System.out.println(this.toString()+" wait end 業務邏輯執行 等待");
keeper.wait();
System.out.println(this.toString()+" wait end 執行緒 結束");
}
}
}
public static void main(String[] args) throws Exception {
ShareLock shareLock = new ShareLock();
shareLock.RegisterLock();
}
}
package com.jvm.others.zookeeper;
import java.util.Arrays;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
public class ShareLock2 {
static ZooKeeper keeper = null;
static String parentNode="/ElecLock";
static{
try {
keeper = new ZooKeeper("192.168.17.7:2181", 3000000, new Watcher() { //監聽與伺服器建立連線後的回撥函式process
@Override
public void process(WatchedEvent event) {
//不處理邏輯
System.out.println(this.toString()+"-----------預設監聽器---------------");
}
});
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Test
public void RegisterLock() throws Exception {
// TODO Auto-generated method stub
//建立鎖節點,在建立臨時有序節點時,他會在設定的節點名稱 拼接序列數字 如 lock_0000000007,來生成 真正要建立的節點名稱
String create = keeper.create("/ElecLock/lock_", "lock".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL );
callBack(create);
System.out.println(create);
}
//重複監聽或者執行業務邏輯
private void callBack(final String create) throws KeeperException,
InterruptedException {
//獲得所有鎖節點
List<String> children = keeper.getChildren(parentNode, false);
String[] array = children.toArray(new String[children.size()]);
Arrays.sort(array);
//獲得最小的鎖節點
String minNode = array[0];
minNode=parentNode+"/"+minNode;
System.out.println("當前最小的節點是: " + minNode);
//判斷當前建立的鎖節點是否為最小鎖節點
if(minNode.equals(create)){
//如果是進行業務操作
synchronized (keeper) {
System.out.println("業務邏輯 開始 執行");
keeper.notify();
System.out.println("業務邏輯 開始 完畢");
System.out.println("刪除此節點"+ create);
keeper.delete(create, -1 );
}
}else{
//如果不是進行等待並且監聽當前最小節點(即鎖庫parentNode)
//如果節點不存在,則返回null, 並且watcher 不回被觸發
Stat exists = keeper.exists(minNode, new Watcher() {
@Override
public void process(WatchedEvent watchedevent) {
try {
System.out.println("重新監聽");
callBack(create);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
/**
* 高併發注意這種情況,當我們獲得了zookeeper中的資料最小的minNode,在我們還沒有執行到keeper.exists(minNode, new Watcher()時
* 此時 minNode 正好被其他業務的執行緒執行了,並且此節點已經被刪除。我們需要對 exists 進行判斷,如果為null,我們應該重新呼叫執行callBack 方法
* 可以避免 這種特殊情況
*/
System.out.println("最小節點"+minNode+(exists ==null?"節點不存在":exists));
if(exists == null){ //如果當前最小的節點不存在了,則重新執行callBack
callBack(create);
}
synchronized (keeper) {
System.out.println(this.toString()+" wait end 業務邏輯執行 等待");
keeper.wait();
System.out.println(this.toString()+" wait end 執行緒 結束");
}
}
}
public static void main(String[] args) throws Exception {
ShareLock2 shareLock = new ShareLock2();
shareLock.RegisterLock();
}
}
package com.jvm.others.zookeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
/**利用zookeeper實現共享鎖
* 思想:在zookeeper中的一個Lock目錄下注冊臨時ephemeral節點,每個節點為一把鎖
* @author pc
*
*/
public class ShareLock3 {
static ZooKeeper keeper = null;
static String parentNode="/ElecLock";
static{
try {
keeper = new ZooKeeper("192.168.17.7:2181", 3000000, new Watcher() { //客戶端與伺服器建立連線後註冊監聽器並回調函式process,和預設執行監聽器的回掉函式,如keeper.getChildren(parentNode, true); 這裡的true指 使用預設監聽器,如果parentNode 下發生了 增加刪除 會觸發預設監聽器
@Override
public void process(WatchedEvent event) {
//不處理邏輯
System.out.println(this.toString()+"--------------預設監聽器------------");
}
});
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Test
public void RegisterLock() throws Exception {
//建立鎖節點,在建立臨時有序節點時,他會在設定的節點名稱 拼接序列數字 如 lock_0000000007,來生成 真正要建立的節點名稱
String create = keeper.create("/ElecLock/lock_", "lock".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL );
callBack(create);
System.out.println(create);
}
//重複監聽或者執行業務邏輯
private void callBack(final String create) throws KeeperException,
InterruptedException {
//獲得所有鎖節點
List<String> children = keeper.getChildren(parentNode, false);
String[] array = children.toArray(new String[children.size()]);
Arrays.sort(array);
//獲得最小的鎖節點
String minNode = array[0];
minNode=parentNode+"/"+minNode;
System.out.println("當前最小的節點是: " + minNode);
//判斷當前建立的鎖節點是否為最小鎖節點
if(minNode.equals(create)){
//如果是進行業務操作
synchronized (keeper) {
System.out.println("業務邏輯 開始 執行");
keeper.notify();
System.out.println("業務邏輯 開始 完畢");
System.out.println("刪除此節點"+ create);
keeper.delete(create, -1 );
}
}else{
//如果不是進行等待並且監聽當前最小節點(即鎖庫parentNode)
//如果節點不存在,則返回null, 並且watcher 不回被觸發,而卻會發生下面所說的情況。
Stat exists = keeper.exists(minNode, new Watcher() {
@Override
public void process(WatchedEvent watchedevent) {
try {
System.out.println("重新監聽");
callBack(create);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
/**
* 高併發注意這種情況1,當我們獲得了zookeeper中的資料最小的minNode,在我們還沒有執行到keeper.exists(minNode, new Watcher()時
* 此時 minNode 正好被其他業務的執行緒執行了,並且此節點已經被刪除。我們需要對 exists 進行判斷,如果為null,我們應該重新呼叫執行callBack 方法
* 可以避免 這種特殊情況
*
*
*/
System.out.println("最小節點"+minNode+(exists ==null?"節點不存在":exists));
if(exists == null){ //如果當前最小的節點不存在了,則重新執行callBack
callBack(create);
}
synchronized (keeper) {
System.out.println(this.toString()+" wait end 業務邏輯執行 等待");
keeper.wait();
System.out.println(this.toString()+" wait end 執行緒 結束");
}
}
}
public static void main(String[] args) throws Exception {
ShareLock3 shareLock = new ShareLock3();
shareLock.RegisterLock();
}
}
package com.jvm.others.zookeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
/**利用zookeeper實現共享鎖
* 思想:在zookeeper中的一個Lock目錄下注冊臨時ephemeral節點,每個節點為一把鎖
* @author pc
*
*/
public class ShareLock4 {
static ZooKeeper keeper = null;
static String parentNode="/ElecLock";
static{
try {
keeper = new ZooKeeper("192.168.17.7:2181", 3000000, new Watcher() { //監聽與伺服器建立連線後的回撥函式process
@Override
public void process(WatchedEvent event) {
//不處理邏輯
System.out.println(this.toString()+"-----------預設監聽器---------------");
}
});
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Test
public void RegisterLock() throws Exception {
// TODO Auto-generated method stub
//建立鎖節點,在建立臨時有序節點時,他會在設定的節點名稱 拼接序列數字 如 lock_0000000007,來生成 真正要建立的節點名稱
String create = keeper.create("/ElecLock/lock_", "lock".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL );
callBack(create);
System.out.println(create);
}
//重複監聽或者執行業務邏輯
private void callBack(final String create) throws KeeperException,
InterruptedException {
//獲得所有鎖節點
List<String> children = keeper.getChildren(parentNode, false);
String[] array = children.toArray(new String[children.size()]);
Arrays.sort(array);
//獲得最小的鎖節點
String minNode = array[0];
minNode=parentNode+"/"+minNode;
System.out.println("當前最小的節點是: " + minNode);
//判斷當前建立的鎖節點是否為最小鎖節點
if(minNode.equals(create)){
//如果是進行業務操作
synchronized (keeper) {
System.out.println("業務邏輯 開始 執行");
keeper.notify();
System.out.println("業務邏輯 開始 完畢");
System.out.println("刪除此節點"+ create);
keeper.delete(create, -1 );
}
}else{
//如果不是進行等待並且監聽當前最小節點(即鎖庫parentNode)
//如果節點不存在,則返回null, 並且watcher 不回被觸發
// exists 只判斷當前節點(不包含子節點)在 刪除增加更新時才會觸發 watcher
Stat exists = keeper.exists(minNode, new Watcher() {
@Override
public void process(WatchedEvent watchedevent) {
try {
System.out.println("重新監聽");
callBack(create);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
/**
* 高併發注意這種情況,當我們獲得了zookeeper中的資料最小的minNode,在我們還沒有執行到keeper.exists(minNode, new Watcher()時
* 此時 minNode 正好被其他業務的執行緒執行了,並且此節點已經被刪除。我們需要對 exists 進行判斷,如果為null,我們應該重新呼叫執行callBack 方法
* 可以避免 這種特殊情況
*
* 第二種特殊情況: 當我們獲得了zookeeper中的資料最小的minNode,在執行到keeper.exists(minNode, new Watcher()時,並判斷當前minNode節點存在,準備向註冊監聽器列表中註冊監聽器時,
* minNode 節點在這一時刻被刪除了,此時程式還以為minNode 節點存在,並且向註冊監聽器列表中註冊事件,當註冊完畢時,並返回結果,此時程式等待 minNode 被刪除時 監聽器被執行,
* 但是 minNode 早已經不存在,所以,此時的執行緒會一直處於等待的狀態。並且業務邏輯永遠不會被執行。
*/
System.out.println("最小節點"+minNode+(exists ==null?"節點不存在":exists));
if(exists == null){ //如果當前最小的節點不存在了,則重新執行callBack
callBack(create);
}
synchronized (keeper) {
System.out.println(this.toString()+" wait end 業務邏輯執行 等待");
keeper.wait();
System.out.println(this.toString()+" wait end 執行緒 結束");
}
}
}
public static void main(String[] args) throws Exception {
ShareLock4 shareLock = new ShareLock4();
shareLock.RegisterLock();
}
}
歡迎大神指點小弟。