1. 程式人生 > >【分散式鎖】04-使用Redisson實現ReadWriteLock原理

【分散式鎖】04-使用Redisson實現ReadWriteLock原理

前言

關於讀寫鎖,大家應該都瞭解JDK中的ReadWriteLock, 當然Redisson也有讀寫鎖的實現。

所謂讀寫鎖,就是多個客戶端同時加讀鎖,是不會互斥的,多個客戶端可以同時加這個讀鎖,讀鎖和讀鎖是不互斥的

Redisson中使用RedissonReadWriteLock來實現讀寫鎖,它是RReadWriteLock的子類,具體實現讀寫鎖的類分別是:RedissonReadLockRedissonWriteLock

Redisson讀寫鎖使用例子

還是從官方文件中找的使用案例:

RReadWriteLock rwlock = redisson.getReadWriteLock("tryLock");

RLock lock = rwlock.readLock();
// or
RLock lock = rwlock.writeLock();

// traditional lock method
lock.lock();

// or acquire lock and automatically unlock it after 10 seconds
lock.lock(10, TimeUnit.SECONDS);

// or wait for lock aquisition up to 100 seconds 
// and automatically unlock it after 10 seconds
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}

Redisson加讀鎖邏輯原理

public class RedissonReadLock extends RedissonLock implements RLock {
    @Override
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                                "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                                "if (mode == false) then " +
                                  "redis.call('hset', KEYS[1], 'mode', 'read'); " +
                                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                                  "redis.call('set', KEYS[2] .. ':1', 1); " +
                                  "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
                                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                  "return nil; " +
                                "end; " +
                                "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
                                  "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
                                  "local key = KEYS[2] .. ':' .. ind;" +
                                  "redis.call('set', key, 1); " +
                                  "redis.call('pexpire', key, ARGV[1]); " +
                                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                  "return nil; " +
                                "end;" +
                                "return redis.call('pttl', KEYS[1]);",
                        Arrays.<Object>asList(getName(), getReadWriteTimeoutNamePrefix(threadId)), 
                        internalLockLeaseTime, getLockName(threadId), getWriteLockName(threadId));
    }
}

客戶端A(UUID_01:threadId_01)來加讀鎖

注:
以下文章中客戶端A用:UUID_01:threadId_01標識
客戶端B用:UUID_02:threadId_02標識

KEYS:

  • KEYS1: getName() = tryLock
  • KEYS[2]: getReadWriteTimeoutNamePrefix(threadId) = {anyLock}:UUID_01:threadId_01:rwlock_timeout

ARGV:

  • ARGV1: internalLockLeaseTime = 30000毫秒
  • ARGV[2]: getLockName(threadId) = UUID_01:threadId_01
  • ARGV[3]: getWriteLockName(threadId) = UUID_01:threadId_01:write

接著對程式碼中lua指令碼一行行解讀:

  1. hget anyLock mode 第一次加鎖時是空的
  2. mode = false,進入if邏輯
  3. hset anyLock UUID_01:threadId_01 1
    anyLock是hash結構,設定hash的key、value
  4. set {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1
    設定一個string型別的key value資料
  5. pexpire {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 30000
    設定key value的過期時間
  6. pexpire anyLock 30000
    設定anyLock的過期時間

此時redis中存在的資料結構為:

anyLock: {
  "mode": "read",
  "UUID_01:threadId_01": 1
}

{anyLock}:UUID_01:threadId_01:rwlock_timeout:1  1
客戶端A 第二次來加讀鎖

繼續分析,客戶端A已經加過讀鎖,此時如果繼續加讀鎖會怎樣處理呢?

  1. hget anyLock mode 此時mode=read,會進入第二個if判斷
  2. hincrby anyLock UUID_01:threadId_01 1 此時hash中的value會加1,變成2
  3. set {anyLock}:UUID_01:threadId_01:rwlock_timeout:2 1
    ind 為hincrby結果,hincrby返回是2
  4. pexpire anyLock 30000
  5. pexpire {anyLock}:UUID_01:threadId_01:rwlock_timeout:2 30000

此時redis中存在的資料結構為:

anyLock: {
  “mode”: “read”,
  “UUID_01:threadId_01”: 2
}

{anyLock}:UUID_01:threadId_01:rwlock_timeout:1  1
{anyLock}:UUID_01:threadId_01:rwlock_timeout:2  1

客戶端B (UUID_02:threadId_02)第一次來加讀鎖

基本步驟和上面一直,加鎖後redis中資料為:

anyLock: {
  "mode": "read",
  "UUID_01:threadId_01": 2,
  "UUID_02:threadId_02": 1
}

{anyLock}:UUID_01:threadId_01:rwlock_timeout:1  1
{anyLock}:UUID_01:threadId_01:rwlock_timeout:2  1
{anyLock}:UUID_02:threadId_02:rwlock_timeout:1  1

這裡需要注意一下:
為雜湊表 key 中的域 field 的值加上增量 increment,如果 key 不存在,一個新的雜湊表被建立並執行 HINCRBY 命令。

Redisson加寫鎖邏輯原理

Redisson中由RedissonWriteLock 來實現寫鎖,我們看下寫鎖的核心邏輯:

public class RedissonWriteLock extends RedissonLock implements RLock {
    @Override
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                            "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                            "if (mode == false) then " +
                                  "redis.call('hset', KEYS[1], 'mode', 'write'); " +
                                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                  "return nil; " +
                              "end; " +
                              "if (mode == 'write') then " +
                                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
                                      "local currentExpire = redis.call('pttl', KEYS[1]); " +
                                      "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
                                      "return nil; " +
                                  "end; " +
                                "end;" +
                                "return redis.call('pttl', KEYS[1]);",
                        Arrays.<Object>asList(getName()), 
                        internalLockLeaseTime, getLockName(threadId));
    }
}

還是像上面一樣,一行行來分析每句lua指令碼執行語義。

客戶端A先加讀寫、再加寫鎖

KEYS和ARGV引數:

  • KEYS1 = anyLock
  • ARGV1 = 30000
  • ARGV[2] = UUID_01:threadId_01:write
  1. hget anyLock mode,此時沒人加鎖,mode=false
  2. hset anyLock mode write
  3. hset anyLock UUID_01:threadId_01:write 1
  4. pexpire anyLock 30000

此時redis中資料格式為:

anyLock: {
    "mode": "write",
    "UUID_01:threadId_01:write": 1
}

此時再次來加寫鎖,直接到另一個if語句中:

  1. hexists anyLock UUID_01:threadId_01:write
  2. hincrby anyLock UUID_01:threadId_01:write 1
  3. pexpire anyLock pttl + 30000

此時redis中資料格式為:

anyLock: {
    "mode": "write",
    "UUID_01:threadId_01:write": 2
}

客戶端A和客戶端B,先後加讀鎖,客戶端C來加寫鎖

讀鎖加完後,此時redis資料格式為:

anyLock: {
  "mode": "read",
  "UUID_01:threadId_01": 1,
  "UUID_02:threadId_02": 1
}

{anyLock}:UUID_01:threadId_01:rwlock_timeout:1    1
{anyLock}:UUID_02:threadId_02:rwlock_timeout:1    1

客戶端C引數為:

  • KEYS1 = anyLock
  • ARGV1 = 30000
  • ARGV[2] = UUID_03:threadId_03:write

hget anyLock mode,mode = read,已經有人加了讀鎖,不是寫鎖,此時會直接執行:pttl
anyLock,返回一個anyLock的剩餘生存時間

  1. hget anyLock mode,mode = read,已經有人加了讀鎖,不是寫鎖,所以if語句不會成立
  2. pttl anyLock,返回一個anyLock的剩餘生存時間

客戶端C加鎖失敗,就會不斷的嘗試重試去加鎖

客戶端A先加寫鎖、客戶端B接著加讀鎖

加完寫鎖後此時Redis資料格式為:

anyLock: {
  "mode": "write",
  "UUID_01:threadId_01:write": 1
}

客戶端B執行讀鎖邏輯引數為:

  • KEYS1 = anyLock
  • KEYS[2] = {anyLock}:UUID_02:threadId_02:rwlock_timeout
  • ARGV1 = 30000毫秒
  • ARGV[2] = UUID_02:threadId_02
  • ARGV[3] = UUID_02:threadId_02:write

接著看下加鎖邏輯:

image.png

如上圖,客戶端B加讀鎖會走到紅框中的if邏輯:

  1. hget anyLock mode,mode = write
    客戶端A已經加了一個寫鎖
  2. hexists anyLock UUID_02:threadId_02:write,存在的話,如果客戶端B自己之前加過寫鎖的話,此時才能進入這個分支
  3. 返回pttl anyLock,導致加鎖失敗

客戶端A先加寫鎖、客戶端A接著加讀鎖

還是接著上面的邏輯,繼續分析:

  1. hget anyLock mode,mode = write
    客戶端A已經加了一個寫鎖
  2. hexists anyLock UUID_01:threadId_01:write,此時存在這個key,所以可以進入if分支
  3. hincrby anyLock UUID_01:threadId_01 1,也就是說此時,加了一個讀鎖
  4. set {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 1,
  5. pexpire anyLock 30000
  6. pexpire {anyLock}:UUID_01:threadId_01:rwlock_timeout:1 30000

此時redis中資料格式為:

anyLock: {
  "mode": "write",
  "UUID_01:threadId_01:write": 1,
  "UUID_01:threadId_01": 1
}

{anyLock}:UUID_01:threadId_01:rwlock_timeout:1    1

客戶端A先加讀鎖、客戶端A接著加寫鎖

客戶端A加讀鎖後,redis中資料結構為:

anyLock: {
  "mode": "read",
  "UUID_01:threadId_01": 1
}

{anyLock}:UUID_01:threadId_01:rwlock_timeout:1  1

此時客戶端A再來加寫鎖,邏輯如下:

image.png

此時客戶端A先加的讀鎖,mode=read,所以再次加寫鎖是不能成功的

如果是同一個客戶端同一個執行緒,先加了一次寫鎖,然後加讀鎖,是可以加成功的,預設是在同一個執行緒寫鎖的期間,可以多次加讀鎖

而同一個客戶端同一個執行緒,先加了一次讀鎖,是不允許再被加寫鎖的

總結

顯然還有寫鎖與寫鎖互斥的邏輯就不分析了,通過上面一些場景的分析,我們可以知道:

  • 讀鎖與讀鎖非互斥
  • 讀鎖與寫鎖互斥
  • 寫鎖與寫鎖互斥
  • 讀讀、寫寫 同個客戶端同個執行緒都可重入
  • 先寫鎖再加讀鎖可重入
  • 先讀鎖再寫鎖不可重入

Redisson讀寫鎖釋放原理

Redission 讀鎖釋放原理

不同客戶端加了讀鎖 / 同一個客戶端+執行緒多次可重入加了讀鎖

例如客戶端A先加讀鎖,然後再次加讀鎖
最後客戶端B來加讀鎖

此時Redis中資料格式為:

anyLock: {
  "mode": "read",
  "UUID_01:threadId_01": 2,
  "UUID_02:threadId_02": 1
}

{anyLock}:UUID_01:threadId_01:rwlock_timeout:1        1
{anyLock}:UUID_01:threadId_01:rwlock_timeout:2        1
{anyLock}:UUID_02:threadId_02:rwlock_timeout:1        1

接著我們看下釋放鎖的核心程式碼:

public class RedissonReadLock extends RedissonLock implements RLock {
    @Override
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
        String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                "if (mode == false) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
                "if (lockExists == 0) then " +
                    "return nil;" +
                "end; " +

                "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + 
                "if (counter == 0) then " +
                    "redis.call('hdel', KEYS[1], ARGV[2]); " + 
                "end;" +
                "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +

                "if (redis.call('hlen', KEYS[1]) > 1) then " +
                    "local maxRemainTime = -3; " + 
                    "local keys = redis.call('hkeys', KEYS[1]); " + 
                    "for n, key in ipairs(keys) do " + 
                        "counter = tonumber(redis.call('hget', KEYS[1], key)); " + 
                        "if type(counter) == 'number' then " + 
                            "for i=counter, 1, -1 do " + 
                                "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + 
                                "maxRemainTime = math.max(remainTime, maxRemainTime);" + 
                            "end; " + 
                        "end; " + 
                    "end; " +

                    "if maxRemainTime > 0 then " +
                        "redis.call('pexpire', KEYS[1], maxRemainTime); " +
                        "return 0; " +
                    "end;" + 

                    "if mode == 'write' then " + 
                        "return 0;" + 
                    "end; " +
                "end; " +

                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; ",
                Arrays.<Object>asList(getName(), getChannelName(), timeoutPrefix, keyPrefix), 
                LockPubSub.unlockMessage, getLockName(threadId));
    }
}

客戶端A來釋放鎖:
對應的KEYS和ARGV引數為:

  • KEYS1 = anyLock

  • KEYS[2] = redisson_rwlock:{anyLock}

  • KEYS[3] = {anyLock}:UUID_01:threadId_01:rwlock_timeout

  • KEYS[4] = {anyLock}

  • ARGV1 = 0

  • ARGV[2] = UUID_01:threadId_01

接下來開始執行操作:

  1. hget anyLock mode,mode = read
  2. hexists anyLock UUID_01:threadId_01,肯定是存在的,因為這個客戶端A加過讀鎖
  3. hincrby anyLock UUID_01:threadId_01 -1,將這個客戶端對應的加鎖次數遞減1,現在就是變成1,counter = 1
  4. del {anyLock}:UUID_01:threadId_01:rwlock_timeout:2,刪除了一個timeout key

此時Redis中的資料結構為:

anyLock: {
  "mode": "read",
  "UUID_01:threadId_01": 1,
  "UUID_02:threadId_02": 1
}

{anyLock}:UUID_01:threadId_01:rwlock_timeout:1    1
{anyLock}:UUID_02:threadId_02:rwlock_timeout:1    1

此時繼續往下,具體邏輯如圖:

image.png

  1. hlen anyLock > 1,就是hash裡面的元素超過1個
  2. pttl {anyLock}:UUID_01:threadId_01:rwlock_timeout:1,此時獲取那個timeout key的剩餘生存時間還有多少毫秒,比如說此時這個key的剩餘生存時間是20000毫秒

這個for迴圈的含義是獲取到了所有的timeout key的最大的一個剩餘生存時間,假設最大的剩餘生存時間是25000毫秒

客戶端A繼續來釋放鎖:

此時客戶端A執行流程還會和上面一直,執行完成後Redis中資料結構為:

anyLock: {
  "mode": "read",
  "UUID_02:threadId_02": 1
}

{anyLock}:UUID_02:threadId_02:rwlock_timeout:1    1

因為這裡會走counter == 0的邏輯,所以會執行"redis.call('hdel', KEYS[1], ARGV[2]); "

客戶端B繼續來釋放鎖:

客戶端B流程也和上面一直,執行完後就會刪除anyLock這個key

同一個客戶端/執行緒先加寫鎖再加讀鎖

上面已經分析過這種情形,操作過後Redis中資料結構為:

anyLock: {
  "mode": "write",
  "UUID_01:threadId_01:write": 1,
  "UUID_01:threadId_01": 1
}

{anyLock}:UUID_01:threadId_01:rwlock_timeout:1    1

此時客戶端A來釋放讀鎖:

  1. hincrby anyLock UUID_01:threadId_01 -1,將這個客戶端對應的加鎖次數遞減1,現在就是變成1,counter = 0
  2. hdel anyLock UUID_01:threadId_01,此時就是從hash資料結構中刪除客戶端A這個加鎖的記錄
  3. del {anyLock}:UUID_01:threadId_01:rwlock_timeout:1,刪除了一個timeout key

此時Redis中資料變成:

anyLock: {
  "mode": "write",
  "UUID_01:threadId_01:write": 1
}

Redisson寫鎖釋放原理

先看下寫鎖釋放的核心邏輯:

public class RedissonWriteLock extends RedissonLock implements RLock {
    @Override
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                "if (mode == false) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                "if (mode == 'write') then " +
                    "local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +
                    "if (lockExists == 0) then " +
                        "return nil;" +
                    "else " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                            "return 0; " +
                        "else " +
                            "redis.call('hdel', KEYS[1], ARGV[3]); " +
                            "if (redis.call('hlen', KEYS[1]) == 1) then " +
                                "redis.call('del', KEYS[1]); " +
                                "redis.call('publish', KEYS[2], ARGV[1]); " + 
                            "else " +
                                // has unlocked read-locks
                                "redis.call('hset', KEYS[1], 'mode', 'read'); " +
                            "end; " +
                            "return 1; "+
                        "end; " +
                    "end; " +
                "end; "
                + "return nil;",
        Arrays.<Object>asList(getName(), getChannelName()), 
        LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
    }
}
同一個客戶端多次可重入加寫鎖 / 同一個客戶端先加寫鎖再加讀鎖

客戶端A加兩次寫鎖釋放:

此時Redis中資料為:

anyLock: {
  "mode": "write",
  "UUID_01:threadId_01:write": 2,
  "UUID_01:threadId_01": 1
}

{anyLock}:UUID_01:threadId_01:rwlock_timeout:1    1

客戶端A來釋放鎖KEYS和ARGV引數:

  • KEYS1 = anyLock

  • KEYS[2] = redisson_rwlock:{anyLock}

  • ARGV1 = 0

  • ARGV[2] = 30000

  • ARGV[3] = UUID_01:threadId_01:write

直接分析lua程式碼:

  1. 上面mode=write,後面使用hincrby進行-1操作,此時count=1
  2. 如果count>0,此時使用pexpire然後返回0
  3. 此時客戶端A再來釋放寫鎖,count=0
  4. hdel anyLock UUID_01:threadId_01:write

此時Redis中資料:

anyLock: {
  "mode": "write",
  "UUID_01:threadId_01": 1
}

{anyLock}:UUID_01:threadId_01:rwlock_timeout:1    1

後續還會接著判斷,如果count=0,代表寫鎖都已經釋放完了,此時hlen如果>1,代表加的還有讀鎖,所以接著執行:hset anyLock mode read, 將寫鎖轉換為讀鎖

最終Redis資料為:

anyLock: {
  "mode": "read",
  "UUID_01:threadId_01": 1
}

{anyLock}:UUID_01:threadId_01:rwlock_timeout:1    1

總結

Redisson陸續也更新了好幾篇了,疫情期間宅在家裡一直學習Redisson相關內容,這篇文章寫了2天,從早到晚。

讀寫鎖這塊內容真的很多,本篇篇幅很長,如果學習本篇文章最好跟著原始碼一起讀,後續還會繼續更新Redisson相關內容,如有不正確的地方,歡迎指正!

申明

本文章首發自本人部落格:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!

感興趣的小夥伴可關注個人公眾號:壹枝花算不算浪漫

相關推薦

分散式04-使用Redisson實現ReadWriteLock原理

前言 關於讀寫鎖,大家應該都瞭解JDK中的ReadWriteLock, 當然Redisson也有讀寫鎖的實現。 所謂讀寫鎖,就是多個客戶端同時加讀鎖,是不會互斥的,多個客戶端可以同時加這個讀鎖,讀鎖和讀鎖是不互斥的 Redisson中使用RedissonReadWriteLock來實現讀寫鎖,它是RReadW

分散式06-Zookeeper實現分散式:可重入原始碼分析

前言 前面已經講解了Redis的客戶端Redission是怎麼實現分散式鎖的,大多都深入到原始碼級別。 在分散式系統中,常見的分散式鎖實現方案還有Zookeeper,接下來會深入研究Zookeeper是如何來實現分散式鎖的。 Zookeeper初識 檔案系統 Zookeeper維護一個類似檔案系統的資料結構

分散式01-使用Redisson實現可重入分散式原理

前言 主流的分散式鎖一般有三種實現方式: 資料庫樂觀鎖 基於Redis的分散式鎖 基於ZooKeeper的分散式鎖 之前我在部落格上寫過關於mysql和redis實現分散式鎖的具體方案:https://www.cnblogs.com/wang-meng/p/10226618.html裡面主要是從實現原理出

分散式02-使用Redisson實現公平原理

前言 前面分析了Redisson可重入鎖的原理,主要是通過lua指令碼加鎖及設定過期時間來保證鎖執行的原子性,然後每個執行緒獲取鎖會將獲取鎖的次數+1,釋放鎖會將當前鎖次數-1,如果為0則表示釋放鎖成功。 可重入原理和JDK中的可重入鎖都是一致的。 Redisson公平鎖原理 JDK中也有公平鎖和非公平鎖,所

分散式03-使用Redisson實現RedLock原理

前言 前面已經學習了Redission可重入鎖以及公平鎖的原理,接著看看Redission是如何來實現RedLock的。 RedLock原理 RedLock是基於redis實現的分散式鎖,它能夠保證以下特性: 互斥性:在任何時候,只能有一個客戶端能夠持有鎖;避免死鎖: 當客戶端拿到鎖後,即使發生了網路分割槽

分散式05-使用Redisson中Semaphore和CountDownLatch原理

前言 前面已經寫了Redisson大多的內容,我們再看看Redisson官網共有哪些元件: image.png 剩下還有Semaphore和CountDownLatch兩塊,我們就趁熱打鐵,趕緊看看Redisson是如何實現的吧。 我們在JDK中都知道Semaphore和CountDownLatch兩兄弟,

原創redis庫存操作,分散式的四種實現方式[連載二]--基於Redisson實現分散式

一、redisson介紹 redisson實現了分散式和可擴充套件的java資料結構,支援的資料結構有:List, Set, Map, Queue, SortedSet, ConcureentMap, Lock, AtomicLong, CountDownLatch。並且是執行緒安全的,底層使用N

Redis實現分散式Redis實現分散式

前言 分散式鎖一般有三種實現方式:1. 資料庫樂觀鎖;2. 基於Redis的分散式鎖;3. 基於ZooKeeper的分散式鎖。本篇部落格將介紹第二種方式,基於Redis實現分散式鎖。雖然網上已經有各種介紹Redis分散式鎖實現的部落格,然而他們的實現卻有著各種各樣的問題,為

原創redis庫存操作,分散式的四種實現方式[連載一]--基於zookeeper實現分散式

一、背景 在電商系統中,庫存的概念一定是有的,例如配一些商品的庫存,做商品秒殺活動等,而由於庫存操作頻繁且要求原子性操作,所以絕大多數電商系統都用Redis來實現庫存的加減,最近公司專案做架構升級,以微服務的形式做分散式部署,對庫存的操作也單獨封裝為一個微服務,這樣在高併發情況下,加減庫存時,就會出現超賣等

連載redis庫存操作,分散式的四種實現方式[三]--基於Redis watch機制實現分散式

一、redis的事務介紹 1、 Redis保證一個事務中的所有命令要麼都執行,要麼都不執行。如果在傳送EXEC命令前客戶端斷線了,則Redis會清空事務佇列,事務中的所有命令都不會執行。而一旦客戶端傳送了EXEC命令,所有的命令就都會被執行,即使此後客戶端斷線也沒關係,因為Redis中已經記錄了所有要執行的

基於redisson分散式的簡單註解實現

Redisson依賴: <!--redisson--><dependency>     <groupId>org.redisson</groupId>     <artifactId>redisson</ar

分散式快取——-基於redis分散式快取的實現

一:Redis 是什麼? Redis是基於記憶體、可持久化的日誌型、Key-Value資料庫 高效能儲存系統,並提供多種語言的API. 二:出現背景 資料結構(Data Structure)需求

redis併發讀寫,使用Redisson實現分散式

今天為大家帶來一篇有關Redisson實現分散式鎖的文章,好了,不多說了,直接進入主題。1. 可重入鎖(Reentrant Lock)Redisson的分散式可重入鎖RLock Java物件實現了java.util.concurrent.locks.Lock介面,同時還支援自

hibernate框架使用hibernate實現悲觀和樂觀

四種隔離機制不要忘記:(1,2,4,8) 1.read-uncommitted:能夠去讀那些沒有提交的資料(允許髒讀的存在) 2.read-committed:不會出現髒讀,因為只有另一個事務提交才會讀取來 結果,但仍然會出現不可重複讀和幻讀現象。 4.repeatable

分散式架構(10)---基於Redis元件的特性,實現一個分散式限流

分散式---基於Redis進行介面IP限流 場景 為了防止我們的介面被人惡意訪問,比如有人通過JMeter工具頻繁訪問我們的介面,導致介面響應變慢甚至崩潰,所以我們需要對一些特定的介面進行IP限流,即一定時間內同一IP訪問的次數是有限的。 實現原理 用Redis作為限流元件的核心的原理,將使用者的IP地址當

Redisson實現分散式(3)—專案落地實現

Redisson實現分散式鎖(3)—專案落地實現 有關Redisson實現分散式鎖前面寫了兩篇部落格作為該專案落地的鋪墊。 1、Redisson實現分散式鎖(1)---原理 2、Redisson實現分散式鎖(2)—RedissonLock 這篇講下通過Redisson實現分散式鎖的專案實現,我會把專案放到Gi

分散式的演化常用的種類以及解決方案

### 前言 上一篇分散式鎖的文章中,通過超市存放物品的例子和大家簡單分享了一下Java鎖。本篇文章我們就來深入探討一下Java鎖的種類,以及不同的鎖使用的場景,當然本篇只介紹我們常用的鎖。我們分為兩大類,分別是樂觀鎖和悲觀鎖,公平鎖和非公平鎖。 ### 樂觀鎖和悲觀鎖 #### 樂觀鎖 老貓相信,

分散式的演化電商“超賣”場景實戰

### 前言 從本篇開始,老貓會通過電商中的業務場景和大家分享鎖在實際應用場景下的演化過程。從Java單體鎖到分散式環境下鎖的實踐。 ### 超賣的第一種現象案例 其實在電商業務場景中,會有一個這樣讓人忌諱的現象,那就是“超賣”,那麼什麼是超賣呢?舉個例子,某商品的庫存數量只有10件,最終卻賣出了15件

分散式的演化“超賣場景”,MySQL分散式

### 前言 之前的文章中通過電商場景中秒殺的例子和大家分享了單體架構中鎖的使用方式,但是現在很多應用系統都是相當龐大的,很多應用系統都是微服務的架構體系,那麼在這種跨jvm的場景下,我們又該如何去解決併發。 ### 單體應用鎖的侷限性 在進入實戰之前簡單和大家粗略聊一下網際網路系統中的架構演進。 !

分散式的演化終章!手擼ZK分散式!

### 前言 這應該是分散式鎖演化的最後一個章節了,相信很多小夥伴們看完這個章節之後在應對高併發的情況下,如何保證執行緒安全心裡肯定也會有譜了。在實際的專案中也可以參考一下老貓的github上的例子,當然程式碼沒有經過特意的封裝,需要小夥伴們自己再好好封裝一下。那麼接下來,就和大家分享一下基於zookeep