分散式鎖原始碼剖析(2) Redisson實現公平分散式鎖
Redisson分散式鎖原始碼剖析(公平鎖)
maven配置檔案:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.8.1</version>
</dependency>
程式碼示例:
Config config = new Config(); config.useClusterServers() .addNodeAddress("redis://192.168.31.114:7001") .addNodeAddress("redis://192.168.31.184:7002"); RedissonClient redisson = Redisson.create(config); RLock lock = redisson.getFairLock("anyLock"); lock.lock(); lock.unlock();
lock()方法原始碼剖析
基本邏輯和可重入鎖類似,最大區別是加鎖的邏輯。
核心原始碼:
lock()->RedisssonLock.lock()->lockInterruptibly()->tryAcquire()->tryAcquireAsync()->tryLockInnerAsync()(RedissonFairLock類中的方法)
if (command == RedisCommands.EVAL_LONG) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, // remove stale threads "while true do " + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" + "if firstThreadId2 == false then " + "break;" + "end; " + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + "if timeout <= tonumber(ARGV[4]) then " + "redis.call('zrem', KEYS[3], firstThreadId2); " + "redis.call('lpop', KEYS[2]); " + "else " + "break;" + "end; " + "end;" + "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) " + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " + "redis.call('lpop', KEYS[2]); " + "redis.call('zrem', KEYS[3], ARGV[2]); " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "local firstThreadId = redis.call('lindex', KEYS[2], 0); " + "local ttl; " + "if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " + "ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" + "else " + "ttl = redis.call('pttl', KEYS[1]);" + "end; " + "local timeout = ttl + tonumber(ARGV[3]);" + "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " + "redis.call('rpush', KEYS[2], ARGV[2]);" + "end; " + "return ttl;", Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName), internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime); }
第一段邏輯:死迴圈,彈出佇列中的第一個元素,如果佇列是空,直接跳出迴圈。timeout = set集合裡面執行緒對應的過期時間。如果timeout <= 當前時間,清空集合,清空佇列。
第二段邏輯:如果anyLock鎖不存在並且佇列為空或者佇列的第一個元素是當前執行緒,彈出佇列的第一個元素,從有序set集合中刪除當前執行緒對應的元素,然後給當前執行緒加鎖,設定這個key的有效時間是30秒。判斷anyLock中,是否有當前執行緒:1,如果有,那累加1,即當前執行緒:2,。(重入鎖)
第三段邏輯:得到佇列中的第一個元素,如果佇列中的第一個元素存在並且執行緒id不等於當前執行緒,ttl = 佇列第一個元素過期時間 - 當前時間。如果佇列是空,ttl = anyLock的存活時間。
第四段邏輯:timeout= ttl + 當前時間 + 50秒(等待時間),往set集合插入一個元素,過期時間為timeout,並把它放入佇列中。最終返回ttl。
unlock()方法原始碼剖析
unlock()->RedisssonLock.unlock()->unlockAsync()->unlockInnerAsync()(RedissonFairLock類中的方法)
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// remove stale threads
"while true do "
+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
+ "if timeout <= tonumber(ARGV[4]) then "
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
+ "end; "
+ "end;"
+ "if (redis.call('exists', KEYS[1]) == 0) then " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"end; " +
"redis.call('del', KEYS[1]); " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; ",
Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName, getChannelName()),
LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());
}
第一段邏輯:獲取佇列中的第一個元素,如果佇列為空,跳出迴圈。timeout = set集合中執行緒對應的元素存活時間,如果timeout <=當前時間,清空set集合和佇列。
第二段邏輯:如果鎖不存在,取出佇列中的第一個元素,釋出一個解鎖訊息事件,如果anylock的map結構中沒有任何執行緒,則直接返回null。遞減anyLock的threadId:1。(因為鎖的可重入性)
第三段邏輯:刪除anyLock這個key,佇列中的第一個元素通過訊息事件,嘗試去獲取鎖。