Redis實現分散式鎖(設計模式應用實戰)
阿新 • • 發佈:2020-05-10
筆者看過網路上各種各樣使用redis實現分散式鎖的程式碼,要麼錯誤,要麼片段化,沒有一個完整的例子,借這個週末給大家總結一下redis實現分散式鎖的兩種機制
自旋鎖和排他鎖
鑑於實現鎖的方式不同,那麼這裡使用策略模式來組織程式碼
一、自旋鎖
分散式鎖抽象策略介面
package com.srr.lock; /** * @Description 分散式鎖的介面 */ abstract public interface DistributedLock { /** * 獲取鎖 */ boolean lock(); /** * 解鎖 */ void unlock(); }
自旋鎖策略抽象類,使用模板方法模式構建
package com.srr.lock; /** * 自旋鎖策略模板 */ public abstract class SpinRedisLockStrategy implements DistributedLock { private static final Integer retry = 50; //預設重試5次 private static final Long sleeptime = 100L; protected String lockKey; protected String requestId; protected int expireTime; private SpinRedisLockStrategy(){} public SpinRedisLockStrategy(String lockKey, String requestId, int expireTime){ this.lockKey=lockKey; this.requestId=requestId; this.expireTime=expireTime; } /** * 模板方法,搭建的獲取鎖的框架,具體邏輯交於子類實現 */ @Override public boolean lock() { Boolean flag = false; try { for (int i=0;i<retry;i++){ flag = tryLock(); if(flag){ System.out.println(Thread.currentThread().getName()+"獲取鎖成功"); break; } Thread.sleep(sleeptime); } }catch (Exception e){ e.printStackTrace(); } return flag; } /** * 嘗試獲取鎖,子類實現 */ protected abstract boolean tryLock() ; /** * 解鎖:刪除key */ @Override public abstract void unlock(); }
自旋鎖實現子類
package com.srr.lock; import redis.clients.jedis.Jedis; import java.util.Collections; /** * 自旋鎖 */ public class SpinRedisLock extends SpinRedisLockStrategy{ private static final Long RELEASE_SUCCESS = 1L; private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; public SpinRedisLock(String lockKey, String requestId, int expireTime) { super(lockKey,requestId, expireTime); } @Override protected boolean tryLock() { Jedis jedis = new Jedis("localhost", 6379); //建立客戶端,1p和埠號 String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } @Override public void unlock() { Jedis jedis = new Jedis("localhost", 6379); //建立客戶端,1p和埠號 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { System.out.println("lock is unlock"); } } }
至此,自旋鎖方式實現分散式鎖就完成了,下面來看排他鎖阻塞的方式實現
二、排他鎖
在實現之前需要大家搞懂一個概念,也就是redis的事件通知:
/**
* 鍵空間通知,所有通知以 keyspace@ 為字首
* 鍵事件通知,所有通知以 keyevent@ 為字首
* 所有命令都只在鍵真的被改動了之後,才會產生通知,比如刪除foo會產生
* 鍵空間通知
* “pmessage”,"__ key*__ : * “,”__ keyspace@0__:foo",“set”
* 和鍵事件通知
* “pmessage”,"__ key*__ : *","__ keyevent@0__:set",“foo”
*/
搞懂概念之後,需要在redis的配置檔案redis.conf中將其 notify-keyspace-events "KEA",預設為notify-keyspace-events "",這樣才能啟動redis的事件監聽機制。
排它鎖策略抽象類
package com.srr.lock; import redis.clients.jedis.Jedis; /** * @Description 阻塞獲取鎖,模板類 */ public abstract class BlockingRedisLockStrategy implements DistributedLock { protected String lockKey; protected String requestId; protected int expireTime; private BlockingRedisLockStrategy(){} public BlockingRedisLockStrategy(String lockKey, String requestId,int expireTime){ this.lockKey=lockKey; this.requestId=requestId; this.expireTime=expireTime; } /** * 模板方法,搭建的獲取鎖的框架,具體邏輯交於子類實現 * @throws Exception */ @Override public final boolean lock() { //獲取鎖成功 if (tryLock()){ System.out.println(Thread.currentThread().getName()+"獲取鎖成功"); return true; }else{ //獲取鎖失敗 //阻塞一直等待 waitLock(); //遞迴,再次獲取鎖 return lock(); } } /** * 嘗試獲取鎖,子類實現 */ protected abstract boolean tryLock() ; /** * 等待獲取鎖,子類實現 */ protected abstract void waitLock(); /** * 解鎖:刪除key */ @Override public abstract void unlock(); }
排他鎖實現子類
package com.srr.lock; import redis.clients.jedis.Jedis; import java.util.Collections; /** * 排他鎖,阻塞 */ public class BlockingRedisLock extends BlockingRedisLockStrategy { private static final Long RELEASE_SUCCESS = 1L; private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; public BlockingRedisLock(String lockKey, String requestId, int expireTime) { super(lockKey,requestId, expireTime); } /** * 嘗試獲取分散式鎖 * @return 是否獲取成功 */ @Override public boolean tryLock() { Jedis jedis = new Jedis("localhost", 6379); //建立客戶端,1p和埠號 String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } @Override public void waitLock() { //判斷key是否存在 Jedis jedis = new Jedis("localhost", 6379); //建立客戶端,1p和埠號 KeyExpiredListener keyExpiredListener = new KeyExpiredListener(); /** * 鍵空間通知,所有通知以 keyspace@ 為字首 * 鍵事件通知,所有通知以 keyevent@ 為字首 * 所有命令都只在鍵真的被改動了之後,才會產生通知,比如刪除foo會產生 * 鍵空間通知 * “pmessage”,"__ key*__ : * “,”__ keyspace@0__:foo",“set” * 和鍵事件通知 * “pmessage”,"__ key*__ : *","__ keyevent@0__:set",“foo” */ //如果要監聽某個key的執行了什麼操作,就訂閱__ keyspace@0__,監聽某種操作動了哪些key,就訂閱__ keyevent@0__ //這裡我們需要監聽分散式鎖的鍵被刪除了,所以要監聽刪除動作"__keyspace@0__:"+key jedis.psubscribe(keyExpiredListener, "__keyspace@0__:"+lockKey); System.out.println("over"); } /** * 釋放分散式鎖 * @return 是否釋放成功 */ @Override public void unlock() { Jedis jedis = new Jedis("localhost", 6379); //建立客戶端,1p和埠號 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { System.out.println("lock is unlock"); } } }
redis事件監聽類
package com.srr.lock; import redis.clients.jedis.JedisPubSub; /** * redis 事件監聽器 */ public class KeyDelListener extends JedisPubSub { public KeyDelListener(){ } // 初始化訂閱時候的處理 @Override public void onPSubscribe(String pattern, int subscribedChannels) { } // 取得訂閱的訊息後的處理 @Override public void onPMessage(String pattern, String channel, String message) { System.out.println("message == "+message); this.punsubscribe(); System.out.println("unsubscribe == "+message); } }
到這裡排他鎖的完整程式碼就寫完了,其實對比一下,兩者的區別在於lock的實現方式不同,筆者為了確保程式碼完整性就全部貼上了。
程式碼寫完了那麼給一個場景測試一下我們的程式碼有沒有問題,請看下面的測試程式碼:
這裡我們構建一個Lock工具類:
package com.srr.lock; /** * 鎖工具類 */ public class Lock { /** * 獲取鎖 */ boolean lock(DistributedLock lock) { return lock.lock(); }; /** * 釋放鎖 */ void unlock(DistributedLock lock) { lock.unlock(); }; }
測試類:
package com.srr.lock; import redis.clients.jedis.Jedis; /** * 測試場景 * count從1加到101 * 使用redis分散式鎖在分散式環境下保證結果正確 */ public class T { volatile int count = 1; public void inc(){ for(int i = 0;i<100;i++){ try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } count++; System.out.println("count == "+count); } } public int getCount(){ return count; } public static void main(String[] args) { final T t = new T(); final Lock lock = new Lock(); //final RedisLock redisLock = new BlockingRedisLock("","1",100000,jedis); final DistributedLock distributedLock = new SpinRedisLock("test","1",100000); Thread t1 = new Thread(new Runnable() { @Override public void run() { if(lock.lock(distributedLock)){ t.inc(); System.out.println("t1 running"); System.out.println("t1 == count == "+ t.getCount()); lock.unlock(distributedLock); } } }); Thread t2 = new Thread(new Runnable() { @Override public void run() { if(lock.lock(distributedLock)) { t.inc(); System.out.println("t2 running"); System.out.println("t2 == count == " + t.getCount()); lock.unlock(distributedLock); } } }); t1.start(); t2.start(); } }
測試結果:
到這裡,全部程式碼就完成了,如果想使用zookeeper實現分散式鎖只需要抽象出一個策略類實現DistributedLock介面即可。是不是很方便呢。
原創不易,多多關注!
&n