基於redis實現的分散式鎖
阿新 • • 發佈:2019-01-03
/** * Created by BingZhong on 2017/7/29. * * 基於Redis實現的分散式鎖 */ public final class RedisLockHelper { private static Logger logger = LoggerFactory.getLogger(RedisLockHelper.class); /** * redis操作幫助類,可以是其他封裝了redis操作的類 */ private RedisHelper redisHelper; public static final long DEFAULT_TIMEOUT = 30 * 1000; public static final long DEFAULT_SLEEP_TIME = 100; private RedisLockHelper(RedisHelper redisHelper) { this.redisHelper = redisHelper; } public static RedisLockHelper getInstance(RedisHelper redisHelper) { return new RedisLockHelper(redisHelper); } /** * 建立鎖 * * @param mutex 互斥量 * @param timeout 鎖的超時時間 * @param sleepTime 執行緒自旋嘗試獲取鎖時的休眠時間 * @param timeUnit 時間單位 */ public RedisLock newLock(String mutex, long timeout, long sleepTime, TimeUnit timeUnit) { logger.info("建立分散式鎖,互斥量為{}", mutex); return new RedisLock(mutex, timeout, sleepTime, timeUnit); } public RedisLock newLock(String mutex, long timeout, TimeUnit timeUnit) { return newLock(mutex, timeout, DEFAULT_SLEEP_TIME, timeUnit); } public RedisLock newLock(String mutex) { return newLock(mutex, DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS); } public class RedisLock { /** * 用於建立redis健值對的鍵,相當於互斥量 */ private final String mutex; /** * 鎖過期的絕對時間 */ private volatile long lockExpiresTime = 0; /** * 鎖的超時時間 */ private final long timeout; /** * 每次迴圈獲取鎖的休眠時間 */ private final long sleepTime; /** * 鎖的執行緒持有者 */ private volatile Thread lockHolder = null; private final ReentrantLock threadLock = new ReentrantLock(); public RedisLock(String mutex, long timeout, long sleepTime, TimeUnit timeUnit) { this.mutex = mutex; this.timeout = timeUnit.toMillis(timeout); this.sleepTime = timeUnit.toMillis(sleepTime); } /** * 加鎖,將會一直嘗試獲取鎖,直到超時 */ public boolean lock(long acquireTimeout, TimeUnit timeUnit) throws InterruptedException { acquireTimeout = timeUnit.toMillis(acquireTimeout); long acquireTime = acquireTimeout + System.currentTimeMillis(); threadLock.tryLock(acquireTimeout, timeUnit); try { while (true) { boolean hasLock = tryLock(); if (hasLock) { //獲取鎖成功 return true; } else if (acquireTime < System.currentTimeMillis()) { break; } Thread.sleep(sleepTime); } } finally { if (threadLock.isHeldByCurrentThread()) { threadLock.unlock(); } } return false; } /** * 嘗試獲取鎖,無論是否獲取到鎖都將直接返回而不會阻塞 * 不支援重入鎖 */ public boolean tryLock() { if (lockHolder == Thread.currentThread()) { throw new IllegalMonitorStateException("不支援重入鎖"); } long currentTime = System.currentTimeMillis(); String expires = String.valueOf(timeout + currentTime); //嘗試設定互斥量 if (redisHelper.setNx(mutex, expires) > 0) { setLockStatus(expires); return true; } else { String currentLockTime = redisHelper.get(mutex); //檢查鎖是否超時 if (Objects.nonNull(currentLockTime) && Long.parseLong(currentLockTime) < currentTime) { //獲取舊的鎖時間並設定互斥量 String oldLockTime = redisHelper.getSet(mutex, expires); //判斷獲取到的舊值是否一致,不一致證明已經有另外的程序(執行緒)成功獲取到了鎖 if (Objects.nonNull(oldLockTime) && Objects.equals(oldLockTime, currentLockTime)) { setLockStatus(expires); return true; } } return false; } } /** * 該鎖是否被鎖住 */ public boolean isLock() { String currentLockTime = redisHelper.get(mutex); //存在互斥量且鎖還為過時即鎖住 return Objects.nonNull(currentLockTime) && Long.parseLong(currentLockTime) > System.currentTimeMillis(); } public String getMutex() { return mutex; } /** * 解鎖 */ public boolean unlock() { //只有鎖的持有執行緒才能解鎖 if (lockHolder == Thread.currentThread()) { //判斷鎖是否超時,沒有超時才將互斥量刪除 if (lockExpiresTime > System.currentTimeMillis()) { redisHelper.del(mutex); logger.info("刪除互斥量[{}]", mutex); } lockHolder = null; logger.info("釋放[{}]鎖成功", mutex); return true; } else { throw new IllegalMonitorStateException("沒有獲取到鎖的執行緒無法執行解鎖操作"); } } private void setLockStatus(String expires) { lockExpiresTime = Long.parseLong(expires); lockHolder = Thread.currentThread(); logger.info("獲取[{}]鎖成功", mutex); } } }