1. 程式人生 > 其它 >加鎖了還有併發問題?Redis分散式鎖,真的用對了?

加鎖了還有併發問題?Redis分散式鎖,真的用對了?

  新接手的專案,偶爾會出現賬不平的問題。之前的技術老大臨走時給的解釋是:排查了,沒找到原因,之後太忙就沒再解決,可能是框架的原因……

  既然專案交付到手中,這樣的問題是必須要解決的。梳理了所有賬務處理邏輯,最終找到了原因:資料庫併發操作熱點賬戶導致。就這這個問題,來聊一聊分散式系統下基於Redis的分散式鎖。順便也分解一下問題形成原因及解決方案。

  原因分析

  系統併發量並不高,存在熱點賬戶,但也不至於那麼嚴重。問題的根源在於系統架構設計,人為的製造了併發。場景是這樣的:商戶批量匯入一批資料,系統會進行前置處理,並對賬戶餘額進行增減。

  此時,另外一個定時任務,也會對賬戶進行掃描更新。而且對同一賬戶的操作分佈到各個系統當中,熱點賬戶也就出現了。

  針對此問題的解決方案,從架構層面可以考慮將賬務系統進行抽離,集中在一個系統中進行處理,所有的資料庫事務及執行順序由賬務系統來統籌處理。從技術方面來講,則可以通過鎖機制來對熱點賬戶進行加鎖。

  本篇文章就針對熱點賬戶基於分散式鎖的實現方式進行詳細的講解。

  鎖的分析

  在Java的多執行緒環境下,通常有幾類鎖可以使用:

  JVM記憶體模型級別的鎖,常用的有:synchronized、Lock等;資料庫鎖,比如樂觀鎖,悲觀鎖等;分散式鎖;

  JVM記憶體級別的鎖,可以保證單體服務下執行緒的安全性,比如多個執行緒訪問/修改一個全域性變數。但當系統進行叢集部署時,JVM級別的本地鎖就無能為力了。

  悲觀鎖與樂觀鎖

  像上述案例中,熱點賬戶就屬於分散式系統中的共享資源,我們通常會採用資料庫鎖或分散式鎖來進行解決。

  資料庫鎖,又分為樂觀鎖和悲觀鎖。

  悲觀鎖是基於資料庫(Mysql的InnoDB)提供的排他鎖來實現的。在進行事務操作時,通過select ... for update語句,MySQL會對查詢結果集中每行資料都新增排他鎖,其他執行緒對該記錄的更新與刪除操作都會阻塞。從而達到共享資源的順序執行(修改);

  樂觀鎖是相對悲觀鎖而言的,樂觀鎖假設資料一般情況不會造成衝突,所以在資料進行提交更新的時候,才會正式對資料的衝突與否進行檢測。如果衝突則返回給使用者異常資訊,讓使用者決定如何去做。樂觀鎖適用於讀多寫少的場景,這樣可以提高程式的吞吐量。在樂觀鎖實現時通常會基於記錄狀態或新增version版本來進行實現。

  悲觀鎖失效場景

  專案中使用了悲觀鎖,但悲觀鎖卻失效了。這也是使用悲觀鎖時,常見的誤區,下面來分析一下。

  正常使用悲觀鎖的流程:

  通過select ... for update鎖定記錄;計算新餘額,修改金額並存儲;執行完成釋放鎖;

  經常犯錯的處理流程:

  查詢賬戶餘額,計算新餘額;通過select ... for update鎖定記錄;修改金額並存儲;執行完成釋放鎖;

  錯誤的流程中,比如A和B服務查詢到的餘額都是100,A扣減50,B扣減40,然後A鎖定記錄,更新資料庫為50;A釋放鎖之後,B鎖定記錄,更新資料庫為60。顯然,後者把前者的更新給覆蓋掉了。解決的方案就是擴大鎖的範圍,將鎖提前到計算新餘額之前。

  通常悲觀鎖對資料庫的壓力是非常大的,在實踐中通常會根據場景使用樂觀鎖或分散式鎖等方式來實現。

  下面進入正題,講講基於Redis的分散式鎖實現。

  Redis分散式鎖實戰演習

  這裡以Spring Boot、Redis、Lua指令碼為例來演示分散式鎖的實現。為了簡化處理,示例中Redis既承擔了分散式鎖的功能,也承擔了資料庫的功能。

  場景構建

  叢集環境下,對同一個賬戶的金額進行操作,基本步驟:

  從資料庫讀取使用者金額;程式修改金額;再將最新金額儲存到資料庫;

  下面從最初不加鎖,不同步處理,逐步推演出最終的分散式鎖。

  基礎整合及類構建

  準備一個不加鎖處理的基礎業務環境。

  首先在Spring Boot專案中引入相關依賴:

  

  org.springframework.boot

  spring-boot-starter-data-redis

  

  

  org.springframework.boot

  spring-boot-starter-web

  

  賬戶對應實體類UserAccount:

  public class UserAccount {

  //使用者ID

  private String userId;

  //賬戶內金額

  private int amount;

  //新增賬戶金額

  public void addAmount(int amount) {

  this.amount=this.amount + amount;

  }

  // 省略構造方法和getter/setter

  }

  建立一個執行緒實現類AccountOperationThread:

  public class AccountOperationThread implements Runnable {

  private final static Logger logger=LoggerFactory.getLogger(AccountOperationThread.class);

  private static final Long RELEASE_SUCCESS=1L;

  private String userId;

  private RedisTemplate<Object, Object> redisTemplate;

  public AccountOperationThread(String userId, RedisTemplate<Object, Object> redisTemplate) {

  this.userId=userId;

  thisisTemplate=redisTemplate;

  }

  @Override

  public void run() {

  noLock();

  }

  /**

  * 不加鎖

  */

  private void noLock() {

  try {

  Random random=new Random();

  // 模擬執行緒進行業務處理

  TimeUnit.MILLISECONDS.sleep(random.nextInt(100) + 1);

  } catch (InterruptedException e) {

  e.printStackTrace();

  }

  //模擬資料庫中獲取使用者賬號

  UserAccount userAccount=(UserAccount) redisTemplate.opsForValue().get(userId);

  // 金額+1

  userAccount.addAmount(1);

  logger(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());

  //模擬存回資料庫

  redisTemplate.opsForValue().set(userId, userAccount);

  }

  }

  其中RedisTemplate的例項化交給了Spring Boot:

  @Configuration

  public class RedisConfig {

  @Bean

  public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {

  RedisTemplate<Object, Object> redisTemplate=new RedisTemplate<>();

  redisTemplate.setConnectionFactory(redisConnectionFactory);

  Jackson2JsonRedisSerializer

  ObjectMapper objectMapper=new ObjectMapper();

  objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);

  objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);

  jackson2JsonRedisSerializer.setObjectMapper(objectMapper);

  // 設定value的序列化規則和 key的序列化規則

  redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);

  redisTemplate.setKeySerializer(new StringRedisSerializer());

  redisTemplate.afterPropertiesSet();

  return redisTemplate;

  }

  }

  最後,再準備一個TestController來進行觸發多執行緒的執行:

  @RestController

  public class TestController {

  private final static Logger logger=LoggerFactory.getLogger(TestController.class);

  private static ExecutorService executorService=Executors.newFixedThreadPool(10);

  @Autowired

  private RedisTemplate<Object, Object> redisTemplate;

  @GetMapping("/test")

  public String test() throws InterruptedException {

  // 初始化使用者user_001到Redis,賬戶金額為0

  redisTemplate.opsForValue().set("user_001", new UserAccount("user_001", 0));

  // 開啟10個執行緒進行同步測試,每個執行緒為賬戶增加1元

  for (int i=0; i < 10; i++) {

  logger("建立執行緒i=" + i);

  executorService.execute(new AccountOperationThread("user_001", redisTemplate));

  }

  // 主執行緒休眠1秒等待執行緒跑完

  TimeUnit.MILLISECONDS.sleep(1000);

  // 查詢Redis中的user_001賬戶

  UserAccount userAccount=(UserAccount) redisTemplate.opsForValue().get("user_001");

  logger("user id : " + userAccount.getUserId() + " amount : " + userAccount.getAmount());

  return "success";

  }

  }

  執行上述程式,正常來說10個執行緒,每個執行緒加1,結果應該是10。但多執行幾次,會發現,結果變化很大,基本上都要比10小。

  [pool-1-thread-5] c.sis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 1

  [pool-1-thread-4] c.sis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 1

  [pool-1-thread-3] c.sis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 1

  [pool-1-thread-1] c.sis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 1

  [pool-1-thread-1] c.sis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 2

  [pool-1-thread-2] c.sis.thread.AccountOperationThread : pool-1-thread-2 : user id : user_001 amount : 2

  [pool-1-thread-5] c.sis.thread.AccountOperationThread : pool-1-thread-5 : user id : user_001 amount : 2

  [pool-1-thread-4] c.sis.thread.AccountOperationThread : pool-1-thread-4 : user id : user_001 amount : 3

  [pool-1-thread-1] c.sis.thread.AccountOperationThread : pool-1-thread-1 : user id : user_001 amount : 4

  [pool-1-thread-3] c.sis.thread.AccountOperationThread : pool-1-thread-3 : user id : user_001 amount : 5

  [nio-8080-exec-1] c.sis.controller.TestController : user id : user_001 amount : 5

  以上述日誌為例,前四個執行緒都將值改為1,也就是後面三個執行緒都將前面的修改進行了覆蓋,導致最終結果不是10,只有5。這顯然是有問題的。

  Redis同步鎖實現

  針對上面的情況,在同一個JVM當中,我們可以通過執行緒加鎖來完成。但在分散式環境下,JVM級別的鎖是沒辦法實現的,這裡可以採用Redis同步鎖實現。

  基本思路:第一個執行緒進入時,在Redis中進記錄,當後續執行緒過來請求時,判斷Redis是否存在該記錄,如果存在則說明處於鎖定狀態,進行等待或返回。如果不存在,則進行後續業務處理。

  /**

  * 1.搶佔資源時判斷是否被鎖。

  * 2.如未鎖則搶佔成功且加鎖,否則等待鎖釋放。

  * 3.業務完成後釋放鎖,讓給其它執行緒。

  *

  * 該方案並未解決同步問題,原因:執行緒獲得鎖和加鎖的過程,並非原子性操作,可能會導致執行緒A獲得鎖,還未加鎖時,執行緒B也獲得了鎖。

  */

  private void redisLock() {

  Random random=new Random();

  try {

  TimeUnit.MILLISECONDS.sleep(random.nextInt(1000) + 1);

  } catch (InterruptedException e) {

  e.printStackTrace();

  }

  while (true) {

  Object lock=redisTemplate.opsForValue().get(userId + ":syn");

  if (lock==null) {

  // 獲得鎖 -> 加鎖 -> 跳出迴圈

  logger(Thread.currentThread().getName() + ":獲得鎖");

  redisTemplate.opsForValue().set(userId + ":syn", "lock");

  break;

  }

  try {

  // 等待500毫秒重試獲得鎖

  TimeUnit.MILLISECONDS.sleep(500);

  } catch (InterruptedException e) {

  e.printStackTrace();

  }

  }

  try {

  //模擬資料庫中獲取使用者賬號

  UserAccount userAccount=(UserAccount) redisTemplate.opsForValue().get(userId);

  if (userAccount !=null) {

  //設定金額

  userAccount.addAmount(1);

  logger(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());

  //模擬存回資料庫

  redisTemplate.opsForValue().set(userId, userAccount);

  }

  } finally {

  //釋放鎖

  redisTemplate.delete(userId + ":syn");

  logger(Thread.currentThread().getName() + ":釋放鎖");

  }

  }

  在while程式碼塊中,先判斷對應使用者ID是否在Redis中存在,如果不存在,則進行set加鎖,如果存在,則跳出迴圈繼續等待。

  上述程式碼,看起來實現了加鎖的功能,但當執行程式時,會發現與未加鎖一樣,依舊存在併發問題。原因是:獲取鎖和加鎖的操作並不是原子的。比如兩個執行緒發現lock都是null,都進行了加鎖,此時併發問題依舊存在。

  Redis原子性同步鎖

  針對上述問題,可將獲取鎖和加鎖的過程原子化處理。基於spring-boot-data-redis提供的原子化API可以實現:

  // 該方法使用了redis的指令:SETNX key value

  // 1.key不存在,設定成功返回value,setIfAbsent返回true;

  // 2.key存在,則設定失敗返回null,setIfAbsent返回false;

  // 3.原子性操作;

  Boolean setIfAbsent(K var1, V var2);

  上述方法的原子化操作是對Redis的setnx命令的封裝,在Redis中setnx的使用如下例項:

  redis> SETNX mykey "Hello"

  (integer) 1

  redis> SETNX mykey "World"

  (integer) 0

  redis> GET mykey

  "Hello"

  第一次,設定mykey時,並不存在,則返回1,表示設定成功;第二次設定mykey時,已經存在,則返回0,表示設定失敗。再次查詢mykey對應的值,會發現依舊是第一次設定的值。也就是說redis的setnx保證了唯一的key只能被一個服務設定成功。

  理解了上述API及底層原理,來看看執行緒中的實現方法程式碼如下:

  /**

  * 1.原子操作加鎖

  * 2.競爭執行緒迴圈重試獲得鎖

  * 3.業務完成釋放鎖

  */

  private void atomicityRedisLock() {

  //Spring data redis 支援的原子性操作

  while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", "lock")) {

  try {

  // 等待100毫秒重試獲得鎖

  TimeUnit.MILLISECONDS.sleep(100);

  } catch (InterruptedException e) {

  e.printStackTrace();

  }

  }

  logger(Thread.currentThread().getName() + ":獲得鎖");

  try {

  //模擬資料庫中獲取使用者賬號

  UserAccount userAccount=(UserAccount) redisTemplate.opsForValue().get(userId);

  if (userAccount !=null) {

  //設定金額

  userAccount.addAmount(1);

  logger(Thread.currentThread().getName() + " : user id : " + userId + " amount : " + userAccount.getAmount());

  //模擬存回資料庫

  redisTemplate.opsForValue().set(userId, userAccount);

  }

  } finally {

  //釋放鎖

  redisTemplate.delete(userId + ":syn");

  logger(Thread.currentThread().getName() + ":釋放鎖");

  }

  }

  再次執行程式碼,會發現結果正確了,也就是說可以成功的對分散式執行緒進行了加鎖。

  Redis分散式鎖的死鎖

  雖然上述程式碼執行結果沒問題,但如果應用異常宕機,沒來得及執行finally中釋放鎖的方法,那麼其他執行緒則永遠無法獲得這個鎖。

  此時可採用setIfAbsent的過載方法:

  Boolean setIfAbsent(K var1, V var2, long var3, TimeUnit var5);

  基於該方法,可以設定鎖的過期時間。這樣即便獲得鎖的執行緒宕機,在Redis中資料過期之後,其他執行緒可正常獲得該鎖。

  示例程式碼如下:

  private void atomicityAndExRedisLock() {

  try {

  //Spring data redis 支援的原子性操作,並設定5秒過期時間

  while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn",

  System.currentTimeMillis() + 5000, 5000, TimeUnit.MILLISECONDS)) {

  // 等待100毫秒重試獲得鎖

  logger(Thread.currentThread().getName() + ":嘗試迴圈獲取鎖");

  TimeUnit.MILLISECONDS.sleep(1000);

  }

  logger(Thread.currentThread().getName() + ":獲得鎖--------");

  // 應用在這裡宕機,程序退出,無法執行 finally;

  Thread.currentThread().interrupt();

  // 業務邏輯...

  } catch (InterruptedException e) {

  e.printStackTrace();

  } finally {

  //釋放鎖

  if (!Thread.currentThread().isInterrupted()) {

  redisTemplate.delete(userId + ":syn");

  logger(Thread.currentThread().getName() + ":釋放鎖");

  }

  }

  }

  業務超時及守護執行緒

  上面添加了Redis所的超時時間,看似解決了問題,但又引入了新的問題。

  比如,正常情況下執行緒A在5秒內可正常處理完業務,但偶發會出現超過5秒的情況。如果將超時時間設定為5秒,執行緒A獲得了鎖,但業務邏輯處理需要6秒。此時,執行緒A還在正常業務邏輯,執行緒B已經獲得了鎖。當執行緒A處理完時,有可能將執行緒B的鎖給釋放掉。

  在上述場景中有兩個問題點:

  第一,執行緒A和執行緒B可能會同時在執行,存在併發問題。第二,執行緒A可能會把執行緒B的鎖給釋放掉,導致一系列的惡性迴圈。

  當然,可以通過在Redis中設定value值來判斷鎖是屬於執行緒A還是執行緒B。但仔細分析會發現,這個問題的本質是因為執行緒A執行業務邏輯耗時超出了鎖超時的時間。

  那麼就有兩個解決方案了:

  第一,將超時時間設定的足夠長,確保業務程式碼能夠在鎖釋放之前執行完成;第二,為鎖新增守護執行緒,為將要過期釋放但未釋放的鎖增加時間;

  第一種方式需要全行大多數情況下業務邏輯的耗時,進行超時時間的設定。

  第二種方式,可通過如下守護執行緒的方式來動態增加鎖超時時間。

  public class DaemonThread implements Runnable {

  private final static Logger logger=LoggerFactory.getLogger(DaemonThread.class);

  // 是否需要守護 主執行緒關閉則結束守護執行緒

  private volatile boolean daemon=true;

  // 守護鎖

  private String lockKey;

  private RedisTemplate<Object, Object> redisTemplate;

  public DaemonThread(String lockKey, RedisTemplate<Object, Object> redisTemplate) {

  this.lockKey=lockKey;

  thisisTemplate=redisTemplate;

  }

  @Override

  public void run() {

  try {

  while (daemon) {

  long time=redisTemplate.getExpire(lockKey, TimeUnit.MILLISECONDS);

  // 剩餘有效期小於1秒則續命

  if (time < 1000) {

  logger("守護程序: " + Thread.currentThread().getName() + " 延長鎖時間 5000 毫秒");

  redisTemplate.expire(lockKey, 5000, TimeUnit.MILLISECONDS);

  }

  TimeUnit.MILLISECONDS.sleep(300);

  }

  logger(" 守護程序: " + Thread.currentThread().getName() + "關閉 ");

  } catch (InterruptedException e) {

  e.printStackTrace();

  }

  }

  // 主執行緒主動呼叫結束

  public void stop() {

  daemon=false;

  }

  }

  上述執行緒每隔300毫秒獲取一下Redis中鎖的超時時間,如果小於1秒,則延長5秒。當主執行緒呼叫關閉時,守護執行緒也隨之關閉。

  主執行緒中相關程式碼實現:

  private void deamonRedisLock() {

  //守護執行緒

  DaemonThread daemonThread=null;

  //Spring data redis 支援的原子性操作,並設定5秒過期時間

  String uuid=UUID.randomUUID().toString();

  String value=Thread.currentThread().getId() + ":" + uuid;

  try {

  while (!redisTemplate.opsForValue().setIfAbsent(userId + ":syn", value, 5000, TimeUnit.MILLISECONDS)) {

  // 等待100毫秒重試獲得鎖

  logger(Thread.currentThread().getName() + ":嘗試迴圈獲取鎖");

  TimeUnit.MILLISECONDS.sleep(1000);

  }

  logger(Thread.currentThread().getName() + ":獲得鎖----");

  // 開啟守護執行緒

  daemonThread=new DaemonThread(userId + ":syn", redisTemplate);

  Thread thread=new Thread(daemonThread);

  thread.start();

  // 業務邏輯執行10秒...

  TimeUnit.MILLISECONDS.sleep(10000);

  } catch (InterruptedException e) {

  e.printStackTrace();

  } finally {

  //釋放鎖 這裡也需要原子操作,今後通過 Redis + Lua 講

  String result=(String) redisTemplate.opsForValue().get(userId + ":syn");

  if (value.equals(result)) {

  redisTemplate.delete(userId + ":syn");

  logger(Thread.currentThread().getName() + ":釋放鎖-----");

  }

  //關閉守護執行緒

  if (daemonThread !=null) {

  daemonThread.stop();

  }

  }

  }

  其中在獲得鎖之後,開啟守護執行緒,在finally中將守護執行緒關閉。

  基於Lua指令碼的實現

  在上述邏輯中,我們是基於spring-boot-data-redis提供的原子化操作來保證鎖判斷和執行的原子化的。在非Spring Boot專案中,則可以基於Lua指令碼來實現。

  首先定義加鎖和解鎖的Lua指令碼及對應的DefaultRedisScript物件,在RedisConfig配置類中新增如下例項化程式碼:

  @Configuration

  public class RedisConfig {

  //lock script

  private static final String LOCK_SCRIPT=" if redis.call('setnx',KEYS[1],ARGV[1])==1 " +

  " then redis.call('expire',KEYS[1],ARGV[2]) " +

  " return 1 " +

  " else return 0 end ";

  private static final String UNLOCK_SCRIPT="if redis.call('get', KEYS[1])==ARGV[1] then return redis.call" +

  "('del', KEYS[1]) else return 0 end";

  // ... 省略部分程式碼

  @Bean

  public DefaultRedisScript lockRedisScript() {

  DefaultRedisScript defaultRedisScript=new DefaultRedisScript<>();

  defaultRedisScript.setResultType(Boolean.class);

  defaultRedisScript.setScriptText(LOCK_SCRIPT);

  return defaultRedisScript;

  }

  @Bean

  public DefaultRedisScript unlockRedisScript() {

  DefaultRedisScript defaultRedisScript=new DefaultRedisScript<>();

  defaultRedisScript.setResultType(Long.class);

  defaultRedisScript.setScriptText(UNLOCK_SCRIPT);

  return defaultRedisScript;

  }

  }

  再通過在AccountOperationThread類中新建構造方法,將上述兩個物件傳入類中(省略此部分演示)。然後,就可以基於RedisTemplate來呼叫了,改造之後的程式碼實現如下:

  private void deamonRedisLockWithLua() {

  //守護執行緒

  DaemonThread daemonThread=null;

  //Spring data redis 支援的原子性操作,並設定5秒過期時間

  String uuid=UUID.randomUUID().toString();

  String value=Thread.currentThread().getId() + ":" + uuid;

  try {

  while (!redisTemplate.execute(lockRedisScript, Collections.singletonList(userId + ":syn"), value, 5)) {

  // 等待1000毫秒重試獲得鎖

  logger(Thread.currentThread().getName() + ":嘗試迴圈獲取鎖");

  TimeUnit.MILLISECONDS.sleep(1000);

  }

  logger(Thread.currentThread().getName() + ":獲得鎖----");

  // 開啟守護執行緒

  daemonThread=new DaemonThread(userId + ":syn", redisTemplate);

  Thread thread=new Thread(daemonThread);

  thread.start();

  // 業務邏輯執行10秒...

  TimeUnit.MILLISECONDS.sleep(10000);

  } catch (InterruptedException e) {

  logger.error("異常", e);

  } finally {

  //使用Lua指令碼:先判斷是否是自己設定的鎖,再執行刪除

  // key存在,當前值=期望值時,刪除key;key存在,當前值!=期望值時,返回0;

  Long result=redisTemplate.execute(unlockRedisScript, Collections.singletonList(userId + ":syn"), value);

  logger("redis解鎖:{}", RELEASE_SUCCESS.equals(result));

  if (RELEASE_SUCCESS.equals(result)) {

  if (daemonThread !=null) {

  //關閉守護執行緒

  daemonThread.stop();

  logger(Thread.currentThread().getName() + ":釋放鎖---");

  }

  }

  }

  }

  其中while迴圈中加鎖和finally中的釋放鎖都是基於Lua指令碼來實現了。

  Redis鎖的其他因素

  除了上述例項,在使用Redis分散式鎖時,還可以考慮以下情況及方案。

  Redis鎖的不可重入

  當執行緒在持有鎖的情況下再次請求加鎖,如果一個鎖支援一個執行緒多次加鎖,那麼這個鎖就是可重入的。如果一個不可重入鎖被再次加鎖,由於該鎖已經被持有,再次加鎖會失敗。Redis可通過對鎖進行重入計數,加鎖時加 1,解鎖時減 1,當計數歸 0時釋放鎖。

  可重入鎖雖然高效但會增加程式碼的複雜性,這裡就不舉例說明了。

  等待鎖釋放

  有的業務場景,發現被鎖則直接返回。但有的場景下,客戶端需要等待鎖釋放然後去搶鎖。上述示例就屬於後者。針對等待鎖釋放也有兩種方案:

  客戶端輪訓:當未獲得鎖時,等待一段時間再重新獲取,直到成功。上述示例就是基於這種方式實現的。這種方式的缺點也很明顯,比較耗費伺服器資源,當併發量大時會影響伺服器的效率。使用Redis的訂閱釋出功能:當獲取鎖失敗時,訂閱鎖釋放訊息,獲取鎖成功後釋放時,傳送釋放訊息。叢集中的主備切換和腦裂

  在Redis包含主從同步的叢集部署方式中,如果主節點掛掉,從節點提升為主節點。如果客戶端A在主節點加鎖成功,指令還未同步到從節點,此時主節點掛掉,從節點升為主節點,新的主節點中沒有鎖的資料。這種情況下,客戶端B就可能加鎖成功,從而出現併發的場景。

  當叢集發生腦裂時,Redis master節點跟slave 節點和 sentinel 叢集處於不同的網路分割槽。sentinel叢集無法感知到master的存在,會將 slave 節點提升為 master 節點,此時就會存在兩個不同的 master 節點。從而也會導致併發問題的出現。Redis Cluster叢集部署方式同理。

  小結

  通過生產環境中的一個問題,排查原因,尋找解決方案,到最終對基於Redis分散式的深入研究,這便是學習的過程。

  同時,每當面試或被問題如何解決分散式共享資源時,我們會脫口而出”基於Redis實現分散式鎖“,但通過本文的學習會發現,Redis分散式鎖並不是萬能的,而且在使用的過程中還需要注意超時、死鎖、誤解鎖、叢集選主/腦裂等問題。

  Redis以高效能著稱,但在實現分散式鎖的過程中還是存在一些問題。因此,基於Redis的分散式鎖可以極大的緩解併發問題,但要完全防止併發,還是得從資料庫層面入手。