activiti部署流程deployment實現原始碼實現過程
以下是流程部署的實現過程,看了一天原始碼瞭解了activiti原始碼是如何實現的,包括資料資料,瞭解activiti的架構
//根據bpmn檔案部署流程
Deployment deployment = repositoryService.createDeployment().addClasspathResource("diagrams/demo2.bpmn").deploy();
呼叫順序
//RepositoryServiceImpl類的方法 public Deployment deploy(DeploymentBuilderImpl deploymentBuilder) { return (Deployment)this.commandExecutor.execute(new DeployCmd(deploymentBuilder)); } //CommandExecutorImpl類的方法 public <T> T execute(Command<T> command) { return this.execute(this.defaultConfig, command); } public <T> T execute(CommandConfig config, Command<T> command) { return this.first.execute(config, command); }
這裡的命令執行器CommandExecutorImpl屬性first是介面CommandInterceptor,執行順序是LogInterceptor、SpringTransactionInterceptor、XX、CommandInvoker。
LogInterceptor只是輸出日誌
SpringTransactionInterceptor是控制事務的,因為部署的時候會往表ACT_RE_PROCDEF、ACT_RE_DEPLOYMENT、ACT_GE_BYTEARRAY同時插入資料,保證事務完整性。
CommandContextInterceptor
CommandInvoker是執行傳入的command的excute方法,這裡傳入的是DeployCmd,所以去看DeployCmd的excute方法
public Deployment execute(CommandContext commandContext) { DeploymentEntity deployment = deploymentBuilder.getDeployment(); deployment.setDeploymentTime(Context.getProcessEngineConfiguration().getClock().getCurrentTime()); if ( deploymentBuilder.isDuplicateFilterEnabled() ) { DeploymentEntity existingDeployment = Context .getCommandContext() .getDeploymentEntityManager() .findLatestDeploymentByName(deployment.getName()); if ( (existingDeployment!=null) && !deploymentsDiffer(deployment, existingDeployment)) { return existingDeployment; } } deployment.setNew(true); // Save the data Context .getCommandContext() .getDeploymentEntityManager() .insertDeployment(deployment); if(Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) { Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent( ActivitiEventBuilder.createEntityEvent(ActivitiEventType.ENTITY_CREATED, deployment)); } // Deployment settings Map<String, Object> deploymentSettings = new HashMap<String, Object>(); deploymentSettings.put(DeploymentSettings.IS_BPMN20_XSD_VALIDATION_ENABLED, deploymentBuilder.isBpmn20XsdValidationEnabled()); deploymentSettings.put(DeploymentSettings.IS_PROCESS_VALIDATION_ENABLED, deploymentBuilder.isProcessValidationEnabled()); // Actually deploy Context .getProcessEngineConfiguration() .getDeploymentManager() .deploy(deployment, deploymentSettings); if (deploymentBuilder.getProcessDefinitionsActivationDate() != null) { scheduleProcessDefinitionActivation(commandContext, deployment); } if(Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) { Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent( ActivitiEventBuilder.createEntityEvent(ActivitiEventType.ENTITY_INITIALIZED, deployment)); } return deployment; }
這裡儲存DeploymentEntity
Context.getCommandContext().getDeploymentEntityManager().insertDeployment(deployment);
再來看DbSqlSession裡的insert方法,這裡會把實體存入Map insertedObjects裡,之後會用到
public void insert(PersistentObject persistentObject) {
if (persistentObject.getId() == null) {
String id = this.dbSqlSessionFactory.getIdGenerator().getNextId();
persistentObject.setId(id);
}
Class<? extends PersistentObject> clazz = persistentObject.getClass();
if (!this.insertedObjects.containsKey(clazz)) {
this.insertedObjects.put(clazz, new ArrayList());
}
((List)this.insertedObjects.get(clazz)).add(persistentObject);
this.cachePut(persistentObject, false);
}
再回到DeployCmd接著往下走,實際部署。
Context.getProcessEngineConfiguration().getDeploymentManager().deploy(deployment, deploymentSettings);
執行的是BpmnDeployer裡的deploy方法
public void deploy(DeploymentEntity deployment, Map<String, Object> deploymentSettings) {
LOG.debug("Processing deployment {}", deployment.getName());
List<ProcessDefinitionEntity> processDefinitions = new ArrayList<ProcessDefinitionEntity>();
Map<String, ResourceEntity> resources = deployment.getResources();
for (String resourceName : resources.keySet()) {
LOG.info("Processing resource {}", resourceName);
if (isBpmnResource(resourceName)) {
ResourceEntity resource = resources.get(resourceName);
byte[] bytes = resource.getBytes();
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
BpmnParse bpmnParse = bpmnParser
.createParse()
.sourceInputStream(inputStream)
.deployment(deployment)
.name(resourceName);
if (deploymentSettings != null) {
// Schema validation if needed
if (deploymentSettings.containsKey(DeploymentSettings.IS_BPMN20_XSD_VALIDATION_ENABLED)) {
bpmnParse.setValidateSchema((Boolean) deploymentSettings.get(DeploymentSettings.IS_BPMN20_XSD_VALIDATION_ENABLED));
}
// Process validation if needed
if (deploymentSettings.containsKey(DeploymentSettings.IS_PROCESS_VALIDATION_ENABLED)) {
bpmnParse.setValidateProcess((Boolean) deploymentSettings.get(DeploymentSettings.IS_PROCESS_VALIDATION_ENABLED));
}
}
bpmnParse.execute();
for (ProcessDefinitionEntity processDefinition: bpmnParse.getProcessDefinitions()) {
processDefinition.setResourceName(resourceName);
if (deployment.getTenantId() != null) {
processDefinition.setTenantId(deployment.getTenantId()); // process definition inherits the tenant id
}
String diagramResourceName = getDiagramResourceForProcess(resourceName, processDefinition.getKey(), resources);
// Only generate the resource when deployment is new to prevent modification of deployment resources
// after the process-definition is actually deployed. Also to prevent resource-generation failure every
// time the process definition is added to the deployment-cache when diagram-generation has failed the first time.
if(deployment.isNew()) {
if (Context.getProcessEngineConfiguration().isCreateDiagramOnDeploy() &&
diagramResourceName==null && processDefinition.isGraphicalNotationDefined()) {
try {
byte[] diagramBytes = IoUtil.readInputStream(ProcessDiagramGenerator.generatePngDiagram(bpmnParse.getBpmnModel()), null);
diagramResourceName = getProcessImageResourceName(resourceName, processDefinition.getKey(), "png");
createResource(diagramResourceName, diagramBytes, deployment);
} catch (Throwable t) { // if anything goes wrong, we don't store the image (the process will still be executable).
LOG.warn("Error while generating process diagram, image will not be stored in repository", t);
}
}
}
processDefinition.setDiagramResourceName(diagramResourceName);
processDefinitions.add(processDefinition);
}
}
}
// check if there are process definitions with the same process key to prevent database unique index violation
List<String> keyList = new ArrayList<String>();
for (ProcessDefinitionEntity processDefinition : processDefinitions) {
if (keyList.contains(processDefinition.getKey())) {
throw new ActivitiException("The deployment contains process definitions with the same key (process id atrribute), this is not allowed");
}
keyList.add(processDefinition.getKey());
}
CommandContext commandContext = Context.getCommandContext();
ProcessDefinitionEntityManager processDefinitionManager = commandContext.getProcessDefinitionEntityManager();
DbSqlSession dbSqlSession = commandContext.getSession(DbSqlSession.class);
for (ProcessDefinitionEntity processDefinition : processDefinitions) {
List<TimerEntity> timers = new ArrayList<TimerEntity>();
if (deployment.isNew()) {
int processDefinitionVersion;
ProcessDefinitionEntity latestProcessDefinition = null;
if (processDefinition.getTenantId() != null && !ProcessEngineConfiguration.NO_TENANT_ID.equals(processDefinition.getTenantId())) {
latestProcessDefinition = processDefinitionManager
.findLatestProcessDefinitionByKeyAndTenantId(processDefinition.getKey(), processDefinition.getTenantId());
} else {
latestProcessDefinition = processDefinitionManager
.findLatestProcessDefinitionByKey(processDefinition.getKey());
}
if (latestProcessDefinition != null) {
processDefinitionVersion = latestProcessDefinition.getVersion() + 1;
} else {
processDefinitionVersion = 1;
}
processDefinition.setVersion(processDefinitionVersion);
processDefinition.setDeploymentId(deployment.getId());
String nextId = idGenerator.getNextId();
String processDefinitionId = processDefinition.getKey()
+ ":" + processDefinition.getVersion()
+ ":" + nextId; // ACT-505
// ACT-115: maximum id length is 64 charcaters
if (processDefinitionId.length() > 64) {
processDefinitionId = nextId;
}
processDefinition.setId(processDefinitionId);
if(commandContext.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
commandContext.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
ActivitiEventBuilder.createEntityEvent(ActivitiEventType.ENTITY_CREATED, processDefinition));
}
removeObsoleteTimers(processDefinition);
addTimerDeclarations(processDefinition, timers);
removeObsoleteMessageEventSubscriptions(processDefinition, latestProcessDefinition);
addMessageEventSubscriptions(processDefinition);
removeObsoleteSignalEventSubScription(processDefinition, latestProcessDefinition);
addSignalEventSubscriptions(processDefinition);
dbSqlSession.insert(processDefinition);
addAuthorizations(processDefinition);
if(commandContext.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
commandContext.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
ActivitiEventBuilder.createEntityEvent(ActivitiEventType.ENTITY_INITIALIZED, processDefinition));
}
scheduleTimers(timers);
} else {
String deploymentId = deployment.getId();
processDefinition.setDeploymentId(deploymentId);
ProcessDefinitionEntity persistedProcessDefinition = null;
if (processDefinition.getTenantId() == null || ProcessEngineConfiguration.NO_TENANT_ID.equals(processDefinition.getTenantId())) {
persistedProcessDefinition = processDefinitionManager.findProcessDefinitionByDeploymentAndKey(deploymentId, processDefinition.getKey());
} else {
persistedProcessDefinition = processDefinitionManager.findProcessDefinitionByDeploymentAndKeyAndTenantId(deploymentId, processDefinition.getKey(), processDefinition.getTenantId());
}
if (persistedProcessDefinition != null) {
processDefinition.setId(persistedProcessDefinition.getId());
processDefinition.setVersion(persistedProcessDefinition.getVersion());
processDefinition.setSuspensionState(persistedProcessDefinition.getSuspensionState());
}
}
// Add to cache
Context
.getProcessEngineConfiguration()
.getDeploymentManager()
.getProcessDefinitionCache()
.add(processDefinition.getId(), processDefinition);
// Add to deployment for further usage
deployment.addDeployedArtifact(processDefinition);
}
}
這裡主要看bpmnParse.execute();
這裡相當於把xml的內容封裝成物件bpmnModel
bpmnModel = converter.convertToBpmnModel(streamSource, validateSchema, enableSafeBpmnXml, encoding);
然後看方法transformProcessDefinitions(),xml裡的每一個<process>標籤就是一個流程定義。
protected void transformProcessDefinitions() {
sequenceFlows = new HashMap<String, TransitionImpl>();
for (Process process : bpmnModel.getProcesses()) {
if (process.isExecutable()) {
bpmnParserHandlers.parseElement(this, process);
}
}
if (processDefinitions.size() > 0) {
processDI();
}
}
再來看bpmnParserHandlers.parseElement(this, process);
ProcessParseHandler 這樣流程定義就存入bpmnParse的ProcessDefinitions集合。
protected void executeParse(BpmnParse bpmnParse, Process process) {
if (process.isExecutable() == false) {
LOGGER.info("Ignoring non-executable process with id='" + process.getId() + "'. Set the attribute isExecutable=\"true\" to deploy this process.");
} else {
bpmnParse.getProcessDefinitions().add(transformProcess(bpmnParse, process));
}
}
ProcessHistoryParseHandler,監聽流程結束的時候更新流程例項。
protected void executeParse(BpmnParse bpmnParse, Process element) {
bpmnParse.getCurrentProcessDefinition().addExecutionListener(org.activiti.engine.impl.pvm.PvmEvent.EVENTNAME_END, PROCESS_INSTANCE_END_HANDLER);
}
再回到transformProcessDefinitions方法往下走看processDI方法,這裡是根據xml回話png圖片
public void processDI() {
if (bpmnModel.getLocationMap().size() > 0) {
// Verify if all referenced elements exist
for (String bpmnReference : bpmnModel.getLocationMap().keySet()) {
if (bpmnModel.getFlowElement(bpmnReference) == null) {
// ACT-1625: don't warn when artifacts are referenced from DI
if(bpmnModel.getArtifact(bpmnReference) == null) {
LOGGER.warn("Invalid reference in diagram interchange definition: could not find " + bpmnReference);
}
} else if (! (bpmnModel.getFlowElement(bpmnReference) instanceof FlowNode)) {
LOGGER.warn("Invalid reference in diagram interchange definition: " + bpmnReference + " does not reference a flow node");
}
}
for (String bpmnReference : bpmnModel.getFlowLocationMap().keySet()) {
if (bpmnModel.getFlowElement(bpmnReference) == null) {
// ACT-1625: don't warn when artifacts are referenced from DI
if(bpmnModel.getArtifact(bpmnReference) == null) {
LOGGER.warn("Invalid reference in diagram interchange definition: could not find " + bpmnReference);
}
} else if (! (bpmnModel.getFlowElement(bpmnReference) instanceof SequenceFlow)) {
if (bpmnModel.getFlowLocationMap().get(bpmnReference).size() > 0) {
LOGGER.warn("Invalid reference in diagram interchange definition: " + bpmnReference + " does not reference a sequence flow");
} else {
LOGGER.warn("Invalid reference in diagram interchange definition: " + bpmnReference + " does not reference a sequence flow");
}
}
}
for (Process process : bpmnModel.getProcesses()) {
if (!process.isExecutable()) {
continue;
}
// Parse diagram interchange information
ProcessDefinitionEntity processDefinition = getProcessDefinition(process.getId());
if (processDefinition != null) {
processDefinition.setGraphicalNotationDefined(true);
for (String shapeId : bpmnModel.getLocationMap().keySet()) {
if (processDefinition.findActivity(shapeId) != null) {
createBPMNShape(shapeId, bpmnModel.getGraphicInfo(shapeId), processDefinition);
}
}
for (String edgeId : bpmnModel.getFlowLocationMap().keySet()) {
if (bpmnModel.getFlowElement(edgeId) != null) {
createBPMNEdge(edgeId, bpmnModel.getFlowLocationGraphicInfo(edgeId));
}
}
}
}
}
}
這裡看bpmModel的locationMap,裡面存的是每個任務節點的座標。這樣就可以取出locationMap的任務節點座標存入
processDefinition.namedActivities裡對應key的任務節點對應ActivityImpl座標
flowLocationMap裡面存的是箭頭的起點和終點座標。從bpmnModel.process.flowElementList根據key取出對應的FlowElement。然後取出bmpnParse.sequenceFlows根據key取出對應的sequenceFlow。之後從flowLocationMap取出的座標存入list然後存入sequenceFlow.waypoints。
再回到BpmnDeploy.deploy方法往下走。繪製成png的流程圖,拼接png的路徑,生成ResourceEntity存入DbSqlSession。最後把生成的processDefinition新增入集合processDefinitions。之後迴圈processDefinitions,set基本引數,然後存入DbSqlSession。此時就deploy結束,這時的DbSqlSession的insertedObjects有兩個ResourceEntity,一個DeploymentEntity,一個ProcessDefinitionEntity。
結束deploy後回到CommandContextInterceptor 裡的context.close方法,主要是執行方法flushSessions()方法
最後是執行DbSqlSession裡的flush方法
之前在原始碼里根據mybatis的ProcessDefinition.xml根據插入語句的insertProcessDefinition搜尋,沒有搜尋到,原來是”insertProcessDefinition“這個statement是拼出來的。之後再用mybatis插入這四條資料。
四條資料執行完成後再提交事務,最後流程的部署就完成了。