1. 程式人生 > >基於ZooKeeper的三種分散式鎖實現

基於ZooKeeper的三種分散式鎖實現

【歡迎關注公眾號:程式猿講故事 (codestory),及時接收最新文章】

今天介紹基於ZooKeeper的分散式鎖的簡單實現,包括阻塞鎖和非阻塞鎖。同時增加了網上很少介紹的基於節點的非阻塞鎖實現,主要是為了加深對ZooKeeper的理解。

維基百科:分散式鎖,是控制分散式系統之間同步訪問共享資源的一種方式。在分散式系統中,常常需要協調他們的動作。如果不同的系統或是同一個系統的不同主機之間共享了一個或一組資源,那麼訪問這些資源的時候,往往需要互斥來防止彼此干擾來保證一致性,在這種情況下,便需要使用到分散式鎖。

1      阻塞鎖和非阻塞鎖

根據業務特點,普通分散式鎖有兩種需求:阻塞鎖和非阻塞鎖。

阻塞鎖:多個系統同時呼叫同一個資源,所有請求被排隊處理。已經得到分散式鎖的系統,進入執行狀態完成業務操作;沒有得到分散式鎖的執行緒進入阻塞狀態等待,當獲得相應的訊號並獲得分散式鎖後,進入執行狀態完成業務操作。

 

非阻塞鎖:多個系統同時呼叫同一個資源,當某一個系統最先獲取到鎖,進入執行狀態完成業務操作;其他沒有得到分散式鎖的系統,就直接返回,不做任何業務邏輯,可以給使用者提示進行其他操作。

 

2      鎖程式碼簡單設計

基於ZooKeeper實現鎖,一般都是建立EPHEMERAL_SEQUENTIAL子節點並比較序號實現的。參照Redis的分散式鎖實現,也可以使用EPHEMERAL節點實現。

 

3      分散式鎖程式碼

完整程式碼比較多,佔篇幅。在文中只保留了關鍵的程式碼。完整專案程式碼放到了github(https://github.com/SeemSilly/codestory/tree/master/research-zoo-keeper),感興趣的可以關注。

3.1   分散式鎖介面定義

ZooKeeperLock.java

public interface ZooKeeperLock {

  /**

   * 嘗試獲取鎖

   *

   * @param guidNodeName 用於加鎖的唯一節點名

   * @param clientGuid 用於唯一標識當前客戶端的ID

   * @return

   */

  boolean lock(String guidNodeName, String clientGuid);

 

  /**

   * 釋放鎖

   *

   * @param guidNodeName 用於加鎖的唯一節點名

   * @param clientGuid 用於唯一標識當前客戶端的ID

   * @return

   */

  boolean release(String guidNodeName, String clientGuid);

 

  /**

   * 鎖是否已經存在

   *

   * @param guidNodeName 用於加鎖的唯一節點名

   * @return

   */

  boolean exists(String guidNodeName);

}

 

3.2   基於節點實現的非阻塞鎖

NodeBlocklessLock.java

public class NodeBlocklessLock extends ZooKeeperBase implements ZooKeeperLock {

  /** 嘗試獲取鎖 */

  public boolean lock(String guidNodeName, String clientGuid) {

    boolean result = false;

    if (getZooKeeper().exists(guidNodeName, false) == null) {

      getZooKeeper().create(guidNodeName, clientGuid.getBytes(),

          ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

      byte[] data = getZooKeeper().getData(guidNodeName, false, null);

      if (data != null && clientGuid.equals(new String(data))) {

        result = true;

      }

    }

    return result;

  }

 

  /** 釋放鎖 */

  public boolean release(String guidNodeName, String clientGuid) {

    boolean result = false;

    Stat stat = new Stat();

    byte[] data = getZooKeeper().getData(guidNodeName, false, stat);

    if (data != null && clientGuid.equals(new String(data))) {

      getZooKeeper().delete(guidNodeName, stat.getVersion());

      result = true;

    }

    return result;

  }

 

  /** 鎖是否已經存在 */

  public boolean exists(String guidNodeName) {

    boolean result = false;

    Stat stat = getZooKeeper().exists(guidNodeName, false);

    result = stat != null;

    return result;

  }

}

 

3.3   基於子節點實現的分散式鎖基類

ChildrenNodeLock.java

public abstract class ChildrenNodeLock extends ZooKeeperBase implements ZooKeeperLock {

  /** 獲取當前節點的前一個節點,如果為空表示自己是第一個 */

  protected String getPrevElementName() {

    List<String> elementNames = getZooKeeper().getChildren(this.guidNodeName, false);

    long curElementSerial = Long.valueOf(

        elementNodeFullName.substring((this.guidNodeName + "/" + childPrefix).length()));

    String prevElementName = null;

    long prevElementSerial = -1;

    for (String oneElementName : elementNames) {

      long oneElementSerial = Long.parseLong(oneElementName.substring(childPrefix.length()));

      if (oneElementSerial < curElementSerial) {

        // 比當前節點小

        if (oneElementSerial > prevElementSerial) {

          prevElementSerial = oneElementSerial;

          prevElementName = oneElementName;

        }

      }

    }

    return prevElementName;

  }

 

  /** 嘗試獲取鎖 */

  public boolean lock(String guidNodeName, String clientGuid) {

    boolean result = false;

    // 確保根節點存在,並且建立為容器節點

    super.createRootNode(this.guidNodeName, CreateMode.CONTAINER);

    // 建立子節點並返回帶序列號的節點名

    elementNodeFullName = getZooKeeper().create(this.guidNodeName + "/" + childPrefix,

        new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

    boolean lockSuccess = isLockSuccess();

    result = lockSuccess;

    return result;

  }

 

 

  /** 釋放鎖 */

  public boolean release(String guidNodeName, String clientGuid) {

    // 刪除子節點

    getZooKeeper().delete(elementNodeFullName, 0);

    return true;

  }

 

  /** 鎖是否已經存在,容器節點存在,並且有子節點,則說明鎖已經存在 */

  public boolean exists(String guidNodeName) {

    boolean exists = false;

    Stat stat = new Stat();

    try {

      getZooKeeper().getData(guidNodeName, false, stat);

      exists = stat.getNumChildren() > 0;

    } catch (KeeperException.NoNodeException e) {

      exists = false;

    }

    return exists;

  }

 

  /** 是否加鎖成功 , 由子類實現 */

  protected abstract boolean isLockSuccess();

}

 

3.4   基於子節點實現的非阻塞鎖

ChildrenBlocklessLock.java

public class ChildrenBlocklessLock extends ChildrenNodeLock {

  /** 是否加鎖成功 */

  protected boolean isLockSuccess() throws KeeperException, InterruptedException {

    boolean lockSuccess = false;

    String prevElementName = getPrevElementName();

    if (prevElementName != null) {

      // 有更小的節點,說明當前節點沒搶到鎖,刪掉自己並退出

      getZooKeeper().delete(elementNodeFullName, 0);

    } else {

      lockSuccess = true;

    }

    return lockSuccess;

  }

}

 

3.5   基於子節點實現的阻塞鎖

ChildrenBlockingLock.java

public class ChildrenBlockingLock extends ChildrenNodeLock {

  /** 前一個節點被刪除的訊號 */

  static Integer mutex = Integer.valueOf(-1);

 

  /** 監控的節點被刪除 */

  protected void processNodeDeleted(WatchedEvent event) {

    synchronized (mutex) {

      // 節點被刪除,通知退出執行緒

      mutex.notify();

    }

  }

 

  /** 是否加鎖成功 */

  protected boolean isLockSuccess() {

    boolean lockSuccess;

    while (true) {

      String prevElementName = getPrevElementName();

      if (prevElementName == null) {

        lockSuccess = true;

        break;

      } else {

        // 有更小的節點,說明當前節點沒搶到鎖,註冊前一個節點的監聽

        getZooKeeper().exists(this.guidNodeName + "/" + prevElementName, true);

        synchronized (mutex) {

          mutex.wait();

          log.info("{} 被刪除,看看是不是輪到自己了", prevElementName);

        }

      }

    }

    return lockSuccess;

  }

}

 

4      測試用例

4.1   測試程式碼

LockClientThread.java 獲取分散式鎖和釋放鎖

public class LockClientThread extends Thread {

  /** 模擬獲取分散式鎖,成功後執行業務 */

  public void run() {

    boolean locked = zooKeeperLock.lock(guidNodeName, clientGuid);

    if (locked) {

      log.info("{} lock() success,拿到鎖了,假裝忙2秒", clientGuid);

      Thread.sleep(2000);

      boolean released = zooKeeperLock.release(guidNodeName, clientGuid);

      log.info("{} release() result : {}", clientGuid, released);

    } else {

      log.info("{} lock() fail", clientGuid);

    }

  }

}

模擬多個客戶端併發執行

public void testChildrenBlocklessMultiThread() throws IOException {

  String guidNodeName = "/multi-" + System.currentTimeMillis();

  int threadCount = 5;

 

  LockClientThread[] threads = new LockClientThread[threadCount];

  for (int i = 0; i < threadCount; i++) {

    ChildrenBlocklessLock nodeBlocklessLock = new ChildrenBlocklessLock(address);

    threads[i] = new LockClientThread(nodeBlocklessLock, guidNodeName, "client-" + (i + 1));

  }

  for (int i = 0; i < threadCount; i++) {

    threads[i].start();

  }

}

4.2   非阻塞鎖的測試結果

可以看到,只有一個執行緒能搶到鎖並執行業務,其他執行緒都直接退出。

55:43.929 [INFO] LockClientThread.run(33) client-1 lock() ...

55:43.942 [INFO] LockClientThread.run(33) client-3 lock() ...

55:43.947 [INFO] LockClientThread.run(33) client-2 lock() ...

55:43.948 [INFO] LockClientThread.run(33) client-4 lock() ...

55:43.949 [INFO] LockClientThread.run(33) client-5 lock() ...

55:44.052 [INFO] LockClientThread.run(36) client-1 lock() success,拿到鎖了,假裝忙2秒

55:44.072 [INFO] LockClientThread.run(47) client-5 lock() fail

55:44.085 [INFO] LockClientThread.run(47) client-4 lock() fail

55:44.091 [INFO] LockClientThread.run(47) client-2 lock() fail

55:44.096 [INFO] LockClientThread.run(47) client-3 lock() fail

55:46.053 [INFO] LockClientThread.run(42) client-1 release() ...

55:46.057 [INFO] LockClientThread.run(44) client-1 release() result : true

 

4.3   阻塞鎖的測試結果

可以看到,搶到分散式鎖的執行緒執行業務,沒搶到鎖的執行緒會等到直到鎖被釋放重新獲取到鎖後再執行業務。

59:32.802 [INFO] LockClientThread.run(33) client-1 lock() ...

59:32.811 [INFO] LockClientThread.run(33) client-3 lock() ...

59:32.812 [INFO] LockClientThread.run(33) client-4 lock() ...

59:32.813 [INFO] LockClientThread.run(33) client-2 lock() ...

59:32.813 [INFO] LockClientThread.run(33) client-5 lock() ...

59:32.836 [INFO] LockClientThread.run(36) client-1 lock() success,拿到鎖了,假裝忙2秒

59:34.836 [INFO] LockClientThread.run(42) client-1 release() ...

59:34.844 [INFO] LockClientThread.run(44) client-1 release() result : true

59:34.846 [INFO] ChildrenBlockingLock.isLockSuccess(55) element0000000000 被刪除,看看是不是輪到自己了

59:34.848 [INFO] LockClientThread.run(36) client-5 lock() success,拿到鎖了,假裝忙2秒

59:36.848 [INFO] LockClientThread.run(42) client-5 release() ...

59:36.852 [INFO] ChildrenBlockingLock.isLockSuccess(55) element0000000001 被刪除,看看是不是輪到自己了

59:36.852 [INFO] LockClientThread.run(44) client-5 release() result : true

59:36.855 [INFO] LockClientThread.run(36) client-2 lock() success,拿到鎖了,假裝忙2秒

59:38.855 [INFO] LockClientThread.run(42) client-2 release() ...

59:38.869 [INFO] ChildrenBlockingLock.isLockSuccess(55) element0000000002 被刪除,看看是不是輪到自己了

59:38.870 [INFO] LockClientThread.run(44) client-2 release() result : true

59:38.876 [INFO] LockClientThread.run(36) client-4 lock() success,拿到鎖了,假裝忙2秒

59:40.877 [INFO] LockClientThread.run(42) client-4 release() ...

59:40.881 [INFO] ChildrenBlockingLock.isLockSuccess(55) element0000000003 被刪除,看看是不是輪到自己了

59:40.882 [INFO] LockClientThread.run(44) client-4 release() result : true

59:40.884 [INFO] LockClientThread.run(36) client-3 lock() success,拿到鎖了,假裝忙2秒

59:42.884 [INFO] LockClientThread.run(42) client-3 release() ...

59:42.887 [INFO] LockClientThread.run(44) client-3 release() result : true