加鎖了還有併發問題?Redis分散式鎖,真的用對了?
新接手的專案,偶爾會出現賬不平的問題。之前的技術老大臨走時給的解釋是:排查了,沒找到原因,之後太忙就沒再解決,可能是框架的原因……
既然專案交付到手中,這樣的問題是必須要解決的。梳理了所有賬務處理邏輯,最終找到了原因:資料庫併發操作熱點賬戶導致。就這這個問題,來聊一聊分散式系統下基於Redis的分散式鎖。順便也分解一下問題形成原因及解決方案。
原因分析
系統併發量並不高,存在熱點賬戶,但也不至於那麼嚴重。問題的根源在於系統架構設計,人為的製造了併發。場景是這樣的:商戶批量匯入一批資料,系統會進行前置處理,並對賬戶餘額進行增減。
此時,另外一個定時任務,也會對賬戶進行掃描更新。而且對同一賬戶的操作分佈到各個系統當中,熱點賬戶也就出現了。
針對此問題的解決方案,從架構層面可以考慮將賬務系統進行抽離,集中在一個系統中進行處理,所有的資料庫事務及執行順序由賬務系統來統籌處理。從技術方面來講,則可以通過鎖機制來對熱點賬戶進行加鎖。
本篇文章就針對熱點賬戶基於分散式鎖的實現方式進行詳細的講解。
鎖的分析
在Java的多執行緒環境下,通常有幾類鎖可以使用:
JVM記憶體模型級別的鎖,常用的有:synchronized、Lock等;資料庫鎖,比如樂觀鎖,悲觀鎖等;分散式鎖;
JVM記憶體級別的鎖,可以保證單體服務下執行緒的安全性,比如多個執行緒訪問/修改一個全域性變數。但當系統進行叢集部署時,JVM級別的本地鎖就無能為力了。
悲觀鎖與樂觀鎖
像上述案例中,熱點賬戶就屬於分散式系統中的共享資源,我們通常會採用資料庫鎖或分散式鎖來進行解決。
資料庫鎖,又分為樂觀鎖和悲觀鎖。
悲觀鎖是基於資料庫(Mysql的InnoDB)提供的排他鎖來實現的。在進行事務操作時,通過select ... for update語句,MySQL會對查詢結果集中每行資料都新增排他鎖,其他執行緒對該記錄的更新與刪除操作都會阻塞。從而達到共享資源的順序執行(修改);
樂觀鎖是相對悲觀鎖而言的,樂觀鎖假設資料一般情況不會造成衝突,所以在資料進行提交更新的時候,才會正式對資料的衝突與否進行檢測。如果衝突則返回給使用者異常資訊,讓使用者決定如何去做。樂觀鎖適用於讀多寫少的場景,這樣可以提高程式的吞吐量。在樂觀鎖實現時通常會基於記錄狀態或新增version版本來進行實現。
悲觀鎖失效場景
專案中使用了悲觀鎖,但悲觀鎖卻失效了。這也是使用悲觀鎖時,常見的誤區,下面來分析一下。
正常使用悲觀鎖的流程:
通過select ... for update鎖定記錄;計算新餘額,修改金額並存儲;執行完成釋放鎖;
經常犯錯的處理流程:
查詢賬戶餘額,計算新餘額;通過select ... for update鎖定記錄;修改金額並存儲;執行完成釋放鎖;
錯誤的流程中,比如A和B服務查詢到的餘額都是100,A扣減50,B扣減40,然後A鎖定記錄,更新資料庫為50;A釋放鎖之後,B鎖定記錄,更新資料庫為60。顯然,後者把前者的更新給覆蓋掉了。解決的方案就是擴大鎖的範圍,將鎖提前到計算新餘額之前。
通常悲觀鎖對資料庫的壓力是非常大的,在實踐中通常會根據場景使用樂觀鎖或分散式鎖等方式來實現。
下面進入正題,講講基於Redis的分散式鎖實現。
Redis分散式鎖實戰演習
這裡以Spring Boot、Redis、Lua指令碼為例來演示分散式鎖的實現。為了簡化處理,示例中Redis既承擔了分散式鎖的功能,也承擔了資料庫的功能。
場景構建
叢集環境下,對同一個賬戶的金額進行操作,基本步驟:
從資料庫讀取使用者金額;程式修改金額;再將最新金額儲存到資料庫;
下面從最初不加鎖,不同步處理,逐步推演出最終的分散式鎖。
基礎整合及類構建
準備一個不加鎖處理的基礎業務環境。
首先在Spring Boot專案中引入相關依賴:
org.springframework.boot
spring-boot-starter-data-redis
org.springframework.boot
spring-boot-starter-web
賬戶對應實體類UserAccount:
public class UserAccount {
//使用者ID
private String userId;
//賬戶內金額
private int amount;
//新增賬戶金額
public void addAmount(int amount) {
this.amount=this.amount + amount;
}
// 省略構造方法和getter/setter
}
建立一個執行緒實現類AccountOperationThread:
public class AccountOperationThread implements Runnable {
private final static Logger logger=LoggerFactory.getLogger(AccountOperationThread.class);
private static final Long RELEASE_SUCCESS=1L;
private String userId;
private RedisTemplate<Object, Object> redisTemplate;
public AccountOperationThread(String userId, RedisTemplate<Object, Object> redisTemplate) {
this.userId=userId;
thisisTemplate=redisTemplate;
}
@Override
public void run() {
noLock();
}
/**
* 不加鎖
*/
private void noLock() {
try {
Random random=new Random();
// 模擬執行緒進行業務處理
TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//模擬資料庫中獲取使用者賬號
UserAccount userAccount=(UserAccount) redisTemplate.opsForValue().get(userId);
// 金額+1
userAccount.addAmount(1);
logger(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
//模擬存回資料庫
redisTemplate.opsForValue().set(userId, userAccount);
}
}
其中RedisTemplate的例項化交給了Spring Boot:
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate=new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializer
ObjectMapper objectMapper=new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 設定value的序列化規則和 key的序列化規則
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
最後,再準備一個TestController來進行觸發多執行緒的執行:
@RestController
public class TestController {
private final static Logger logger=LoggerFactory.getLogger(TestController.class);
private static ExecutorService executorService=Executors.newFixedThreadPool(10);
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
@GetMapping("/test")
public String test() throws InterruptedException {
// 初始化使用者user_001到Redis,賬戶金額為0
redisTemplate.opsForValue().set("user_001", new UserAccount("user_001", 0));
// 開啟10個執行緒進行同步測試,每個執行緒為賬戶增加1元
for (int i=0; i < 10; i++) {
logger("建立執行緒i=" + i);
executorService.execute(new AccountOperationThread("user_001", redisTemplate));
}
// 主執行緒休眠1秒等待執行緒跑完
TimeUnit.MILLISECONDS.sleep(1000);
// 查詢Redis中的user_001賬戶
UserAccount userAccount=(UserAccount) redisTemplate.opsForValue().get("user_001");
logger("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount());
return "success";
}
}
執行上述程式,正常來說10個執行緒,每個執行緒加1,結果應該是10。但多執行幾次,會發現,結果變化很大,基本上都要比10小。
[pool-1-thread-5] c.sis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 1
[pool-1-thread-4] c.sis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 1
[pool-1-thread-3] c.sis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 1
[pool-1-thread-1] c.sis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 1
[pool-1-thread-1] c.sis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 2
[pool-1-thread-2] c.sis.thread.AccountOperationThread : pool-1-thread-2 : user id : user_001 amount : 2
[pool-1-thread-5] c.sis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 2
[pool-1-thread-4] c.sis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 3
[pool-1-thread-1] c.sis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 4
[pool-1-thread-3] c.sis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 5
[nio-8080-exec-1] c.sis.controller.TestController : user id : user_001 amount : 5
以上述日誌為例,前四個執行緒都將值改為1,也就是後面三個執行緒都將前面的修改進行了覆蓋,導致最終結果不是10,只有5。這顯然是有問題的。
Redis同步鎖實現
針對上面的情況,在同一個JVM當中,我們可以通過執行緒加鎖來完成。但在分散式環境下,JVM級別的鎖是沒辦法實現的,這裡可以採用Redis同步鎖實現。
基本思路:第一個執行緒進入時,在Redis中進記錄,當後續執行緒過來請求時,判斷Redis是否存在該記錄,如果存在則說明處於鎖定狀態,進行等待或返回。如果不存在,則進行後續業務處理。
/**
* 1.搶佔資源時判斷是否被鎖。
* 2.如未鎖則搶佔成功且加鎖,否則等待鎖釋放。
* 3.業務完成後釋放鎖,讓給其它執行緒。
*
* 該方案並未解決同步問題,原因:執行緒獲得鎖和加鎖的過程,並非原子性操作,可能會導致執行緒A獲得鎖,還未加鎖時,執行緒B也獲得了鎖。
*/
private void redisLock() {
Random random=new Random();
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1);
} catch (InterruptedException e) {
e.printStackTrace();
}
while (true) {
Object lock=redisTemplate.opsForValue().get(userId + ":syn");
if (lock==null) {
// 獲得鎖 -> 加鎖 -> 跳出迴圈
logger(Thread.currentThread().getName() + ":獲得鎖");
redisTemplate.opsForValue().set(userId + ":syn", "lock");
break;
}
try {
// 等待500毫秒重試獲得鎖
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
//模擬資料庫中獲取使用者賬號
UserAccount userAccount=(UserAccount) redisTemplate.opsForValue().get(userId);
if (userAccount !=null) {
//設定金額
userAccount.addAmount(1);
logger(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
//模擬存回資料庫
redisTemplate.opsForValue().set(userId, userAccount);
}
} finally {
//釋放鎖
redisTemplate.delete(userId + ":syn");
logger(Thread.currentThread().getName() + ":釋放鎖");
}
}
在while程式碼塊中,先判斷對應使用者ID是否在Redis中存在,如果不存在,則進行set加鎖,如果存在,則跳出迴圈繼續等待。
上述程式碼,看起來實現了加鎖的功能,但當執行程式時,會發現與未加鎖一樣,依舊存在併發問題。原因是:獲取鎖和加鎖的操作並不是原子的。比如兩個執行緒發現lock都是null,都進行了加鎖,此時併發問題依舊存在。
Redis原子性同步鎖
針對上述問題,可將獲取鎖和加鎖的過程原子化處理。基於spring-boot-data-redis提供的原子化API可以實現:
// 該方法使用了redis的指令:SETNX key value
// 1.key不存在,設定成功返回value,setIfAbsent返回true;
// 2.key存在,則設定失敗返回null,setIfAbsent返回false;
// 3.原子性操作;
Boolean setIfAbsent(K var1, V var2);
上述方法的原子化操作是對Redis的setnx命令的封裝,在Redis中setnx的使用如下例項:
redis> SETNX mykey "Hello"
(integer) 1
redis> SETNX mykey "World"
(integer) 0
redis> GET mykey
"Hello"
第一次,設定mykey時,並不存在,則返回1,表示設定成功;第二次設定mykey時,已經存在,則返回0,表示設定失敗。再次查詢mykey對應的值,會發現依舊是第一次設定的值。也就是說redis的setnx保證了唯一的key只能被一個服務設定成功。
理解了上述API及底層原理,來看看執行緒中的實現方法程式碼如下:
/**
* 1.原子操作加鎖
* 2.競爭執行緒迴圈重試獲得鎖
* 3.業務完成釋放鎖
*/
private void atomicityRedisLock() {
//Spring data redis 支援的原子性操作
while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) {
try {
// 等待100毫秒重試獲得鎖
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
logger(Thread.currentThread().getName() + ":獲得鎖");
try {
//模擬資料庫中獲取使用者賬號
UserAccount userAccount=(UserAccount) redisTemplate.opsForValue().get(userId);
if (userAccount !=null) {
//設定金額
userAccount.addAmount(1);
logger(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());
//模擬存回資料庫
redisTemplate.opsForValue().set(userId, userAccount);
}
} finally {
//釋放鎖
redisTemplate.delete(userId + ":syn");
logger(Thread.currentThread().getName() + ":釋放鎖");
}
}
再次執行程式碼,會發現結果正確了,也就是說可以成功的對分散式執行緒進行了加鎖。
Redis分散式鎖的死鎖
雖然上述程式碼執行結果沒問題,但如果應用異常宕機,沒來得及執行finally中釋放鎖的方法,那麼其他執行緒則永遠無法獲得這個鎖。
此時可採用setIfAbsent的過載方法:
Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);
基於該方法,可以設定鎖的過期時間。這樣即便獲得鎖的執行緒宕機,在Redis中資料過期之後,其他執行緒可正常獲得該鎖。
示例程式碼如下:
private void atomicityAndExRedisLock() {
try {
//Spring data redis 支援的原子性操作,並設定5秒過期時間
while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn",
System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) {
// 等待100毫秒重試獲得鎖
logger(Thread.currentThread().getName() + ":嘗試迴圈獲取鎖");
TimeUnit.MILLISECONDS.sleep(1000);
}
logger(Thread.currentThread().getName() + ":獲得鎖--------");
// 應用在這裡宕機,程序退出,無法執行 finally;
Thread.currentThread().interrupt();
// 業務邏輯...
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//釋放鎖
if (!Thread.currentThread().isInterrupted()) {
redisTemplate.delete(userId + ":syn");
logger(Thread.currentThread().getName() + ":釋放鎖");
}
}
}
業務超時及守護執行緒
上面添加了Redis所的超時時間,看似解決了問題,但又引入了新的問題。
比如,正常情況下執行緒A在5秒內可正常處理完業務,但偶發會出現超過5秒的情況。如果將超時時間設定為5秒,執行緒A獲得了鎖,但業務邏輯處理需要6秒。此時,執行緒A還在正常業務邏輯,執行緒B已經獲得了鎖。當執行緒A處理完時,有可能將執行緒B的鎖給釋放掉。
在上述場景中有兩個問題點:
第一,執行緒A和執行緒B可能會同時在執行,存在併發問題。第二,執行緒A可能會把執行緒B的鎖給釋放掉,導致一系列的惡性迴圈。
當然,可以通過在Redis中設定value值來判斷鎖是屬於執行緒A還是執行緒B。但仔細分析會發現,這個問題的本質是因為執行緒A執行業務邏輯耗時超出了鎖超時的時間。
那麼就有兩個解決方案了:
第一,將超時時間設定的足夠長,確保業務程式碼能夠在鎖釋放之前執行完成;第二,為鎖新增守護執行緒,為將要過期釋放但未釋放的鎖增加時間;
第一種方式需要全行大多數情況下業務邏輯的耗時,進行超時時間的設定。
第二種方式,可通過如下守護執行緒的方式來動態增加鎖超時時間。
public class DaemonThread implements Runnable {
private final static Logger logger=LoggerFactory.getLogger(DaemonThread.class);
// 是否需要守護 主執行緒關閉則結束守護執行緒
private volatile boolean daemon=true;
// 守護鎖
private String lockKey;
private RedisTemplate<Object, Object> redisTemplate;
public DaemonThread(String lockKey, RedisTemplate<Object, Object> redisTemplate) {
this.lockKey=lockKey;
thisisTemplate=redisTemplate;
}
@Override
public void run() {
try {
while (daemon) {
long time=redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS);
// 剩餘有效期小於1秒則續命
if (time < 1000) {
logger("守護程序: " + Thread.currentThread().getName() + " 延長鎖時間 5000 毫秒");
redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS);
}
TimeUnit.MILLISECONDS.sleep(300);
}
logger(" 守護程序: " + Thread.currentThread().getName() + "關閉 ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 主執行緒主動呼叫結束
public void stop() {
daemon=false;
}
}
上述執行緒每隔300毫秒獲取一下Redis中鎖的超時時間,如果小於1秒,則延長5秒。當主執行緒呼叫關閉時,守護執行緒也隨之關閉。
主執行緒中相關程式碼實現:
private void deamonRedisLock() {
//守護執行緒
DaemonThread daemonThread=null;
//Spring data redis 支援的原子性操作,並設定5秒過期時間
String uuid=UUID.randomUUID().toString();
String value=Thread.currentThread().getId() + ":" + uuid;
try {
while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) {
// 等待100毫秒重試獲得鎖
logger(Thread.currentThread().getName() + ":嘗試迴圈獲取鎖");
TimeUnit.MILLISECONDS.sleep(1000);
}
logger(Thread.currentThread().getName() + ":獲得鎖----");
// 開啟守護執行緒
daemonThread=new DaemonThread(userId + ":syn", redisTemplate);
Thread thread=new Thread(daemonThread);
thread.start();
// 業務邏輯執行10秒...
TimeUnit.MILLISECONDS.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//釋放鎖 這裡也需要原子操作,今後通過 Redis + Lua 講
String result=(String) redisTemplate.opsForValue().get(userId + ":syn");
if (value.equals(result)) {
redisTemplate.delete(userId + ":syn");
logger(Thread.currentThread().getName() + ":釋放鎖-----");
}
//關閉守護執行緒
if (daemonThread !=null) {
daemonThread.stop();
}
}
}
其中在獲得鎖之後,開啟守護執行緒,在finally中將守護執行緒關閉。
基於Lua指令碼的實現
在上述邏輯中,我們是基於spring-boot-data-redis提供的原子化操作來保證鎖判斷和執行的原子化的。在非Spring Boot專案中,則可以基於Lua指令碼來實現。
首先定義加鎖和解鎖的Lua指令碼及對應的DefaultRedisScript物件,在RedisConfig配置類中新增如下例項化程式碼:
@Configuration
public class RedisConfig {
//lock script
private static final String LOCK_SCRIPT=" if redis.call('setnx',KEYS[1],ARGV[1])==1 " +
" then redis.call('expire',KEYS[1],ARGV[2]) " +
" return 1 " +
" else return 0 end ";
private static final String UNLOCK_SCRIPT="if redis.call('get', KEYS[1])==ARGV[1] then return redis.call" +
"('del', KEYS[1]) else return 0 end";
// ... 省略部分程式碼
@Bean
public DefaultRedisScript lockRedisScript() {
DefaultRedisScript defaultRedisScript=new DefaultRedisScript<>();
defaultRedisScript.setResultType(Boolean.class);
defaultRedisScript.setScriptText(LOCK_SCRIPT);
return defaultRedisScript;
}
@Bean
public DefaultRedisScript unlockRedisScript() {
DefaultRedisScript defaultRedisScript=new DefaultRedisScript<>();
defaultRedisScript.setResultType(Long.class);
defaultRedisScript.setScriptText(UNLOCK_SCRIPT);
return defaultRedisScript;
}
}
再通過在AccountOperationThread類中新建構造方法,將上述兩個物件傳入類中(省略此部分演示)。然後,就可以基於RedisTemplate來呼叫了,改造之後的程式碼實現如下:
private void deamonRedisLockWithLua() {
//守護執行緒
DaemonThread daemonThread=null;
//Spring data redis 支援的原子性操作,並設定5秒過期時間
String uuid=UUID.randomUUID().toString();
String value=Thread.currentThread().getId() + ":" + uuid;
try {
while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + ":syn"), value, 5)) {
// 等待1000毫秒重試獲得鎖
logger(Thread.currentThread().getName() + ":嘗試迴圈獲取鎖");
TimeUnit.MILLISECONDS.sleep(1000);
}
logger(Thread.currentThread().getName() + ":獲得鎖----");
// 開啟守護執行緒
daemonThread=new DaemonThread(userId + ":syn", redisTemplate);
Thread thread=new Thread(daemonThread);
thread.start();
// 業務邏輯執行10秒...
TimeUnit.MILLISECONDS.sleep(10000);
} catch (InterruptedException e) {
logger.error("異常", e);
} finally {
//使用Lua指令碼:先判斷是否是自己設定的鎖,再執行刪除
// key存在,當前值=期望值時,刪除key;key存在,當前值!=期望值時,返回0;
Long result=redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId + ":syn"), value);
logger("redis解鎖:{}", RELEASE_SUCCESS.equals(result));
if (RELEASE_SUCCESS.equals(result)) {
if (daemonThread !=null) {
//關閉守護執行緒
daemonThread.stop();
logger(Thread.currentThread().getName() + ":釋放鎖---");
}
}
}
}
其中while迴圈中加鎖和finally中的釋放鎖都是基於Lua指令碼來實現了。
Redis鎖的其他因素
除了上述例項,在使用Redis分散式鎖時,還可以考慮以下情況及方案。
Redis鎖的不可重入
當執行緒在持有鎖的情況下再次請求加鎖,如果一個鎖支援一個執行緒多次加鎖,那麼這個鎖就是可重入的。如果一個不可重入鎖被再次加鎖,由於該鎖已經被持有,再次加鎖會失敗。Redis可通過對鎖進行重入計數,加鎖時加 1,解鎖時減 1,當計數歸 0時釋放鎖。
可重入鎖雖然高效但會增加程式碼的複雜性,這裡就不舉例說明了。
等待鎖釋放
有的業務場景,發現被鎖則直接返回。但有的場景下,客戶端需要等待鎖釋放然後去搶鎖。上述示例就屬於後者。針對等待鎖釋放也有兩種方案:
客戶端輪訓:當未獲得鎖時,等待一段時間再重新獲取,直到成功。上述示例就是基於這種方式實現的。這種方式的缺點也很明顯,比較耗費伺服器資源,當併發量大時會影響伺服器的效率。使用Redis的訂閱釋出功能:當獲取鎖失敗時,訂閱鎖釋放訊息,獲取鎖成功後釋放時,傳送釋放訊息。叢集中的主備切換和腦裂
在Redis包含主從同步的叢集部署方式中,如果主節點掛掉,從節點提升為主節點。如果客戶端A在主節點加鎖成功,指令還未同步到從節點,此時主節點掛掉,從節點升為主節點,新的主節點中沒有鎖的資料。這種情況下,客戶端B就可能加鎖成功,從而出現併發的場景。
當叢集發生腦裂時,Redis master節點跟slave 節點和 sentinel 叢集處於不同的網路分割槽。sentinel叢集無法感知到master的存在,會將 slave 節點提升為 master 節點,此時就會存在兩個不同的 master 節點。從而也會導致併發問題的出現。Redis Cluster叢集部署方式同理。
小結
通過生產環境中的一個問題,排查原因,尋找解決方案,到最終對基於Redis分散式的深入研究,這便是學習的過程。
同時,每當面試或被問題如何解決分散式共享資源時,我們會脫口而出”基於Redis實現分散式鎖“,但通過本文的學習會發現,Redis分散式鎖並不是萬能的,而且在使用的過程中還需要注意超時、死鎖、誤解鎖、叢集選主/腦裂等問題。
Redis以高效能著稱,但在實現分散式鎖的過程中還是存在一些問題。因此,基於Redis的分散式鎖可以極大的緩解併發問題,但要完全防止併發,還是得從資料庫層面入手。