ZooKeeper分散式鎖與程式碼實現
阿新 • • 發佈:2019-01-26
1.zk的核心機制之一:分散式鎖
分散式鎖能夠在一組程序之間提供互斥機制,使得在任何時候只有一個程序可以持有鎖。分散式鎖可以用於在大型分散式系統中實現領導者選舉,在任何時間點,持有鎖的那個程序就是系統的領導者。注意:不要將zookeeper自己的領導者選舉和使用ZooKeeper基本操作實現的一般領導者選舉服務混為一談。事實上,zookeepr自己的領導者選舉機制是不對外公開的。
2.鎖的具體實現
程式碼實現一個分散式鎖。
- 客戶端A
public class DistributedClient {
// 超時時間
private static final int SESSION_TIMEOUT = 5000;
// zookeeper server列表
private String hosts = "localhost:4180,localhost:4181,localhost:4182";
private String groupNode = "locks";
private String subNode = "sub";
private ZooKeeper zk;
// 當前client建立的子節點
private String thisPath;
// 當前client等待的子節點
private String waitPath;
private CountDownLatch latch = new CountDownLatch(1);
/**
* 連線zookeeper
*/
public void connectZookeeper() throws Exception {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
try {
// 連線建立時, 開啟latch, 喚醒wait在該latch上的執行緒
if (event.getState() == KeeperState.SyncConnected) {
latch.countDown();
}
// 發生了waitPath的刪除事件
if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {
doSomething();
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 等待連線建立
latch.await();
// 建立子節點
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// wait一小會, 讓結果更清晰一些
Thread.sleep(10);
// 注意, 沒有必要監聽"/locks"的子節點的變化情況
List<String> childrenNodes = zk.getChildren("/" + groupNode, false);
// 列表中只有一個子節點, 那肯定就是thisPath, 說明client獲得鎖
if (childrenNodes.size() == 1) {
doSomething();
} else {
String thisNode = thisPath.substring(("/" + groupNode + "/").length());
// 排序
Collections.sort(childrenNodes);
int index = childrenNodes.indexOf(thisNode);
if (index == -1) {
// never happened
} else if (index == 0) {
// inddx == 0, 說明thisNode在列表中最小, 當前client獲得鎖
doSomething();
} else {
// 獲得排名比thisPath前1位的節點
this.waitPath = "/" + groupNode + "/" + childrenNodes.get(index - 1);
// 在waitPath上註冊監聽器, 當waitPath被刪除時, zookeeper會回撥監聽器的process方法
zk.getData(waitPath, true, new Stat());
}
}
}
private void doSomething() throws Exception {
try {
System.out.println("gain lock: " + thisPath);
Thread.sleep(2000);
// do something
} finally {
System.out.println("finished: " + thisPath);
// 將thisPath刪除, 監聽thisPath的client將獲得通知
// 相當於釋放鎖
zk.delete(this.thisPath, -1);
}
}
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
new Thread() {
public void run() {
try {
DistributedClient dl = new DistributedClient();
dl.connectZookeeper();
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
}
Thread.sleep(Long.MAX_VALUE);
}
}
- 分散式多程序模式實現:
public class DistributedClientMy {
// 超時時間
private static final int SESSION_TIMEOUT = 5000;
// zookeeper server列表
private String hosts = "spark01:2181,spark02:2181,spark03:2181";
private String groupNode = "locks";
private String subNode = "sub";
private boolean haveLock = false;
private ZooKeeper zk;
// 當前client建立的子節點
private volatile String thisPath;
/**
* 連線zookeeper
*/
public void connectZookeeper() throws Exception {
zk = new ZooKeeper("spark01:2181", SESSION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
try {
// 子節點發生變化
if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {
// thisPath是否是列表中的最小節點
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
String thisNode = thisPath.substring(("/" + groupNode + "/").length());
// 排序
Collections.sort(childrenNodes);
if (childrenNodes.indexOf(thisNode) == 0) {
doSomething();
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 建立子節點
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
// wait一小會, 讓結果更清晰一些
Thread.sleep(new Random().nextInt(1000));
// 監聽子節點的變化
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
// 列表中只有一個子節點, 那肯定就是thisPath, 說明client獲得鎖
if (childrenNodes.size() == 1) {
doSomething();
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
/**
* 共享資源的訪問邏輯寫在這個方法中
*/
private void doSomething() throws Exception {
try {
System.out.println("gain lock: " + thisPath);
Thread.sleep(2000);
// do something
} finally {
System.out.println("finished: " + thisPath);
// 將thisPath刪除, 監聽thisPath的client將獲得通知
// 相當於釋放鎖
zk.delete(this.thisPath, -1);
}
}
public static void main(String[] args) throws Exception {
DistributedClientMy dl = new DistributedClientMy();
dl.connectZookeeper();
Thread.sleep(Long.MAX_VALUE);
}
}