ZooKeeper - 分散式鎖
阿新 • • 發佈:2018-11-28
ZooKeeper實現分散式鎖的優點:ZK可以建立持久節點、臨時節點和順序節點。臨時節點在會話結束之後,自動被刪除。即使發生宕機,只要超出心跳時間,就會斷開會話,從而刪除臨時節點,不會造成死鎖的現象。
- 指定競爭的資源,在ZK下生成持久節點。
- 在持久節點下,生成若干臨時順序節點,嘗試獲取鎖。
- 判斷該節點是否是序號最小的節點,若是,則獲取鎖;若不是,則阻塞。
- 通過ZK的Watcher監聽上一個節點的事件,滿足要求則解除阻塞。重複3操作。
- 最後所有的節點都獲得了鎖,再斷開ZK的會話連線,刪除所有臨時節點。
package com.mzs.lock; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.logging.Logger; public class DistributeLock implements Watcher, Lock { private static Logger logger = Logger.getLogger("com.mzs.lock.DistributeLock"); // 根節點路徑 private final String path = "/root"; private ZooKeeper zooKeeper; // 會話超時時間 private final static int SESSION_TIME_OUT = 50000; // 當前鎖 private String currentLock; // 上一個鎖 private String waitLock; // 競爭的資源 private String lockName; private CountDownLatch countDownLatch; public DistributeLock(String url, String lockName) { this.lockName = lockName; try { // 建立zookeeper會話 zooKeeper = new ZooKeeper(url, SESSION_TIME_OUT, this); // 根節點是否存在 Stat stat = zooKeeper.exists(path, false); if (stat == null) { // 建立持久節點,並擁有所有權 zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } /** * 加鎖(普通獲取或者重入獲取) */ @Override public void lock() { // 成功獲取 if (tryLock()) logger.info("[" + Thread.currentThread().getName() + "] --- get --- [" + lockName + "]"); else { try { // 等待獲取 waitLock(waitLock, SESSION_TIME_OUT); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 等待鎖 * @param waitTime 超時時間 * @return 是否成功等待 */ private boolean waitLock(String waitLock, long waitTime) throws KeeperException, InterruptedException { Stat stat = zooKeeper.exists(path + "/" + waitLock, true); // 等待鎖中有節點與之對應 if (stat != null) { countDownLatch = new CountDownLatch(1); try { logger.info("[" + Thread.currentThread().getName() + "] --- wait --- [" + path + "/" + waitLock + "]"); // 等待時長為waitTime毫秒 boolean bool = countDownLatch.await(waitTime, TimeUnit.MILLISECONDS); String curentThreadName = Thread.currentThread().getName(); logger.info("[" + curentThreadName + "] --- " + (!bool ? "wait for timeout,stop to get the lock" : "got the lock in the valid time")); // 等待結束 countDownLatch = null; } catch (InterruptedException e) { e.printStackTrace(); } } return true; } /** * 加鎖(優先考慮中斷) * * @throws InterruptedException 被中斷異常 */ @Override public void lockInterruptibly() throws InterruptedException { lock(); } /** * 嘗試獲得鎖 * @return 是否獲得鎖 */ @Override public boolean tryLock() { String separateStr = "/"; String separateStr2 = "-"; // 臨時順序節點的路徑 String path_1 = path + separateStr + lockName + separateStr2; try { // 建立臨時順序節點 currentLock = zooKeeper.create(path_1, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); logger.info("[" + currentLock + "] --- " + " was created"); // 定義一個根節點下所有子節點組成的列表 List<String> nodeChildren = zooKeeper.getChildren(path, false); // 定義一個儲存包含lockName的節點的列表 List<String> nodeList = new ArrayList<>(); // 從nodeChildren中取出所有包含lockName的所有節點,加入到nodeList中 for (String nodeElem : nodeChildren) { String node = nodeElem.split(separateStr2)[0]; if (node.equals(lockName)) { nodeList.add(nodeElem); } } // 排序nodeList(從小到大) Collections.sort(nodeList); logger.info("[" + Thread.currentThread().getName() + "] --- corresponding the lock [" + currentLock + "]"); // 若當前鎖對應的是最小節點,則認為取得了鎖 if (currentLock.equals(path + separateStr + nodeList.get(0))) { return true; } // 通過當前鎖檢視它所對應的節點 String currentNode = currentLock.substring(currentLock.lastIndexOf(separateStr) + 1); // 二分查詢法查詢上一個節點 int index = Collections.binarySearch(nodeList, currentNode) - 1; // 將上一個節點標記一個等待鎖 waitLock = nodeList.get(index); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } // 表示獲取鎖失敗 return false; } /** * 超時時間內,嘗試獲得鎖 * * @param time 超時時間 * @param unit 時間單位 * @return 是否成功獲得鎖 */ @Override public boolean tryLock(long time, TimeUnit unit) { try { // 已經成功獲取鎖 if (tryLock()) return true; // 等待獲取鎖 return waitLock(waitLock, time); } catch (Exception e) { e.printStackTrace(); } return false; } /** * 釋放鎖 */ @Override public void unlock() { try { // 刪除當前鎖對應的節點 zooKeeper.delete(currentLock, -1); logger.info("[" + Thread.currentThread().getName() + "] --- release --- [" + currentLock + "]"); // 釋放當前鎖 currentLock = null; zooKeeper.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } @Override public Condition newCondition() { return null; } /** * 監聽節點 * @param event 節點事件 */ @Override public void process(WatchedEvent event) { if (countDownLatch != null) { countDownLatch.countDown(); } /*if (Event.KeeperState.SyncConnected == event.getState()) { if (Event.EventType.None == event.getType() && event.getPath() == null) { logger.info("establish the session"); } else if (Event.EventType.NodeCreated == event.getType()) { logger.info("node [" + event.getPath() + "] was created"); } else if (Event.EventType.NodeDeleted == event.getType()) { logger.info("node [" + event.getPath() + "] was deleted"); } else if (Event.EventType.NodeDataChanged == event.getType()) { logger.info("node [" + event.getPath() + "] data changed"); } else if (Event.EventType.NodeChildrenChanged == event.getType()) { logger.info("node [" + event.getPath() + "] children node changed"); } }*/ } }
package com.mzs.lock; import java.util.logging.Logger; public class TestDistributeLock { private static Logger logger = Logger.getLogger("com.mzs.lock.TestDistributeLock"); public static void main(String[] args) { for (int i = 0; i < 10; i++) { Thread thread = new Thread(new Runnable() { @Override public void run() { DistributeLock distributeLock = null; try { distributeLock = new DistributeLock("192.168.101.199:2181", "lock"); distributeLock.lock(); logger.info("[" + Thread.currentThread().getName() + "] --- running" ); } finally { if (distributeLock != null) distributeLock.unlock(); } } }); thread.start(); } } }
程式碼選自 --- https://www.cnblogs.com/liuyang0/p/6800538.html