併發程式設計經歷:同步加鎖之業務鎖
分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow
也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!
業務鎖
在處理併發問題時,很多情況下需要用到業務鎖來達到按照某個維度同步執行業務塊。
例子:
@Override@Transactional(rollbackFor = Exception.class, noRollbackFor = TerminateException.class)public ApplyDO submitApply(ApplyDOapplyDO) { LockResultEnum lockResultEnum =null; String lockName = new StringBuffer().append(applyDO.getSite()).append("_").append(applyDO.getSiteMemId()).toString(); try { //加鎖 lockResultEnum = lockManager.getLock(lockName, LockTypeEnum.APPLY_LOCK.getCode()); if (LockResultEnum.沒有獲取到鎖.equals(lockResultEnum)){ throw new BizException(ErrorCode.LOCK_FAIL); } … returnapplyDO; } catch (TerminateExceptione) { throwe; } catch (BizExceptione) { throw new BizException(e.getErrorCode(),e); } catch (Exceptione) { throw new BizException(ErrorCode.GENERIC_ERROR,e); } finally { //釋放鎖 lockManager.releaseLock(lockName, LockTypeEnum.APPLY_LOCK.getCode(),lockResultEnum); }}
LockManager的getLock方法實現如下:
@Overridepublic LockResultEnum getLock(StringlockName,StringlockType){ if(StringUtil.isEmpty(lockName)){ LOG.error("getLock()引數為空,param:" +lockName); throw new BizException(ErrorCode.ILLEGAL_ARGUMENT,"引數為空!"); } //只是生成一個數據庫鎖名,純粹的字串拼接過程 String lockName_ = getDBLockName(lockName,lockType); booleanisGetDbLocked =lockDao.getDbLock(lockName_); if (isGetDbLocked) { LockDO lock = lockDao.getRowLockByName(lockName); if (lock !=null){ return LockResultEnum.獲取鎖成功; } else { return LockResultEnum.僅資料庫鎖; } } else { LOG.warn("獲取鎖【" +lockName_+"】失敗"); return LockResultEnum.沒有獲取到鎖; }}
LockManager的releaseLock方法實現如下:
@Overridepublic void releaseLock(StringlockName,StringlockType,LockResultEnumlockResultEnum) { String lockName_ = getDBLockName(lockName,lockType); if (StringUtil.isEmpty(lockName)) { LOG.error("releaseLock()引數為空,lockName:{}",lockName); throw new BizException(ErrorCode.ILLEGAL_ARGUMENT,"引數為空!"); } if (LockResultEnum.獲取鎖成功.equals(lockResultEnum)|| LockResultEnum.僅資料庫鎖.equals(lockResultEnum)) { booleanisReleased =lockDao.releaseDbLock(lockName_); if (!isReleased) { LOG.warn("釋放鎖【" +lockName_+"】失敗"); } } else { LOG.debug("不需要釋放鎖【" +lockName_+"】"); } }
LockDao的實現如下:
@Overridepublic boolean getDbLock(String lockCode){ Long lock = (Long)super.getSqlMapClientTemplate().queryForObject("LockDO.getLockDbByCode",lockCode); booleanresult = (lock !=null&&lock.longValue()== 1) ? true:false; returnresult;} @Override public boolean releaseDbLock(String lockCode) { Long lock = (Long)super.getSqlMapClientTemplate().queryForObject("LockDO.releaseLockDbByCode",lockCode); booleanresult = (lock !=null&&lock.longValue()== 1) ? true:false; returnresult; } @Override public LockDO getRowLockByName(Stringname) { return (LockDO)super.getSqlMapClientTemplate().queryForObject("LockDO.selectForUpdateByLockName",name);}
LockDao對應sqlMap檔案裡的執行sql如下:
<selectid="selectForUpdateByLockName"resultMap="jobLockMap" parameterClass="java.lang.String" > select ID, NAME, REMARK, IS_ENABLED from VENUS_LOCK where NAME = #value# and IS_ENABLED = 'y' FOR UPDATE </select> <!-- 通過指定的程式碼取得操作資料鎖--> <selectid="getLockDbByCode"resultClass="java.lang.Long"parameterClass="string"> <![CDATA[ select get_lock(#value#, 0) as tolock; ]]> </select> <!-- 通過指定的程式碼釋放操作資料鎖--> <selectid="releaseLockDbByCode"resultClass="java.lang.Long"parameterClass="string"> <![CDATA[ select release_lock(#value#) as torelease; ]]> </select>
通過以上程式碼可以很清楚的看出原理了。貸款申請提交時,為了防止一個人同時提交多筆,要按照以人維度進行業務鎖的加鎖處理。加鎖邏輯就是鎖名和人直接掛鉤(就是鎖名裡有可以直接區分人的欄位),通過執行sql:select get_lock(#鎖名#, 0) as tolock;來獲取資料庫鎖,如果獲取成功,返回1。這裡還去獲取了一下行鎖,獲取的行鎖它鎖住的是venus_lock表的符合where條件的那些行,執行sql: select ID, NAME, REMARK,IS_ENABLED from VENUS_LOCK where NAME = #鎖名#and IS_ENABLED = 'y' FOR UPDATE;這裡行鎖是否獲取成功其實都沒有關係。獲取到鎖之後就可以執行業務邏輯了,執行完一定要釋放鎖,執行sql:select release_lock(#鎖名#) as torelease;為了保證釋放鎖操作一定執行,一般在finally子句中執行它即可。通過以上的步驟,當一個人同時申請多筆時,鎖名是一樣的,所以獲取到鎖後返回值就是1、2、3…具體看你是第幾個獲取的了,只有第一個獲取的返回值是1,從lockDao .getDbLock裡的booleanresult = (lock !=null&&lock.longValue()== 1) ? true:false;就可以看出,只有第一個可以執行業務邏輯,其他就認為是沒有獲取到鎖而丟擲異常終止執行:if (LockResultEnum.沒有獲取到鎖.equals(lockResultEnum)){thrownewBizException(ErrorCode.LOCK_FAIL); }
還有一個例子:
下面的是任務分發器,它實現了Runnable介面,在任務分發器執行時會去獲取各種非同步任務型別的待執行任務列表,這裡也用到了業務鎖,呼叫的和上面的一樣都是lockManager.getLock(...)方法。
public class JobDispatcher implements Runnable { private static final Logger LOG = LoggerFactory.getLogger("applyCenterJobLog"); /** 守護執行緒名稱 */ private String name; /** 一天秒數 */ private static final long ONE_DAY_SEC = 24 * 60 * 60; /** 執行緒池佇列長度 */ private int queueSize = 5; /** 初始處理執行緒數 */ private int coreSize = 5; /** 最大處理執行緒數 */ private int maxSize = 5; /** 空閒執行緒最大閒置時間 */ private long keepAliveTime = ONE_DAY_SEC; /** 執行緒池接收新任務閥值 */ private int hungrySize = 2; /** 分發器執行狀態標記 */ private boolean isRunning = true; /** 無命令處理時休息時常(毫秒) */ private long noCmdSleepMillis = 1000; /** 出現系統異常時休息時常(毫秒),防止把系統拖垮 */ private long errorCmdSleepMillis = 10000; private JobManager jobManager; /** handler產生工廠類 */ private JobHandlerFactory jobHandlerFactory; private List<String> jobTypeList; /** * spring init */ public void init() { LOG.info("分發器【" + name + "】init!!!!!"); jobTypeList = jobHandlerFactory.getJobTypeList(); } /** * spring destroy */ public void destroy() { LOG.warn("收到分發器【" + name + "】停止通知!!!!!"); isRunning = false; } @Override public void run() { LOG.info("分發器【" + name + "】啟動ing..."); BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(queueSize); ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, keepAliveTime, TimeUnit.SECONDS, queue); while (isRunning) { try { int i = 0; if (queue.size() < hungrySize) { for (String jobType : jobTypeList) { List<JobDO> jobDOList = jobManager.assignJob(jobType, queueSize - queue.size()); for (JobDO jobDO : jobDOList) { i++; JobHandler<JobDO> tmpJobHandler = jobHandlerFactory.getHandler(jobDO); ExecuteJobThread<JobDO> executeCmdThread = new ExecuteJobThread<JobDO>(jobDO, tmpJobHandler); executor.execute(executeCmdThread); } } } else { ThreadUtil.sleep(noCmdSleepMillis, LOG); } if (i == 0) { ThreadUtil.sleep(noCmdSleepMillis, LOG); } else { i = 0; } } catch (Exception e) { LOG.error("dispacher 排程異常" + e.getMessage(), e); ThreadUtil.sleep(errorCmdSleepMillis, LOG); } } executor.shutdown(); } /** * 執行分發 */ public void dispatcher() { Thread thread = new Thread(this); isRunning = true; thread.start(); }...//一些set方法}
jobManager的assignJob方法如下:
public List<JobDO> assignJob(String jobType, int jobNum) { if (StringUtil.isBlank(jobType) || jobNum <= 0) { LOG.error("assignJob()引數非法jobType:{},jobNum:{}", jobType, jobNum); throw new BizException(ErrorCode.ILLEGAL_ARGUMENT, "引數非法!"); } LockResultEnum lockResultEnum = null; try { /** 1、獲取業務鎖 */ //這裡呼叫的lockManager.getLock(...)就是之前例子裡的 lockResultEnum = lockManager.getLock(jobType, LockTypeEnum.JOB_LOCK.getCode()); if (!LockResultEnum.獲取鎖成功.equals(lockResultEnum)) {//返回emptylist,dispatcher會sleep一定時間,可配置 return new ArrayList<JobDO>(0); } return doAssignJob(jobType, jobNum); } catch (Exception e) { LOG.warn("獲取鎖失敗", e); } finally { lockManager.releaseLock(jobType, LockTypeEnum.JOB_LOCK.getCode(), lockResultEnum); } return new ArrayList<JobDO>(0); }
從上可見,這次是要獲取資料庫鎖和行鎖都成功才行: if (!LockResultEnum.獲取鎖成功.equals(lockResultEnum)) {return new ArrayList<JobDO>(0);}
所以需要在venus_lock表中有對應任務型別的資料,才能使sql:select ID, NAME, REMARK,IS_ENABLED from VENUS_LOCK where NAME = #鎖名#and IS_ENABLED = 'y' FOR UPDATE;執行成功,獲取到行鎖。