zookeeper分散式鎖,解決了羊群效應, 真正的zookeeper 分散式鎖
阿新 • • 發佈:2020-11-14
zookeeper 實現分散式鎖,監聽前一個節點來避免羊群效應,
思路:很簡單,但是實現起來要麻煩一些, 而且我也是看了很多帖子,發現很多帖子的程式碼,下載下來逐步除錯之後發現,看起來是對的,但在併發情況下執行,就會出現問題. 有的在程式碼裡其實並沒有真正實現(監聽前一個節點),
接下來分享一個: 真正的zookeeper 分散式鎖: 這個也是別人的程式碼,自己只是搬運工,很遺憾,我自己寫的分散式鎖,看起來是對的,但是出現了併發問題,
package tech.codestory.zookeeper.aalvcai.base_I0Itec_ZK_lock; import lombok.extern.slf4j.Slf4j; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * @author 邱潤澤 **/ @Slf4j public class ZookeeperLock { private String server = "127.0.0.1:2181"; private ZkClient zkClient; private static final String rootPath = "/qiurunze-lock1"; public ZookeeperLock() { zkClient = new ZkClient(server, 5000, 20000); buildRoot(); } // 構建根節點 public void buildRoot() { if (!zkClient.exists(rootPath)) { zkClient.createPersistent(rootPath); } } // 獲取鎖 public Lock lock(String lockId, long timeout) { // 建立臨時節點 Lock lockNode = createLockNode(lockId); lockNode = tryActiveLock(lockNode);// 嘗試啟用鎖 if (!lockNode.isActive()) { try { synchronized (lockNode) { lockNode.wait(timeout); // 執行緒鎖住 } } catch (InterruptedException e) { throw new RuntimeException(e); } } if (!lockNode.isActive()) { throw new RuntimeException(" lock timeout"); } return lockNode; } // 釋放鎖 public void unlock(Lock lock) { if (lock.isActive()) { zkClient.delete(lock.getPath()); } } // 嘗試啟用鎖 private Lock tryActiveLock(Lock lockNode) { // 獲取根節點下面所有的子節點 List<String> list = zkClient.getChildren(rootPath) .stream() .sorted() .map(p -> rootPath + "/" + p) .collect(Collectors.toList()); // 判斷當前是否為最小節點 log.info("Thread: {}, list : {}",Thread.currentThread().getName(),list); String firstNodePath = list.get(0); log.info("Thread: {}, firstNodePath: {}",Thread.currentThread().getName(),firstNodePath); // 最小節點是不是當前節點 if (firstNodePath.equals(lockNode.getPath())) { lockNode.setActive(true); } else { String upNodePath = list.get(list.indexOf(lockNode.getPath()) - 1); log.info("Thread: {},監聽的節點是: {}",Thread.currentThread().getName(),upNodePath); zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { } @Override public void handleDataDeleted(String dataPath) throws Exception { // 事件處理 與心跳 在同一個執行緒,如果Debug時佔用太多時間,將導致本節點被刪除,從而影響鎖邏輯。 System.out.println("節點刪除:" + dataPath); Lock lock = tryActiveLock(lockNode); synchronized (lockNode) { if (lock.isActive()) { lockNode.notify(); // 釋放了 } } zkClient.unsubscribeDataChanges(upNodePath, this); } }); } return lockNode; } public Lock createLockNode(String lockId) { String nodePath = zkClient.createEphemeralSequential(rootPath + "/" + lockId, "w"); return new Lock(lockId, nodePath); } } class Test01{ static volatile int num = 0; static ZookeeperLock zookeeperLock = new ZookeeperLock(); public static void main(String[] args){ for (int i = 0; i < 10; i++) { new Thread(()->{ try { Lock zkLock = zookeeperLock.lock("lvcai", 5000 ); TimeUnit.MILLISECONDS.sleep(100); for (int j = 0; j < 10; j++) { num++; } System.out.println( "num的值是 : "+ num ); zookeeperLock.unlock(zkLock); } catch (Exception e) { e.printStackTrace(); } },"執行緒"+i).start(); } } } class Lock { private String lockId; private String path; private boolean active; public Lock(String lockId, String path) { this.lockId = lockId; this.path = path; } public Lock() { } public String getLockId() { return lockId; } public void setLockId(String lockId) { this.lockId = lockId; } public String getPath() { return path; } public void setPath(String path) { this.path = path; } public boolean isActive() { return active; } public void setActive(boolean active) { this.active = active; } }