1. 程式人生 > 資料庫 >Redis實現分散式鎖和等待序列的方法示例

Redis實現分散式鎖和等待序列的方法示例

在叢集下,經常會因為同時處理髮生資源爭搶和併發問題,但是我們都知道同步鎖synchronized 、cas 、ReentrankLock 這些鎖的作用範圍都是JVM ,說白了在叢集下沒啥用。這時我們就需要能在多臺JVM 之間決定執行順序的鎖了,現在分散式鎖主要有redis 、Zookeeper 實現的,還有資料庫的方式,不過效能太差,也就是需要一個第三方的監管。

背景

最近在做一個消費Kafka 訊息的時候發現,由於線上的消費者過多,經常會遇到,多個機器同時處理一個主鍵型別的資料的情況發生,如果最後是執行更新操作的話,也就是一個更新順序的問題,但是如果恰好都需要插入資料的時候,會出現主鍵重複的問題。這是生產上不被允許的(因為公司有異常監管的機制,扣分啥的),這是就需要個分散式鎖了,斟酌後用了Redis 的實現方式(因為網上例子多)

分析

redis 實現的分散式鎖,實現原理是set 方法,因為多個執行緒同時請求的時候,只有一個執行緒可以成功並返回結果,還可以設定有效期,來避免死鎖的發生,一切都是這麼的完美,不過有個問題,在set 的時候,會直接返回結果,成功或者失敗,不具有阻塞效果,需要我們自己對失敗的執行緒程序處理,有兩種方式

  • 丟棄
  • 等待重試 由於我們的系統需要這些資料,那麼只能重新嘗試獲取。這裡使用redis 的List 型別實現等待序列的作用

程式碼

直接上程式碼 其實直接redis的工具類就可以解決了

package com.test
import redis.clients.jedis.Jedis;

import java.util.Collections;
import java.util.List;

/**
 * @desc redis佇列實現方式
 * @anthor 
 * @date 
 **/
public class RedisUcUitl {

  private static final String LOCK_SUCCESS = "OK";
  private static final String SET_IF_NOT_EXIST = "NX";
  private static final String SET_WITH_EXPIRE_TIME = "PX";

  private static final Long RELEASE_SUCCESS = 1L;

  private RedisUcUitl() {

  }
  /**
   * logger
   **/

  /**
   * 儲存redis佇列順序儲存 在佇列首部存入
   *
   * @param key  位元組型別
   * @param value 位元組型別
   */
  public static Long lpush(Jedis jedis,final byte[] key,final byte[] value) {

    return jedis.lpush(key,value);
  
  }

  /**
   * 移除列表中最後一個元素 並將改元素新增入另一個列表中 ,當列表為空時 將阻塞連線 直到等待超時
   *
   * @param srckey
   * @param dstkey
   * @param timeout 0 表示永不超時
   * @return
   */
  public static byte[] brpoplpush(Jedis jedis,final byte[] srckey,final byte[] dstkey,final int timeout) {

    return jedis.brpoplpush(srckey,dstkey,timeout);

  }

  /**
   * 返回制定的key,起始位置的redis資料
   * @param redisKey
   * @param start
   * @param end -1 表示到最後
   * @return
   */
  public static List<byte[]> lrange(Jedis jedis,final byte[] redisKey,final long start,final long end) {
    
    return jedis.lrange(redisKey,start,end);
  }

  /**
   * 刪除key
   * @param redisKey
   */
  public static void delete(Jedis jedis,final byte[] redisKey) {
    
     return jedis.del(redisKey);
  }

  /**
   * 嘗試加鎖
   * @param lockKey key名稱
   * @param requestId 身份標識
   * @param expireTime 過期時間
   * @return
   */
  public static boolean tryGetDistributedLock(Jedis jedis,final String lockKey,final String requestId,final int expireTime) {
    String result = jedis.set(lockKey,requestId,SET_IF_NOT_EXIST,SET_WITH_EXPIRE_TIME,expireTime);
    return LOCK_SUCCESS.equals(result);

  }

  /**
   * 釋放鎖
   * @param lockKey key名稱
   * @param requestId 身份標識
   * @return
   */
  public static boolean releaseDistributedLock(Jedis jedis,final String requestId) {
    final String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
    jedis.eval(script,Collections.singletonList(lockKey),Collections.singletonList(requestId));

    return RELEASE_SUCCESS.equals(result);

  }
}

業務邏輯主要程式碼如下

1.先消耗佇列中的

while(true){
  // 消費佇列
  try{
    // 被放入redis佇列的資料 序列化後的
    byte[] bytes = RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8),dstKeyStr.getBytes(UTF_8),1);
    if(bytes == null || bytes.isEmpty()){
      // 佇列中沒資料時退出
      break;
    }
    // 反序列化物件
    Map<String,Object> singleMap = (Map<String,Object>) ObjectSerialUtil.bytesToObject(bytes);
    // 塞入唯一的值 防止被其他執行緒誤解鎖
    String requestId = UUID.randomUUID().toString();
    boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,100);
    if(lockGetFlag){
      // 成功獲取鎖 進行業務處理
      //TODO
      // 處理完畢釋放鎖 
      boolean freeLock = RedisUcUitl.releaseDistributedLock(keyStr,requestId);

    }else{
      // 未能獲得鎖放入等待佇列
     RedisUcUitl.lpush(keyStr.getBytes(UTF_8),ObjectSerialUtil.objectToBytes(param));
  
    }
    
  }catch(Exception e){
    break;
  }
  
}

2.處理最新接到的資料

同樣是走嘗試獲取鎖,獲取不到放入佇列的流程

一般序列化用fastJson 之列的就可以了,這裡用的是JDK 自帶的,工具類如下

public class ObjectSerialUtil {

  private ObjectSerialUtil() {
//    工具類
  }

  /**
   * 將Object物件序列化為byte[]
   *
   * @param obj 物件
   * @return byte陣列
   * @throws Exception
   */
  public static byte[] objectToBytes(Object obj) throws IOException {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    ObjectOutputStream oos = new ObjectOutputStream(bos);
    oos.writeObject(obj);
    byte[] bytes = bos.toByteArray();
    bos.close();
    oos.close();
    return bytes;
  }


  /**
   * 將bytes陣列還原為物件
   *
   * @param bytes
   * @return
   * @throws Exception
   */
  public static Object bytesToObject(byte[] bytes) {
    try {
      ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
      ObjectInputStream ois = new ObjectInputStream(bin);
      return ois.readObject();
    } catch (Exception e) {
      throw new BaseException("反序列化出錯!",e);
    }
  }
}

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。