redis分布式鎖&隊列應用
分布式鎖
- setnx(set if not exists)
如果設值成功則證明上鎖成功,然後再調用del指令釋放。
// 這裏的冒號:就是一個普通的字符,沒特別含義,它可以是任意其它字符,不要誤解
> setnx lock:codehole true
OK
... do something critical ...
> del lock:codehole
(integer) 1
但是有個問題,如果邏輯執行到中間出現異常了,可能會導致 del 指令沒有被調用,這樣就會陷入死鎖,鎖永遠得不到釋放。
- setnx(set if not exists) 加上過期時間
> setnx lock:codehole true OK > expire lock:codehole 5 ... do something critical ... > del lock:codehole (integer) 1
如果在 setnx 和 expire 之間服務器進程突然掛掉了,可能是因為機器掉電或者是被人為殺掉的,就會導致 expire 得不到執行,也會造成死鎖。
- 使用ex nx命令一起執行
> set lock:codehole true ex 5 nx
OK
... do something critical ...
> del lock:codehole
- 刪除鎖的線程必須是上鎖的線程
為 set 指令的 value 參數設置為一個隨機數,釋放鎖時先匹配隨機數是否一致,然後再刪除 key,這是為了確保當前線程占有的鎖不會被其它線程釋放,除非這個鎖是過期了被服務器自動釋放的。
但是匹配 value 和刪除 key 不是一個原子操作,Redis 也沒有提供類似於delifequals這樣的指令,這就需要使用 Lua 腳本來處理了,因為 Lua 腳本可以保證連續多個指令的原子性執行。
上鎖
tag = random.nextint() # 隨機數
if redis.set(key, tag, nx=True, ex=5):
do_something()
redis.delifequals(key, tag) # 假想的 delifequals 指令
# delifequals 解鎖
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
延時隊列
消息隊列
註意:
Redis 的消息隊列不是專業的消息隊列,它沒有非常多的高級特性,沒有 ack 保證,如果對消息的可靠性有著極致的追求,那麽它就不適合使用。
Redis 的 list(列表) 數據結構常用來作為異步消息隊列使用,使用rpush/lpush操作入隊列,使用lpop 和 rpop來出隊列。
> rpush notify-queue apple banana pear
(integer) 3
> llen notify-queue
(integer) 3
> lpop notify-queue
"apple"
> llen notify-queue
(integer) 2
> lpop notify-queue
"banana"
> llen notify-queue
(integer) 1
> lpop notify-queue
"pear"
> llen notify-queue
(integer) 0
> lpop notify-queue
(nil)
阻塞隊列
如果隊列空了,客戶端就會陷入 pop 的死循環,不停地 pop,沒有數據,接著再 pop,又沒有數據。這就是浪費生命的空輪詢。
通常我們使用 sleep 來解決這個問題,讓線程睡一會,睡個 1s 鐘就可以了。但是有個小問題,那就是睡眠會導致消息的延遲增大。
我們可以使用 blpop/brpop,阻塞讀。
阻塞讀在隊列沒有數據的時候,會立即進入休眠狀態,一旦數據到來,則立刻醒過來。消息的延遲幾乎為零。用blpop/brpop替代前面的lpop/rpop,就完美解決了上面的問題。
鎖沖突處理
上面我們講了分布式鎖的問題,但是加鎖失敗沒有講。一般我們有3種策略來處理加鎖失敗:
- 直接拋出異常,通知用戶稍後重試
這種方式比較適合由用戶直接發起的請求,用戶看到錯誤對話框後,會先閱讀對話框的內容,再點擊重試,這樣就可以起到人工延時的效果。 - sleep 一會再重試
sleep 會阻塞當前的消息處理線程,會導致隊列的後續消息處理出現延遲。如果碰撞的比較頻繁或者隊列裏消息比較多,sleep 可能並不合適。 - 將請求轉移至延時隊列,過一會再試
這種方式比較適合異步消息處理,將當前沖突的請求扔到另一個隊列延後處理以避開沖突。
延時隊列
延時隊列可以通過 Redis 的 zset(有序列表) 來實現。我們將消息序列化成一個字符串作為 zset 的value,這個消息的到期處理時間作為score,然後用多個線程輪詢 zset 獲取到期的任務進行處理,多個線程是為了保障可用性,萬一掛了一個線程還有其它線程可以繼續處理。
import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import redis.clients.jedis.Jedis;
public class RedisDelayingQueue<T> {
static class TaskItem<T> {
public String id;
public T msg;
}
// fastjson 序列化對象中存在 generic 類型時,需要使用 TypeReference
private Type TaskType = new TypeReference<TaskItem<T>>() {
}.getType();
private Jedis jedis;
private String queueKey;
public RedisDelayingQueue(Jedis jedis, String queueKey) {
this.jedis = jedis;
this.queueKey = queueKey;
}
public void delay(T msg) {
TaskItem<T> task = new TaskItem<T>();
task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
task.msg = msg;
String s = JSON.toJSONString(task); // fastjson 序列化
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延時隊列 ,5s 後再試
}
public void loop() {
while (!Thread.interrupted()) {
// 只取一條
Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
if (values.isEmpty()) {
try {
Thread.sleep(500); // 歇會繼續
} catch (InterruptedException e) {
break;
}
continue;
}
String s = values.iterator().next();
if (jedis.zrem(queueKey, s) > 0) { // 搶到了
TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
this.handleMsg(task.msg);
}
}
}
public void handleMsg(T msg) {
System.out.println(msg);
}
public static void main(String[] args) {
Jedis jedis = new Jedis();
RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
Thread producer = new Thread() {
public void run() {
for (int i = 0; i < 10; i++) {
queue.delay("codehole" + i);
}
}
};
Thread consumer = new Thread() {
public void run() {
queue.loop();
}
};
producer.start();
consumer.start();
try {
producer.join();
Thread.sleep(6000);
consumer.interrupt();
consumer.join();
} catch (InterruptedException e) {
}
}
}
redis分布式鎖&隊列應用