【請求去重】java基於分散式鎖解決重複請求問題
問題:開發app時,app快速連續點選會向伺服器連續發起請求,導致資料庫出現重複資料。
解決思路:
對使用者唯一標示+請求uri+請求引數進行去重。
1、利用jvm BlockingQueue堵塞佇列,來一條請求判斷是否存在佇列,不存在新增,存在去除。
優點:消耗資源較小。
缺點:在分散式下,請求會分發在不同伺服器上。
2、利用分散式鎖,redis、zokeeper等,進行加鎖。
優點:解決分散式下請求分發問題。
缺點:每次請求,需要操作一次分散式鎖的消耗。
考慮第一種在分散式下,將要改變nginx分發策略為根據ip進行分發請求,這樣將導致負載均衡作用降低。
採用第二種方式,寫起來比較簡單,並且redis這種高併發框架,這點消耗不算啥,缺點完全可以接受。
redis鎖工具類,需要spring的redisTemplate支援。
import org.springframework.data.redis.core.RedisTemplate; import java.util.concurrent.TimeUnit; public class RedisLock { /** 使用說明 * * //使用方法,建立RedisLock物件 * RedisLock lock = new RedisLock(redisTemplate, "lock_" + product.getId()); * try { * if (lock.lock()) {} * } finally { * lock.unlock(); */ private RedisTemplate<String, String> redisTemplate; /** * 重試時間 */ private static final int DEFAULT_ACQUIRY_RETRY_MILLIS = 100; /** * 鎖的字尾 */ private static final String LOCK_SUFFIX = "_redis_lock"; /** * 鎖的字首 */ private static final String LOCK_PREFIX = "REDIS_LOCK:"; /** * 鎖的key */ private String lockKey; /** * 鎖超時時間,防止執行緒在入鎖以後,防止阻塞後面的執行緒無法獲取鎖 */ private int expireMsecs = 60 * 1000; /** * 執行緒獲取鎖的等待時間 */ private int timeoutMsecs = 10 * 1000; /** * 是否鎖定標誌 */ private volatile boolean locked = false; /** * 構造器 * @param redisTemplate * @param lockKey 鎖的key */ public RedisLock(RedisTemplate<String, String> redisTemplate, String lockKey) { this.redisTemplate = redisTemplate; this.lockKey = LOCK_PREFIX + lockKey + LOCK_SUFFIX; } /** * 構造器 * @param redisTemplate * @param lockKey 鎖的key * @param expireMsecs 鎖的有效期 */ public RedisLock(RedisTemplate<String, String> redisTemplate, String lockKey, int expireMsecs) { this(redisTemplate, lockKey); this.expireMsecs = expireMsecs; } /** * 構造器 * @param redisTemplate * @param lockKey 鎖的key * @param timeoutMsecs 獲取鎖的超時時間 * @param expireMsecs 鎖的有效期 */ public RedisLock(RedisTemplate<String, String> redisTemplate, String lockKey, int expireMsecs , int timeoutMsecs) { this(redisTemplate, lockKey, expireMsecs); this.timeoutMsecs = timeoutMsecs; } public String getLockKey() { return lockKey; } /** * 封裝和jedis方法 * @param key * @return */ private String get(final String key) { Object obj = redisTemplate.opsForValue().get(key); return obj != null ? obj.toString() : null; } /** * 封裝和jedis方法 * @param key * @param value * @return */ private boolean setNX(final String key, final String value) { Boolean boo = redisTemplate.opsForValue().setIfAbsent(key, value); if(boo) redisTemplate.expire(key,expireMsecs + 1, TimeUnit.MILLISECONDS); return boo; } /** * 封裝和jedis方法 * @param key * @param value * @return */ private String getSet(final String key, final String value) { Object obj = redisTemplate.opsForValue().getAndSet(key,value); if(obj != null){ redisTemplate.expire(key,expireMsecs + 1,TimeUnit.MILLISECONDS); return (String) obj; } return null; } /** * 獲取鎖 * @return 獲取鎖成功返回ture,超時返回false * @throws InterruptedException */ public synchronized boolean lock() throws InterruptedException { int timeout = timeoutMsecs; while (timeout >= 0) { long expires = System.currentTimeMillis() + expireMsecs + 1; String expiresStr = String.valueOf(expires); //鎖到期時間 if (this.setNX(lockKey, expiresStr)) { locked = true; return true; } //redis裡key的時間 String currentValue = this.get(lockKey); //判斷鎖是否已經過期,過期則重新設定並獲取 if (currentValue != null && Long.parseLong(currentValue) < System.currentTimeMillis()) { //設定鎖並返回舊值 String oldValue = this.getSet(lockKey, expiresStr); //比較鎖的時間,如果不一致則可能是其他鎖已經修改了值並獲取 if (oldValue != null && oldValue.equals(currentValue)) { locked = true; return true; } } timeout -= DEFAULT_ACQUIRY_RETRY_MILLIS; //延時 Thread.sleep(DEFAULT_ACQUIRY_RETRY_MILLIS); } return false; } /** * 釋放獲取到的鎖 */ public synchronized void unlock() { if (locked) { redisTemplate.delete(lockKey); locked = false; } } }
起初我擔心用redis進行加鎖不是原子操作。
後lock方法實際測試,為原子操作。
在攔截器中取出請求url、請求引數、使用者唯一標示 當做鎖 在 preHandle 方法中加鎖 ,在afterCompletion方法中解鎖
如果在perHandle中獲取不到鎖,return false,
@Component public class TouristInterceptor implements HandlerInterceptor {. @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) { Enumeration<String> paramNames = request.getParameterNames(); while (paramNames.hasMoreElements()) { String paramName = (String) paramNames.nextElement(); String[] paramValues = request.getParameterValues(paramName); if (paramValues.length == 1) { String paramValue = paramValues[0]; if (paramValue.length() != 0) { logger.info(paramName + ":" + paramValue); } } } // 獲取訪問URL String url = request.getRequestURL().toString(); logger.info("訪問URL:{}", url); // 請求去重鎖 requestLock = restapiBaseInterceptor.getRequestLock(request); boolean lock = restapiBaseInterceptor.requestLock(requestLock); if ( lock) { logger.info("【請求鎖】獲取到鎖:{}",requestLock.getLockKey()); return true; } else { response.setCharacterEncoding("UTF-8"); response.setContentType("application/json; charset=utf-8"); if(!lock){ try { PrintWriter out = response.getWriter(); logger.error("【請求去重】重複請求:{}",requestLock.getLockKey()); out.append(Jsons.toJson(new MapleReturnBody(ReturnConstants.PLEASE_NOT_REPEAT))); } catch (IOException e) { e.printStackTrace(); } } return false; } } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { restapiBaseInterceptor.requestUnlock(requestLock); logger.info("【請求鎖】釋放鎖:{}",requestLock.getLockKey()); } }
我們認為在一個請求在3秒鐘沒有處理完,就當這個請求死了並釋放掉這個請求,我們把鎖的超時時間設定3秒,獲取所的等待時間設定為1秒。
這樣,當兩條同樣的請求,在第一條請求沒有返回資訊之前,第二條請求就會被鎖擋住。
當用戶登入時,我們可以取使用者唯一標示當做鎖,但是當用戶未登入時,怎麼防止遊客的重複請求呢?
思路:每個裝置在進入app時生成uuid,一直儲存到他離開app。uuid作為裝置唯一標示來限制遊客重複請求
問題:怎樣判斷請求是否是app發出的?這條請求是否已失效呢?
每個伺服器,再使用者登入的情況下,肯定有一套來判斷使用者身份的機制,但是如果未登入,怎麼防掉不是由app發出的請求呢?
app再發起請求時,帶上一個引數,內容為:固定字串-uuid-時間戳,舉例 :
csdn-51254720adc749fab81930c232c1f29f-1539410361628
然後通過AES演算法加密後傳給伺服器,伺服器拿到加密的aes後進行解密,分割 - 判斷第一段是不是csdn,最後一段與伺服器時間比較,是否超過時效時間範圍。
這裡有一個問題,app的時間戳與伺服器時間不一樣,解決方案是伺服器提供一個查詢時間戳介面,app初始化時查詢伺服器時間戳,用app時間減去伺服器時間得到時間差,把計算出的時間差存在本地,需要計算伺服器時間戳時,用app時間 減去 時間戳 得到伺服器時間
,給出公式:
app時間 - 伺服器時間 = 時間差
app時間 - 時間戳 = 伺服器時間