1. 程式人生 > >簡單的spring boot + flowable(activiti) + rocketmq 流程包括:申請、稽核、定期提醒、rocketmq訊息傳送

簡單的spring boot + flowable(activiti) + rocketmq 流程包括:申請、稽核、定期提醒、rocketmq訊息傳送

一、配置

1、application.yml配置

server:
  port: 18090
spring:
  application:
    name: service-flowable
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://localhost:3306/flowable?characterEncoding=utf8&useSSL=false
    username: admin
    password: admin
#自動部署驗證設定:true-開啟(預設)、false-關閉
flowable:
  check-process-definitions: false
swagger:
    basePackage: com.example.flowable.controller
#訊息服務地址
rocketmq:
  namesrvaddr: 127.0.0.1:9876

2、重寫flowable預設配置

@Configuration
@Slf4j
public class ProcessConfig {

    @Autowired
    private DataSourceProperties dataSourceProperties;

    @Bean
    @Primary
    protected ProcessEngineConfiguration configuration() {

        List<FlowableEventListener> list = new ArrayList<>();
        //增加job監聽  JobListener()類後面詳細分析
        list.add(new JobListener());
        //配置監聽型別(與JobListener配合)
        Map<String,List<FlowableEventListener>> map = new HashMap<>();
        map.put("JOB_EXECUTION_SUCCESS",list);
        map.put("JOB_EXECUTION_FAILURE",list);

        ProcessEngineConfiguration processEngineConfiguration = StandaloneProcessEngineConfiguration
                .createStandaloneProcessEngineConfiguration()
                .setJdbcUrl(dataSourceProperties.getUrl())
                .setJdbcDriver(dataSourceProperties.getDriverClassName())
                .setJdbcUsername(dataSourceProperties.getUsername())
                .setJdbcPassword(dataSourceProperties.getPassword())
                //如果表不存在,自動建立表
                .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE)
                //屬性asyncExecutorActivate定義為true,工作流引擎在啟動時就建立啟動async executor執行緒池
                .setAsyncExecutorActivate(false)
                //流程釋出的時候是否生成流程圖
                .setCreateDiagramOnDeploy(true)
                //生成流程圖引數
                .setProcessDiagramGenerator(new DefaultProcessDiagramGenerator())
                .setActivityFontName("幼圓")
                .setAnnotationFontName("幼圓")
                .setLabelFontName("幼圓");
        //配置監聽
        processEngineConfiguration.setTypedEventListeners(map);

        return processEngineConfiguration;
    }

    @Bean
    protected ProcessEngine engine() {
        //建立流程引擎  服務啟動是初始化一次即可
        return ProcessEngines.getDefaultProcessEngine();
    }
}

3、rocketmq配置類

//系統啟動時會載入application.yml中的rocketmq:namesrvaddr: 127.0.0.1:9876裝載到namesrvaddr變數
@Component
@Data
@ConfigurationProperties("rocketmq")
public class RocketMqProperties {
    //服務地址集合
    private List<String> namesrvaddr;
}

4、自定義註解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketConsume{

    /**
     * 訂閱主題
     *
     * @return
     */
    String topic();

    /**
     * 訂閱標籤
     *
     * @return
     */
    String[] tags() default {"*"};
}

5、初始化所有消費類

@Component
public class RocketMQConfig extends WebApplicationObjectSupport{

    private List<String> namesrvaddr;

    @Autowired
    private RocketMqProperties properties;

    /**
     * @Author: ZR
     * @param: 初始化消費者
     * @Description:
     * @date: Created in 13:26 2018/5/15
     */
    @PostConstruct
    private void init() {

        namesrvaddr = properties.getNamesrvaddr();
        ApplicationContext context = getApplicationContext();
        Map<String, RocketMQListener> mqListenerMap = context.getBeansOfType(RocketMQListener.class);
        if (mqListenerMap != null && !mqListenerMap.isEmpty()) {
            Collection<RocketMQListener> list = mqListenerMap.values();
            list.stream().filter(rocketMqListener -> null != rocketMqListener
                    && rocketMqListener.getClass().isAnnotationPresent(RocketConsume.class))
                    .collect(Collectors.toList())
                    .forEach(listener ->{
                        RocketConsume rocketConsume = listener.getClass().getAnnotation(RocketConsume.class);
                        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
                        consumer.setNamesrvAddr(namesrvaddr.get(0));
                        try {
                            consumer.subscribe(rocketConsume.topic(),rocketConsume.tags()[0]);
                        } catch (MQClientException e) {
                            e.printStackTrace();
                        }
                        consumer.setConsumerGroup(rocketConsume.tags()[0]);
                        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                        consumer.setMessageListener(listener);
                        consumer.setMessageModel(MessageModel.CLUSTERING);
                        try {
                            consumer.start();
                        } catch (MQClientException e) {
                            e.printStackTrace();
                        }

                    });
        }
    }

}

6、訊息監聽類

public abstract class RocketMQListener<T> implements MessageListenerConcurrently {

    public abstract boolean consume(T t);

    private boolean consume(MessageExt messageExt) {
        if (null != messageExt) {
            byte[] bytes = messageExt.getBody();
            if (null != bytes) {
                String json = new String(bytes);
                if (StringUtils.isNotEmpty(json)) {
                    Class <T> entityClass = (Class <T>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
                    T t = JSON.parseObject(json,entityClass);
                    if (!consume(t)) {
                        return false;
                    }
                }
            }
        }
        return true;
    }

    private ConsumeConcurrentlyStatus consume(List<MessageExt> messageExtList) {
        if (null != messageExtList && !messageExtList.isEmpty()) {
            for (MessageExt messageExt : messageExtList) {
                if (!consume(messageExt)) {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList,
                                                    ConsumeConcurrentlyContext context) {
        return consume(messageExtList);
    }
}

7、訊息生產工具類

@Component
public class RocketMQProducer {

    private static DefaultMQProducer sender;

    @Value("${spring.application.name}")
    private String groupName;

    @Autowired
    private RocketMqProperties properties;

    @PostConstruct
    public void init() {
        sender = new DefaultMQProducer(groupName);
        sender.setNamesrvAddr(properties.getNamesrvaddr().get(0));
        try {
            sender.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public static<T> void send(String topic,String tags,String keys,T body) {

        String json = JSON.toJSONString(body);
        Message message = new Message(topic,tags,keys,json.getBytes());
        send(message);
    }

    private static void send(Message message) {

        try {
            SendResult result = sender.send(message);
            SendStatus status = result.getSendStatus();
            System.out.println("messageId=" + result.getMsgId() + ", status=" + status);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

8、到期提醒歸還監聽類

public class EndTimerExecutorListener implements ExecutionListener {

    @Override
    public void notify(DelegateExecution delegateExecution) {

        if (delegateExecution.getVariable("isReturn") != null && !(boolean)delegateExecution.getVariable("isReturn")) {
            HashMap<String, String> map = new HashMap<>();
            map.put("content", "請儘快歸還");
            //生產工具類傳送訊息
            RocketMQProducer.send("FLOWABLE", "RETURN", delegateExecution.getId(),map);
        }
    }
}

9、訊息接收類

@Component
@Slf4j
@RocketConsume(topic = "FLOWABLE", tags = "RETURN")
public class FlowableConsumer extends RocketMQListener<HashMap> {

    @Override
    public boolean consume(HashMap s) {
        System.out.println("接收成功" +  s.get("content"));
        return true;
    }
}

二、流程部署

1、部署流程方式

public class ActivitiService {

    @Autowired
    private RepositoryService repositoryService;
    @Autowired
    private RuntimeService runtimeService;

    public void deploymentProcess() {

        //流程部署
        Deployment deployment = repositoryService.createDeployment()
                //xml地址
                .addClasspathResource("processes/flowable.bpmn20.xml")
                //設定流程名稱
                .name("flowable")
                //釋出
                .deploy();

        //增加事件監聽 JobListener()類後面詳細分析
        runtimeService.addEventListener(new JobListener());
    }

}

2、流程圖

這裡寫圖片描述

三、釋出工作流例項

1、啟動流程

/**
 * 啟動流程
 */
public void startProcess() {
    //獲取流程定義
    ProcessDefinition processDefinition = repositoryService
            .createProcessDefinitionQuery()
            .latestVersion()
            .singleResult();

    Map<String, Object> variables = new HashMap<>();
    variables.put("delegateExpression", new ApproveTaskAssignmentListener());
    variables.put("expression", new ApproveTaskDeleteListener());
    // 設定例項發起者
    identityService.setAuthenticatedUserId("Initiator");
    //發起流程
    processEngine.getRuntimeService().startProcessInstanceById(processDefinition.getId(),variables);
    identityService.setAuthenticatedUserId(null);

}

這裡寫圖片描述

2、提交申請

   public boolean commitProcess(String duration) {

    //提交申請借閱 duration借閱期限 例如10秒後job執行 duration = PT10S
    List<Task> tasks = taskService.createTaskQuery().list();
    tasks.forEach(task -> {
        Map<String, Object> variables = task.getProcessVariables();
        //為到期提醒做準備
        variables.put("duration", duration);
        taskService.complete(task.getId(), variables);
    });

    return true;
}

3、稽核

public boolean approveProcess(boolean approved) {

    List<Task> tasks = taskService.createTaskQuery().list();
    tasks.forEach(task -> {
        Map<String, Object> processVariables = taskService.getVariables(task.getId());
        if (approved) {
            //通過
            processVariables.put("Task_review", "Task_return");
            //是否歸還
            processVariables.put("isReturn", false);
        } else {
            //拒絕
            processVariables.put("Task_review", "Task_refuse");
        }

        taskService.complete(task.getId(), processVariables);
    });


    return true;
}

4、歸還

public boolean returnProcess() {

    List<Task> tasks = taskService.createTaskQuery().list();
    tasks.forEach(task -> {
        Map<String, Object> processVariables = taskService.getVariables(task.getId());
        processVariables.put("isReturn", true);
        taskService.complete(task.getId(),processVariables);
    });

    return true;
}