1. 程式人生 > >多執行緒併發問題解決之redis鎖

多執行緒併發問題解決之redis鎖

一 問題背景

        我們做的是醫療資訊化系統,在系統中一條患者資訊對醫院中當前科室中的所有診斷醫生是可見的,當有一個診斷醫生點選按鈕處理該資料時,資料的狀態發生了變化,其他的醫生就不可以再處理此患者的資料了。我們開始的做法是,在醫生點選按鈕時先去後臺數據庫獲取當前資料狀態,根據狀態判斷資料是否可以操作,如果可以操作,則修改資料狀態,進行業務邏輯處理,否則提示資料已被其他人處理,不能處理。

二 問題分析

        按照上邊的業務邏輯,我們畫個圖分析,如下圖

在上圖中,如果使用者A和B同時向資料庫發起請求獲取資料狀態,資料庫返回wait,A和B都拿到了相同的狀態,判斷是可以操作資料的,這時他們處理資料。A使用者處理完成後提交了資料,資料庫狀態變為done,記錄此資料的處理人為A。由於B使用者也可以處理資料,所以他也提交資料,這時資料的操作人記錄為了B。有人會說,在A和B提交資料修改狀態時再做一個狀態的判斷,這種也難以避免最開始的獲取狀態的問題,即使這一步狀態獲取到了,提示後邊的人不能修改,這又會產生系統不友好的問題(我操作了半天,到最後你告訴我不能處理,我白忙活了)。以上問題產生的主要原因就是在多執行緒情況下對共享資料的資源競爭處理不當,我們需要保證資料的唯一性,即在某一時刻,只能有一個執行緒獨享資料資源。

三 問題解決

        如何解決呢?分散式鎖,分散式鎖有多種實現方式,本文我們用redis實現。由於redis是單執行緒的,所以一次只能處理一個請求,並將資源分配給這個請求,我們稱加 鎖。如下圖

多執行緒情況下,redis只會處理其中的一個,其他的暫時等待。如上圖當A和B同時發出請求時,redis接受並處理A請求,此時B請求排隊等待,等到A請求處理完後再處理B請求。此時redis已經將資源(lock)分配給了A,A請求資料庫,B請求沒有獲取到資源直接返回不在請求資料庫。這樣就保證了資料庫共享資料被唯一資源使用。程式碼簡單實現

 1
public class RedisLock { 2 3 private static final String GET_RESULT = "OK"; 4 private static final String RELEASE_RESULT = "1"; 5 private static final String SET_IF_NOT_EXIST = "NX"; 6 private static final String SET_WITH_EXPIRE_TIME = "PX"; 7 8 /** 9 * 獲取redis鎖 10 *
@param jedis redis客戶端 11 * @param lockKey 鎖標識 key 12 * @param requestId 鎖的持有者,加鎖的請求 13 * @param expireTime 鎖過期時間 14 * @return 15 */ 16 public static boolean getLock(Jedis jedis, String lockKey, String requestId, int expireTime){ 17 //SET_IF_NOT_EXIST 當key不存在時 才處理 18 //SET_WITH_EXPIRE_TIME 設定過期時間 時間由expireTime決定 19 String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); 20 if (GET_RESULT.equals(result)) { 21 return true; 22 } 23 return false; 24 } 25 26 /** 27 * 釋放鎖 28 * @param jedis 29 * @param lockKey 30 * @param requestId 31 * @return 32 */ 33 public static boolean releaseLock(Jedis jedis, String lockKey, String requestId){ 34 // 方式1 35 // if (jedis.get(lockKey).equals(requestId)) {//校驗當前鎖的持有人與但概念請求是否相同 36 // 執行在這裡時,如果鎖被其它請求重新獲取到了,此時就不該刪除了 37 // jedis.del(lockKey); 38 // } 39 40 //方式2 41 // eval() 方法會交給redis服務端執行,減少了從服務端再到客戶端處理的過程 42 //賦值 KEYS[1] = lockKey ARGV[1] = requestId 43 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; 44 Object releaseResult = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); 45 if (RELEASE_RESULT.equals(releaseResult.toString())) { 46 return true; 47 } 48 return false; 49 } 50 }

四 測試鎖機制

  測試併發我們可以使用一些軟體,比如Jmeter,本文我們寫個方法測試

 1 public static void main(String[] args) {
 2         //要建立的執行緒的數量
 3         CountDownLatch looker = new CountDownLatch(1);
 4         CountDownLatch latch = new CountDownLatch(10);
 5         final String key = "lockKey";
 6         for(int i=0; i < latch.getCount(); i++){
 7             Jedis jedis = new Jedis();
 8             UUID uuid = UUID.randomUUID();
 9             Thread thread = new Thread(new Runnable() {
10                 @Override
11                 public void run() {
12                     try {
13                         looker.await();
14                         System.out.println(Thread.currentThread().getName()+"競爭資源,獲取鎖");
15                         boolean getResult = getLock(jedis, key, uuid.toString(), 5000);
16                         if(getResult){
17                             System.out.println(Thread.currentThread().getName()+"獲取到了鎖,處理業務,用時3秒");
18                             Thread.sleep(3000);
19                             boolean releaseResult = releaseLock(jedis, key, uuid.toString());
20                             if(releaseResult){
21                                 System.out.println(Thread.currentThread().getName()+"業務處理完畢,釋放鎖");
22                             }
23                         }else{
24                             System.out.println(Thread.currentThread().getName()+"競爭資源失敗,未獲取到鎖");
25                         }
26                         latch.countDown();
27                     } catch (InterruptedException e) {
28                         e.printStackTrace();
29                     }
30                 }
31             });
32             thread.start();
33         }
34 
35         try {
36             System.out.println("準備,5秒後開始");
37             Thread.sleep(5000);
38             looker.countDown(); //發令  let all threads proceed
39 
40             latch.await(); // // wait for all to finish
41             System.out.println("結束");
42         } catch (InterruptedException e) {
43             e.printStackTrace();
44         }
45 
46     }

可以看到控制檯上輸出的結果