分散式鎖(redis\ip\database)的一些實現方式及在實際中的應用
阿新 • • 發佈:2019-01-25
分散式鎖,在分散式架構中的鎖的實現。接觸到的使用情形主要分為兩種情況:
一:該資源只允許一次訪問,請注意,此種情況更像是一種獨佔,即併發情況下,只有一個執行緒能獲取該資源,其他執行緒則放棄該資源。
二:該資源每次只允許一個執行緒訪問,其他執行緒會不斷嘗試獲取這個鎖,直到所有執行緒都訪問了這個資源。
1。redis實現分散式鎖
在redis中設定一個鎖名,和value值,再為這個鎖名設定超時時間。
每次執行緒來訪問先判斷,這個鎖名是否在redis中已經存在。如果存在則直接返回false表示該執行緒並未獲取到鎖。
設定超時時間,是為了防止死鎖(某一執行緒獲取鎖後未釋放該鎖)。
程式碼:
/** * 加鎖 * * @param lockName 鎖名 * @param timeout 超時時間 秒 * @return */ @Override public boolean lock(String lockName, long timeout) { String value = UUIDUtil.getUUID(); String key = "key"+lockName; System.out.println(Thread.currentThread().getName()+"正在初步獲取鎖!"); if(redisUtil.setIfAbsent(key,value)) { redisUtil.expire(key,timeout); return true; } return false; }
/** * 釋放鎖 * * @param lockName */ @Override public void unLock(String lockName) { redisUtil.del(lockName); }
redisUtils中的方法:
/** * 當且僅當key不存在時,set一個key為val的字串,返回true;若key存在,則什麼都不做返回false * * @param key * @param value * @return */ public boolean setIfAbsent(String key,String value){ return redisTemplate.opsForValue().setIfAbsent(key,value); }
/** * 指定快取失效時間 * @param key 鍵 * @param time 時間(秒) * @return */ public boolean expire(String key,long time){ try { if(time>0){ redisTemplate.expire(key, time, TimeUnit.SECONDS); } return true; } catch (Exception e) { e.printStackTrace(); return false; } }
/** * 刪除快取 * @param key 可以傳一個值 或多個 */ @SuppressWarnings("unchecked") public void del(String ... key){ if(key!=null&&key.length>0){ if(key.length==1){ redisTemplate.delete(key[0]); }else{ redisTemplate.delete(CollectionUtils.arrayToList(key)); } } }
2.ip鎖,通過獲取伺服器的ip和配置的可執行的ip是否一致,一致即獲取了鎖。及指定每次獲取資源的伺服器(這種其實就是把分散式變成了單機版)
public class IpLock implements ConcurrentLock { private String hostAddress = IPUtil.getHostAddress(); @Override public boolean lock(String lockName, long timeout) { String server_ip = PropertiesUtils.getApplicationProperties().getProperty("server_Ip"); if(StringUtils.isNotBlank(hostAddress)&&StringUtils.isNotBlank(server_ip)){ if(server_ip.equals(hostAddress)||server_ip.equals("localhost")){ return true; } } return false; } @Override public void unLock(String lockName) { } }
3.資料庫鎖,依賴資料庫的行級鎖,每次只能有一個執行緒獲取到update的鎖。如果更新成功表示獲取到了鎖。未更新成功即沒有拿到鎖。
public class dbLock implements ConcurrentLock { private static Logger logger = LoggerFactory.getLogger(dbLock.class); private DataSource dataSource; private ThreadLocal<Connection> connectionCatch = new ThreadLocal<>(); @Override public boolean lock(String lockName, long timeout) { Connection connection = null; PreparedStatement ps = null; String sql = "update tb_dataBase_lock t set t.lock_version = t.lock_version + 1, t.update_date = sysdate where t.lock_name = ? "; try { connection = dataSource.getConnection(); if(connection!=null&& !connection.isClosed()){ connectionCatch.set(connection); connection.setAutoCommit(false); PreparedStatement statement = connection.prepareStatement(sql); statement.setString(1,lockName); int i = statement.executeUpdate(); connection.close(); return i==1; } return false; } catch (SQLException e) { connectionCatch.remove(); logger.error(e.getMessage(),e); try { if(connection!=null&&!connection.isClosed()){ connection.close(); } if(ps !=null && ps.isClosed()){ ps.close(); } } catch (SQLException e1) { logger.error(e.getMessage(),e); } } return false; } @Override public void unLock(String lockName) { Connection connection = connectionCatch.get(); try { if(connection == null|| connection.isClosed())return; connection.commit(); connection.close(); }catch (Exception e){ logger.error(e.getMessage()); } connectionCatch.remove(); } public void setDataSource(DataSource dataSource) { this.dataSource = dataSource; }
4.zookeeperLock,利用zookeeper來實現分散式鎖。
未完待續........
實際應用:通過AOP來統一寫鎖的邏輯。
自定義註解作為切入點(pointCut)。
定義鎖的執行邏輯類作為通知(advice),具體可以實現methodInterceptor.
public class ConcurrentLockInterceptor implements MethodInterceptor { private Logger logger = LoggerFactory.getLogger(ConcurrentLockInterceptor.class); private Map<String,ConcurrentLock> locks; public void setLocks(Map<String, ConcurrentLock> locks) { this.locks = locks; } @Override public Object invoke(MethodInvocation invocation) throws Throwable { LockRequired lockRequired = invocation.getMethod().getAnnotation(LockRequired.class); if(lockRequired == null){ return invocation.proceed(); } String lockName = lockRequired.lockName(); if(StringUtils.isBlank(lockName)){ lockName = invocation.getThis().getClass().getSimpleName()+"."+invocation.getMethod().getName(); } ConcurrentLock lock = locks.get("redis"); if(lockRequired.strategy().equals(LockRequired.LockStrategy.IP)){ lock= locks.get("ip"); }else if(lockRequired.strategy().equals(LockRequired.LockStrategy.DB)){ lock = locks.get("db"); } boolean locked =false; try { locked = lock.lock(lockName, lockRequired.timeout()); if (locked) { logger.info("伺服器【" + IPUtil.getHostAddress() + "】獲取了鎖【" + lockName + "】"); invocation.proceed(); lock.unLock(lockName); } else { logger.info("伺服器【" + IPUtil.getHostAddress() + "】沒獲取到鎖【" + lockName + "】跳過執行方法!"); } }catch (Exception e){ logger.error(e.getMessage()); if(locked){ lock.unLock(lockName); logger.debug("伺服器["+IPUtil.getHostAddress()+"]成功釋放鎖["+lockName+"]"); } } return null; } }