Zookeeper(七)分散式鎖
阿新 • • 發佈:2019-02-18
獲取鎖實現思路:
1. 首先建立一個作為鎖目錄(znode),通常用它來描述鎖定的實體,稱為:/lock_node
2. 希望獲得鎖的客戶端在鎖目錄下建立znode,作為鎖/lock_node的子節點,並且節點型別為有序臨時節點(EPHEMERAL_SEQUENTIAL);
例如:有兩個客戶端建立znode,分別為/lock_node/lock-1和/lock_node/lock-2
3. 當前客戶端呼叫getChildren(/lock_node)得到鎖目錄所有子節點,不設定watch,接著獲取小於自己(步驟2建立)的兄弟節點
4. 步驟3中獲取小於自己的節點不存在 && 最小節點與步驟2中建立的相同,說明當前客戶端順序號最小,獲得鎖,結束。
5. 客戶端監視(watch)相對自己次小的有序臨時節點狀態
6. 如果監視的次小節點狀態發生變化,則跳轉到步驟3,繼續後續操作,直到退出鎖競爭。
public synchronized boolean lock() throws KeeperException, InterruptedException { if (isClosed()) { return false; } // 如果鎖目錄不存在, 建立鎖目錄 節點型別為永久型別 ensurePathExists(dir); // 建立鎖節點,節點型別EPHEMERAL_SEQUENTIAL // 如果不存在小於自己的節點 並且最小節點 與當前建立的節點相同 獲得鎖 // 未獲得成功,對當前次小節點設定watch return (Boolean) retryOperation(zop); }
建立鎖目錄
protected void ensurePathExists(String path) { ensureExists(path, null, acl, CreateMode.PERSISTENT); }
protected void ensureExists(final String path, final byte[] data, final List<ACL> acl, final CreateMode flags) { try { retryOperation(new ZooKeeperOperation() { public boolean execute() throws KeeperException, InterruptedException { // 建立鎖目錄 Stat stat = zookeeper.exists(path, false); // 節點如果存在 直接返回 if (stat != null) { return true; } // 建立節點 // data為null // flags為持久化節點 zookeeper.create(path, data, acl, flags); return true; } }); } catch (KeeperException e) { LOG.warn("Caught: " + e, e); } catch (InterruptedException e) { LOG.warn("Caught: " + e, e); } }
建立鎖節點,獲得鎖目錄下的所有節點, 如果為最小節點 獲得鎖成功/** * the command that is run and retried for actually * obtaining the lock * @return if the command was successful or not */ public boolean execute() throws KeeperException, InterruptedException { do { if (id == null) { long sessionId = zookeeper.getSessionId(); String prefix = "x-" + sessionId + "-"; // lets try look up the current ID if we failed // in the middle of creating the znode findPrefixInChildren(prefix, zookeeper, dir); idName = new ZNodeName(id); } if (id != null) { List<String> names = zookeeper.getChildren(dir, false); if (names.isEmpty()) { LOG.warn("No children in: " + dir + " when we've just " + "created one! Lets recreate it..."); // lets force the recreation of the id id = null; } else { // lets sort them explicitly (though they do seem to come back in order ususally :) SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>(); for (String name : names) { sortedNames.add(new ZNodeName(dir + "/" + name)); } // 獲得最小節點 ownerId = sortedNames.first().getName(); // lock_1, lock_2, lock_3 傳入引數lock_2 返回lock_1 SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName); if (!lessThanMe.isEmpty()) { ZNodeName lastChildName = lessThanMe.last(); lastChildId = lastChildName.getName(); if (LOG.isDebugEnabled()) { LOG.debug("watching less than me node: " + lastChildId); } // 次小節點設定watch Stat stat = zookeeper.exists(lastChildId, new LockWatcher()); if (stat != null) { return Boolean.FALSE; } else { LOG.warn("Could not find the" + " stats for less than me: " + lastChildName.getName()); } } else { // 鎖目錄下的最小節點 與當前客戶端建立相同 if (isOwner()) { if (callback != null) { callback.lockAcquired(); } // 獲得鎖 return Boolean.TRUE; } } } } } while (id == null); return Boolean.FALSE; } };
private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) throws KeeperException, InterruptedException { // 獲取鎖目錄下的所有子節點 List<String> names = zookeeper.getChildren(dir, false); for (String name : names) { //x-sessionId- if (name.startsWith(prefix)) { id = name; if (LOG.isDebugEnabled()) { LOG.debug("Found id created last time: " + id); } break; } } // 當前鎖目錄下 沒有與當前會話對應的子節點 建立子節點 節點型別為臨時順序節點 if (id == null) { // dir/x-sessionId-i id = zookeeper.create(dir + "/" + prefix, data, getAcl(), EPHEMERAL_SEQUENTIAL); if (LOG.isDebugEnabled()) { LOG.debug("Created id: " + id); } }
釋放鎖:
釋放鎖非常簡單,刪除步驟1中建立的有序臨時節點。另外,如果客戶端程序死亡或連線失效,對應的節點也會被刪除。
public synchronized void unlock() throws RuntimeException { if (!isClosed() && id != null) { // we don't need to retry this operation in the case of failure // as ZK will remove ephemeral files and we don't wanna hang // this process when closing if we cannot reconnect to ZK try { ZooKeeperOperation zopdel = new ZooKeeperOperation() { public boolean execute() throws KeeperException, InterruptedException { // 刪除節點 忽略版本 zookeeper.delete(id, -1); return Boolean.TRUE; } }; zopdel.execute(); } catch (InterruptedException e) { LOG.warn("Caught: " + e, e); //set that we have been interrupted. Thread.currentThread().interrupt(); } catch (KeeperException.NoNodeException e) { // do nothing } catch (KeeperException e) { LOG.warn("Caught: " + e, e); throw (RuntimeException) new RuntimeException(e.getMessage()). initCause(e); } finally { if (callback != null) { callback.lockReleased(); } id = null; } } }