1. 程式人生 > 其它 >zooker分散式鎖實現

zooker分散式鎖實現

思路

  1. 使用zookeeper建立以鎖資源為根節點。每次申請鎖時,建立有序的臨時節點。
  2. zookeeper建立有序節點時,會自動建立節點編號,所以取最小序節點為鎖標誌。
  3. 其他申請均輪詢監聽前一節點,形成單鏈表式資料結構,當監聽到前一節點釋放或銷燬時,自動獲得鎖資源。

難點

為什麼要監聽前一節點狀態?
如果後續多個節點都監聽最小序節點,形成單一節點被多個客戶端監聽,會給伺服器造成很大的壓力。

程式碼

public class Zk implements Lock {
    private static CountDownLatch cdl = new CountDownLatch(1);

    private static final String IP_PORT = "127.0.0.1:2181";
    private static final String Z_NODE = "/LOCK";

    private volatile String beforePath;
    private volatile String path;

    private final ZkClient zkClient = new ZkClient(IP_PORT);

    public Zk() {
        if (!zkClient.exists(Z_NODE)) {
            zkClient.createPersistent(Z_NODE);
        }
    }

    @Override
    public void lock() {
        if (tryLock()) {
            System.out.println("獲得鎖");
        } else {
            // 嘗試加鎖
            // 進入等待 監聽
            waitForLock();
            // 再次嘗試
            lock();
        }
    }

    @Override
    public void lockInterruptibly() {

    }

    @Override
    public boolean tryLock() {
        synchronized(this) {
            // 第一次就進來建立自己的臨時節點
            if (StringUtils.isBlank(path)) {
                path = zkClient.createEphemeralSequential(Z_NODE + "/", "lock");
            }

            // 對節點排序
            List<String> children = zkClient.getChildren(Z_NODE);
            Collections.sort(children);

            // 當前的是最小節點就返回加鎖成功
            if (path.equals(Z_NODE + "/" + children.get(0))) {
                return true;
            } else {
                // 不是最小節點 就找到自己的前一個 依次類推 釋放也是一樣
                int i = Collections.binarySearch(children, path.substring(Z_NODE.length() + 1));
                beforePath = Z_NODE + "/" + children.get(i - 1);
            }
            return false;
        }

    }

    public void waitForLock() {
        IZkDataListener listener = new IZkDataListener() {
            public void handleDataChange(String s, Object o) {
            }

            public void handleDataDeleted(String s) {
                System.out.println(Thread.currentThread().getName() + ":監聽到節點刪除事件!---------------------------");
                cdl.countDown();
            }
        };
        // 監聽
        this.zkClient.subscribeDataChanges(beforePath, listener);
        if (zkClient.exists(beforePath)) {
            try {
                System.out.println("加鎖失敗 等待");
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        // 釋放監聽
        zkClient.unsubscribeDataChanges(beforePath, listener);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) {
        return false;
    }

    @Override
    public void unlock() {
        zkClient.delete(path);
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}
public class ZkLock {
    static AtomicInteger atomicInteger = new AtomicInteger(10);
    static AtomicInteger atomicInteger1 = new AtomicInteger(0);
    private static final int NUM = 15;

    private static final Zk zk = new Zk();

    public static void main(String[] args) {
        try {
            for (int i = 0; i < NUM; i++) {
                new Thread(() -> {
                    try {
                        zk.lock();
                        Thread.sleep(1000);
                        if (atomicInteger.decrementAndGet() > 0) {
                            System.out.println("執行中" + atomicInteger1.incrementAndGet());
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        zk.unlock();
                        System.out.println("釋放鎖");
                    }
                }).start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}