1. 程式人生 > 程式設計 >Etcd分散式鎖:cp分散式鎖的最佳實現

Etcd分散式鎖:cp分散式鎖的最佳實現

為什麼需要cp分散式鎖

分散式鎖的功能和訴求,我們已經在Redis分散式鎖:基於AOP和Redis實現的簡易版分散式鎖簡單的介紹過了。

目前自研的Redis分散式鎖,已可滿足大部分場景(非公平+可自動續期+可重入的分散式鎖),可投入生產環境的單機環境中使用。但是因為是基於Redis單機的環境,只能用於併發量並不高的場景。隨著接入的業務場景擴大,Redis單機已經變得不可靠了,那麼接下來給我們的選擇只有兩種: 1、Redis單機改為叢集。 2、改用其他基於一致性演演算法的實現方式。

方案1有先天性的缺陷,redis叢集無法保證一致性問題,在master節點宕機的瞬間,master和slave節點之間的資料可能是不一致的。這將會導致服務a從master節點拿到了鎖a,然後master節點宕機,在slave節點尚未完全同步完master的資料之前,服務b將從slave節點上成功拿到同樣的鎖a。

而在其他基於一致性演演算法的實現方式上,zk和ectd是不錯的選擇。然後考慮到zk已廉頗老矣,我們選擇了ectd這個後起之秀。

由於在分散式鎖的場景內,我們更關注的是鎖的一致性,而非鎖的可用性,所以cp鎖比ap鎖更可靠。

設計思路

etcd引入了租約的概念,我們首先需要授予一個租約,然後同時設定租約的有效時間。租約的有效時間我們可以用來作為鎖的有效時間。

然後我們可以直接呼叫etcd的lock功能,在指定的租約上對指定的lockName進行加鎖操作。如果當前沒有其他執行緒持有該鎖,則該執行緒能直接持有鎖。否則需要等待。這裡我們可以將timeout的時間設定為鎖的等待時間來實現競爭鎖失敗等待獲取的過程。當然由於網路波動等問題,我建議timeout的時間最少設定為500ms(或你們認為合理的數值)。

然後解鎖的過程,我們放棄了etcd的unlock操作,而直接使用了etcd的revoke操作。之所以沒採用unlock操作,一是因為unlock所需要的引數是上一步lock操作返回的lockKey,我們並不希望多維護一個欄位,二是因為我們最終會執行revoke操作,而revoke操作會將該租約下的所有key都失效,因為我們目前目前設計的是一個租約對應一個鎖,不存在會釋放其它業務場景中的鎖的情況。

此外,為了保證執行緒在等待獲取鎖的過程中租約不會過期,所以我們得為這個執行緒設定一個守護執行緒,在該執行緒授予租約後就開啟守護執行緒,定期去判斷是否需要續期。

和redis分散式鎖不一樣的是,redis分散式鎖的有效時間是快取的有效時間,所以可以在獲取鎖成功後再開啟用於續期的守護執行緒,而etcd分散式鎖的有效時間是租約的有效時間,在等待獲取鎖的過程中可能租約會過期,所以得在獲取租約後就得開啟守護執行緒。這樣就增加了很多的複雜度。

##具體實現 原生的etcd是通過Go語言來寫的,直接在java程式中應用會有一點困難,所以我們直接採用jetcd來作為etcd的客戶端,這樣在java程式中就可以使用程式碼方式和etcd服務端通訊。

jetcd提供了LeaseClient,我們可以直接使用grant功能完成授予租約的操作。

public LockLeaseData getLeaseData(String lockName,Long lockTime) {
    try {
        LockLeaseData lockLeaseData = new LockLeaseData();
        CompletableFuture<LeaseGrantResponse> leaseGrantResponseCompletableFuture = client.getLeaseClient().grant(lockTime);
        Long leaseId = leaseGrantResponseCompletableFuture.get(1,TimeUnit.SECONDS).getID();
        lockLeaseData.setLeaseId(leaseId);
        CpSurvivalClam cpSurvivalClam = new CpSurvivalClam(Thread.currentThread(),leaseId,lockName,lockTime,this);
        Thread survivalThread = threadFactoryManager.getThreadFactory().newThread(cpSurvivalClam);
        survivalThread.start();
        lockLeaseData.setCpSurvivalClam(cpSurvivalClam);
        lockLeaseData.setSurvivalThread(survivalThread);
        return lockLeaseData;
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        return null;
    }
}
複製程式碼

此外如上所述,我們在獲取租約後,開啟了CpSurvivalClam的守護執行緒來定期續期。CpSurvivalClam的實現和我們在redis分散式鎖的時候實現大體一致,差別只是將其中的expandLockTime操作改為了etcd中的keepAliveOnce。expandLockTime方法具體如下所示:

/**
 * 重置鎖的有效時間
 *
 * @param leaseId 鎖的租約id
 * @return 是否成功重置
 */
public Boolean expandLockTime(Long leaseId) {
    try {
        CompletableFuture<LeaseKeepAliveResponse> leaseKeepAliveResponseCompletableFuture = client.getLeaseClient().keepAliveOnce(leaseId);
        leaseKeepAliveResponseCompletableFuture.get();
        return Boolean.TRUE;
    } catch (InterruptedException | ExecutionException e) {
        return Boolean.FALSE;
    }
}
複製程式碼

然後jetcd提供了LockClient,我們直接可以用lock功能,將leaseId和lockName傳入,我們會得到一個在該租約下的lockKey。此外為了保證加鎖成功後,租約未過期。我們加了一步timeToLive的操作,用於判斷租約在獲取鎖成功後的是否還存活。如果ttl未大於0,則判斷為加鎖失敗。

/**
 * 在指定的租約上加鎖,如果租約過期,則算加鎖失敗。
 *
 * @param leaseId  鎖的租約Id
 * @param lockName 鎖的名稱
 * @param waitTime 加鎖過程中的的等待時間,單位ms
 * @return 是否加鎖成功
 */
public Boolean tryLock(Long leaseId,String lockName,Long waitTime) {
    try {
        CompletableFuture<LockResponse> lockResponseCompletableFuture = client.getLockClient().lock(ByteSequence.from(lockName,Charset.defaultCharset()),leaseId);
        long timeout = Math.max(500,waitTime);
        lockResponseCompletableFuture.get(timeout,TimeUnit.MILLISECONDS).getKey();
        CompletableFuture<LeaseTimeToLiveResponse> leaseTimeToLiveResponseCompletableFuture = client.getLeaseClient().timeToLive(leaseId,LeaseOption.DEFAULT);
        long ttl = leaseTimeToLiveResponseCompletableFuture.get(1,TimeUnit.SECONDS).getTTl();
        if (ttl > 0) {
            return Boolean.TRUE;
        } else {
            return Boolean.FALSE;
        }
    } catch (TimeoutException | InterruptedException | ExecutionException e) {
        return Boolean.FALSE;
    }
}
複製程式碼

解鎖過程,我們可以直接使用LeaseClient下的revoke操作,在撤銷租約的同時將該租約下的lock釋放。

/**
 * 取消租約,並釋放鎖
 *
 * @param leaseId 租約id
 * @return 是否成功釋放
 */
public Boolean unLock(Long leaseId) {
    try {
        CompletableFuture<LeaseRevokeResponse> revokeResponseCompletableFuture = client.getLeaseClient().revoke(leaseId);
        revokeResponseCompletableFuture.get(1,TimeUnit.SECONDS);
        return Boolean.TRUE;
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        return Boolean.FALSE;
    }
}

複製程式碼

然後是統一的CpLock物件,封裝了加解鎖的過程,對外只暴露execute方法,避免使用者忘記解鎖步驟。

public class CpLock {

    private String lockName;

    private LockEtcdClient lockEtcdClient;

    /**
     * 分散式鎖的鎖持有數
     */
    private volatile int state;

    private volatile transient Thread lockOwnerThread;

    /**
     * 當前執行緒擁有的lease物件
     */
    private FastThreadLocal<LockLeaseData> lockLeaseDataFastThreadLocal = new FastThreadLocal<>();
    /**
     * 鎖自動釋放時間,單位s,預設為30
     */
    private static Long LOCK_TIME = 30L;

    /**
     * 獲取鎖失敗單次等待時間,單位ms,預設為300
     */
    private static Integer SLEEP_TIME_ONCE = 300;

    CpLock(String lockName,LockEtcdClient lockEtcdClient) {
        this.lockName = lockName;
        this.lockEtcdClient = lockEtcdClient;
    }

    private LockLeaseData getLockLeaseData(String lockName,long lockTime) {
        if (lockLeaseDataFastThreadLocal.get() != null) {
            return lockLeaseDataFastThreadLocal.get();
        } else {
            LockLeaseData lockLeaseData = lockEtcdClient.getLeaseData(lockName,lockTime);
            lockLeaseDataFastThreadLocal.set(lockLeaseData);
            return lockLeaseData;
        }
    }

    final Boolean tryLock(long waitTime) {
        final long startTime = System.currentTimeMillis();
        final long endTime = startTime + waitTime * 1000;
        final long lockTime = LOCK_TIME;
        final Thread current = Thread.currentThread();
        try {
            do {
                int c = this.getState();
                if (c == 0) {
                    LockLeaseData lockLeaseData = this.getLockLeaseData(lockName,lockTime);
                    if (Objects.isNull(lockLeaseData)) {
                        return Boolean.FALSE;
                    }
                    Long leaseId = lockLeaseData.getLeaseId();
                    if (lockEtcdClient.tryLock(leaseId,endTime - System.currentTimeMillis())) {
                        log.info("執行緒獲取重入鎖成功,cp鎖的名稱為{}",lockName);
                        this.setLockOwnerThread(current);
                        this.setState(c + 1);
                        return Boolean.TRUE;
                    }
                } else if (lockOwnerThread == Thread.currentThread()) {
                    if (c + 1 <= 0) {
                        throw new Error("Maximum lock count exceeded");
                    }
                    this.setState(c + 1);
                    log.info("執行緒重入鎖成功,cp鎖的名稱為{},當前LockCount為{}",state);
                    return Boolean.TRUE;
                }
                int sleepTime = SLEEP_TIME_ONCE;
                if (waitTime > 0) {
                    log.info("執行緒暫時無法獲得cp鎖,當前已等待{}ms,本次將再等待{}ms,System.currentTimeMillis() - startTime,sleepTime,lockName);
                    try {
                        Thread.sleep(sleepTime);
                    } catch (InterruptedException e) {
                        log.info("執行緒等待過程中被中斷,e);
                    }
                }
            } while (System.currentTimeMillis() <= endTime);
            if (waitTime == 0) {
                log.info("執行緒獲得cp鎖失敗,將放棄獲取,lockName);
            } else {
                log.info("執行緒獲得cp鎖失敗,之前共等待{}ms,將放棄等待獲取,lockName);
            }
            this.stopKeepAlive();
            return Boolean.FALSE;
        } catch (Exception e) {
            log.error("execute error",e);
            this.stopKeepAlive();
            return Boolean.FALSE;
        }
    }

    /**
     * 停止續約,並將租約物件從執行緒中移除
     */
    private void stopKeepAlive() {
        LockLeaseData lockLeaseData = lockLeaseDataFastThreadLocal.get();
        if (Objects.nonNull(lockLeaseData)) {
            lockLeaseData.getCpSurvivalClam().stop();
            lockLeaseData.setCpSurvivalClam(null);
            lockLeaseData.getSurvivalThread().interrupt();
            lockLeaseData.setSurvivalThread(null);
        }
        lockLeaseDataFastThreadLocal.remove();
    }

    final void unLock() {
        if (lockOwnerThread == Thread.currentThread()) {
            int c = this.getState() - 1;
            if (c == 0) {
                this.setLockOwnerThread(null);
                this.setState(c);
                LockLeaseData lockLeaseData = lockLeaseDataFastThreadLocal.get();
                this.stopKeepAlive();
                //unLock操作必須在最後執行,避免其他執行緒獲取到鎖時的state等資料不正確
                lockEtcdClient.unLock(lockLeaseData.getLeaseId());
                log.info("重入鎖LockCount-1,執行緒已成功釋放鎖,lockName);
            } else {
                this.setState(c);
                log.info("重入鎖LockCount-1,cp鎖的名稱為{},剩餘LockCount為{}",c);
            }
        }
    }

    public <T> T execute(Supplier<T> supplier,int waitTime) {
        Boolean holdLock = Boolean.FALSE;
        Preconditions.checkArgument(waitTime >= 0,"waitTime必須為自然數");
        try {
            if (holdLock = this.tryLock(waitTime)) {
                return supplier.get();
            }
            return null;
        } catch (Exception e) {
            log.error("cpLock execute error",e);
            return null;
        } finally {
            if (holdLock) {
                this.unLock();
            }
        }
    }

    public <T> T execute(Supplier<T> supplier) {
        return this.execute(supplier,0);
    }
}

複製程式碼

CpLock和之前Redis分散式鎖中的ApLock實現大體一致。區別主要有:

1、因為我們是在授予租約的操作中開啟了守護執行緒,所以在競爭鎖失敗、出現異常和釋放鎖這些場景下,我們必須得停止守護執行緒續期。又因為是可重入的場景,我們又只希望在state為0的情況下再去生成租約去競爭鎖。所以避免多種情況判斷,我們引入了FastThreadLocal lockLeaseDataFastThreadLocal來儲存當前執行緒的Lease物件。

2、redis分散式鎖在任何場景下,等待獲取鎖都是通過休眠輪詢的方式實現的,而在etcd場景下,我們在state為0時通過etcd自身的等待邏輯來完成等待,在state非0場景下,依然通過休眠輪詢的方式來實現等待。因為可能會存在state從非0轉為0的情況,所以我們的waitTime值是endTime - System.currentTimeMillis(),而非原本傳入的waitTime。這樣能夠讓等待時間更接近我們期望值。

更新說明

本次更新,我們實現了基於etcd的cp分散式鎖,同時也修復了redis分散式鎖中的一個隱藏問題。

之前的setState操作在unLock之後,這樣在併發場景下會導致一個問題發生。執行緒a和執行緒b在競爭獲取鎖a,此時各自的區域性變數c和state都為0,然後執行緒a在獲取到了鎖之後立刻釋放了鎖,此時先執行了unLock,state還是1,執行緒b成功獲得鎖,將state重置為c+1,依然是1,然後執行緒a執行setState,將stete改為0。此時執行緒b如果去釋放鎖,執行stete-1操作,變為了-1。這個問題主要是因為獲取state值和state值修改操作是非同步的,而在多執行緒場景下,分散式鎖是通過lock控制的,我們只需要將unLock操作挪到所有賦值之後即可解決這個問題。

後續計劃

目前實現的cp分散式鎖的版本,已可滿足分散式鎖的絕大部分場景(非公平+可自動續期+可重入+強一致性的分散式鎖),已可投入生產環境的叢集中使用。後續的計劃中,ap鎖和cp鎖將會分別更新,會優化一些使用場景。也會嘗試去解決公平鎖的問題,以及迴圈獲取鎖需要等待休眠的問題。

以上計劃已完成,如何實現公平鎖可詳見Etcd分散式鎖(二):支援公平鎖,避免某些場景下執行緒長期無法獲取鎖

本次cp分散式鎖需要考慮大量的使用場景,目前只進行了小規模的測試,如有考慮不周的地方,還望大家海涵。

推薦閱讀

1、Redis分散式鎖:基於AOP和Redis實現的簡易版分散式鎖
2、Redis分散式鎖(二):支援鎖的續期,避免鎖超時後導致多個執行緒獲得鎖
3、Redis分散式鎖(三):支援鎖可重入,避免鎖遞迴呼叫時死鎖

好了,我們下一期再見,歡迎大家一起留言討論。同時也歡迎點贊~