1. 程式人生 > 實用技巧 >Flink基礎(三十五):FLINK SQL(十一)USE 語句

Flink基礎(三十五):FLINK SQL(十一)USE 語句

前言:在分散式環境中,我們經常使用鎖來進行併發控制,鎖可分為樂觀鎖和悲觀鎖,基於資料庫版本戳的實現是樂觀鎖,基於redis或zookeeper的實現可認為是悲觀鎖了。樂觀鎖和悲觀鎖最根本的區別在於執行緒之間是否相互阻塞。

那麼,本文主要來討論基於redis的分散式鎖演算法問題。

從2.6.12版本開始,redis為SET命令增加了一系列選項(set [key] NX/XX EX/PX [expiration]):

  • EX seconds – 設定鍵key的過期時間,單位時秒

  • PX milliseconds – 設定鍵key的過期時間,單位時毫秒

  • NX – 只有鍵key不存在的時候才會設定key的值

  • XX – 只有鍵key存在的時候才會設定key的值

原文地址:https://redis.io/commands/set
中文地址:http://redis.cn/commands/set.html

注意: 由於SET命令加上選項已經可以完全取代SETNX, SETEX, PSETEX的功能,所以在將來的版本中,redis可能會不推薦使用並且最終拋棄這幾個命令。

這裡簡單提一下,在舊版本的redis中(指2.6.12版本之前),使用redis實現分散式鎖一般需要setNX、expire、getSet、del等命令。而且會發現這種實現有很多邏輯判斷的原子操作以及本地時間等並沒有控制好。

而在舊版本的redis中,redis的超時時間很難控制,使用者迫切需要把setNX和expiration結合為一體的命令,把他們作為一個原子操作,這樣新版本的多選項set命令誕生了。然而這並沒有完全解決複雜的超時控制帶來的問題。

接下來,我們的一切討論都基於新版redis。

在這裡,我先提出幾個在實現redis分散式鎖中需要考慮的關鍵問題

1、死鎖問題;

1.1、為了防止死鎖,redis至少需要設定一個超時時間;

1.2、由1.1引申出來,當鎖自動釋放了,但是程式並沒有執行完畢,這時候其他執行緒又獲取到鎖執行同樣的程式,可能會造成併發問題,這個問題我們需要考慮一下是否歸屬於分散式鎖帶來問題的範疇。

2、鎖釋放問題,這裡會有兩個問題;

2.1、每個獲取redis鎖的執行緒應該釋放自己獲取到的鎖,而不是其他執行緒的,所以我們需要在每個執行緒獲取鎖的時候給鎖做上不同的標記以示區分;

2.2、由2.1帶來的問題是執行緒在釋放鎖的時候需要判斷當前鎖是否屬於自己,如果屬於自己才釋放,這裡涉及到邏輯判斷語句,至少是兩個操作在進行,那麼我們需要考慮這兩個操作要在一個原子內執行,否者在兩個行為之間可能會有其他執行緒插入執行,導致程式紊亂。

3、更可靠的鎖;

單例項的redis(這裡指只有一個master節點)往往是不可靠的,雖然實現起來相對簡單一些,但是會面臨著宕機等不可用的場景,即使在主從複製的時候也顯得並不可靠(因為redis的主從複製往往是非同步的)。

關於Martin Kleppmann的Redlock的分析

原文地址:https://redis.io/topics/distlock
中文地址:http://redis.cn/topics/distlock.html

文章分析得出,這種演算法只需具備3個特性就可以實現一個最低保障的分散式鎖。

  • 安全屬性(Safety property): 獨享(相互排斥)。在任意一個時刻,只有一個客戶端持有鎖。

  • 活性A(Liveness property A): 無死鎖。即便持有鎖的客戶端崩潰(crashed)或者網路被分裂(gets partitioned),鎖仍然可以被獲取。

  • 活性B(Liveness property B): 容錯。 只要大部分Redis節點都活著,客戶端就可以獲取和釋放鎖.

我們來分析一下:

第一點安全屬性意味著悲觀鎖(互斥鎖)是我們做redis分散式鎖的前提,否者將可能造成併發;

第二點表明為了避免死鎖,我們需要設定鎖超時時間,保證在一定的時間過後,鎖可以重新被利用;

第三點是說對於客戶端來說,獲取鎖和手動釋放鎖可以有更高的可靠性。

更進一步分析,結合上文提到的關鍵問題,這裡可以引申出另外的兩個問題:
  • 怎麼才能合理判斷程式真正處理的有效時間範圍?(這裡有個時間偏移的問題)

  • redis Master節點宕機後恢復(可能還沒有持久化到磁碟)、主從節點切換,(N/2)+1這裡的N應該怎麼動態計算更合理?

接下來再看,redis之父antirez對Redlock的評價

原文地址:http://antirez.com/news/101

文中主要提到了網路延遲和本地時鐘的修改(不管是時間伺服器或人為修改)對這種演算法可能造成的影響。

最後,來點實踐吧

I、傳統的單例項redis分散式鎖實現(關鍵步驟)

獲取鎖(含自動釋放鎖):

SET resource_name my_random_value NX PX 30000
 手動刪除鎖(Lua指令碼):

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

II、分散式環境的redis(多master節點)的分散式鎖實現

為了保證在儘可能短的時間內獲取到(N/2)+1個節點的鎖,可以並行去獲取各個節點的鎖(當然,並行可能需要消耗更多的資源,因為序列只需要count到足夠數量的鎖就可以停止獲取了);

另外,怎麼動態實時統一獲取redis master nodes需要更進一步去思考了。

QA,補充一下說明(以下為我與朋友溝通的情況,以說明文中大家可能不夠明白的地方):

1、在關鍵問題2.1中,刪除就刪除了,會造成什麼問題?

執行緒A超時,準備刪除鎖;但此時的鎖屬於執行緒B;執行緒B還沒執行完,執行緒A把鎖刪除了,這時執行緒C獲取到鎖,同時執行程式;所以不能亂刪。

2、在關鍵問題2.2中,只要在key生成時,跟執行緒相關就不用考慮這個問題了嗎?

不同的執行緒執行程式,執行緒之間肯雖然有差異呀,然後在redis鎖的value設定有執行緒資訊,比如執行緒id或執行緒名稱,是分散式環境的話加個機器id字首咯(類似於twitter的snowflake演算法!),但是在del命令只會涉及到key,不會再次檢查value,所以還是需要lua指令碼控制if(condition){xxx}的原子性。

3、那要不要考慮鎖的重入性?

不需要重入;try…finally 沒得重入的場景;對於單個執行緒來說,執行是序列的,獲取鎖之後必定會釋放,因為finally的程式碼必定會執行啊(只要進入了try塊,finally必定會執行)。

4、為什麼兩個執行緒都會去刪除鎖?(貌似重複的問題。不管怎樣,還是耐心解答吧)

每個執行緒只能管理自己的鎖,不能管理別人執行緒的鎖啊。這裡可以聯想一下ThreadLocal。

5、如果加鎖的執行緒掛了怎麼辦?只能等待自動超時?

看你怎麼寫程式的了,一種是問題3的回答;另外,那就自動超時咯。這種情況也適用於網路over了。

6、時間太長,程式異常就會蛋疼,時間太短,就會出現程式還沒有處理完就超時了,這豈不是很尷尬?

是呀,所以需要更好的衡量這個超時時間的設定。

實踐部分主要程式碼:

RedisLock工具類:

package com.caiya.cms.web.component;

import com.caiya.cache.CacheException;
import com.caiya.cache.redis.JedisCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
 * redis實現分散式鎖
 * 可實現特性:
 * 1、使多執行緒無序排隊獲取和釋放鎖;
 * 2、丟棄未成功獲得鎖的執行緒處理;
 * 3、只釋放執行緒本身加持的鎖;
 * 4、避免死鎖
 *
 * @author wangnan
 * @since 1.0
 */
public final class RedisLock {

    private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);

    /**
     * 嘗試加鎖(僅一次)
     *
     * @param lockKey       鎖key
     * @param lockValue     鎖value
     * @param expireSeconds 鎖超時時間(秒)
     * @return 是否加鎖成功
     * @throws CacheException
     */
    public static boolean tryLock(String lockKey, String lockValue, long expireSeconds) throws CacheException {
        JedisCache jedisCache = JedisCacheFactory.getInstance().getJedisCache();
        try {
            String response = jedisCache.set(lockKey, lockValue, "nx", "ex", expireSeconds);
            return Objects.equals(response, "OK");
        } finally {
            jedisCache.close();
        }
    }

    /**
     * 加鎖(指定最大嘗試次數範圍內)
     *
     * @param lockKey       鎖key
     * @param lockValue     鎖value
     * @param expireSeconds 鎖超時時間(秒)
     * @param tryTimes      最大嘗試次數
     * @param sleepMillis   每兩次嘗試之間休眠時間(毫秒)
     * @return 是否加鎖成功
     * @throws CacheException
     */
    public static boolean lock(String lockKey, String lockValue, long expireSeconds, int tryTimes, long sleepMillis) throws CacheException {
        boolean result;
        int count = 0;
        do {
            count++;
            result = tryLock(lockKey, lockValue, expireSeconds);
            try {
                TimeUnit.MILLISECONDS.sleep(sleepMillis);
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        } while (!result && count <= tryTimes);
        return result;
    }

    /**
     * 釋放鎖
     *
     * @param lockKey   鎖key
     * @param lockValue 鎖value
     */
    public static void unlock(String lockKey, String lockValue) {
        JedisCache jedisCache = JedisCacheFactory.getInstance().getJedisCache();
        try {
            String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
            Object result = jedisCache.eval(luaScript, 1, lockKey, lockValue);
//            Objects.equals(result, 1L);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            jedisCache.close();
        }
//        return false;
    }


    private RedisLock() {
    }

}

使用工具類的程式碼片段1:

        ...
        String lockKey = Constant.DEFAULT_CACHE_NAME + ":addItemApply:" + applyPriceDTO.getItemId() + "_" + applyPriceDTO.getSupplierId();// 跟業務相關的唯一拼接鍵
        String lockValue = Constant.DEFAULT_CACHE_NAME + ":" + System.getProperty("JvmId") + ":" + Thread.currentThread().getName() + ":" + System.currentTimeMillis();// 生成叢集環境中的唯一值
        boolean locked = RedisLock.tryLock(lockKey, lockValue, 100);// 只嘗試一次,在本次處理過程中直接拒絕其他執行緒的請求
        if (!locked) {
            throw new IllegalAccessException("您的操作太頻繁了,休息一下再來吧~");
        }
        try {
            // 開始處理核心業務邏輯
            Item item = itemService.queryItemByItemId(applyPriceDTO.getItemId());
            ...
            ...
        } finally {
            RedisLock.unlock(lockKey, lockValue);// 在finally塊中釋放鎖
        }

使用工具類的程式碼片段2:

        ...
        String lockKey = Constant.DEFAULT_CACHE_NAME + ":addItemApply:" + applyPriceDTO.getItemId() + "_" + applyPriceDTO.getSupplierId();
        String lockValue = Constant.DEFAULT_CACHE_NAME + ":機器編號:" + Thread.currentThread().getName() + ":" + System.currentTimeMillis();
        boolean locked = RedisLock.lock(lockKey, lockValue, 100, 20, 100);// 非公平鎖,無序競爭(這裡需要合理根據業務處理情況設定最大嘗試次數和每次休眠時間)
        if (!locked) {
            throw new IllegalAccessException("系統太忙,本次操作失敗");// 一般來說,不會走到這一步;如果真的有這種情況,並且在合理設定鎖嘗試次數和等待響應時間之後仍然處理不過來,可能需要考慮優化程式響應時間或者用訊息佇列排隊執行了
        }

        try {
            // 開始處理核心業務邏輯
            Item item = itemService.queryItemByItemId(applyPriceDTO.getItemId());
            ...
            ...
        } finally {
            RedisLock.unlock(lockKey, lockValue);
        }
        ...

附加:

基於redis的分散式鎖實現客戶端Redisson:

基於zookeeper的分散式鎖實現:

 

 

        ...
        String lockKey = Constant.DEFAULT_CACHE_NAME + ":addItemApply:" + applyPriceDTO.getItemId() + "_" + applyPriceDTO.getSupplierId();
        String lockValue = Constant.DEFAULT_CACHE_NAME + ":機器編號:" + Thread.currentThread().getName() + ":" + System.currentTimeMillis();
        boolean locked = RedisLock.lock(lockKey, lockValue, 100, 20, 100);// 非公平鎖,無序競爭(這裡需要合理根據業務處理情況設定最大嘗試次數和每次休眠時間)
        if (!locked) {
            throw new IllegalAccessException("系統太忙,本次操作失敗");// 一般來說,不會走到這一步;如果真的有這種情況,並且在合理設定鎖嘗試次數和等待響應時間之後仍然處理不過來,可能需要考慮優化程式響應時間或者用訊息佇列排隊執行了
        }

        try {
            // 開始處理核心業務邏輯
            Item item = itemService.queryItemByItemId(applyPriceDTO.getItemId());
            ...
            ...
        } finally {
            RedisLock.unlock(lockKey, lockValue);
        }
        ...