1. 程式人生 > >死磕 java同步系列之zookeeper分散式鎖

死磕 java同步系列之zookeeper分散式鎖

(2)zookeeper分散式鎖有哪些優點?

(3)zookeeper分散式鎖有哪些缺點?

簡介

zooKeeper是一個分散式的,開放原始碼的分散式應用程式協調服務,它可以為分散式應用提供一致性服務,它是Hadoop和Hbase的重要元件,同時也可以作為配置中心、註冊中心運用在微服務體系中。

本章我們將介紹zookeeper如何實現分散式鎖運用在分散式系統中。

基礎知識

什麼是znode?

zooKeeper操作和維護的為一個個資料節點,稱為 znode,採用類似檔案系統的層級樹狀結構進行管理,如果 znode 節點包含資料則儲存為位元組陣列(byte array)。

而且,同一個節點多個客戶同時建立【本篇文章由公眾號“彤哥讀原始碼”原創】,只有一個客戶端會成功,其它客戶端建立時將失敗。

zooKeeper

節點型別

znode 共有四種類型:

  • 持久(無序)

  • 持久有序

  • 臨時(無序)

  • 臨時有序

其中,持久節點如果不手動刪除會一直存在,臨時節點當客戶端session失效就會自動刪除節點。

什麼是watcher?

watcher(事件監聽器),是zookeeper中的一個很重要的特性。

zookeeper允許使用者在指定節點上註冊一些watcher,並且在一些特定事件觸發的時候,zooKeeper服務端會將事件通知到感興趣的客戶端上去,該機制是Zookeeper實現分散式協調服務的重要特性

KeeperState EventType 觸發條件 說明 操作
SyncConnected(3) None(-1) 客戶端與服務端成功建立連線 此時客戶端和伺服器處於連線狀態 -
同上 NodeCreated(1) Watcher監聽的對應資料節點被建立 同上 Create
同上 NodeDeleted(2) Watcher監聽的對應資料節點被刪除 同上 Delete/znode
同上 NodeDataChanged(3) Watcher監聽的對應資料節點的資料內容發生變更 同上 setDate/znode
同上 NodeChildChanged(4) Wather監聽的對應資料節點的子節點列表發生變更 同上 Create/child
Disconnected(0) None(-1) 客戶端與ZooKeeper伺服器斷開連線 此時客戶端和伺服器處於斷開連線狀態 -
Expired(-112) None(-1) 會話超時 此時客戶端會話失效,通常同時也會受到SessionExpiredException異常 -
AuthFailed(4) None(-1) 通常有兩種情況,1:使用錯誤的schema進行許可權檢查 2:SASL許可權檢查失敗 通常同時也會收到AuthFailedException異常 -

原理解析

方案一

既然,同一個節點只能建立一次,那麼,加鎖時檢測節點是否存在,不存在則建立之,存在或者建立失敗則監聽這個節點的刪除事件,這樣,當釋放鎖的時候監聽的客戶端再次競爭去建立這個節點,成功的則獲取到鎖,不成功的則再次監聽該節點。

zooKeeper

比如,有三個客戶端client1、client2、client3同時獲取/locker/user_1這把鎖,它們將按照如下步驟執行:

(1)三者同時嘗試建立/locker/user_1節點;

(2)client1建立成功,它獲取到鎖;

(3)client2和client3建立失敗,它們監聽/locker/user_1的刪除事件;

(4)client1執行鎖內業務邏輯;

(5)client1釋放鎖,刪除節點/locker/user_1;

(6)client2和client3都捕獲到節點/locker/user_1被刪除的事件,二者皆被喚醒;

(7)client2和client3同時去建立/locker/user_1節點;

(8)回到第二步,依次類推【本篇文章由公眾號“彤哥讀原始碼”原創】;

不過,這種方案有個很嚴重的弊端——驚群效應。

如果併發量很高,多個客戶端同時監聽同一個節點,釋放鎖時同時喚醒這麼多個客戶端,然後再競爭,最後還是隻有一個能獲取到鎖,其它客戶端又要沉睡,這些客戶端的喚醒沒有任何意義,極大地浪費系統資源,那麼有沒有更好的方案呢?答案是當然有,請看方案二。

方案二

為了解決方案一中的驚群效應,我們可以使用有序子節點的形式來實現分散式鎖,而且為了規避客戶端獲取鎖後突然斷線的風險,我們有必要使用臨時有序節點。

zooKeeper

比如,有三個客戶端client1、client2、client3同時獲取/locker/user_1這把鎖,它們將按照如下步驟執行:

(1)三者同時在/locker/user_1/下面建立臨時有序子節點;

(2)三者皆建立成功,分別為/locker/user_1/0000000001、/locker/user_1/0000000003、/locker/user_1/0000000002;

(3)檢查自己建立的節點是不是子節點中最小的;

(4)client1發現自己是最小的節點,它獲取到鎖;

(5)client2和client3發現自己不是最小的節點,它們無法獲取到鎖;

(6)client2建立的節點為/locker/user_1/0000000003,它監聽其上一個節點/locker/user_1/0000000002的刪除事件;

(7)client3建立的節點為/locker/user_1/0000000002,它監聽其上一個節點/locker/user_1/0000000001的刪除事件;

(8)client1執行鎖內業務邏輯;

(9)client1釋放鎖,刪除節點/locker/user_1/0000000001;

(10)client3監聽到節點/locker/user_1/0000000001的刪除事件,被喚醒;

(11)client3再次檢查自己是不是最小的節點,發現是,則獲取到鎖;

(12)client3執行鎖內業務邏輯【本篇文章由公眾號“彤哥讀原始碼”原創】;

(13)client3釋放鎖,刪除節點/locker/user_1/0000000002;

(14)client2監聽到節點/locker/user_1/0000000002的刪除事件,被喚醒;

(15)client2執行鎖內業務邏輯;

(16)client2釋放鎖,刪除節點/locker/user_1/0000000003;

(17)client2檢查/locker/user_1/下是否還有子節點,沒有了則刪除/locker/user_1節點;

(18)流程結束;

這種方案相對於方案一來說,每次釋放鎖時只喚醒一個客戶端,減少了執行緒喚醒的代價,提高了效率。

zookeeper原生API實現

pom檔案

pom中引入以下jar包:

<dependency>
    <groupid>org.apache.zookeeper</groupid>
    <artifactid>zookeeper</artifactid>
    <version>3.5.5</version>
</dependency>

Locker介面

定義一個Locker介面,與上一章mysql分散式鎖使用同一個介面。

public interface Locker {
    void lock(String key, Runnable command);
}

zookeeper分散式鎖實現

這裡通過內部類ZkLockerWatcher處理zookeeper的相關操作,需要注意以下幾點:

(1)zk連線建立完畢之前不要進行相關操作,否則會報ConnectionLoss異常,這裡通過LockSupport.park();阻塞連線執行緒並在監聽執行緒中喚醒處理;

(2)客戶端執行緒與監聽執行緒不是同一個執行緒,所以可以通過LockSupport.park();及LockSupport.unpark(thread);來處理;

(3)中間很多步驟不是原子的(坑),所以需要再次檢測,詳見程式碼中註釋;

@Slf4j
@Component
public class ZkLocker implements Locker {
    @Override
    public void lock(String key, Runnable command) {
        ZkLockerWatcher watcher = ZkLockerWatcher.conn(key);
        try {
            if (watcher.getLock()) {
                command.run();
            }
        } finally {
            watcher.releaseLock();
        }
    }

    private static class ZkLockerWatcher implements Watcher {
        public static final String connAddr = "127.0.0.1:2181";
        public static final int timeout = 6000;
        public static final String LOCKER_ROOT = "/locker";

        ZooKeeper zooKeeper;
        String parentLockPath;
        String childLockPath;
        Thread thread;

        public static ZkLockerWatcher conn(String key) {
            ZkLockerWatcher watcher = new ZkLockerWatcher();
            try {
                ZooKeeper zooKeeper = watcher.zooKeeper = new ZooKeeper(connAddr, timeout, watcher);
                watcher.thread = Thread.currentThread();
                // 阻塞等待連線建立完畢
                LockSupport.park();
                // 根節點如果不存在,就建立一個(併發問題,如果兩個執行緒同時檢測不存在,兩個同時去建立必須有一個會失敗)
                if (zooKeeper.exists(LOCKER_ROOT, false) == null) {
                    try {
                        zooKeeper.create(LOCKER_ROOT, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    } catch (KeeperException e) {
                        // 如果節點已存在,則建立失敗,這裡捕獲異常,並不阻擋程式正常執行
                        log.info("建立節點 {} 失敗", LOCKER_ROOT);
                    }
                }
                // 當前加鎖的節點是否存在
                watcher.parentLockPath = LOCKER_ROOT + "/" + key;
                if (zooKeeper.exists(watcher.parentLockPath, false) == null) {
                    try {
                        zooKeeper.create(watcher.parentLockPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    } catch (KeeperException e) {
                        // 如果節點已存在,則建立失敗,這裡捕獲異常,並不阻擋程式正常執行
                        log.info("建立節點 {} 失敗", watcher.parentLockPath);
                    }
                }

            } catch (Exception e) {
                log.error("conn to zk error", e);
                throw new RuntimeException("conn to zk error");
            }
            return watcher;
        }

        public boolean getLock() {
            try {
                // 建立子節點【本篇文章由公眾號“彤哥讀原始碼”原創】
                this.childLockPath = zooKeeper.create(parentLockPath + "/", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                // 檢查自己是不是最小的節點,是則獲取成功,不是則監聽上一個節點
                return getLockOrWatchLast();
            } catch (Exception e) {
                log.error("get lock error", e);
                throw new RuntimeException("get lock error");
            } finally {
//                System.out.println("getLock: " + childLockPath);
            }
        }

        public void releaseLock() {
            try {
                if (childLockPath != null) {
                    // 釋放鎖,刪除節點
                    zooKeeper.delete(childLockPath, -1);
                }
                // 最後一個釋放的刪除鎖節點
                List<string> children = zooKeeper.getChildren(parentLockPath, false);
                if (children.isEmpty()) {
                    try {
                        zooKeeper.delete(parentLockPath, -1);
                    } catch (KeeperException e) {
                        // 如果刪除之前又新加了一個子節點,會刪除失敗
                        log.info("刪除節點 {} 失敗", parentLockPath);
                    }
                }
                // 關閉zk連線
                if (zooKeeper != null) {
                    zooKeeper.close();
                }
            } catch (Exception e) {
                log.error("release lock error", e);
                throw new RuntimeException("release lock error");
            } finally {
//                System.out.println("releaseLock: " + childLockPath);
            }
        }

        private boolean getLockOrWatchLast() throws KeeperException, InterruptedException {
            List<string> children = zooKeeper.getChildren(parentLockPath, false);
            // 必須要排序一下,這裡取出來的順序可能是亂的
            Collections.sort(children);
            // 如果當前節點是第一個子節點,則獲取鎖成功
            if ((parentLockPath + "/" + children.get(0)).equals(childLockPath)) {
                return true;
            }

            // 如果不是第一個子節點,就監聽前一個節點
            String last = "";
            for (String child : children) {
                if ((parentLockPath + "/" + child).equals(childLockPath)) {
                    break;
                }
                last = child;
            }

            if (zooKeeper.exists(parentLockPath + "/" + last, true) != null) {
                this.thread = Thread.currentThread();
                // 阻塞當前執行緒
                LockSupport.park();
                // 喚醒之後重新檢測自己是不是最小的節點,因為有可能上一個節點斷線了
                return getLockOrWatchLast();
            } else {
                // 如果上一個節點不存在,說明還沒來得及監聽就釋放了,重新檢查一次
                return getLockOrWatchLast();
            }
        }

        @Override
        public void process(WatchedEvent event) {
            if (this.thread != null) {
                // 喚醒阻塞的執行緒(這是在監聽執行緒,跟獲取鎖的執行緒不是同一個執行緒)
                LockSupport.unpark(this.thread);
                this.thread = null;
            }
        }
    }
}

測試程式碼

我們這裡起兩批執行緒,一批獲取user_1這個鎖,一批獲取user_2這個鎖。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class ZkLockerTest {

    @Autowired
    private Locker locker;

    @Test
    public void testZkLocker() throws IOException {
        for (int i = 0; i &lt; 1000; i++) {
            new Thread(()-&gt;{
                locker.lock("user_1", ()-&gt; {
                    try {
                        System.out.println(String.format("user_1 time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }, "Thread-"+i).start();
        }
        for (int i = 1000; i &lt; 2000; i++) {
            new Thread(()-&gt;{
                locker.lock("user_2", ()-&gt; {
                    try {
                        System.out.println(String.format("user_2 time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }, "Thread-"+i).start();
        }

        System.in.read();
    }
}

執行結果:

可以看到穩定在500ms左右列印兩個鎖的結果。

user_1 time: 1568973299578, threadName: Thread-10
user_2 time: 1568973299579, threadName: Thread-1780
user_1 time: 1568973300091, threadName: Thread-887
user_2 time: 1568973300091, threadName: Thread-1542
user_1 time: 1568973300594, threadName: Thread-882
user_2 time: 1568973300594, threadName: Thread-1539
user_2 time: 1568973301098, threadName: Thread-1592
user_1 time: 1568973301098, threadName: Thread-799
user_1 time: 1568973301601, threadName: Thread-444
user_2 time: 1568973301601, threadName: Thread-1096
user_1 time: 1568973302104, threadName: Thread-908
user_2 time: 1568973302104, threadName: Thread-1574
user_2 time: 1568973302607, threadName: Thread-1515
user_1 time: 1568973302607, threadName: Thread-80
user_1 time: 1568973303110, threadName: Thread-274
user_2 time: 1568973303110, threadName: Thread-1774
user_1 time: 1568973303615, threadName: Thread-324
user_2 time: 1568973303615, threadName: Thread-1621

curator實現

上面的原生API實現更易於理解zookeeper實現分散式鎖的邏輯,但是難免保證沒有什麼問題,比如不是重入鎖,不支援讀寫鎖等。

下面我們一起看看現有的輪子curator是怎麼實現的。

pom檔案

pom檔案中引入以下jar包:

<dependency>
    <groupid>org.apache.curator</groupid>
    <artifactid>curator-recipes</artifactid>
    <version>4.0.0</version>
</dependency>
<dependency>
    <groupid>org.apache.curator</groupid>
    <artifactid>curator-framework</artifactid>
    <version>4.0.0</version>
</dependency>

程式碼實現

下面是互斥鎖的一種實現方案:

@Component
@Slf4j
public class ZkCuratorLocker implements Locker {
    public static final String connAddr = "127.0.0.1:2181";
    public static final int timeout = 6000;
    public static final String LOCKER_ROOT = "/locker";

    private CuratorFramework cf;

    @PostConstruct
    public void init() {
        this.cf = CuratorFrameworkFactory.builder()
                .connectString(connAddr)
                .sessionTimeoutMs(timeout)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();

        cf.start();
    }

    @Override
    public void lock(String key, Runnable command) {
        String path = LOCKER_ROOT + "/" + key;
        InterProcessLock lock = new InterProcessMutex(cf, path);
        try {
            // 【本篇文章由公眾號“彤哥讀原始碼”原創】
            lock.acquire();
            command.run();
        } catch (Exception e) {
            log.error("get lock error", e);
            throw new RuntimeException("get lock error", e);
        } finally {
            try {
                lock.release();
            } catch (Exception e) {
                log.error("release lock error", e);
                throw new RuntimeException("release lock error", e);
            }
        }
    }
}

除了互斥鎖,curator還提供了讀寫鎖、多重鎖、訊號量等實現方式,而且他們是可重入的鎖。

總結

(1)zookeeper中的節點有四種類型:持久、持久有序、臨時、臨時有序;

(2)zookeeper提供了一種非常重要的特性——監聽機制,它可以用來監聽節點的變化;

(3)zookeeper分散式鎖是基於 臨時有序節點 + 監聽機制 實現的;

(4)zookeeper分散式鎖加鎖時在鎖路徑下建立臨時有序節點;

(5)如果自己是第一個節點,則獲得鎖;

(6)如果自己不是第一個節點,則監聽前一個節點,並阻塞當前執行緒;

(7)當監聽到前一個節點的刪除事件時,喚醒當前節點的執行緒,並再次檢查自己是不是第一個節點;

(8)使用臨時有序節點而不是持久有序節點是為了讓客戶端無故斷線時能夠自動釋放鎖;

彩蛋

zookeeper分散式鎖有哪些優點?

答:1)zookeeper本身可以叢集部署,相對於mysql的單點更可靠;

2)不會佔用mysql的連線數,不會增加mysql的壓力;

3)使用監聽機制,減少執行緒上下文切換的次數;

4)客戶端斷線能夠自動釋放鎖,非常安全;

5)有現有的輪子curator可以使用;

6)curator實現方式是可重入的,對現有程式碼改造成本小;

zookeeper分散式鎖有哪些缺點?

答:1)加鎖會頻繁地“寫”zookeeper,增加zookeeper的壓力;

2)寫zookeeper的時候會在叢集進行同步,節點數越多,同步越慢,獲取鎖的過程越慢;

3)需要另外依賴zookeeper,而大部分服務是不會使用zookeeper的,增加了系統的複雜性;

4)相對於redis分散式鎖,效能要稍微略差一些;

推薦閱讀

1、死磕 java同步系列之開篇

2、死磕 java魔法類之Unsafe解析

3、死磕 java同步系列之JMM(Java Memory Model)

4、死磕 java同步系列之volatile解析

5、死磕 java同步系列之synchronized解析

6、死磕 java同步系列之自己動手寫一個鎖Lock

7、死磕 java同步系列之AQS起篇

8、死磕 java同步系列之ReentrantLock原始碼解析(一)——公平鎖、非公平鎖

9、死磕 java同步系列之ReentrantLock原始碼解析(二)——條件鎖

10、死磕 java同步系列之ReentrantLock VS synchronized

11、死磕 java同步系列之ReentrantReadWriteLock原始碼解析

12、死磕 java同步系列之Semaphore原始碼解析

13、死磕 java同步系列之CountDownLatch原始碼解析

14、死磕 java同步系列之AQS終篇

15、死磕 java同步系列之StampedLock原始碼解析

16、死磕 java同步系列之CyclicBarrier原始碼解析

17、死磕 java同步系列之Phaser原始碼解析

18、死磕 java同步系列之mysql分散式鎖


歡迎關注我的公眾號“彤哥讀原始碼”,檢視更多原始碼系列文章, 與彤哥一起暢遊原始碼的海洋。

qrcode </stri