1. 程式人生 > 其它 >JUC鎖框架原始碼閱讀-AQS+Zookeeper實現分散式鎖

JUC鎖框架原始碼閱讀-AQS+Zookeeper實現分散式鎖

介紹

1.建立一個永久節點

2.競爭鎖的時候同樣的的key 所有執行緒都往永久節點插入指定key name的臨時節點(節點不允許重複只會有一個插入成功)

3.插入失敗的開啟對永久節點的監聽

4.當時獲得鎖的執行緒down機或者刪除會觸發監聽。然後嘗試獲取CLH第一個執行緒節點 嘗試重新獲取鎖

程式碼已上傳github:https://github.com/aa310958153/zookeeper-lock

注:僅僅用於熟悉AQS如果要用到Zookeeper分散式鎖直接使用Curator基於對Zookeeper鎖的實現

InterProcessMutex:分散式可重入排它鎖
InterProcessSemaphoreMutex:分散式排它鎖
InterProcessReadWriteLock:分散式讀寫鎖
InterProcessMultiLock:將多個鎖作為單個實體管理的容器

加鎖

 @Override
        protected boolean tryAcquire(int acquires) {
            //獲取當前執行緒
            final Thread current = Thread.currentThread();
            //獲取鎖狀態
            int c = getState();
            //等於0表示 當前空閒狀態可以嘗試獲取 <1>zkLock加鎖
            if (c == 0) {
                if (zkLock()&&compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    
return true; } } //可重入判斷 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc);
return true; } return false; }

<1>zkLock

 /**
         * zookeeper儲存臨時節點實現加鎖
         * @return
         */
        public boolean zkLock() {
            String path= getLockPath();
            boolean haveLock=false;
            try {
                curatorFramework
                        .create()
                        .creatingParentContainersIfNeeded()
                        .withMode(CreateMode.EPHEMERAL)
                        .forPath(path, syncValue.get().getValue().getBytes(StandardCharsets.UTF_8));

                haveLock= true;
            } catch (org.apache.zookeeper.KeeperException.NodeExistsException e) {//重複的標識未獲取到鎖
                haveLock=false;
            } catch (Exception e) {
                e.printStackTrace();
                haveLock= false;
            }
            /**
             * 未獲取到鎖監聽永久節點
             */
            if(!haveLock){
                TreeCache treeCache = new TreeCache(CuratorClient.getCurator(), SYN_SWITCH_ZK_NODE);
                try {
                    treeCache.start();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                if(!syncValue.get().isAddListener) {
                    treeCache.getListenable().addListener(new TreeCacheListener() {
                        @Override
                        public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                            ChildData eventData = event.getData();
                            switch (event.getType()) {
                                case NODE_ADDED:
                                    //System.out.println(path + "節點新增:" + eventData.getPath() + "\t新增資料為:" + new String(eventData.getData()));
                                    break;
                                case NODE_UPDATED:
                                  //  System.out.println(eventData.getPath() + "節點資料更新\t更新資料為:" + new String(eventData.getData()) + "\t版本為:" + eventData.getStat().getVersion());
                                    break;
                                case NODE_REMOVED:
                                    //監聽到節點刪除。表示鎖正常釋放 或者持有鎖的服務斷開連線
                                    //獲得第一個阻塞執行緒 喚醒 嘗試獲取鎖
                                    Thread firstThread=getFirstQueuedThread();
                                    if( firstThread!=null) {
                                        LockSupport.unpark(firstThread);
                                    }
                                    break;
                                default:
                                    break;
                            }
                        }
                    });
                    syncValue.get().setAddListener(true);
                }
            }
            return haveLock;
        }

釋放鎖

  @Override
        protected boolean tryRelease(int releases) {
            //狀態-1 大於0的數字表示可重入加了多少次鎖
            int c = getState() - releases;
            //如果加鎖執行緒非當前執行緒丟擲異常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            //根據節點儲存的值校驗是否是當前執行緒加鎖。如果不是丟擲異常
            String value;
            try {
                value= new String(curatorFramework.getData().forPath(getLockPath()));
            } catch (Exception e) {
                e.printStackTrace();
                throw new IllegalMonitorStateException();
            }
            if(value==null||!value.equals(syncValue.get().getValue())){
                throw new IllegalMonitorStateException();
            }
            boolean free = false;
            //當c等於0表示最後一次呼叫unlock 則進行鎖的釋放
            if (c == 0) {
                free = true;
                //獲得鎖的執行緒設定為null
                setExclusiveOwnerThread(null);
                String path= getLockPath();
                try {
                    //刪除節點 會觸發節點監聽
                    curatorFramework.delete().forPath(path);
                } catch (Exception e) {
                    throw new IllegalMonitorStateException();
                }
                syncValue.remove();
            }
            //設定state
            setState(c);

            return free;
        }