1. 程式人生 > 實用技巧 >Redisson分散式鎖

Redisson分散式鎖

  實現Redis的分散式鎖,除了自己基於redis client原生api來實現之外,還可以使用開源框架:Redission,Redisson是一個企業級的開源Redis Client,也提供了分散式鎖的支援。

一、Redisson原理分析

 (1)加鎖機制  

  執行緒去獲取鎖,獲取成功: 執行lua指令碼,儲存資料到redis資料庫。

  執行緒去獲取鎖,獲取失敗: 一直通過while迴圈嘗試獲取鎖,獲取成功後,執行lua指令碼,儲存資料到redis資料庫。

  如果該客戶端面對的是一個redis cluster叢集,他首先會根據hash節點選擇一臺機器,傳送一段lua指令碼到redis上.

  lua指令碼

  

  Redisson原始碼中,執行redis命令的是lua指令碼,其中主要用到如下幾個概念。

    • redis.call() 是執行redis命令.
    • KEYS[1] 是指指令碼中第1個引數
    • ARGV[1] 是指指令碼中第一個引數的值
    • 返回值中nil與false同一個意思。

需要注意的是,在redis執行lua指令碼時,相當於一個redis級別的鎖,不能執行其他操作,類似於原子操作,也是redisson實現的一個關鍵點。

另外,如果lua指令碼執行過程中出現了異常或者redis伺服器直接宕掉了,執行redis的根據日誌回覆的命令,會將指令碼中已經執行的命令在日誌中刪除。

(2)鎖互斥機制

  如果客戶端2來嘗試加鎖,執行了同樣的一段lua指令碼,會咋樣呢?很簡單,第一個if判斷會執行“exists myLock”,發現myLock這個鎖key已經存在了。接著第二個if判斷,判斷一下,myLock鎖key的hash資料結構中,是否包含客戶端2的ID,但是明顯不是的,因為那裡包含的是客戶端1的ID。

  所以,客戶端2會獲取到pttl myLock返回的一個數字,這個數字代表了myLock這個鎖key的**剩餘生存時間。**比如還剩15000毫秒的生存時間。此時客戶端2會進入一個while迴圈,不停的嘗試加鎖。

(3)watch dog自動延期機制

  在一個分散式環境下,假如一個執行緒獲得鎖後,突然伺服器宕機了,那麼這個時候在一定時間後這個鎖會自動釋放,你也可以設定鎖的有效時間(不設定預設30秒),這樣的目的主要是防止死鎖的發生。

  只要客戶端1一旦加鎖成功,就會啟動一個watch dog看門狗,他是一個後臺執行緒,會每隔10秒檢查一下,如果客戶端1還持有鎖key,那麼就會不斷的延長鎖key的生存時間。

(4)可重入加鎖機制

Redisson可以實現可重入加鎖機制的原因,我覺得跟兩點有關:

1、Redis儲存鎖的資料型別是 Hash型別
2、Hash資料型別的key值包含了當前執行緒資訊。

下面是redis儲存的資料

這裡表面資料型別是Hash型別,Hash型別相當於我們java的<key,<key1,value>>型別,這裡key是指 'redisson'

它的有效期還有9秒,我們再來看裡們的key1值為078e44a3-5f95-4e24-b6aa-80684655a15a:45它的組成是:

guid + 當前執行緒的ID。後面的value是就和可重入加鎖有關。

(5)鎖釋放機制  

如果執行lock.unlock(),就可以釋放分散式鎖,就是每次都對myLock資料結構中的那個加鎖次數減1。如果發現加鎖次數是0了,說明這個客戶端已經不再持有鎖了,此時就會用:“del myLock”命令,從redis裡刪除這個key。然後呢,另外的客戶端2就可以嘗試完成加鎖了。這就是所謂的分散式鎖的開源Redisson框架的實現機制。

一般我們在生產系統中,可以用Redisson框架提供的這個類庫來基於redis進行分散式鎖的加鎖與釋放鎖。

  Redis分散式鎖的缺點

  redis cluster,或者是redis master-slave架構的主從非同步複製導致的redis分散式鎖的最大缺陷:在redis master例項宕機的時候,可能導致多個客戶端同時完成加鎖。導致各種髒資料的產生。

二、基於redission的分散式鎖實現

  在分散式環境中,很多場景,如:秒殺、ID生成… 都需要分散式鎖。分散式鎖的實現,可以基於redis的setnx,zk的臨時節點,redission框架  

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.2.3</version>
</dependency>

   工具類RedissionUtils

public class RedissionUtils {
	private static Logger logger = LoggerFactory.getLogger(RedissionUtils.class);
 
	private static RedissionUtils redisUtils;
 
	private RedissionUtils() {
	}
 
	/**
	 * 提供單例模式
	 * 
	 * @return
	 */
	public static RedissionUtils getInstance() {
		if (redisUtils == null)
			synchronized (RedisUtils.class) {
				if (redisUtils == null)
					redisUtils = new RedissionUtils();
			}
		return redisUtils;
	}
 
	/**
	 * 使用config建立Redisson Redisson是用於連線Redis Server的基礎類
	 * 
	 * @param config
	 * @return
	 */
	public RedissonClient getRedisson(Config config) {
		RedissonClient redisson = Redisson.create(config);
		logger.info("成功連線Redis Server");
		return redisson;
	}
 
	/**
	 * 使用ip地址和埠建立Redisson
	 * 
	 * @param ip
	 * @param port
	 * @return
	 */
	public RedissonClient getRedisson(String ip, String port) {
		Config config = new Config();
		config.useSingleServer().setAddress(ip + ":" + port);
		RedissonClient redisson = Redisson.create(config);
		logger.info("成功連線Redis Server" + "\t" + "連線" + ip + ":" + port + "伺服器");
		return redisson;
	}
 
	/**
	 * 關閉Redisson客戶端連線
	 * 
	 * @param redisson
	 */
	public void closeRedisson(RedissonClient redisson) {
		redisson.shutdown();
		logger.info("成功關閉Redis Client連線");
	}
 
	/**
	 * 獲取字串物件
	 * 
	 * @param redisson
	 * @param objectName
	 * @return
	 */
	public <T> RBucket<T> getRBucket(RedissonClient redisson, String objectName) {
		RBucket<T> bucket = redisson.getBucket(objectName);
		return bucket;
	}
 
	/**
	 * 獲取Map物件
	 * 
	 * @param redisson
	 * @param objectName
	 * @return
	 */
	public <K, V> RMap<K, V> getRMap(RedissonClient redisson, String objectName) {
		RMap<K, V> map = redisson.getMap(objectName);
		return map;
	}
 
	/**
	 * 獲取有序集合
	 * 
	 * @param redisson
	 * @param objectName
	 * @return
	 */
	public <V> RSortedSet<V> getRSortedSet(RedissonClient redisson,
			String objectName) {
		RSortedSet<V> sortedSet = redisson.getSortedSet(objectName);
		return sortedSet;
	}
 
	/**
	 * 獲取集合
	 * 
	 * @param redisson
	 * @param objectName
	 * @return
	 */
	public <V> RSet<V> getRSet(RedissonClient redisson, String objectName) {
		RSet<V> rSet = redisson.getSet(objectName);
		return rSet;
	}
 
	/**
	 * 獲取列表
	 * 
	 * @param redisson
	 * @param objectName
	 * @return
	 */
	public <V> RList<V> getRList(RedissonClient redisson, String objectName) {
		RList<V> rList = redisson.getList(objectName);
		return rList;
	}
 
	/**
	 * 獲取佇列
	 * 
	 * @param redisson
	 * @param objectName
	 * @return
	 */
	public <V> RQueue<V> getRQueue(RedissonClient redisson, String objectName) {
		RQueue<V> rQueue = redisson.getQueue(objectName);
		return rQueue;
	}
 
	/**
	 * 獲取雙端佇列
	 * 
	 * @param redisson
	 * @param objectName
	 * @return
	 */
	public <V> RDeque<V> getRDeque(RedissonClient redisson, String objectName) {
		RDeque<V> rDeque = redisson.getDeque(objectName);
		return rDeque;
	}
 
	/**
	 * 此方法不可用在Redisson 1.2 中 在1.2.2版本中 可用
	 * 
	 * @param redisson
	 * @param objectName
	 * @return
	 */
	/**
	 * public <V> RBlockingQueue<V> getRBlockingQueue(RedissonClient
	 * redisson,String objectName){ RBlockingQueue
	 * rb=redisson.getBlockingQueue(objectName); return rb; }
	 */
 
	/**
	 * 獲取鎖
	 * 
	 * @param redisson
	 * @param objectName
	 * @return
	 */
	public RLock getRLock(RedissonClient redisson, String objectName) {
		RLock rLock = redisson.getLock(objectName);
		return rLock;
	}
 
	/**
	 * 獲取原子數
	 * 
	 * @param redisson
	 * @param objectName
	 * @return
	 */
	public RAtomicLong getRAtomicLong(RedissonClient redisson, String objectName) {
		RAtomicLong rAtomicLong = redisson.getAtomicLong(objectName);
		return rAtomicLong;
	}
 
	/**
	 * 獲取記數鎖
	 * 
	 * @param redisson
	 * @param objectName
	 * @return
	 */
	public RCountDownLatch getRCountDownLatch(RedissonClient redisson,
			String objectName) {
		RCountDownLatch rCountDownLatch = redisson
				.getCountDownLatch(objectName);
		return rCountDownLatch;
	}
 
	/**
	 * 獲取訊息的Topic
	 * 
	 * @param redisson
	 * @param objectName
	 * @return
	 */
	public <M> RTopic<M> getRTopic(RedissonClient redisson, String objectName) {
		RTopic<M> rTopic = redisson.getTopic(objectName);
		return rTopic;
	}
 
}

redission連線模式

//單機
RedissonClient redisson = Redisson.create();
Config config = new Config();
config.useSingleServer().setAddress("myredisserver:6379");
RedissonClient redisson = Redisson.create(config);
 
 
//主從
Config config = new Config();
config.useMasterSlaveServers()
    .setMasterAddress("127.0.0.1:6379")
    .addSlaveAddress("127.0.0.1:6389", "127.0.0.1:6332", "127.0.0.1:6419")
    .addSlaveAddress("127.0.0.1:6399");
RedissonClient redisson = Redisson.create(config);
 
 
//哨兵
Config config = new Config();
config.useSentinelServers()
    .setMasterName("mymaster")
    .addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379")
    .addSentinelAddress("127.0.0.1:26319");
RedissonClient redisson = Redisson.create(config);
 
 
//叢集
Config config = new Config();
config.useClusterServers()
    .setScanInterval(2000) // cluster state scan interval in milliseconds
    .addNodeAddress("127.0.0.1:7000", "127.0.0.1:7001")
    .addNodeAddress("127.0.0.1:7002");
RedissonClient redisson = Redisson.create(config);

三、redission的各種分散式鎖

1)可重入鎖:

Redisson的分散式可重入鎖RLock,實現了java.util.concurrent.locks.Lock介面,以及支援自動過期解鎖。同時還提供了非同步(Async)、反射式(Reactive)和RxJava2標準的介面。

// 最常見的使用方法
RLock lock = redisson.getLock("anyLock");
lock.lock();
//...
lock.unlock();
 
//另外Redisson還通過加鎖的方法提供了leaseTime的引數來指定加鎖的時間。超過這個時間後鎖便自動解開了。
 
// 加鎖以後10秒鐘自動解鎖
// 無需呼叫unlock方法手動解鎖
lock.lock(10, TimeUnit.SECONDS);
 
// 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}

大家都知道,如果負責儲存這個分散式鎖的Redisson節點宕機以後,而且這個鎖正好處於鎖住的狀態時,這個鎖會出現鎖死的狀態。為了避免這種情況的發生,Redisson內部提供了一個監控鎖的看門狗,它的作用是在Redisson例項被關閉前,不斷的延長鎖的有效期。預設情況下,看門狗的檢查鎖的超時時間是30秒鐘,也可以通過修改Config.lockWatchdogTimeout來另行指定。

Redisson同時還為分散式鎖提供了非同步執行的相關方法:

RLock lock = redisson.getLock("anyLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);

RLock物件完全符合Java的Lock規範。也就是說只有擁有鎖的程序才能解鎖,其他程序解鎖則會丟擲IllegalMonitorStateException錯誤。

2)公平鎖(Fair Lock):

它保證了當多個Redisson客戶端執行緒同時請求加鎖時,優先分配給先發出請求的執行緒。所有請求執行緒會在一個佇列中排隊,當某個執行緒出現宕機時,Redisson會等待5秒後繼續下一個執行緒,也就是說如果前面有5個執行緒都處於等待狀態,那麼後面的執行緒會等待至少25秒。使用方式同上,獲取的時候使用如下方法:

RLock fairLock = redisson.getFairLock("anyLock");

3)聯鎖(MultiLock):

基於Redis的Redisson分散式聯鎖RedissonMultiLock物件可以將多個RLock物件關聯為一個聯鎖,每個RLock物件例項可以來自於不同的Redisson例項。

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
 
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同時加鎖:lock1 lock2 lock3
// 所有的鎖都上鎖成功才算成功。
lock.lock();
...
lock.unlock();
 
//另外Redisson還通過加鎖的方法提供了leaseTime的引數來指定加鎖的時間。超過這個時間後鎖便自動解開了。
 
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 給lock1,lock2,lock3加鎖,如果沒有手動解開的話,10秒鐘後將會自動解開
lock.lock(10, TimeUnit.SECONDS);
 
// 為加鎖等待100秒時間,並在加鎖成功10秒鐘後自動解開
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

4)紅鎖(RedLock):

基於Redis的Redisson紅鎖RedissonRedLock物件實現了Redlock介紹的加鎖演算法。該物件也可以用來將多個RLock物件關聯為一個紅鎖,每個RLock物件例項可以來自於不同的Redisson例項。

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
 
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同時加鎖:lock1 lock2 lock3
// 紅鎖在大部分節點上加鎖成功就算成功。
lock.lock();
...
lock.unlock();
 
//另外Redisson還通過加鎖的方法提供了leaseTime的引數來指定加鎖的時間。超過這個時間後鎖便自動解開了。
 
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 給lock1,lock2,lock3加鎖,如果沒有手動解開的話,10秒鐘後將會自動解開
lock.lock(10, TimeUnit.SECONDS);
 
// 為加鎖等待100秒時間,並在加鎖成功10秒鐘後自動解開
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

5)讀寫鎖(ReadWriteLock):

基於Redis的Redisson分散式可重入讀寫鎖RReadWriteLock Java物件實現了java.util.concurrent.locks.ReadWriteLock介面。其中讀鎖和寫鎖都繼承了RLock介面。分散式可重入讀寫鎖允許同時有多個讀鎖和一個寫鎖處於加鎖狀態。

RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常見的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
 
//另外Redisson還通過加鎖的方法提供了leaseTime的引數來指定加鎖的時間。超過這個時間後鎖便自動解開了。
 
// 10秒鐘以後自動解鎖
// 無需呼叫unlock方法手動解鎖
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
 
// 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

6)訊號量(Semaphore):

基於Redis的Redisson的分散式訊號量(Semaphore)Java物件RSemaphore採用了與java.util.concurrent.Semaphore相似的介面和用法。同時還提供了非同步(Async)、反射式(Reactive)和RxJava2標準的介面。

RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();

7)可過期性訊號量(PermitExpirableSemaphore):

基於Redis的Redisson可過期性訊號量(PermitExpirableSemaphore)是在RSemaphore物件的基礎上,為每個訊號增加了一個過期時間。每個訊號可以通過獨立的ID來辨識,釋放時只能通過提交這個ID才能釋放。它提供了非同步(Async)、反射式(Reactive)和RxJava2標準的介面。

RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 獲取一個訊號,有效期只有2秒鐘。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);

8)門閂:

基於Redisson的Redisson分散式閉鎖(CountDownLatch)Java物件RCountDownLatch採用了與java.util.concurrent.CountDownLatch相似的介面和用法。

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
 
// 在其他執行緒或其他JVM裡
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();

9)分散式AtomicLong:

RAtomicLong atomicLong = redisson.getAtomicLong("myAtomicLong");
atomicLong.set(3);
atomicLong.incrementAndGet();
atomicLong.get()

10)分散式BitSet:

RBitSet set = redisson.getBitSet("simpleBitset");
set.set(0, true);
set.set(1812, false);
set.clear(0);
set.addAsync("e");
set.xor("anotherBitset");

11)分散式Object:

RBucket<AnyObject> bucket = redisson.getBucket("anyObject");
bucket.set(new AnyObject(1));
AnyObject obj = bucket.get();
bucket.trySet(new AnyObject(3));
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
bucket.getAndSet(new AnyObject(6));

12)分散式Set:

RSet<SomeObject> set = redisson.getSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());

13)分散式List:

RList<SomeObject> list = redisson.getList("anyList");
list.add(new SomeObject());
list.get(0);
list.remove(new SomeObject());

14)分散式Blocking Queue:

RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
queue.offer(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);

15)分散式Map:

RMap<String, SomeObject> map = redisson.getMap("anyMap");
SomeObject prevObject = map.put("123", new SomeObject());
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
SomeObject obj = map.remove("123");
map.fastPut("321", new SomeObject());
map.fastRemove("321");
Future<SomeObject> putAsyncFuture = map.putAsync("321");
Future<Void> fastPutAsyncFuture = map.fastPutAsync("321");
map.fastPutAsync("321", new SomeObject());
map.fastRemoveAsync("321");

除此之外,還支援Multimap。

16)Map eviction:

現在Redis沒有過期清空Map中的某個entry的功能,只能是清空Map所有的entry。Redission提供了這種功能。

RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// ttl = 10 minutes, 
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES);
// ttl = 10 minutes, maxIdleTime = 10 seconds
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);
// ttl = 3 seconds
map.putIfAbsent("key2", new SomeObject(), 3, TimeUnit.SECONDS);
// ttl = 40 seconds, maxIdleTime = 10 seconds
map.putIfAbsent("key2", new SomeObject(), 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);