關於springboot2開機啟動CommandLineRunner阻塞執行導致無法正常訪問url
阿新 • • 發佈:2019-02-20
1、問題描述
現有類GroupExtTimeOutQueueRunner implements CommandLineRunner,
run方法如下:
public void run(String... args) { RPriorityBlockingQueue<CallCdr> blockingQueue = redissonClient.getPriorityBlockingQueue(RedisKeyPrefix.GROUP_EXTENSION_TIMEOUT_QUEUE); while (true) { CallCdr callCdr = null; //從redis的阻塞佇列中獲取元素,若無法獲取到元素 則執行緒掛起 // 一直阻塞在這裡 下面的業務方法不執行 callCdr = blockingQueue.take(); //執行業務方法 dosomething(callCdr ); } }
使用springboot2在開發環境上是完全ok的,但是部署到外部tomcat後,出現以下問題:
1、程式正常啟動,無任何報錯資訊
2、該開機啟動可以正常執行,但是當無法從佇列中獲取到元素的時候,執行緒掛起
原因分析:可能是由於獲取不到元素,一直阻塞著 導致的
解決方案:使用執行緒池
虛擬碼如下
執行緒類,run方法真正執行業務邏輯
package com.ps.uzkefu.apps.ctilink.handler; import com.baomidou.mybatisplus.mapper.EntityWrapper; import com.ps.uzkefu.apps.callcenter.entity.Router; import com.ps.uzkefu.apps.callcenter.service.ExtensionStatusService; import com.ps.uzkefu.apps.ctilink.ommodel.CallCdr; import com.ps.uzkefu.apps.ctilink.ommodel.CallCdrComparator; import com.ps.uzkefu.apps.ctilink.rediskey.CdrType; import com.ps.uzkefu.apps.ctilink.rediskey.EventType; import com.ps.uzkefu.apps.ctilink.rediskey.RedisKeyPrefix; import com.ps.uzkefu.apps.ctilink.routerHandler.BaseRouterHandler; import com.ps.uzkefu.apps.ctilink.routerHandler.ExtensionGroupRouterHandler; import com.ps.uzkefu.apps.ctilink.service.CallInQueueService; import com.ps.uzkefu.apps.oms.account.entity.User; import com.ps.uzkefu.apps.oms.account.service.UserService; import com.ps.uzkefu.common.ExecutionContext; import com.ps.uzkefu.common.UkConstant; import com.ps.uzkefu.utils.ExecuteOMAPI; import lombok.Getter; import lombok.Setter; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.redisson.api.RBucket; import org.redisson.api.RPriorityBlockingQueue; import org.redisson.api.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import java.util.Date; import java.util.Map; import java.util.Objects; /** * Author:ZhuShangJin * Date:2018/6/26 * */ @Getter @Setter public class QueueHandler implements Runnable{ public Logger logger = LoggerFactory.getLogger(this.getClass()); RedissonClient redissonClient; ExtensionStatusService extensionStatusService; UserService userService; CallInQueueService callInQueueService; @Override public synchronized void run() { logger.error("2222222222222222222222222222222"); try { RPriorityBlockingQueue<CallCdr> blockingQueue = redissonClient.getPriorityBlockingQueue(RedisKeyPrefix.GROUP_EXTENSION_TIMEOUT_QUEUE); blockingQueue.trySetComparator(new CallCdrComparator()); boolean jj = (blockingQueue==null); while (true) { CallCdr callCdr = null; try { callCdr = blockingQueue.take(); String to = callCdr.getToNum(); String from = callCdr.getFromNum(); String visitorId = callCdr.getVistorId(); String key = from + "_" + to + "_" + visitorId; RBucket<CallCdr> bucket = redissonClient.getBucket(RedisKeyPrefix.CALL_VISITOR_CDR + key); CallCdr getByKeyCallCdr = bucket.get(); RBucket<Router> routerRBucket = redissonClient.getBucket(RedisKeyPrefix.ROUTER + to); Router router = routerRBucket.get(); // 正常進入佇列 可以接聽的 if (getByKeyCallCdr != null && !getByKeyCallCdr.isHangup() && Objects.equals(CdrType.NO_TIME_OUT, callCdr.getTimeOutType())) { //查詢空閒坐席 String extNum = extensionStatusService.getFreeExtensionNum(getByKeyCallCdr.getGroupNum()+"", getByKeyCallCdr.getStrategy()); // extNum = null; // extNum = getByKeyCallCdr.getExtensionNum(); // if (Objects.equals(getByKeyCallCdr.getLastTimeOutExt(), "2004")) { // extNum = "2003"; // } else { // extNum = "2004"; // } //無空閒坐席 logger.debug("是否正在播放排隊===ivr:"+getByKeyCallCdr.isHasPlayQueueVoice()); if (StringUtils.isBlank(extNum)) { if (!getByKeyCallCdr.isHasPlayQueueVoice()) { getByKeyCallCdr.setHasPlayQueueVoice(true); getByKeyCallCdr.setMenuId(UkConstant.EXTENSION_GROUP_TIP_MENU_ID); String voiceId = router.getExtensionGroups().get(0).getVoices().get(0).getVoiceName(); logger.debug("播放排隊音樂"+voiceId); // 播放排隊音樂 ExecuteOMAPI.turnToIvr(getByKeyCallCdr.getPbxUrl(), getByKeyCallCdr.getVistorId(), UkConstant.EXTENSION_GROUP_TIP_MENU_ID, voiceId); // todo 真正的排隊 corpcode groupnum 在排隊超時出隊的時候刪除相應的元素 在來掛機的時候刪除相應元素 RPriorityBlockingQueue<CallCdr> blockingQueueByCorp = redissonClient.getPriorityBlockingQueue(RedisKeyPrefix.CALL_VISITOR_QUEUE+getByKeyCallCdr.getCorpCode()+":"+getByKeyCallCdr.getGroupNum()); CallCdr inQueueCall = new CallCdr(); inQueueCall.setCorpCode(getByKeyCallCdr.getCorpCode()); inQueueCall.setGroupNum(getByKeyCallCdr.getGroupNum()); inQueueCall.setRedisKey(getByKeyCallCdr.getRedisKey()); inQueueCall.setFromNum(getByKeyCallCdr.getFromNum()); inQueueCall.setToNum(getByKeyCallCdr.getToNum()); inQueueCall.setVistorId(getByKeyCallCdr.getVistorId()); blockingQueueByCorp.put(inQueueCall); } // todo 統計排隊資訊 入隊 組內分機全忙碌 真正的排隊 重回佇列 不做任何改變 logger.debug("是否正在播放排隊ivr:"+getByKeyCallCdr.isHasPlayQueueVoice()); bucket.set(getByKeyCallCdr); blockingQueue.put(getByKeyCallCdr); if (getByKeyCallCdr.getGroupNum() > 0) { callInQueueService.inQueue(getByKeyCallCdr.getCorpCode(), getByKeyCallCdr.getGroupNum() + "", from, getByKeyCallCdr.getQueueTime()); } } else { //查找出最先進入佇列中的呼入 和getByKeyCallCdr 進行對比 是否為同一個呼入 若是 則接聽getByKeyCallCdr //若不是 則把getByKeyCallCdr 放回佇列 直到 找到最先進入佇列中的呼入 todo RPriorityBlockingQueue<CallCdr> blockingQueueByCorp = redissonClient.getPriorityBlockingQueue(RedisKeyPrefix.CALL_VISITOR_QUEUE+getByKeyCallCdr.getCorpCode()+":"+getByKeyCallCdr.getGroupNum()); CallCdr realToAnswer = blockingQueueByCorp.take(); if (!Objects.equals(getByKeyCallCdr.getRedisKey(),realToAnswer.getRedisKey())){ blockingQueue.put(getByKeyCallCdr); // 不是同一個呼入 String realTo = realToAnswer.getToNum(); String realFrom = realToAnswer.getFromNum(); String realVisitorId = realToAnswer.getVistorId(); String realKey = realFrom + "_" + realTo + "_" + realVisitorId; RBucket<CallCdr> realBucket = redissonClient.getBucket(RedisKeyPrefix.CALL_VISITOR_CDR + realKey); getByKeyCallCdr = realBucket.get(); if (getByKeyCallCdr == null){ continue; } } //有空閒坐席 撥打電話到分機 //查詢真正應該接的 corpcode groupnum todo 注意出隊 User user = userService.selectOne(new EntityWrapper<User>().eq("extension", extNum).eq(ExecutionContext.CORP_CODE, getByKeyCallCdr.getCorpCode())); // 設定下一個振鈴的坐席的姓名 getByKeyCallCdr.setUserName(user.getUserName()); // 設定下一個振鈴的坐席id getByKeyCallCdr.setCreater(user.getId()); //如果已有 分機振鈴超時未接 設定 下一個要振鈴的分機 if (getByKeyCallCdr.isHasRingExt()) { getByKeyCallCdr.setNextExt(extNum); } // 設定要在分機振鈴時 加入到振鈴超時佇列 getByKeyCallCdr.setNeedPutToExtTimeOutQueue(true); //向分機呼叫 String result = ExecuteOMAPI.turnToExt(getByKeyCallCdr.getPbxUrl(), visitorId, extNum); logger.debug("呼叫分機========" + extNum); //修改狀態 為正在呼叫 分機中 getByKeyCallCdr.setCalling(true); // 出隊 計算排隊的時間 int queueLength =(int) ((new Date().getTime() - getByKeyCallCdr.getManyInQueueTime().getTime())/1000); int alredayQueueLength = getByKeyCallCdr.getQueueTimeLength(); getByKeyCallCdr.setQueueTimeLength(queueLength+alredayQueueLength); // 設定出佇列的時間 也就是分機振鈴的時間 getByKeyCallCdr.setManyOutQueueTime(new Date()); bucket.set(getByKeyCallCdr); } } // 分機振鈴超時 if (Objects.equals(CdrType.EXT_TIME_OUT, callCdr.getTimeOutType()) && getByKeyCallCdr != null && !getByKeyCallCdr.isHangup() && Objects.equals(EventType.RING, getByKeyCallCdr.getLastEvent())) { //分機 振鈴超時 重新進入佇列 BaseRouterHandler routerHandler = new ExtensionGroupRouterHandler(router, getByKeyCallCdr); routerHandler.extRingTimeOut(); } //排隊超時 if (Objects.equals(CdrType.GROUP_TIME_OUT, callCdr.getTimeOutType()) && getByKeyCallCdr != null && (callCdr.getInQueueCnt() == getByKeyCallCdr.getInQueueCnt()) && !Objects.equals(EventType.ANSWER, getByKeyCallCdr.getLastEvent())) { //排隊超時 執行掛機 todo 超時的選單id 和語音檔名 // todo 統計排隊資訊 出隊 logger.debug("from:"+from+"to:"+to+"第"+getByKeyCallCdr.getInQueueCnt()+"次排隊超時,掛機"); String result = ExecuteOMAPI.turnToIvr(getByKeyCallCdr.getPbxUrl(), visitorId, UkConstant.HANG_MENU_ID, getByKeyCallCdr.getQueueTimeOutVoiceName()); getByKeyCallCdr.setHangup(true); getByKeyCallCdr.setNeedSaveCdr(true); getByKeyCallCdr.setMenuId(UkConstant.HANG_MENU_ID); bucket.set(getByKeyCallCdr); //排隊超時 移除 corpcode+groupNum佇列中真正排隊的來電 RPriorityBlockingQueue<CallCdr> blockingQueueByCorp = redissonClient.getPriorityBlockingQueue(RedisKeyPrefix.CALL_VISITOR_QUEUE+getByKeyCallCdr.getCorpCode()+":"+getByKeyCallCdr.getGroupNum()); CallCdr inQueueCall = new CallCdr(); inQueueCall.setCorpCode(getByKeyCallCdr.getCorpCode()); inQueueCall.setGroupNum(getByKeyCallCdr.getGroupNum()); inQueueCall.setRedisKey(getByKeyCallCdr.getRedisKey()); inQueueCall.setFromNum(getByKeyCallCdr.getFromNum()); inQueueCall.setToNum(getByKeyCallCdr.getToNum()); inQueueCall.setVistorId(getByKeyCallCdr.getVistorId()); if (blockingQueueByCorp.contains(inQueueCall)){ blockingQueueByCorp.remove(inQueueCall); } callInQueueService.outQueue(getByKeyCallCdr.getCorpCode(), getByKeyCallCdr.getGroupNum() + "", from); } if (Objects.equals(CdrType.IVR_KEY_TIME_OUT, callCdr.getTimeOutType()) && getByKeyCallCdr != null && !getByKeyCallCdr.isPressKey()) { // 按鍵超時 Map<String, String> keyResourceTypeIdMap = getByKeyCallCdr.getKeyResourceTypeIdMap(); String jumpType = null; String jumpResourceId = null; if (MapUtils.isEmpty(keyResourceTypeIdMap)) { //如果IRV沒有設定按鍵選單,直接走無按鍵選單超時跳轉 jumpType = getByKeyCallCdr.getKeyTimeoutJumpType(); jumpResourceId = getByKeyCallCdr.getKeyTimeoutJumpId(); getByKeyCallCdr.setIvrPlayCnt(0); } else { //按鍵超時,播放當前ivr jumpType = getByKeyCallCdr.getJumpType(); jumpResourceId = getByKeyCallCdr.getVoiceId(); } getByKeyCallCdr.setJumpType(jumpType); getByKeyCallCdr.setJumpResourceId(jumpResourceId); BaseRouterHandler routerHandler = null; routerHandler = new EventHandler().getHandler(getByKeyCallCdr, jumpType, jumpResourceId, router, routerHandler); getByKeyCallCdr = routerHandler.routerHandler(); bucket.set(getByKeyCallCdr); routerRBucket.set(router); } } catch (InterruptedException e) { e.printStackTrace(); } } } catch (Exception e) { e.printStackTrace(); } } }
開機啟動類,通過執行緒池來執行業務邏輯
package com.ps.uzkefu.apps.ctilink.init; /** * Author:ZhuShangJin * Date:2018/9/13 */ import com.baomidou.mybatisplus.mapper.EntityWrapper; import com.ps.uzkefu.apps.callcenter.entity.Router; import com.ps.uzkefu.apps.callcenter.service.ExtensionStatusService; import com.ps.uzkefu.apps.ctilink.handler.EventHandler; import com.ps.uzkefu.apps.ctilink.handler.QueueHandler; import com.ps.uzkefu.apps.ctilink.ommodel.CallCdr; import com.ps.uzkefu.apps.ctilink.ommodel.CallCdrComparator; import com.ps.uzkefu.apps.ctilink.rediskey.CdrType; import com.ps.uzkefu.apps.ctilink.rediskey.EventType; import com.ps.uzkefu.apps.ctilink.rediskey.RedisKeyPrefix; import com.ps.uzkefu.apps.ctilink.routerHandler.BaseRouterHandler; import com.ps.uzkefu.apps.ctilink.routerHandler.ExtensionGroupRouterHandler; import com.ps.uzkefu.apps.ctilink.service.CallInQueueService; import com.ps.uzkefu.apps.oms.account.entity.User; import com.ps.uzkefu.apps.oms.account.service.UserService; import com.ps.uzkefu.common.ExecutionContext; import com.ps.uzkefu.common.UkConstant; import com.ps.uzkefu.utils.ExecuteOMAPI; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.redisson.api.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.util.Date; import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by pangkunkun on 2017/9/3. * 超時 振鈴超時 排隊超時 按鍵超時 */ @Component @Order(2) public class GroupExtTimeOutQueueRunner implements CommandLineRunner { @Autowired RedissonClient redissonClient; @Autowired ExtensionStatusService extensionStatusService; @Autowired UserService userService; @Autowired CallInQueueService callInQueueService; private static ExecutorService queueThreadPool = Executors.newFixedThreadPool(50); @Override public void run(String... args) { QueueHandler queueHandler = new QueueHandler(); queueHandler.setCallInQueueService(callInQueueService); queueHandler.setRedissonClient(redissonClient); queueHandler.setExtensionStatusService(extensionStatusService); queueHandler.setUserService(userService); queueThreadPool.execute(queueHandler); } }