1. 程式人生 > >redis分布式鎖&隊列應用

redis分布式鎖&隊列應用

pri dem range 指令 註意 app class exist iter

分布式鎖

  1. setnx(set if not exists)

如果設值成功則證明上鎖成功,然後再調用del指令釋放。

// 這裏的冒號:就是一個普通的字符,沒特別含義,它可以是任意其它字符,不要誤解
> setnx lock:codehole true
OK
... do something critical ...
> del lock:codehole
(integer) 1

但是有個問題,如果邏輯執行到中間出現異常了,可能會導致 del 指令沒有被調用,這樣就會陷入死鎖,鎖永遠得不到釋放。

  1. setnx(set if not exists) 加上過期時間
> setnx lock:codehole true
OK
> expire lock:codehole 5
... do something critical ...
> del lock:codehole
(integer) 1

如果在 setnx 和 expire 之間服務器進程突然掛掉了,可能是因為機器掉電或者是被人為殺掉的,就會導致 expire 得不到執行,也會造成死鎖。

  1. 使用ex nx命令一起執行
> set lock:codehole true ex 5 nx
OK
... do something critical ...
> del lock:codehole
  1. 刪除鎖的線程必須是上鎖的線程

為 set 指令的 value 參數設置為一個隨機數,釋放鎖時先匹配隨機數是否一致,然後再刪除 key,這是為了確保當前線程占有的鎖不會被其它線程釋放,除非這個鎖是過期了被服務器自動釋放的。
但是匹配 value 和刪除 key 不是一個原子操作,Redis 也沒有提供類似於delifequals這樣的指令,這就需要使用 Lua 腳本來處理了,因為 Lua 腳本可以保證連續多個指令的原子性執行。

上鎖
tag = random.nextint()  # 隨機數
if redis.set(key, tag, nx=True, ex=5):
    do_something()
    redis.delifequals(key, tag)  # 假想的 delifequals 指令
    

# delifequals 解鎖
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

延時隊列

消息隊列

註意:
Redis 的消息隊列不是專業的消息隊列,它沒有非常多的高級特性,沒有 ack 保證,如果對消息的可靠性有著極致的追求,那麽它就不適合使用。

Redis 的 list(列表) 數據結構常用來作為異步消息隊列使用,使用rpush/lpush操作入隊列,使用lpop 和 rpop來出隊列。

> rpush notify-queue apple banana pear
(integer) 3
> llen notify-queue
(integer) 3
> lpop notify-queue
"apple"
> llen notify-queue
(integer) 2
> lpop notify-queue
"banana"
> llen notify-queue
(integer) 1
> lpop notify-queue
"pear"
> llen notify-queue
(integer) 0
> lpop notify-queue
(nil)

阻塞隊列

如果隊列空了,客戶端就會陷入 pop 的死循環,不停地 pop,沒有數據,接著再 pop,又沒有數據。這就是浪費生命的空輪詢。
通常我們使用 sleep 來解決這個問題,讓線程睡一會,睡個 1s 鐘就可以了。但是有個小問題,那就是睡眠會導致消息的延遲增大。
我們可以使用 blpop/brpop,阻塞讀。
阻塞讀在隊列沒有數據的時候,會立即進入休眠狀態,一旦數據到來,則立刻醒過來。消息的延遲幾乎為零。用blpop/brpop替代前面的lpop/rpop,就完美解決了上面的問題。

鎖沖突處理

上面我們講了分布式鎖的問題,但是加鎖失敗沒有講。一般我們有3種策略來處理加鎖失敗:

  1. 直接拋出異常,通知用戶稍後重試
    這種方式比較適合由用戶直接發起的請求,用戶看到錯誤對話框後,會先閱讀對話框的內容,再點擊重試,這樣就可以起到人工延時的效果。
  2. sleep 一會再重試
    sleep 會阻塞當前的消息處理線程,會導致隊列的後續消息處理出現延遲。如果碰撞的比較頻繁或者隊列裏消息比較多,sleep 可能並不合適。
  3. 將請求轉移至延時隊列,過一會再試
    這種方式比較適合異步消息處理,將當前沖突的請求扔到另一個隊列延後處理以避開沖突。

延時隊列

延時隊列可以通過 Redis 的 zset(有序列表) 來實現。我們將消息序列化成一個字符串作為 zset 的value,這個消息的到期處理時間作為score,然後用多個線程輪詢 zset 獲取到期的任務進行處理,多個線程是為了保障可用性,萬一掛了一個線程還有其它線程可以繼續處理。

import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;

import redis.clients.jedis.Jedis;

public class RedisDelayingQueue<T> {

  static class TaskItem<T> {
    public String id;
    public T msg;
  }

  // fastjson 序列化對象中存在 generic 類型時,需要使用 TypeReference
  private Type TaskType = new TypeReference<TaskItem<T>>() {
  }.getType();

  private Jedis jedis;
  private String queueKey;

  public RedisDelayingQueue(Jedis jedis, String queueKey) {
    this.jedis = jedis;
    this.queueKey = queueKey;
  }

  public void delay(T msg) {
    TaskItem<T> task = new TaskItem<T>();
    task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
    task.msg = msg;
    String s = JSON.toJSONString(task); // fastjson 序列化
    jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延時隊列 ,5s 後再試
  }

  public void loop() {
    while (!Thread.interrupted()) {
      // 只取一條
      Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
      if (values.isEmpty()) {
        try {
          Thread.sleep(500); // 歇會繼續
        } catch (InterruptedException e) {
          break;
        }
        continue;
      }
      String s = values.iterator().next();
      if (jedis.zrem(queueKey, s) > 0) { // 搶到了
        TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
        this.handleMsg(task.msg);
      }
    }
  }

  public void handleMsg(T msg) {
    System.out.println(msg);
  }

  public static void main(String[] args) {
    Jedis jedis = new Jedis();
    RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
    Thread producer = new Thread() {

      public void run() {
        for (int i = 0; i < 10; i++) {
          queue.delay("codehole" + i);
        }
      }

    };
    Thread consumer = new Thread() {

      public void run() {
        queue.loop();
      }

    };
    producer.start();
    consumer.start();
    try {
      producer.join();
      Thread.sleep(6000);
      consumer.interrupt();
      consumer.join();
    } catch (InterruptedException e) {
    }
  }
}

redis分布式鎖&隊列應用