1. 程式人生 > >Redis學習筆記之延時隊列

Redis學習筆記之延時隊列

println itl bre type 列表 ring hist mat lib

目錄

  • 一、業務場景
  • 二、Redis延時隊列

一、業務場景

所謂延時隊列就是延時的消息隊列,下面說一下一些業務場景比較好理解

1.1 實踐場景

  • 訂單支付失敗,每隔一段時間提醒用戶
  • 用戶並發量的情況,可以延時2分鐘給用戶發短信
  • ...

1.2 實現方式

這些情況都可以使用延時隊列來做,實現延時隊列比較場景的有使用消息隊列MQ來實現,比如RocketMQ等等,也可以使用Redis來實現,本博客主要介紹一下Redis實現延時隊列

二、Redis延時隊列

2.1 Redis列表實現

Redis實現延時隊列可以通過其數據結構列表(list)來實現,順便復習一下Redis的列表,實現列表,Redis可以通過隊列和棧來實現:

/* 隊列:First in first out */

//加兩個value
>rpush keynames key1 key2
2

//計算
>llen keynames
2

>lpop keynames
key1

>lpop keynames
key2

//rpush會自動過期的
>rpop keynames
NULL

/* 棧:First in last out */

//同樣,加兩個元素
>rpush keynames key1 key2
2

>rpop keynames
key2

>rpop keynames
key1

對於Redis的基本數據結構,可以參考我之前的博客:https://blog.csdn.net/u014427391/article/details/82860694

然後怎麽實現延時?Thread睡眠或者線程join?這種方法是可以實現,不過假如用戶一多?10個請求就要延時10N了,這種情況系統性能不好的話就會出現線程阻塞了的情況。

隊列空了的情況?就會出現pop 的死循環,這種情況很可怕,很吃系統CPU,雖然可以通過線程睡眠方法來緩解,但不是最好的方法

這時候就要介紹一下Redis的blpop/brpop來替換lpop/rpop,blpop/brpop阻塞讀在隊列沒有數據的時候,會立即進入休眠狀態,一旦數據到來,則立刻醒過來。消息的延遲幾乎為零

2.2 Redis集合實現

Redis的有序集合(zset)也可以用於實現延時隊列,消息作為value,時間作為score,這裏順便復習一下Redis的有序集合

//9.0是score也就是權重
>zadd keyname 9.0 math
1

>zadd keyname 9.2 history
1

//順序
>zrange keyname 0 -1
1) history
2) math

//逆序
>zrevrange keyname 0 -1
1) math
2) history

//相當於count()
>zcard keyname
2

獲取指定key的score
>zscore keyname math
9

然後多個線程的環境怎麽保證任務不被多個線程搶了?這裏可以使用Redis的zrem命令來實現

Redis Zrem 命令用於移除有序集中的一個或多個成員,不存在的成員將被忽略。

當 key 存在但不是有序集類型時,返回一個錯誤。

註意: 在 Redis 2.4 版本以前, ZREM 每次只能刪除一個元素。

下面給出來自《Redis 深度歷險:核心原理與應用實踐》小冊的例子:例子就是用有序集合和zrem來實現的

import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;

import redis.clients.jedis.Jedis;

public class RedisDelayingQueue<T> {

  static class TaskItem<T> {
    public String id;
    public T msg;
  }

  // fastjson 序列化對象中存在 generic 類型時,需要使用 TypeReference
  private Type TaskType = new TypeReference<TaskItem<T>>() {
  }.getType();

  private Jedis jedis;
  private String queueKey;

  public RedisDelayingQueue(Jedis jedis, String queueKey) {
    this.jedis = jedis;
    this.queueKey = queueKey;
  }

  public void delay(T msg) {
    TaskItem<T> task = new TaskItem<T>();
    task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
    task.msg = msg;
    String s = JSON.toJSONString(task); // fastjson 序列化
    jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延時隊列 ,5s 後再試
  }

  public void loop() {
    while (!Thread.interrupted()) {
      // 只取一條
      Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
      if (values.isEmpty()) {
        try {
          Thread.sleep(500); // 歇會繼續
        } catch (InterruptedException e) {
          break;
        }
        continue;
      }
      String s = values.iterator().next();
      if (jedis.zrem(queueKey, s) > 0) { // 搶到了
        TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
        this.handleMsg(task.msg);
      }
    }
  }

  public void handleMsg(T msg) {
    System.out.println(msg);
  }

  public static void main(String[] args) {
    Jedis jedis = new Jedis();
    RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
    Thread producer = new Thread() {

      public void run() {
        for (int i = 0; i < 10; i++) {
          queue.delay("codehole" + i);
        }
      }

    };
    Thread consumer = new Thread() {

      public void run() {
        queue.loop();
      }

    };
    producer.start();
    consumer.start();
    try {
      producer.join();
      Thread.sleep(6000);
      consumer.interrupt();
      consumer.join();
    } catch (InterruptedException e) {
    }
  }
}

不過在多線程環境,是很難做控制的,上面例子也有缺陷,下面引用小冊的說法:

上面的算法中同一個任務可能會被多個進程取到之後再使用 zrem 進行爭搶,那些沒搶到的進程都是白取了一次任務,這是浪費。可以考慮使用 lua scripting 來優化一下這個邏輯,將 zrangebyscore 和 zrem 一同挪到服務器端進行原子化操作,這樣多個進程之間爭搶任務時就不會出現這種浪費了。

Redis學習筆記之延時隊列