redis 分布式鎖
阿新 • • 發佈:2018-06-19
runnable dom @param set tostring 執行 時間 integer oid
在分布式系統中,之前單一的用synchronized或lock已經不適用了。分布式鎖一般有三種實現方式:1. 數據庫樂觀鎖;2. 基於Redis的分布式鎖;3. 基於ZooKeeper的分布式鎖。本博客討論為第二種
代碼實現
現象:模擬多個線程去運算同一個數據 可以發現數據計算是不規則的
package com.zhcx.dispatch.test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ThreadTest { privatestatic int max = 10; public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); for(int i=0;i<15;i++){ threadPool.submit(new Runnable() { public void run() { int current = getMax();if(current>0){ max--; } } }); System.out.println(max); } } private static Integer getMax(){ return max; } }
輸出值為混亂的
解決方法:
創建redis鎖類
package com.zhcx.dispatch.test; importjava.util.Collections; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; @Component public class RedisLock { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; private static JedisPool pool = null; public static Jedis getSource(){ JedisPoolConfig config = new JedisPoolConfig(); // 設置最大連接數 config.setMaxTotal(200); // 設置最大空閑數 config.setMaxIdle(8); // 設置最大等待時間 config.setMaxWaitMillis(1000 * 100); // 在borrow一個jedis實例時,是否需要驗證,若為true,則所有jedis實例均是可用的 config.setTestOnBorrow(true); pool = new JedisPool(config, "127.0.0.0", 6379, 3000); return pool.getResource(); } /** * * @param jedis Redis客戶端 * @param lockKey 鎖 * @param requestId 請求標識 * @param expireTime 超期時間 * @return 是否獲取成功 */ public static boolean getLock(Jedis jedis,String lockKey,String requestId, int expireTime){ String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if(LOCK_SUCCESS.equals(result)){ return true; } return false; } /** * 釋放分布式鎖 * @param jedis Redis客戶端 * @param lockKey 鎖 * @param requestId 請求標識 * @return 是否釋放成功 */ public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) { String script = "if redis.call(‘get‘, KEYS[1]) == ARGV[1] then return redis.call(‘del‘, KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; } }
可以看到,我們加鎖就一行代碼:jedis.set(String key, String value, String nxxx, String expx, int time)
,這個set()方法一共有五個形參:
-
第一個為key,我們使用key來當鎖,因為key是唯一的。
-
第二個為value,我們傳的是requestId,通過給value賦值為requestId,我們就知道這把鎖是哪個請求加的了,在解鎖的時候就可以有依據。requestId可以使用
UUID.randomUUID().toString()
方法生成。 -
第三個為nxxx,這個參數我們填的是NX,意思是SET IF NOT EXIST,即當key不存在時,我們進行set操作;若key已經存在,則不做任何操作;
-
第四個為expx,這個參數我們傳的是PX,意思是我們要給這個key加一個過期的設置,具體時間由第五個參數決定。
-
第五個為time,與第四個參數相呼應,代表key的過期時間。
解鎖代碼,寫了簡單的Lua腳本代碼。我們將Lua代碼傳到jedis.eval()
方法裏,並使參數KEYS[1]賦值為lockKey,ARGV[1]賦值為requestId。eval()方法是將Lua代碼交給Redis服務端執行。
簡單來說,就是在eval命令執行Lua代碼的時候,Lua代碼將被當成一個命令去執行,並且直到eval命令執行完成,Redis才會執行其他命令。
我們加入到之前的線程中
package com.zhcx.dispatch.test; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class ThreadTest { private static int max = 10; synchronized public static void main(String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); for(int i=0;i<15;i++){ String requestId = UUID.randomUUID().toString(); String lockKey = "lock:"; RedisLock.getLock(RedisLock.getSource(), lockKey, requestId, 30); threadPool.submit(new Runnable() { public void run() { int current = getMax(); if(current>0){ max--; } } }); RedisLock.releaseDistributedLock(RedisLock.getSource(), lockKey, requestId); System.out.println(max); } } private static Integer getMax(){ return max; } }
查看輸出:
redis 分布式鎖