1. 程式人生 > >flowable 集成mongodb

flowable 集成mongodb

string div .get rabbit new fin container ESS 適合

學無止境,活到老學到老,每天都問自己進步了嗎?

第一:背景

由於公司每天有至少1500個表單發起,處理待辦任務至少7000個,累計歷史任務數據已經達到200多w條,時間一長,通過數據庫查詢已辦的任何和我發起的流程巨慢

所以我們考慮到這些數據能不能放入ES或者是mongodb中

流程中心1.0版本集成的是ES,速度確實非常快,提升查詢性能近萬倍,但是由於ES是一個全文檢索的系統,對我們這些業務數據來說,不是很適合,比方說

我們的表單數據,他直接給分詞了,不符合業務的要求。

流程中心2.0版本我們就改成mongodb,速度一樣達到es的查詢效果,解決了以前分詞的問題,而且數據結構化查詢也非常方便。

第二:集成mongodb策略

集成mongodb的策略有兩種

1.利用flowable提供的mongodb的插件來集成,具體可以參考他們提供的demo

2.通過流程實例id和任務id查詢表數據,做加工處理,然後在通過消息隊列的方式同步到mongodb中

第一種方式就相當於把歷史表的數據全部搬到mongodb中,在關系型數據庫中不存放任何歷史數據,正是由於中原因,我擔心歷史數據的丟失,所以我采用的是

第二種策略,關系數據庫中有一份數據,在mongodb中我也有一份加工後的數據,以保證萬無一失。

第三:具體實現

3.1 同步數據

@Component
@RabbitListener(queues 
= FlowConstant.FLOWABLE_HISTORY_DATAS, containerFactory = "rabbitListenerContainerFactory") public class HisDataSyncReceiver { private static Logger logger = LoggerFactory.getLogger(HisDataSyncReceiver.class); @Autowired private ISyncHisService syncHisService; @RabbitHandler
public void process(@Payload String message) { if (StringUtils.isNotBlank(message)) { logger.info("接受到的數據為:" + message); try { Thread.sleep(100); SynHisDataVo data = (SynHisDataVo) JsonUtils.jsonToObj(message, SynHisDataVo.class); String type = data.getType(); String id = data.getId(); String processStaus = data.getProcessStatus(); if (type.equals(SynHisDataEnum.EXECUTION.getSn())) { syncHisService.syncExecution(id,processStaus); } else if (type.equals(SynHisDataEnum.TASK.getSn())) { syncHisService.syncTask(id,processStaus); } else { logger.error("歷史隊列中錯誤的數據" + message, message); } } catch (Exception e) { logger.error("歷史隊列中錯誤的數據"+ message, e); } } } }

3.2 查詢mongodb的任務數據

public PagerModel<SearchTaskVo> querySearchTaskVoPagerModel(QueryTaskVo params, Query query) {
        org.springframework.data.mongodb.core.query.Query queryParams = new org.springframework.data.mongodb.core.query.Query();
        queryParams.addCriteria(Criteria.where("endTime").exists(true));
        if (StringUtils.isNotBlank(params.getAssignee())) {
            queryParams.addCriteria(Criteria.where("assignee").is(params.getAssignee().trim()));
        }
        if (StringUtils.isNotBlank(params.getCreator())) {
            queryParams.addCriteria(Criteria.where("creator").is(params.getCreator().trim()));
        }
        if (StringUtils.isNotBlank(params.getCreateName())) {
            queryParams.addCriteria(Criteria.where("creatorName").is(params.getCreateName().trim()));
        }
        if (StringUtils.isNotBlank(params.getExecutionName())) {
            Pattern pattern = Pattern.compile("^.*" + params.getExecutionName().trim() + ".*$", Pattern.CASE_INSENSITIVE);
            //通過流程標題或流程編號復合查詢
            Criteria criteriaNameOrKey = new Criteria();
            criteriaNameOrKey.orOperator(Criteria.where("procInstName").regex(pattern),Criteria.where("businessKey").regex(pattern),Criteria.where("creatorName").regex(pattern));
            queryParams.addCriteria(criteriaNameOrKey);
        }
        if (params.getProcessStatus() != null) {
            queryParams.addCriteria(Criteria.where("processStatus").is(params.getProcessStatus().trim()));
        }
        if (StringUtils.isNotBlank(params.getStartTime()) && StringUtils.isNotBlank(params.getEndTime())) {
            queryParams.addCriteria(Criteria.where("startTime").gte(params.getStartTime().trim()).lte(params.getEndTime().trim()));
        }
        if (StringUtils.isNotBlank(params.getSystemSn())) {
            queryParams.addCriteria(Criteria.where("systemSn").is(params.getSystemSn().trim()));
        }
        queryParams.with(new Sort(Sort.Direction.DESC, "endTime"));
        int i = query.getInitPageIndex() < 1 ? 1 : query.getInitPageIndex();
        queryParams.skip((i - 1) * query.getPageSize()).limit(query.getPageSize());
        List<SearchTaskVo> datas = mongoTemplate.find(queryParams, SearchTaskVo.class, SearchTaskVo.TASK_TABLE);
        long count = mongoTemplate.count(queryParams, SearchTaskVo.TASK_TABLE);
        //去重
        datas = this.removal(datas);
        //翻譯轉閱
        this.turnReadTaskData(datas);
        //排序
        if (MapUtils.isNotEmpty(params.getOrderbyMap())){
            this.taskCommonSort(params.getOrderbyMap(),datas);
        }
        PagerModel<SearchTaskVo> pm = new PagerModel<>(count, datas);
        return pm;
    }

3.3.查詢mongodb的實例數據

public PagerModel<SearchExecutionVo> querySearchExecutionVoPagerModel(QueryTaskVo params, com.dragon.tools.pager.Query query) {
        Query queryParams = new Query();
        //創建者工號
        if (StringUtils.isNotBlank(params.getCreator())) {
            queryParams.addCriteria(Criteria.where("startUser").is(params.getCreator()));
        }
        //創建者姓名
        if (StringUtils.isNotBlank(params.getCreateName())) {
            queryParams.addCriteria(Criteria.where("creatorName").is(params.getCreateName()));
        }
        if (StringUtils.isNotBlank(params.getProcessDefinitionKey())) {
            queryParams.addCriteria(Criteria.where("proDefKey").is(params.getProcessDefinitionKey()));
        }else {
            //轉閱流程在已發不讓顯示
            queryParams.addCriteria(Criteria.where("proDefKey").ne(FlowConstant.TURN_READ));
        }
        if (StringUtils.isNotBlank(params.getExecutionName())) {
            Pattern pattern = Pattern.compile("^.*" + params.getExecutionName() + ".*$", Pattern.CASE_INSENSITIVE);
           // queryParams.addCriteria(Criteria.where("name").regex(pattern));
            // queryParams.addCriteria(Criteria.where("businessKey").regex(pattern));
            //通過流程標題或流程編號復合查詢
            Criteria criteriaNameOrKey= new Criteria();
            criteriaNameOrKey.orOperator(Criteria.where("name").regex(pattern),
                    Criteria.where("businessKey").regex(pattern));
            queryParams.addCriteria(criteriaNameOrKey);


        }
        if (StringUtils.isNotBlank(params.getProcessStatus())) {
            queryParams.addCriteria(Criteria.where("processStatus").is(params.getProcessStatus().trim()));
        }
        if (StringUtils.isNotBlank(params.getStartTime()) && StringUtils.isNotBlank(params.getEndTime())) {
            queryParams.addCriteria(Criteria.where("startTime").gte(params.getStartTime().trim()).lte(params.getEndTime().trim()));
        }
        if (StringUtils.isNotBlank(params.getSystemSn())) {
            queryParams.addCriteria(Criteria.where("systemSn").is(params.getSystemSn().trim()));
        }
        //默認開始時間降序
        queryParams.with(new Sort(Sort.Direction.DESC, "startTime"));
        queryParams.skip(((query.getInitPageIndex()==0?1:query.getInitPageIndex())-1)*query.getPageSize()).limit(query.getPageSize());
        List<SearchExecutionVo> datas = mongoTemplate.find(queryParams, SearchExecutionVo.class, SearchExecutionVo.EXECUTION_TABLE);
        //當前審批人查詢出來
        this.createApprover(datas);
        //轉閱流程特殊處理
        this.turnReadExecutionData(datas);
        if (MapUtils.isNotEmpty(params.getOrderbyMap())){
            this.excutionCommonSort(params.getOrderbyMap(),datas);
        }
        long count = mongoTemplate.count(queryParams, SearchExecutionVo.class, SearchExecutionVo.EXECUTION_TABLE);
        return new PagerModel<>(count, datas);
    }

flowable 集成mongodb