簡單的spring boot + flowable(activiti) + rocketmq 流程包括:申請、稽核、定期提醒、rocketmq訊息傳送
阿新 • • 發佈:2019-01-22
一、配置
1、application.yml配置
server: port: 18090 spring: application: name: service-flowable datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/flowable?characterEncoding=utf8&useSSL=false username: admin password: admin #自動部署驗證設定:true-開啟(預設)、false-關閉 flowable: check-process-definitions: false swagger: basePackage: com.example.flowable.controller #訊息服務地址 rocketmq: namesrvaddr: 127.0.0.1:9876
2、重寫flowable預設配置
@Configuration @Slf4j public class ProcessConfig { @Autowired private DataSourceProperties dataSourceProperties; @Bean @Primary protected ProcessEngineConfiguration configuration() { List<FlowableEventListener> list = new ArrayList<>(); //增加job監聽 JobListener()類後面詳細分析 list.add(new JobListener()); //配置監聽型別(與JobListener配合) Map<String,List<FlowableEventListener>> map = new HashMap<>(); map.put("JOB_EXECUTION_SUCCESS",list); map.put("JOB_EXECUTION_FAILURE",list); ProcessEngineConfiguration processEngineConfiguration = StandaloneProcessEngineConfiguration .createStandaloneProcessEngineConfiguration() .setJdbcUrl(dataSourceProperties.getUrl()) .setJdbcDriver(dataSourceProperties.getDriverClassName()) .setJdbcUsername(dataSourceProperties.getUsername()) .setJdbcPassword(dataSourceProperties.getPassword()) //如果表不存在,自動建立表 .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE) //屬性asyncExecutorActivate定義為true,工作流引擎在啟動時就建立啟動async executor執行緒池 .setAsyncExecutorActivate(false) //流程釋出的時候是否生成流程圖 .setCreateDiagramOnDeploy(true) //生成流程圖引數 .setProcessDiagramGenerator(new DefaultProcessDiagramGenerator()) .setActivityFontName("幼圓") .setAnnotationFontName("幼圓") .setLabelFontName("幼圓"); //配置監聽 processEngineConfiguration.setTypedEventListeners(map); return processEngineConfiguration; } @Bean protected ProcessEngine engine() { //建立流程引擎 服務啟動是初始化一次即可 return ProcessEngines.getDefaultProcessEngine(); } }
3、rocketmq配置類
//系統啟動時會載入application.yml中的rocketmq:namesrvaddr: 127.0.0.1:9876裝載到namesrvaddr變數
@Component
@Data
@ConfigurationProperties("rocketmq")
public class RocketMqProperties {
//服務地址集合
private List<String> namesrvaddr;
}
4、自定義註解
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RocketConsume{ /** * 訂閱主題 * * @return */ String topic(); /** * 訂閱標籤 * * @return */ String[] tags() default {"*"}; }
5、初始化所有消費類
@Component
public class RocketMQConfig extends WebApplicationObjectSupport{
private List<String> namesrvaddr;
@Autowired
private RocketMqProperties properties;
/**
* @Author: ZR
* @param: 初始化消費者
* @Description:
* @date: Created in 13:26 2018/5/15
*/
@PostConstruct
private void init() {
namesrvaddr = properties.getNamesrvaddr();
ApplicationContext context = getApplicationContext();
Map<String, RocketMQListener> mqListenerMap = context.getBeansOfType(RocketMQListener.class);
if (mqListenerMap != null && !mqListenerMap.isEmpty()) {
Collection<RocketMQListener> list = mqListenerMap.values();
list.stream().filter(rocketMqListener -> null != rocketMqListener
&& rocketMqListener.getClass().isAnnotationPresent(RocketConsume.class))
.collect(Collectors.toList())
.forEach(listener ->{
RocketConsume rocketConsume = listener.getClass().getAnnotation(RocketConsume.class);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setNamesrvAddr(namesrvaddr.get(0));
try {
consumer.subscribe(rocketConsume.topic(),rocketConsume.tags()[0]);
} catch (MQClientException e) {
e.printStackTrace();
}
consumer.setConsumerGroup(rocketConsume.tags()[0]);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageListener(listener);
consumer.setMessageModel(MessageModel.CLUSTERING);
try {
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
});
}
}
}
6、訊息監聽類
public abstract class RocketMQListener<T> implements MessageListenerConcurrently {
public abstract boolean consume(T t);
private boolean consume(MessageExt messageExt) {
if (null != messageExt) {
byte[] bytes = messageExt.getBody();
if (null != bytes) {
String json = new String(bytes);
if (StringUtils.isNotEmpty(json)) {
Class <T> entityClass = (Class <T>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
T t = JSON.parseObject(json,entityClass);
if (!consume(t)) {
return false;
}
}
}
}
return true;
}
private ConsumeConcurrentlyStatus consume(List<MessageExt> messageExtList) {
if (null != messageExtList && !messageExtList.isEmpty()) {
for (MessageExt messageExt : messageExtList) {
if (!consume(messageExt)) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList,
ConsumeConcurrentlyContext context) {
return consume(messageExtList);
}
}
7、訊息生產工具類
@Component
public class RocketMQProducer {
private static DefaultMQProducer sender;
@Value("${spring.application.name}")
private String groupName;
@Autowired
private RocketMqProperties properties;
@PostConstruct
public void init() {
sender = new DefaultMQProducer(groupName);
sender.setNamesrvAddr(properties.getNamesrvaddr().get(0));
try {
sender.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public static<T> void send(String topic,String tags,String keys,T body) {
String json = JSON.toJSONString(body);
Message message = new Message(topic,tags,keys,json.getBytes());
send(message);
}
private static void send(Message message) {
try {
SendResult result = sender.send(message);
SendStatus status = result.getSendStatus();
System.out.println("messageId=" + result.getMsgId() + ", status=" + status);
} catch (Exception e) {
e.printStackTrace();
}
}
}
8、到期提醒歸還監聽類
public class EndTimerExecutorListener implements ExecutionListener {
@Override
public void notify(DelegateExecution delegateExecution) {
if (delegateExecution.getVariable("isReturn") != null && !(boolean)delegateExecution.getVariable("isReturn")) {
HashMap<String, String> map = new HashMap<>();
map.put("content", "請儘快歸還");
//生產工具類傳送訊息
RocketMQProducer.send("FLOWABLE", "RETURN", delegateExecution.getId(),map);
}
}
}
9、訊息接收類
@Component
@Slf4j
@RocketConsume(topic = "FLOWABLE", tags = "RETURN")
public class FlowableConsumer extends RocketMQListener<HashMap> {
@Override
public boolean consume(HashMap s) {
System.out.println("接收成功" + s.get("content"));
return true;
}
}
二、流程部署
1、部署流程方式
public class ActivitiService {
@Autowired
private RepositoryService repositoryService;
@Autowired
private RuntimeService runtimeService;
public void deploymentProcess() {
//流程部署
Deployment deployment = repositoryService.createDeployment()
//xml地址
.addClasspathResource("processes/flowable.bpmn20.xml")
//設定流程名稱
.name("flowable")
//釋出
.deploy();
//增加事件監聽 JobListener()類後面詳細分析
runtimeService.addEventListener(new JobListener());
}
}
2、流程圖
三、釋出工作流例項
1、啟動流程
/**
* 啟動流程
*/
public void startProcess() {
//獲取流程定義
ProcessDefinition processDefinition = repositoryService
.createProcessDefinitionQuery()
.latestVersion()
.singleResult();
Map<String, Object> variables = new HashMap<>();
variables.put("delegateExpression", new ApproveTaskAssignmentListener());
variables.put("expression", new ApproveTaskDeleteListener());
// 設定例項發起者
identityService.setAuthenticatedUserId("Initiator");
//發起流程
processEngine.getRuntimeService().startProcessInstanceById(processDefinition.getId(),variables);
identityService.setAuthenticatedUserId(null);
}
2、提交申請
public boolean commitProcess(String duration) {
//提交申請借閱 duration借閱期限 例如10秒後job執行 duration = PT10S
List<Task> tasks = taskService.createTaskQuery().list();
tasks.forEach(task -> {
Map<String, Object> variables = task.getProcessVariables();
//為到期提醒做準備
variables.put("duration", duration);
taskService.complete(task.getId(), variables);
});
return true;
}
3、稽核
public boolean approveProcess(boolean approved) {
List<Task> tasks = taskService.createTaskQuery().list();
tasks.forEach(task -> {
Map<String, Object> processVariables = taskService.getVariables(task.getId());
if (approved) {
//通過
processVariables.put("Task_review", "Task_return");
//是否歸還
processVariables.put("isReturn", false);
} else {
//拒絕
processVariables.put("Task_review", "Task_refuse");
}
taskService.complete(task.getId(), processVariables);
});
return true;
}
4、歸還
public boolean returnProcess() {
List<Task> tasks = taskService.createTaskQuery().list();
tasks.forEach(task -> {
Map<String, Object> processVariables = taskService.getVariables(task.getId());
processVariables.put("isReturn", true);
taskService.complete(task.getId(),processVariables);
});
return true;
}