1. 程式人生 > >curator分散式鎖

curator分散式鎖

curator是Netflix公司開源的一個ZooKeeper客戶端封裝。

ZooKeeper可以被用來實現分散式鎖,具體是使用“臨時順序節點”實現。

獲取鎖

一個分散式鎖對應ZooKeeper的一個資料夾,每個需要獲取這個分散式鎖的客戶端執行緒在這個資料夾下建立一個臨時順序節點,此時有兩種情況:

1)建立的臨時順序節點是資料夾下的第一個節點,則認為是獲取分散式鎖成功。

2)建立的臨時順序節點不是資料夾下的第一個節點,則認為當前鎖已經被另一個客戶端執行緒獲取,此時需要進入阻塞狀態,等待節點順序中的前一個節點釋放鎖的時候喚醒當前執行緒。

阻塞-喚醒邏輯:把資料夾下的節點順序排列,找到當前節點的前一個節點,在前一個節點新增Watch,當前一個節點被刪除時會觸發Watch事件,進而喚醒當前阻塞執行緒。

如果前一個節點對應的客戶端崩潰了,則節點對應的Watch事件也會觸發,也會喚醒後一個節點對應的客戶端執行緒,此時仍需要判斷當前節點是第一個節點之後才能獲取鎖,否則繼續進入阻塞並Watch前一個節點。

重入性

只考慮同一個客戶端、同一個執行緒獲取同一個分散式鎖的可重入性,第一次獲取鎖成功之後,在JVM記憶體中的一個ConcurrentMap中儲存當前執行緒對應的鎖路徑及重入次數,後面同一個執行緒再次獲取鎖時,先檢查該Map中當前鎖是否已被當前執行緒佔用即可,如果已佔用,則只需要遞增重入次數即可。

因為重入性只考慮同一個客戶端、同一個JVM、同一個執行緒,所以可以不用考慮判斷ConcurrentMap中的Owner執行緒的併發問題。

釋放鎖

釋放鎖時,對應可重入分散式鎖,首先重入次數減一,然後判斷重入次數是否已經為0:

1)如果重入次數為0,則刪除當前客戶端執行緒對應的臨時順序節點,刪除操作會觸發次節點的Watch事件,如果有別的客戶端執行緒正在阻塞等待,則會通過Watch機制喚醒。

2)如果重入次數非0,則說明還未完全釋放鎖,直接返回即可。

可重入分散式鎖測試程式碼:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorDistributeLockTest {
    public static void main(String[] args) {
        String zkAddr = "127.0.0.1:2181";
        String lockPath = "/distribute-lock";
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString(zkAddr)
                .sessionTimeoutMs(2000)
                .retryPolicy(retryPolicy)
                .build();
        cf.start();

        InterProcessMutex lock = new InterProcessMutex(cf, lockPath);
        new Thread("thread-1"){
            @Override
            public void run() {
                process(lock);
            }
        }.start();
        new Thread("thread-2"){
            @Override
            public void run() {
                process(lock);
            }
        }.start();
    }

    private static void process(InterProcessLock lock) {
        System.out.println(Thread.currentThread().getName() + " acquire");
        try {
            lock.acquire();
            System.out.println(Thread.currentThread().getName() + " acquire success");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println(Thread.currentThread().getName() + " release");
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        System.out.println(Thread.currentThread().getName() + " release success");
    }
}



延伸閱讀:http://www.hollischuang.com/archives/1716