zooker分散式鎖實現
阿新 • • 發佈:2021-10-13
思路
- 使用zookeeper建立以鎖資源為根節點。每次申請鎖時,建立有序的臨時節點。
- zookeeper建立有序節點時,會自動建立節點編號,所以取最小序節點為鎖標誌。
- 其他申請均輪詢監聽前一節點,形成單鏈表式資料結構,當監聽到前一節點釋放或銷燬時,自動獲得鎖資源。
難點
為什麼要監聽前一節點狀態?
如果後續多個節點都監聽最小序節點,形成單一節點被多個客戶端監聽,會給伺服器造成很大的壓力。
程式碼
public class Zk implements Lock { private static CountDownLatch cdl = new CountDownLatch(1); private static final String IP_PORT = "127.0.0.1:2181"; private static final String Z_NODE = "/LOCK"; private volatile String beforePath; private volatile String path; private final ZkClient zkClient = new ZkClient(IP_PORT); public Zk() { if (!zkClient.exists(Z_NODE)) { zkClient.createPersistent(Z_NODE); } } @Override public void lock() { if (tryLock()) { System.out.println("獲得鎖"); } else { // 嘗試加鎖 // 進入等待 監聽 waitForLock(); // 再次嘗試 lock(); } } @Override public void lockInterruptibly() { } @Override public boolean tryLock() { synchronized(this) { // 第一次就進來建立自己的臨時節點 if (StringUtils.isBlank(path)) { path = zkClient.createEphemeralSequential(Z_NODE + "/", "lock"); } // 對節點排序 List<String> children = zkClient.getChildren(Z_NODE); Collections.sort(children); // 當前的是最小節點就返回加鎖成功 if (path.equals(Z_NODE + "/" + children.get(0))) { return true; } else { // 不是最小節點 就找到自己的前一個 依次類推 釋放也是一樣 int i = Collections.binarySearch(children, path.substring(Z_NODE.length() + 1)); beforePath = Z_NODE + "/" + children.get(i - 1); } return false; } } public void waitForLock() { IZkDataListener listener = new IZkDataListener() { public void handleDataChange(String s, Object o) { } public void handleDataDeleted(String s) { System.out.println(Thread.currentThread().getName() + ":監聽到節點刪除事件!---------------------------"); cdl.countDown(); } }; // 監聽 this.zkClient.subscribeDataChanges(beforePath, listener); if (zkClient.exists(beforePath)) { try { System.out.println("加鎖失敗 等待"); cdl.await(); } catch (InterruptedException e) { e.printStackTrace(); } } // 釋放監聽 zkClient.unsubscribeDataChanges(beforePath, listener); } @Override public boolean tryLock(long time, TimeUnit unit) { return false; } @Override public void unlock() { zkClient.delete(path); } @Override public Condition newCondition() { return null; } }
public class ZkLock { static AtomicInteger atomicInteger = new AtomicInteger(10); static AtomicInteger atomicInteger1 = new AtomicInteger(0); private static final int NUM = 15; private static final Zk zk = new Zk(); public static void main(String[] args) { try { for (int i = 0; i < NUM; i++) { new Thread(() -> { try { zk.lock(); Thread.sleep(1000); if (atomicInteger.decrementAndGet() > 0) { System.out.println("執行中" + atomicInteger1.incrementAndGet()); } } catch (InterruptedException e) { e.printStackTrace(); } finally { zk.unlock(); System.out.println("釋放鎖"); } }).start(); } } catch (Exception e) { e.printStackTrace(); } } }