1. 程式人生 > >【原始碼分析】分散式鎖-RedisLockRegistry原始碼分析[轉]

【原始碼分析】分散式鎖-RedisLockRegistry原始碼分析[轉]

前言

官網的英文介紹大概如下:

Starting with version 4.0, the RedisLockRegistry is available. Certain components (for example aggregator and resequencer) use a lock obtained from a LockRegistry instance to ensure that only one thread is manipulating a group at a time. The DefaultLockRegistry performs this function within a single component; you can now configure an external lock registry on these components. When used with a shared MessageGroupStore, the RedisLockRegistry can be use to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.
When a lock is released by a local thread, another local thread will generally be able to acquire the lock immediately. If a lock is released by a thread using a different registry instance, it can take up to 100ms to acquire the lock.
To avoid “hung” locks (when a server fails), the locks in this registry are expired after a default 60 seconds, but this can be configured on the registry. Locks are normally held for a much smaller time.

上述大概意思是RedisLockRegistry可以確保在分散式環境中,只有一個thread在執行,也就是實現了分散式鎖,當一個本地執行緒釋放了鎖,其他本地現場會立即去搶佔鎖,如果鎖被佔用了,那麼會進行重試機制,100毫秒進行重試一次。同時也避免了”hung” locks 當伺服器fails的時候。同時也給鎖設定了預設60秒的過期時間

如何獲取鎖

鎖的獲取過程

詳細流程如上圖所示,這裡主要核心業務是這樣,首先Lock是java.util.concurrent.locks中的鎖,也就是本地鎖。然後自己用RedisLock實現了Lock介面而已,但是實際上RedisLock也使用了本地鎖。主要是通過redis鎖+本地鎖雙重鎖的方式實現的一個比較好的鎖。針對redis鎖來說只要能獲取到鎖,那麼就算是成功的。如果獲取不到鎖就等待100毫秒繼續重試,如果獲取到鎖那麼就採用本地鎖鎖住本地的執行緒。通過兩種方式很好的去實現了一個完善的分散式鎖機制。
下面程式碼主要是獲取鎖的一個流程,先從本地鎖裡面獲取,如果獲取到了那麼和redis裡面存放的RedisLock鎖做對比,判斷是否是同一個物件,如果不是那麼就刪除本地鎖然後重新建立一個鎖返回

@Override
public Lock obtain(Object lockKey) {
    Assert.isInstanceOf(String.class, lockKey);

    //try to find the lock within hard references
    //從本地強引用裡面獲取鎖,
    RedisLock lock = findLock(this.hardThreadLocks.get(), lockKey);

    /*
     * If the lock is locked, check that it matches what's in the store.
     * If it doesn't, the lock must have expired.
     */
//這裡主要判斷了這個鎖是否是鎖住的,如果不是的那麼該鎖已經過期了 //如果強引用裡面有這個鎖,並且lock.thread!=null,說明這個鎖沒有被佔用 if (lock != null && lock.thread != null) { //從redis獲取鎖,若如果redis鎖為空或者跟當前強引用的鎖不一致,可以確定兩個問題 //1.redis裡面的鎖和本地的鎖不是一個了 //2.redis裡面沒有鎖 RedisLock lockInStore = this.redisTemplate.boundValueOps(this.registryKey + ":" + lockKey).get(); if (lockInStore == null || !lock.equals(lockInStore)) { //刪除強引用裡面鎖 getHardThreadLocks().remove(lock); lock = null; } } //如果鎖==null if (lock == null) { //try to find the lock within weak references //嘗試線從弱引用裡面去找鎖 lock = findLock(this.weakThreadLocks.get(), lockKey); //如果弱引用鎖==null 那麼新建一個鎖 if (lock == null) { lock = new RedisLock((String) lockKey); //判斷是否用弱引用,如果用那麼就加入到弱引用裡面 if (this.useWeakReferences) { getWeakThreadLocks().add(lock); } } } return lock; }

上面獲取到的是RedisLock,RedisLock是實現java原生Lock介面,並重寫了lock()方法。首先從localRegistry中獲取到鎖,這裡的鎖是java開發包裡面的ReentrantLock。首先把本地先鎖住,然後再去遠端obtainLock。每次sleep() 100毫秒直到獲取到遠端鎖為止,程式碼如下所示:

@Override
public void lock() {
    //這裡採用java開發包裡面的ReentrantLock 進行多執行緒的加鎖,單機多執行緒的情況下解決併發的問題
    Lock localLock = RedisLockRegistry.this.localRegistry.obtain(this.lockKey);
    localLock.lock();
    while (true) {
        try {
            while (!this.obtainLock()) {
                Thread.sleep(100); //NOSONAR
            }
            break;
        }
        catch (InterruptedException e) {
                /*
                 * This method must be uninterruptible so catch and ignore
                 * interrupts and only break out of the while loop when
                 * we get the lock.
                 */
        }
        catch (Exception e) {
            localLock.unlock();
            rethrowAsLockException(e);
        }
    }
}

核心遠端鎖還是在RedisLock中,這裡採用了redis事務+watch的方式,watch和事務都是redis裡面自帶的。使用watch時候如果key的值發生了任何變化。那麼exec()將不會執行,那麼如下程式碼返回的success就是false。從而來實現redis鎖的功能

private boolean obtainLock() {
    //判斷建立這個類的執行緒和當前是否是一個,如果是就直接獲取鎖
    Thread currentThread = Thread.currentThread();
    if (currentThread.equals(this.thread)) {
        this.reLock++;
        return true;
    }
    //把當前鎖存到集合種
    toHardThreadStorage(this);

    /*
     * Set these now so they will be persisted if successful.
     */
    this.lockedAt = System.currentTimeMillis();
    this.threadName = currentThread.getName();

    Boolean success = false;
    try {
        success = RedisLockRegistry.this.redisTemplate.execute(new SessionCallback<Boolean>() {

            @SuppressWarnings({"unchecked", "rawtypes"})
            @Override
            public Boolean execute(RedisOperations ops) throws DataAccessException {
                String key = constructLockKey();
                //監控key如果該key被改變了 那麼該事務是不能被實現的會進行回滾
                ops.watch(key); //monitor key
                //如果key存在了就停止監控,如果key已經存在了 那麼肯定是被別人佔用了
                if (ops.opsForValue().get(key) != null) {
                    ops.unwatch(); //key already exists, stop monitoring
                    return false;
                }

                ops.multi(); //transaction start
                //設定一個值並加上過期時間 m預設是一分鐘左右的時間
                //set the value and expire
                //把鎖放入到redis中
                ops.opsForValue()
                        .set(key, RedisLock.this, RedisLockRegistry.this.expireAfter, TimeUnit.MILLISECONDS);

                //exec will contain all operations result or null - if execution has been aborted due to 'watch'
                return ops.exec() != null;
            }

        });

    }
    finally {
      //如果不成功那麼把當前過期時間和鎖的名字設定成null
        if (!success) {
            this.lockedAt = 0;
            this.threadName = null;
            toWeakThreadStorage(this);
        }
        else {
        //如果成功把當前鎖的thread名稱設定成currentThread
            this.thread = currentThread;
            if (logger.isDebugEnabled()) {
                logger.debug("New lock; " + this.toString());
            }
        }

    }

    return success;
}

上面是整個加鎖的流程,基本流程比較簡單,看完加鎖應該自己都能解鎖,無非就是去除redis鎖和本地的鎖而已。

@Override
public void unlock() {
    //判斷當前執行的執行緒和鎖的執行緒做對比,如果兩個執行緒不一樣那麼丟擲異常
    if (!Thread.currentThread().equals(this.thread)) {
        if (this.thread == null) {
            throw new IllegalStateException("Lock is not locked; " + this.toString());
        }
        throw new IllegalStateException("Lock is owned by " + this.thread.getName() + "; " + this.toString());
    }

    try {
       //如果reLock--小於=0的話就刪除redis裡面的鎖
        if (this.reLock-- <= 0) {
            try {
                this.assertLockInRedisIsUnchanged();
                RedisLockRegistry.this.redisTemplate.delete(constructLockKey());
                if (logger.isDebugEnabled()) {
                    logger.debug("Released lock; " + this.toString());
                }
            }
            finally {
                this.thread = null;
                this.reLock = 0;
                toWeakThreadStorage(this);
            }
        }
    }
    finally {
    //拿到本地鎖,進行解鎖
        Lock localLock = RedisLockRegistry.this.localRegistry.obtain(this.lockKey);
        localLock.unlock();
    }
}

tryLock在原有的加鎖上面增加了一個超時機制,主要是先通過本地的超時機制

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    //拿到本地鎖
    Lock localLock = RedisLockRegistry.this.localRegistry.obtain(this.lockKey);
    //先本地鎖進行tryLock
    if (!localLock.tryLock(time, unit)) {
        return false;
    }
    try {
        long expire = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(time, unit);
        boolean acquired;
        //這裡添加了超時機制,跟之前的無限等待做了一個區分
        while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) { //NOSONAR
            Thread.sleep(100); //NOSONAR
        }
         //超時後沒有獲取到鎖,那麼就把本地鎖進行解鎖
        if (!acquired) {
            localLock.unlock();
        }
        return acquired;
    }
    catch (Exception e) {
        localLock.unlock();
        rethrowAsLockException(e);
    }
    return false;
}