Redisson分散式鎖
實現Redis的分散式鎖,除了自己基於redis client原生api來實現之外,還可以使用開源框架:Redission,Redisson是一個企業級的開源Redis Client,也提供了分散式鎖的支援。
一、Redisson原理分析
(1)加鎖機制
執行緒去獲取鎖,獲取成功: 執行lua指令碼,儲存資料到redis資料庫。
執行緒去獲取鎖,獲取失敗: 一直通過while迴圈嘗試獲取鎖,獲取成功後,執行lua指令碼,儲存資料到redis資料庫。
如果該客戶端面對的是一個redis cluster叢集,他首先會根據hash節點選擇一臺機器,傳送一段lua指令碼到redis上.
lua指令碼
Redisson原始碼中,執行redis命令的是lua指令碼,其中主要用到如下幾個概念。
-
- redis.call() 是執行redis命令.
- KEYS[1] 是指指令碼中第1個引數
- ARGV[1] 是指指令碼中第一個引數的值
- 返回值中nil與false同一個意思。
需要注意的是,在redis執行lua指令碼時,相當於一個redis級別的鎖,不能執行其他操作,類似於原子操作,也是redisson實現的一個關鍵點。
另外,如果lua指令碼執行過程中出現了異常或者redis伺服器直接宕掉了,執行redis的根據日誌回覆的命令,會將指令碼中已經執行的命令在日誌中刪除。
(2)鎖互斥機制
如果客戶端2來嘗試加鎖,執行了同樣的一段lua指令碼,會咋樣呢?很簡單,第一個if判斷會執行“exists myLock”,發現myLock這個鎖key已經存在了。接著第二個if判斷,判斷一下,myLock鎖key的hash資料結構中,是否包含客戶端2的ID,但是明顯不是的,因為那裡包含的是客戶端1的ID。
所以,客戶端2會獲取到pttl myLock返回的一個數字,這個數字代表了myLock這個鎖key的**剩餘生存時間。**比如還剩15000毫秒的生存時間。此時客戶端2會進入一個while迴圈,不停的嘗試加鎖。
(3)watch dog自動延期機制
在一個分散式環境下,假如一個執行緒獲得鎖後,突然伺服器宕機了,那麼這個時候在一定時間後這個鎖會自動釋放,你也可以設定鎖的有效時間(不設定預設30秒),這樣的目的主要是防止死鎖的發生。
只要客戶端1一旦加鎖成功,就會啟動一個watch dog看門狗,他是一個後臺執行緒,會每隔10秒檢查一下,如果客戶端1還持有鎖key,那麼就會不斷的延長鎖key的生存時間。
(4)可重入加鎖機制
Redisson可以實現可重入加鎖機制的原因,我覺得跟兩點有關:
1、Redis儲存鎖的資料型別是 Hash型別
2、Hash資料型別的key值包含了當前執行緒資訊。
下面是redis儲存的資料
這裡表面資料型別是Hash型別,Hash型別相當於我們java的<key,<key1,value>>
型別,這裡key是指 'redisson'
它的有效期還有9秒,我們再來看裡們的key1值為078e44a3-5f95-4e24-b6aa-80684655a15a:45
它的組成是:
guid + 當前執行緒的ID。後面的value是就和可重入加鎖有關。
(5)鎖釋放機制
如果執行lock.unlock(),就可以釋放分散式鎖,就是每次都對myLock資料結構中的那個加鎖次數減1。如果發現加鎖次數是0了,說明這個客戶端已經不再持有鎖了,此時就會用:“del myLock”命令,從redis裡刪除這個key。然後呢,另外的客戶端2就可以嘗試完成加鎖了。這就是所謂的分散式鎖的開源Redisson框架的實現機制。
一般我們在生產系統中,可以用Redisson框架提供的這個類庫來基於redis進行分散式鎖的加鎖與釋放鎖。
Redis分散式鎖的缺點
redis cluster,或者是redis master-slave架構的主從非同步複製導致的redis分散式鎖的最大缺陷:在redis master例項宕機的時候,可能導致多個客戶端同時完成加鎖。導致各種髒資料的產生。
二、基於redission的分散式鎖實現
在分散式環境中,很多場景,如:秒殺、ID生成… 都需要分散式鎖。分散式鎖的實現,可以基於redis的setnx,zk的臨時節點,redission框架
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.2.3</version> </dependency>
工具類RedissionUtils
public class RedissionUtils { private static Logger logger = LoggerFactory.getLogger(RedissionUtils.class); private static RedissionUtils redisUtils; private RedissionUtils() { } /** * 提供單例模式 * * @return */ public static RedissionUtils getInstance() { if (redisUtils == null) synchronized (RedisUtils.class) { if (redisUtils == null) redisUtils = new RedissionUtils(); } return redisUtils; } /** * 使用config建立Redisson Redisson是用於連線Redis Server的基礎類 * * @param config * @return */ public RedissonClient getRedisson(Config config) { RedissonClient redisson = Redisson.create(config); logger.info("成功連線Redis Server"); return redisson; } /** * 使用ip地址和埠建立Redisson * * @param ip * @param port * @return */ public RedissonClient getRedisson(String ip, String port) { Config config = new Config(); config.useSingleServer().setAddress(ip + ":" + port); RedissonClient redisson = Redisson.create(config); logger.info("成功連線Redis Server" + "\t" + "連線" + ip + ":" + port + "伺服器"); return redisson; } /** * 關閉Redisson客戶端連線 * * @param redisson */ public void closeRedisson(RedissonClient redisson) { redisson.shutdown(); logger.info("成功關閉Redis Client連線"); } /** * 獲取字串物件 * * @param redisson * @param objectName * @return */ public <T> RBucket<T> getRBucket(RedissonClient redisson, String objectName) { RBucket<T> bucket = redisson.getBucket(objectName); return bucket; } /** * 獲取Map物件 * * @param redisson * @param objectName * @return */ public <K, V> RMap<K, V> getRMap(RedissonClient redisson, String objectName) { RMap<K, V> map = redisson.getMap(objectName); return map; } /** * 獲取有序集合 * * @param redisson * @param objectName * @return */ public <V> RSortedSet<V> getRSortedSet(RedissonClient redisson, String objectName) { RSortedSet<V> sortedSet = redisson.getSortedSet(objectName); return sortedSet; } /** * 獲取集合 * * @param redisson * @param objectName * @return */ public <V> RSet<V> getRSet(RedissonClient redisson, String objectName) { RSet<V> rSet = redisson.getSet(objectName); return rSet; } /** * 獲取列表 * * @param redisson * @param objectName * @return */ public <V> RList<V> getRList(RedissonClient redisson, String objectName) { RList<V> rList = redisson.getList(objectName); return rList; } /** * 獲取佇列 * * @param redisson * @param objectName * @return */ public <V> RQueue<V> getRQueue(RedissonClient redisson, String objectName) { RQueue<V> rQueue = redisson.getQueue(objectName); return rQueue; } /** * 獲取雙端佇列 * * @param redisson * @param objectName * @return */ public <V> RDeque<V> getRDeque(RedissonClient redisson, String objectName) { RDeque<V> rDeque = redisson.getDeque(objectName); return rDeque; } /** * 此方法不可用在Redisson 1.2 中 在1.2.2版本中 可用 * * @param redisson * @param objectName * @return */ /** * public <V> RBlockingQueue<V> getRBlockingQueue(RedissonClient * redisson,String objectName){ RBlockingQueue * rb=redisson.getBlockingQueue(objectName); return rb; } */ /** * 獲取鎖 * * @param redisson * @param objectName * @return */ public RLock getRLock(RedissonClient redisson, String objectName) { RLock rLock = redisson.getLock(objectName); return rLock; } /** * 獲取原子數 * * @param redisson * @param objectName * @return */ public RAtomicLong getRAtomicLong(RedissonClient redisson, String objectName) { RAtomicLong rAtomicLong = redisson.getAtomicLong(objectName); return rAtomicLong; } /** * 獲取記數鎖 * * @param redisson * @param objectName * @return */ public RCountDownLatch getRCountDownLatch(RedissonClient redisson, String objectName) { RCountDownLatch rCountDownLatch = redisson .getCountDownLatch(objectName); return rCountDownLatch; } /** * 獲取訊息的Topic * * @param redisson * @param objectName * @return */ public <M> RTopic<M> getRTopic(RedissonClient redisson, String objectName) { RTopic<M> rTopic = redisson.getTopic(objectName); return rTopic; } }
redission連線模式
//單機 RedissonClient redisson = Redisson.create(); Config config = new Config(); config.useSingleServer().setAddress("myredisserver:6379"); RedissonClient redisson = Redisson.create(config); //主從 Config config = new Config(); config.useMasterSlaveServers() .setMasterAddress("127.0.0.1:6379") .addSlaveAddress("127.0.0.1:6389", "127.0.0.1:6332", "127.0.0.1:6419") .addSlaveAddress("127.0.0.1:6399"); RedissonClient redisson = Redisson.create(config); //哨兵 Config config = new Config(); config.useSentinelServers() .setMasterName("mymaster") .addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379") .addSentinelAddress("127.0.0.1:26319"); RedissonClient redisson = Redisson.create(config); //叢集 Config config = new Config(); config.useClusterServers() .setScanInterval(2000) // cluster state scan interval in milliseconds .addNodeAddress("127.0.0.1:7000", "127.0.0.1:7001") .addNodeAddress("127.0.0.1:7002"); RedissonClient redisson = Redisson.create(config);
三、redission的各種分散式鎖
1)可重入鎖:
Redisson的分散式可重入鎖RLock,實現了java.util.concurrent.locks.Lock介面,以及支援自動過期解鎖。同時還提供了非同步(Async)、反射式(Reactive)和RxJava2標準的介面。
// 最常見的使用方法 RLock lock = redisson.getLock("anyLock"); lock.lock(); //... lock.unlock(); //另外Redisson還通過加鎖的方法提供了leaseTime的引數來指定加鎖的時間。超過這個時間後鎖便自動解開了。 // 加鎖以後10秒鐘自動解鎖 // 無需呼叫unlock方法手動解鎖 lock.lock(10, TimeUnit.SECONDS); // 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) { try { ... } finally { lock.unlock(); } }
大家都知道,如果負責儲存這個分散式鎖的Redisson節點宕機以後,而且這個鎖正好處於鎖住的狀態時,這個鎖會出現鎖死的狀態。為了避免這種情況的發生,Redisson內部提供了一個監控鎖的看門狗,它的作用是在Redisson例項被關閉前,不斷的延長鎖的有效期。預設情況下,看門狗的檢查鎖的超時時間是30秒鐘,也可以通過修改Config.lockWatchdogTimeout來另行指定。
Redisson同時還為分散式鎖提供了非同步執行的相關方法:
RLock lock = redisson.getLock("anyLock"); lock.lockAsync(); lock.lockAsync(10, TimeUnit.SECONDS); Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
RLock物件完全符合Java的Lock規範。也就是說只有擁有鎖的程序才能解鎖,其他程序解鎖則會丟擲IllegalMonitorStateException錯誤。
2)公平鎖(Fair Lock):
它保證了當多個Redisson客戶端執行緒同時請求加鎖時,優先分配給先發出請求的執行緒。所有請求執行緒會在一個佇列中排隊,當某個執行緒出現宕機時,Redisson會等待5秒後繼續下一個執行緒,也就是說如果前面有5個執行緒都處於等待狀態,那麼後面的執行緒會等待至少25秒。使用方式同上,獲取的時候使用如下方法:
RLock fairLock = redisson.getFairLock("anyLock");
3)聯鎖(MultiLock):
基於Redis的Redisson分散式聯鎖RedissonMultiLock物件可以將多個RLock物件關聯為一個聯鎖,每個RLock物件例項可以來自於不同的Redisson例項。
RLock lock1 = redissonInstance1.getLock("lock1"); RLock lock2 = redissonInstance2.getLock("lock2"); RLock lock3 = redissonInstance3.getLock("lock3"); RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); // 同時加鎖:lock1 lock2 lock3 // 所有的鎖都上鎖成功才算成功。 lock.lock(); ... lock.unlock(); //另外Redisson還通過加鎖的方法提供了leaseTime的引數來指定加鎖的時間。超過這個時間後鎖便自動解開了。 RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); // 給lock1,lock2,lock3加鎖,如果沒有手動解開的話,10秒鐘後將會自動解開 lock.lock(10, TimeUnit.SECONDS); // 為加鎖等待100秒時間,並在加鎖成功10秒鐘後自動解開 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); ... lock.unlock();
4)紅鎖(RedLock):
基於Redis的Redisson紅鎖RedissonRedLock物件實現了Redlock介紹的加鎖演算法。該物件也可以用來將多個RLock物件關聯為一個紅鎖,每個RLock物件例項可以來自於不同的Redisson例項。
RLock lock1 = redissonInstance1.getLock("lock1"); RLock lock2 = redissonInstance2.getLock("lock2"); RLock lock3 = redissonInstance3.getLock("lock3"); RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3); // 同時加鎖:lock1 lock2 lock3 // 紅鎖在大部分節點上加鎖成功就算成功。 lock.lock(); ... lock.unlock(); //另外Redisson還通過加鎖的方法提供了leaseTime的引數來指定加鎖的時間。超過這個時間後鎖便自動解開了。 RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3); // 給lock1,lock2,lock3加鎖,如果沒有手動解開的話,10秒鐘後將會自動解開 lock.lock(10, TimeUnit.SECONDS); // 為加鎖等待100秒時間,並在加鎖成功10秒鐘後自動解開 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); ... lock.unlock();
5)讀寫鎖(ReadWriteLock):
基於Redis的Redisson分散式可重入讀寫鎖RReadWriteLock Java物件實現了java.util.concurrent.locks.ReadWriteLock介面。其中讀鎖和寫鎖都繼承了RLock介面。分散式可重入讀寫鎖允許同時有多個讀鎖和一個寫鎖處於加鎖狀態。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock"); // 最常見的使用方法 rwlock.readLock().lock(); // 或 rwlock.writeLock().lock(); //另外Redisson還通過加鎖的方法提供了leaseTime的引數來指定加鎖的時間。超過這個時間後鎖便自動解開了。 // 10秒鐘以後自動解鎖 // 無需呼叫unlock方法手動解鎖 rwlock.readLock().lock(10, TimeUnit.SECONDS); // 或 rwlock.writeLock().lock(10, TimeUnit.SECONDS); // 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖 boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS); // 或 boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS); ... lock.unlock();
6)訊號量(Semaphore):
基於Redis的Redisson的分散式訊號量(Semaphore)Java物件RSemaphore採用了與java.util.concurrent.Semaphore相似的介面和用法。同時還提供了非同步(Async)、反射式(Reactive)和RxJava2標準的介面。
RSemaphore semaphore = redisson.getSemaphore("semaphore"); semaphore.acquire(); //或 semaphore.acquireAsync(); semaphore.acquire(23); semaphore.tryAcquire(); //或 semaphore.tryAcquireAsync(); semaphore.tryAcquire(23, TimeUnit.SECONDS); //或 semaphore.tryAcquireAsync(23, TimeUnit.SECONDS); semaphore.release(10); semaphore.release(); //或 semaphore.releaseAsync();
7)可過期性訊號量(PermitExpirableSemaphore):
基於Redis的Redisson可過期性訊號量(PermitExpirableSemaphore)是在RSemaphore物件的基礎上,為每個訊號增加了一個過期時間。每個訊號可以通過獨立的ID來辨識,釋放時只能通過提交這個ID才能釋放。它提供了非同步(Async)、反射式(Reactive)和RxJava2標準的介面。
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore"); String permitId = semaphore.acquire(); // 獲取一個訊號,有效期只有2秒鐘。 String permitId = semaphore.acquire(2, TimeUnit.SECONDS); // ... semaphore.release(permitId);
8)門閂:
基於Redisson的Redisson分散式閉鎖(CountDownLatch)Java物件RCountDownLatch採用了與java.util.concurrent.CountDownLatch相似的介面和用法。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.trySetCount(1); latch.await(); // 在其他執行緒或其他JVM裡 RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.countDown();
9)分散式AtomicLong:
RAtomicLong atomicLong = redisson.getAtomicLong("myAtomicLong"); atomicLong.set(3); atomicLong.incrementAndGet(); atomicLong.get()
10)分散式BitSet:
RBitSet set = redisson.getBitSet("simpleBitset"); set.set(0, true); set.set(1812, false); set.clear(0); set.addAsync("e"); set.xor("anotherBitset");
11)分散式Object:
RBucket<AnyObject> bucket = redisson.getBucket("anyObject"); bucket.set(new AnyObject(1)); AnyObject obj = bucket.get(); bucket.trySet(new AnyObject(3)); bucket.compareAndSet(new AnyObject(4), new AnyObject(5)); bucket.getAndSet(new AnyObject(6));
12)分散式Set:
RSet<SomeObject> set = redisson.getSet("anySet"); set.add(new SomeObject()); set.remove(new SomeObject());
13)分散式List:
RList<SomeObject> list = redisson.getList("anyList"); list.add(new SomeObject()); list.get(0); list.remove(new SomeObject());
14)分散式Blocking Queue:
RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue"); queue.offer(new SomeObject()); SomeObject obj = queue.peek(); SomeObject someObj = queue.poll(); SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
15)分散式Map:
RMap<String, SomeObject> map = redisson.getMap("anyMap"); SomeObject prevObject = map.put("123", new SomeObject()); SomeObject currentObject = map.putIfAbsent("323", new SomeObject()); SomeObject obj = map.remove("123"); map.fastPut("321", new SomeObject()); map.fastRemove("321"); Future<SomeObject> putAsyncFuture = map.putAsync("321"); Future<Void> fastPutAsyncFuture = map.fastPutAsync("321"); map.fastPutAsync("321", new SomeObject()); map.fastRemoveAsync("321");
除此之外,還支援Multimap。
16)Map eviction:
現在Redis沒有過期清空Map中的某個entry的功能,只能是清空Map所有的entry。Redission提供了這種功能。
RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap"); // ttl = 10 minutes, map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES); // ttl = 10 minutes, maxIdleTime = 10 seconds map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS); // ttl = 3 seconds map.putIfAbsent("key2", new SomeObject(), 3, TimeUnit.SECONDS); // ttl = 40 seconds, maxIdleTime = 10 seconds map.putIfAbsent("key2", new SomeObject(), 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);