基於redis的分散式鎖實現
隨著業務越來越複雜,應用服務都會朝著分散式、叢集方向部署,而分散式CAP原則告訴我們,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分割槽容錯性),三者不可得兼。
很多場景中,需要使用分散式事務、分散式鎖等技術來保證資料最終一致性。有的時候,我們需要保證某一方法同一時刻只能被一個執行緒執行。
在單機(單程序)環境中,JAVA提供了很多併發相關API,但在多機(多程序)環境中就無能為力了。
對於分散式鎖,最好能夠滿足以下幾點
可以保證在分散式部署的應用叢集中,同一個方法在同一時間只能被一臺機器上的一個執行緒執行
這把鎖要是一把可重入鎖(避免死鎖)
這把鎖最好是一把阻塞鎖
有高可用的獲取鎖和釋放鎖功能
獲取鎖和釋放鎖的效能要好
針對分散式鎖,目前有以下幾種實現方案
基於資料庫實現分散式鎖
基於快取實現分散式鎖
基於zookeeper實現分散式鎖
分散式同步鎖實現
實現思路
鎖的實現主要基於redis的SETNX
命令(SETNX詳細解釋參考這裡),我們來看SETNX
的解釋
SETNX key value
將 key 的值設為 value ,當且僅當 key 不存在。
若給定的 key 已經存在,則 SETNX 不做任何動作。
SETNX 是『SET if Not eXists』(如果不存在,則 SET)的簡寫。返回值:
設定成功,返回 1 。
設定失敗,返回 0 。
使用SETNX
- 使用
SETNX
命令獲取鎖,若返回0(key已存在,鎖已存在)則獲取失敗,反之獲取成功 - 為了防止獲取鎖後程序出現異常,導致其他執行緒/程序呼叫
SETNX
命令總是返回0而進入死鎖狀態,需要為該key設定一個“合理”的過期時間 - 釋放鎖,使用
DEL
命令將鎖資料刪除
實現過程
建立同步鎖實現類
/** * 同步鎖 * * @property key Redis key * @property stringRedisTemplate RedisTemplate * @property expire Redis TTL/秒 * @property safetyTime 安全時間/秒 */ class SyncLock( private val key: String, private val stringRedisTemplate: StringRedisTemplate, private val expire: Long, private val safetyTime: Long )
key
reids中的key,對應java api synchronized的物件
expire
reids中key的過期時間
safetyTime
下文介紹其作用
實現鎖的獲取功能
private val value: String get() = Thread.currentThread().name
/**
* 嘗試獲取鎖(立即返回)
*
* @return 是否獲取成功
*/
fun tryLock(): Boolean {
val locked = stringRedisTemplate.opsForValue().setIfAbsent(key, value) ?: false
if (locked) {
stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS)
}
return locked
}
這裡使用setIfAbsent
函式(對應SETNX
命令)嘗試設定key的值為value(當前執行緒id+執行緒名),若成功則同時設定key的過期時間並返回true,否則返回false
實現帶超時時間的鎖獲取功能
private val waitMillisPer: Long = 10
/**
* 嘗試獲取鎖,並至多等待timeout時長
*
* @param timeout 超時時長
* @param unit 時間單位
*
* @return 是否獲取成功
*/
fun tryLock(timeout: Long, unit: TimeUnit): Boolean {
val waitMax = unit.toMillis(timeout)
var waitAlready: Long = 0
while (stringRedisTemplate.opsForValue().setIfAbsent(key, value) != true && waitAlready < waitMax) {
Thread.sleep(waitMillisPer)
waitAlready += waitMillisPer
}
if (waitAlready < waitMax) {
stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS)
return true
}
return false
}
這裡使用while迴圈不斷嘗試鎖的獲取,並至多嘗試timeout時長,在timeout時間內若成功則同時設定key的過期時間並返回true,否則返回false
其實以上兩種tryLock
函式還是有一種可能便是,在呼叫setIfAbsent
後、呼叫expire
之前若服務出現異常,也將導致該鎖(key)無法釋放(過期或刪除),使得其他執行緒/程序再無法獲取鎖而進入死迴圈,為了避免此問題的產生,我們引入了safetyTime
該引數的作用為,從獲取鎖開始直到safetyTime時長,若仍未獲取成功則認為某一執行緒/程序出現異常導致資料不正確,此時強制獲取,其實現如下
實現帶保護功能的鎖獲取功能
/**
* 獲取鎖
*/
fun lock() {
val waitMax = TimeUnit.SECONDS.toMillis(safetyTime)
var waitAlready: Long = 0
while (stringRedisTemplate.opsForValue().setIfAbsent(key, value) != true && waitAlready < waitMax) {
Thread.sleep(waitMillisPer)
waitAlready += waitMillisPer
}
// stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS)
stringRedisTemplate.opsForValue().set(key, value, expire, TimeUnit.SECONDS)
}
這裡同樣使用while迴圈不斷嘗試鎖的獲取,但至多等待safetyTime時長,最終不論是否成功,均使用SETEX
命令將key設定為當前先執行緒對應的value,並同時設定該key的過期時間
實現鎖的釋放功能
/**
* 釋放鎖
*/
fun unLock() {
stringRedisTemplate.opsForValue()[key]?.let {
if (it == value) {
stringRedisTemplate.delete(key)
}
}
}
鎖的釋放使用DEL
命令刪除key,但需要注意的是,釋放鎖時只能釋放本執行緒持有的鎖
若expire設定不合理,如expire設定為10秒,結果在獲取鎖後執行緒運行了20秒,該鎖有可能已經被其他執行緒強制獲取,即該key代表的鎖已經不是當前執行緒所持有的鎖,此時便不能冒然刪除該key,而只能釋放本執行緒持有的鎖。
整合spring boot
為了更好的與spring整合,我們建立一個工廠類來輔助建立同步鎖例項
/**
* SyncLock同步鎖工廠類
*/
@Component
class SyncLockFactory {
@Autowired
private lateinit var stringRedisTemplate: StringRedisTemplate
private val syncLockMap = mutableMapOf<String, SyncLock>()
/**
* 建立SyncLock
*
* @param key Redis key
* @param expire Redis TTL/秒,預設10秒
* @param safetyTime 安全時間/秒,為了防止程式異常導致死鎖,在此時間後強制拿鎖,預設 expire * 5 秒
*/
@Synchronized
fun build(key: String, expire: Long = 10 /* seconds */, safetyTime: Long = expire * 5/* seconds */): SyncLock {
if (!syncLockMap.containsKey(key)) {
syncLockMap[key] = SyncLock(key, stringRedisTemplate, expire, safetyTime)
}
return syncLockMap[key]!!
}
}
在spring框架下可以更方便的使用
@Component
class SomeLogic: InitializingBean {
@Autowired
lateinit var syncLockFactory: SyncLockFactory
lateinit var syncLock
override fun afterPropertiesSet() {
syncLock = syncLockFactory.build("lock:some:name", 10)
}
fun someFun() {
syncLock.lock()
try {
// some logic
} finally {
syncLock.unlock()
}
}
}
註解的實現
藉助spring aop框架,我們可以將SyncLock的使用進一步簡化
建立註解類
/**
* 同步鎖註解
*
* @property key Redis key
* @property expire Redis TTL/秒,預設10秒
*/
@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class SyncLockable(
val key: String,
val expire: Long = 10
)
實現AOP
/**
* 同步鎖註解處理
*/
@Aspect
@Component
class SyncLockHandle {
@Autowired
private lateinit var syncLockFactory: SyncLockFactory
/**
* 在方法上執行同步鎖
*/
@Around("@annotation(syncLockable)")
fun syncLock(jp: ProceedingJoinPoint, syncLockable: SyncLockable): Any? {
val lock = syncLockFactory.build(syncLockable.key, syncLockable.expire)
try {
lock.lock()
return jp.proceed()
} finally {
lock.unLock()
}
}
}
如此一來,我們便可以按照如下方式使用SyncLock
@Component
class SomeLogic {
@SyncLockable("lock:some:name", 10)
fun someFun() {
// some logic
}
}
是不是