1. 程式人生 > >Zookeeper(七)分散式鎖

Zookeeper(七)分散式鎖

獲取鎖實現思路:

1.     首先建立一個作為鎖目錄(znode),通常用它來描述鎖定的實體,稱為:/lock_node

2.     希望獲得鎖的客戶端在鎖目錄下建立znode,作為鎖/lock_node的子節點,並且節點型別為有序臨時節點(EPHEMERAL_SEQUENTIAL);

        例如:有兩個客戶端建立znode,分別為/lock_node/lock-1和/lock_node/lock-2

3.     當前客戶端呼叫getChildren(/lock_node)得到鎖目錄所有子節點,不設定watch,接著獲取小於自己(步驟2建立)的兄弟節點

4.     步驟3中獲取小於自己的節點不存在 && 最小節點與步驟2中建立的相同,說明當前客戶端順序號最小,獲得鎖,結束。

5.     客戶端監視(watch)相對自己次小的有序臨時節點狀態

6.     如果監視的次小節點狀態發生變化,則跳轉到步驟3,繼續後續操作,直到退出鎖競爭。

 public synchronized boolean lock() throws KeeperException, InterruptedException {
        if (isClosed()) {
            return false;
        }
        // 如果鎖目錄不存在, 建立鎖目錄   節點型別為永久型別
        ensurePathExists(dir);

        // 建立鎖節點,節點型別EPHEMERAL_SEQUENTIAL 
        // 如果不存在小於自己的節點   並且最小節點 與當前建立的節點相同  獲得鎖
        // 未獲得成功,對當前次小節點設定watch
        return (Boolean) retryOperation(zop);
    }

建立鎖目錄

    protected void ensurePathExists(String path) {
        ensureExists(path, null, acl, CreateMode.PERSISTENT);
    }
    protected void ensureExists(final String path, final byte[] data,
            final List<ACL> acl, final CreateMode flags) {
        try {
            retryOperation(new ZooKeeperOperation() {
                public boolean execute() throws KeeperException, InterruptedException {
                	// 建立鎖目錄
                    Stat stat = zookeeper.exists(path, false);
                    // 節點如果存在  直接返回
                    if (stat != null) {
                        return true;
                    }
                    // 建立節點
                    // data為null
                    // flags為持久化節點
                    zookeeper.create(path, data, acl, flags);
                    return true;
                }
            });
        } catch (KeeperException e) {
            LOG.warn("Caught: " + e, e);
        } catch (InterruptedException e) {
            LOG.warn("Caught: " + e, e);
        }
    }

建立鎖節點,獲得鎖目錄下的所有節點, 如果為最小節點 獲得鎖成功
        /**
         * the command that is run and retried for actually 
         * obtaining the lock
         * @return if the command was successful or not
         */
        public boolean execute() throws KeeperException, InterruptedException {
            do {
                if (id == null) {
                    long sessionId = zookeeper.getSessionId();
                    String prefix = "x-" + sessionId + "-";
                    // lets try look up the current ID if we failed 
                    // in the middle of creating the znode
                    findPrefixInChildren(prefix, zookeeper, dir);
                    idName = new ZNodeName(id);
                }
                if (id != null) {
                    List<String> names = zookeeper.getChildren(dir, false);
                    if (names.isEmpty()) {
                        LOG.warn("No children in: " + dir + " when we've just " +
                        "created one! Lets recreate it...");
                        // lets force the recreation of the id
                        id = null;
                    } else {
                        // lets sort them explicitly (though they do seem to come back in order ususally :)
                        SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>();
                        for (String name : names) {
                            sortedNames.add(new ZNodeName(dir + "/" + name));
                        }
                        // 獲得最小節點
                        ownerId = sortedNames.first().getName();
                        // lock_1, lock_2, lock_3  傳入引數lock_2  返回lock_1
                        SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);
                        if (!lessThanMe.isEmpty()) {
                            ZNodeName lastChildName = lessThanMe.last();
                            lastChildId = lastChildName.getName();
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("watching less than me node: " + lastChildId);
                            }
                            // 次小節點設定watch 
                            Stat stat = zookeeper.exists(lastChildId, new LockWatcher());
                            if (stat != null) {
                                return Boolean.FALSE;
                            } else {
                                LOG.warn("Could not find the" +
                                		" stats for less than me: " + lastChildName.getName());
                            }
                        } else {
                        	// 鎖目錄下的最小節點  與當前客戶端建立相同
                            if (isOwner()) {
                                if (callback != null) {
                                    callback.lockAcquired();
                                }
                                // 獲得鎖
                                return Boolean.TRUE;
                            }
                        }
                    }
                }
            }
            while (id == null);
            return Boolean.FALSE;
        }
    };

private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) 
            throws KeeperException, InterruptedException {
        	// 獲取鎖目錄下的所有子節點
            List<String> names = zookeeper.getChildren(dir, false);
            for (String name : names) {
            	//x-sessionId-
                if (name.startsWith(prefix)) {
                    id = name;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Found id created last time: " + id);
                    }
                    break;
                }
            }
            // 當前鎖目錄下   沒有與當前會話對應的子節點    建立子節點  節點型別為臨時順序節點
            if (id == null) {
            	// dir/x-sessionId-i
                id = zookeeper.create(dir + "/" + prefix, data, 
                        getAcl(), EPHEMERAL_SEQUENTIAL);

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Created id: " + id);
                }
            }

釋放鎖:

釋放鎖非常簡單,刪除步驟1中建立的有序臨時節點。另外,如果客戶端程序死亡或連線失效,對應的節點也會被刪除。

 public synchronized void unlock() throws RuntimeException {
        
        if (!isClosed() && id != null) {
            // we don't need to retry this operation in the case of failure
            // as ZK will remove ephemeral files and we don't wanna hang
            // this process when closing if we cannot reconnect to ZK
            try {
                
                ZooKeeperOperation zopdel = new ZooKeeperOperation() {
                    public boolean execute() throws KeeperException,
                        InterruptedException {
                    	// 刪除節點  忽略版本
                        zookeeper.delete(id, -1);   
                        return Boolean.TRUE;
                    }
                };
                zopdel.execute();
            } catch (InterruptedException e) {
                LOG.warn("Caught: " + e, e);
                //set that we have been interrupted.
               Thread.currentThread().interrupt();
            } catch (KeeperException.NoNodeException e) {
                // do nothing
            } catch (KeeperException e) {
                LOG.warn("Caught: " + e, e);
                throw (RuntimeException) new RuntimeException(e.getMessage()).
                    initCause(e);
            }
            finally {
                if (callback != null) {
                    callback.lockReleased();
                }
                id = null;
            }
        }
    }