Activiti7非同步定時作業原始碼解析篇
非同步作業的配置
1. 初始化時鐘
public void initClock() {
if (this.clock == null) {
this.clock = new DefaultClockImpl();
}
}
2. 初始化作業處理器
Activiti7採用HashMap : <type -> JobHandler>
維護全域性的作業處理器集合jobHandlers
,作業處理器會實現JobHandler
介面。 在initJobHandlers
中新增6
個預設的作業處理器,如下:
public interface JobHandler {
String getType();
void execute(JobEntity var1, String var2, ExecutionEntity var3, CommandContext var4);
}
public void initJobHandlers() {
this.jobHandlers = new HashMap();
AsyncContinuationJobHandler asyncContinuationJobHandler = new AsyncContinuationJobHandler();
this.jobHandlers.put(asyncContinuationJobHandler.getType(), asyncContinuationJobHandler);
TriggerTimerEventJobHandler triggerTimerEventJobHandler = new TriggerTimerEventJobHandler();
this.jobHandlers.put(triggerTimerEventJobHandler.getType( ), triggerTimerEventJobHandler);
TimerStartEventJobHandler timerStartEvent = new TimerStartEventJobHandler();
this.jobHandlers.put(timerStartEvent.getType(), timerStartEvent);
TimerSuspendProcessDefinitionHandler suspendProcessDefinitionHandler = new TimerSuspendProcessDefinitionHandler();
this.jobHandlers.put(suspendProcessDefinitionHandler.getType(), suspendProcessDefinitionHandler);
TimerActivateProcessDefinitionHandler activateProcessDefinitionHandler = new TimerActivateProcessDefinitionHandler();
this.jobHandlers.put(activateProcessDefinitionHandler.getType(), activateProcessDefinitionHandler);
ProcessEventJobHandler processEventJobHandler = new ProcessEventJobHandler();
this.jobHandlers.put(processEventJobHandler.getType(), processEventJobHandler);
if (this.getCustomJobHandlers() != null) {
Iterator var7 = this.getCustomJobHandlers().iterator();
while(var7.hasNext()) {
JobHandler customJobHandler = (JobHandler)var7.next();
this.jobHandlers.put(customJobHandler.getType(), customJobHandler);
}
}
}
- 預設作業處理器 :
AsyncContinuationJobHandler
: 非同步節點作業處理器TriggerTimerEventJobHandler
: 觸發時間事件作業處理器TimerStartEventJobHandler
: 定時啟動流程例項作業處理器TimerSuspendProcessDefinitionHandler
: 定時掛起流程定義處理器TimerActivateProcessDefinitionHandler
: 定時啟用流程定義處理器ProcessEventJobHandler
: 流程事件作業處理器
- 自定義作業處理器
customJobHandlers
: 這是引擎配置留的擴充套件點,可以新增自定義作業處理器,如果型別和預設的作業處理器相同,則預設作業處理器將被替換。
3. 初始化作業管理器
public void initJobManager() {
if (this.jobManager == null) {
this.jobManager = new DefaultJobManager(this);
}
this.jobManager.setProcessEngineConfiguration(this);
}
DefaultJobManager
持有引擎配置例項ProcessEngineConfigurationImpl
,它可以很方便獲取配置中的非同步作業執行器AsyncExecutor
, 執行流管理器ExecutionEntityManager
, 其他的就是對JobManager
介面的實現.
public interface JobManager {
void execute(Job var1);
void unacquire(Job var1);
JobEntity createAsyncJob(ExecutionEntity var1, boolean var2);
void scheduleAsyncJob(JobEntity var1);
TimerJobEntity createTimerJob(TimerEventDefinition var1, boolean var2, ExecutionEntity var3, String var4, String var5);
void scheduleTimerJob(TimerJobEntity var1);
JobEntity moveTimerJobToExecutableJob(TimerJobEntity var1);
TimerJobEntity moveJobToTimerJob(AbstractJobEntity var1);
SuspendedJobEntity moveJobToSuspendedJob(AbstractJobEntity var1);
AbstractJobEntity activateSuspendedJob(SuspendedJobEntity var1);
DeadLetterJobEntity moveJobToDeadLetterJob(AbstractJobEntity var1);
JobEntity moveDeadLetterJobToExecutableJob(DeadLetterJobEntity var1, int var2);
void setProcessEngineConfiguration(ProcessEngineConfigurationImpl var1);
}
public class DefaultJobManager implements JobManager {
private static Logger logger = LoggerFactory.getLogger(DefaultJobManager.class);
protected ProcessEngineConfigurationImpl processEngineConfiguration;
//for simplicity, some method codes are omitted here.
......
}
4. 初始化非同步作業執行器
-
這裡設為初始化為預設的非同步作業執行器
DefaultAsyncJobExecutor
,可以看到initAsyncExecutor
中很多setXooo()
, 在DefaultAsyncJobExecutor
中的確很多屬性需要初始化。 -
我們可以看到在引擎配置類中有很多屬性和
DefaultAsyncJobExecutor
中的屬性相對應,比如ProcessEngineConfigurationImpl
中的asyncExecutorThreadPoolQueue
與DefaultAsyncJobExecutor
中的threadPoolQueue
對應,前者可以看做引擎留給開發者的開關屬性
, 如果在引擎配置中配置了asyncExecutorThreadPoolQueue
,即不為空,就會設定到非同步作業執行器中。public void initAsyncExecutor() { if (this.asyncExecutor == null) { DefaultAsyncJobExecutor defaultAsyncExecutor = new DefaultAsyncJobExecutor(); defaultAsyncExecutor.setMessageQueueMode(this.asyncExecutorMessageQueueMode); defaultAsyncExecutor.setCorePoolSize(this.asyncExecutorCorePoolSize); defaultAsyncExecutor.setMaxPoolSize(this.asyncExecutorMaxPoolSize); defaultAsyncExecutor.setKeepAliveTime(this.asyncExecutorThreadKeepAliveTime); if (this.asyncExecutorThreadPoolQueue != null) { defaultAsyncExecutor.setThreadPoolQueue(this.asyncExecutorThreadPoolQueue); } defaultAsyncExecutor.setQueueSize(this.asyncExecutorThreadPoolQueueSize); defaultAsyncExecutor.setDefaultTimerJobAcquireWaitTimeInMillis(this.asyncExecutorDefaultTimerJobAcquireWaitTime); defaultAsyncExecutor.setDefaultAsyncJobAcquireWaitTimeInMillis(this.asyncExecutorDefaultAsyncJobAcquireWaitTime); defaultAsyncExecutor.setDefaultQueueSizeFullWaitTimeInMillis(this.asyncExecutorDefaultQueueSizeFullWaitTime); defaultAsyncExecutor.setTimerLockTimeInMillis(this.asyncExecutorTimerLockTimeInMillis); defaultAsyncExecutor.setAsyncJobLockTimeInMillis(this.asyncExecutorAsyncJobLockTimeInMillis); if (this.asyncExecutorLockOwner != null) { defaultAsyncExecutor.setLockOwner(this.asyncExecutorLockOwner); } defaultAsyncExecutor.setResetExpiredJobsInterval(this.asyncExecutorResetExpiredJobsInterval); defaultAsyncExecutor.setResetExpiredJobsPageSize(this.asyncExecutorResetExpiredJobsPageSize); defaultAsyncExecutor.setSecondsToWaitOnShutdown(this.asyncExecutorSecondsToWaitOnShutdown); this.asyncExecutor = defaultAsyncExecutor; } this.asyncExecutor.setProcessEngineConfiguration(this); this.asyncExecutor.setAutoActivate(this.asyncExecutorActivate); }
解釋下 重要的屬性含義:
補充下
BlockingQueue
的介紹 :補充下
執行緒池
的介紹 :public class DefaultAsyncJobExecutor implements AsyncExecutor { private static Logger log = LoggerFactory.getLogger(DefaultAsyncJobExecutor.class); // corePoolSize : the recommended size of the number of threads in the thread pool. // If the actual number of threads in the thread pool is less than `corePoolSize`, create a new core thread first protected int corePoolSize = 2; // If all queues of non-running or working state in the thread pool are full, // but the current number of threads is less than the maximum number of allowed `maxPoolSize` threads, continue creating non-core threads to perform tasks. protected int maxPoolSize = 10; // Idle timeout for non-core threads: thread will be recycled when timeout. protected long keepAliveTime = 5000L; protected int queueSize = 100; // task queue : by default, use `ArrayBlockingQueue` for saving tasks waiting to be performed. // passed to `ThreadPoolExecutor` as parameters. protected BlockingQueue<Runnable> threadPoolQueue; // Thread pool protected ExecutorService executorService; protected long secondsToWaitOnShutdown = 60L; protected Thread timerJobAcquisitionThread; protected Thread asyncJobAcquisitionThread; protected Thread resetExpiredJobThread; protected AcquireTimerJobsRunnable timerJobRunnable; protected AcquireAsyncJobsDueRunnable asyncJobsDueRunnable; protected ResetExpiredJobsRunnable resetExpiredJobsRunnable; protected ExecuteAsyncRunnableFactory executeAsyncRunnableFactory; protected boolean isAutoActivate; //Indicates whether the job executor is activated protected boolean isActive; protected boolean isMessageQueueMode; //The maximum number of timer jobs acquired from database each time protected int maxTimerJobsPerAcquisition = 1; //The maximum number of asynchronous jobs acquired from database each time protected int maxAsyncJobsDuePerAcquisition = 1; protected int defaultTimerJobAcquireWaitTimeInMillis = 10000; protected int defaultAsyncJobAcquireWaitTimeInMillis = 10000; protected int defaultQueueSizeFullWaitTime = 0; protected String lockOwner = UUID.randomUUID().toString(); protected int timerLockTimeInMillis = 300000; protected int asyncJobLockTimeInMillis = 300000; protected int retryWaitTimeInMillis = 500; protected int resetExpiredJobsInterval = 60000; protected int resetExpiredJobsPageSize = 3; protected LinkedList<Job> temporaryJobQueue = new LinkedList(); protected ProcessEngineConfigurationImpl processEngineConfiguration; protected void initAsyncJobExecutionThreadPool() { if (this.threadPoolQueue == null) { log.info("Creating thread pool queue of size {}", this.queueSize); // if `threadPoolQueue` is not specified in `ProcessEngineCofiguration` , the `ArrayBlockingQueue` is as its default value. this.threadPoolQueue = new ArrayBlockingQueue(this.queueSize); } if (this.executorService == null) { log.info("Creating executor service with corePoolSize {}, maxPoolSize {} and keepAliveTime {}", new Object[]{this.corePoolSize, this.maxPoolSize, this.keepAliveTime}); // Factory for creating theads and add them to thread pool; BasicThreadFactory threadFactory = (new Builder()).namingPattern("activiti-async-job-executor-thread-%d").build(); // Thread pool : it implements the `Executor` interface. this.executorService = new ThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, this.keepAliveTime, TimeUnit.MILLISECONDS, this.threadPoolQueue, threadFactory); } } // skip some methods ..... }
從預設的非同步任務執行器可知,它維護了一個執行緒池,並且負責一些
Runnable
任務執行,
流程定時啟動作業的排程時機
Activiti7中多個流程定義以捆綁式自動部署到流程引擎執行,向外提供
Runtime Bundle Service
,這裡幫大家扒扒流程定時啟動事件是怎麼以作業的形式新增到資料庫中,以待執行。
public void deploy(DeploymentEntity deployment, Map<String, Object> deploymentSettings) {
log.debug("Processing deployment {}", deployment.getName());
// Parse the process document element and convert it to BpmnModel
ParsedDeployment parsedDeployment = this.parsedDeploymentBuilderFactory.getBuilderForDeploymentAndSettings(deployment, deploymentSettings).build();
// Irrelevant codes ...
// If the deployed process documentation is updated,...
if (deployment.isNew()) {
Map<ProcessDefinitionEntity, ProcessDefinitionEntity> mapOfNewProcessDefinitionToPreviousVersion =TimerJobEntityManager this.getPreviousVersionsOfProcessDefinitions(parsedDeployment);
this.setProcessDefinitionVersionsAndIds(parsedDeployment, mapOfNewProcessDefinitionToPreviousVersion);
this.persistProcessDefinitionsAndAuthorizations(parsedDeployment);
// The scheduled jobs and event subscriptions is updated here, if necessary
this.updateTimersAndEvents(parsedDeployment, mapOfNewProcessDefinitionToPreviousVersion);
}
// Irrelevant codes ...
}
- 框架Bpmn文件處理的分兩階段:
文件解析
+物件解析
。通過文件解析器BpmnParse
將文件解析成BpmnModel
,並持久化,在ParsedDeploymentBuilder.build()
中會呼叫BpmnParse
的execute()
, 這裡就是文件解析的入口。 - 注意
updateTimersAndEvents
的呼叫,這裡就會檢查並更新事件訂閱和定時任務。
public void updateTimersAndEvents(ProcessDefinitionEntity processDefinition, ProcessDefinitionEntity previousProcessDefinition, ParsedDeployment parsedDeployment) {
Process process = parsedDeployment.getProcessModelForProcessDefinition(processDefinition);
BpmnModel bpmnModel = parsedDeployment.getBpmnModelForProcessDefinition(processDefinition);
this.eventSubscriptionManager.removeObsoleteMessageEventSubscriptions(previousProcessDefinition);
this.eventSubscriptionManager.addMessageEventSubscriptions(processDefinition, process, bpmnModel);
this.eventSubscriptionManager.removeObsoleteSignalEventSubScription(previousProcessDefinition);
this.eventSubscriptionManager.addSignalEventSubscriptions(Context.getCommandContext(), processDefinition, process, bpmnModel);
this.timerManager.removeObsoleteTimers(processDefinition);
this.timerManager.scheduleTimers(processDefinition, process);
}
- 這裡
BpmnDeployer
的helper類來幫忙了,在框架中Helper
很常見,可以看做現實世界中的助理吧,在這裡應該算是委託類了。這裡會依賴事件訂閱管理器EventSubscriptionManager
和TimerManager
定時作業管理器來更新事件訂閱或者定時作業。
protected void scheduleTimers(ProcessDefinitionEntity processDefinition, Process process) {
JobManager jobManager = Context.getCommandContext().getJobManager();
// Acquire the timer jobs declared on the start element
List<TimerJobEntity> timers = this.getTimerDeclarations(processDefinition, process);
Iterator var5 = timers.iterator();
while(var5.hasNext()) {
TimerJobEntity timer = (TimerJobEntity)var5.next();
// The default job manager schedules scheduled jobs to the database
jobManager.scheduleTimerJob(timer);
}
}
protected List<TimerJobEntity> getTimerDeclarations(ProcessDefinitionEntity processDefinition, Process process) {
JobManager jobManager = Context.getCommandContext().getJobManager();
List<TimerJobEntity> timers = new ArrayList();
if (CollectionUtil.isNotEmpty(process.getFlowElements())) {
Iterator var5 = process.getFlowElements().iterator();
while(var5.hasNext()) {
FlowElement element = (FlowElement)var5.next();
// If the current element is the start node, ...
if (element instanceof StartEvent) {
StartEvent startEvent = (StartEvent)element;
// irrelevant codes...
// add timer to timer jobs.
timers.add(timerJob);
}
}
}
新增作業到資料庫
return timers;
}
}
- 在流程部署的過程中,會將宣告在開始節點的定時事件,封裝為定時作業,然後排程作業到資料庫。【注意這裡只是對流程啟動相關的定時啟動事件做排程】
邊界定時作業On UserTask
的排程時機
前面介紹的
元素解析
階段可能需要建立的定時作業,接下來介紹物件解析
階段可能需要建立的的定時作業,這裡以加在UserTask 上的邊界定時事件為例。
- 在啟動流程的過程中,會進行物件解析,這裡我們關注定時事件對應的解析器
TimerEventDefinitionParseHandler
的核心方法executePasrse
方法。
protected void executeParse(BpmnParse bpmnParse, TimerEventDefinition timerEventDefinition) {
if (bpmnParse.getCurrentFlowElement() instanceof IntermediateCatchEvent) {
IntermediateCatchEvent intermediateCatchEvent = (IntermediateCatchEvent)bpmnParse.getCurrentFlowElement();
intermediateCatchEvent.setBehavior(bpmnParse.getActivityBehaviorFactory().createIntermediateCatchTimerEventActivityBehavior(intermediateCatchEvent, timerEventDefinition));
} else if (bpmnParse.getCurrentFlowElement() instanceof BoundaryEvent) {
BoundaryEvent boundaryEvent = (BoundaryEvent)bpmnParse.getCurrentFlowElement();
boundaryEvent.setBehavior(bpmnParse.getActivityBehaviorFactory().createBoundaryTimerEventActivityBehavior(boundaryEvent, timerEventDefinition, boundaryEvent.isCancelActivity()));
}
}
- 大致邏輯給邊界事件設定
行為物件
,在流程運轉過程中執行到UserTask
節點,會執行Boundary Event
的行為物件
的execute
方法。
public void execute(DelegateExecution execution) {
ExecutionEntity executionEntity = (ExecutionEntity)execution;
if (
相關推薦
Activiti7非同步定時作業原始碼解析篇
非同步作業的配置
1. 初始化時鐘
DefaultClockImpl.java
public void initClock() {
if (this.clock == null) {
this.clock = new
spring原始碼閱讀(2)-aop之原始碼解析篇
經過一個aop術語介紹和動態代理的深入講解,我們終於可以來看aop的原始碼了,下面跟著博主一點點剖析spring aop原始碼的實現吧
我們知道spring使用中我們只要做好相關的配置,spring自動幫我們做好了代理的相關工作。
我們從三個方面入手吧
1、配置
2、
Vuejs中nextTick()非同步更新佇列原始碼解析
vue官網關於此解釋說明如下:
vue2.0裡面的深入響應式原理的非同步更新佇列
官網說明如下:
只要觀察到資料變化,Vue 將開啟一個佇列,並緩衝在同一事件迴圈中發生的所有資料改變。如果同一個 watcher 被多次觸發,只會一次推入到佇列中。這種在緩衝
Glide原始碼解析篇之框架主體結構(一)
Gide作為Android最受歡迎的圖片載入庫之一,一直深受Android開發者的喜愛,很大原因就是它的功能極其強大,而使用卻異常簡單。無論是網路、快取、特效,佔位圖,Glide團隊都為開發者考慮的非常細緻,也正因為這個原因,Glide框架的原始碼變得極其複雜,
[email protected]定時任務原始碼解析
本文以springboot中cron表示式配置的定時任務為例子。
在springboot中的啟動類中新增@EnableScheduling註解,在beanFactory中新增ScheduledAnnotationBeanPostProcessor作為bean初始化完畢後的
Mybati原始碼解析篇之六劍客!!!
### 目錄
- 前言
- 環境版本
- Mybatis的六劍客
- SqlSession
- 有何方法
- 語句執行方法
- 立即批量更新方法
- 事務控制方法
- 本地快取方法
- 獲取對映方法
- 有何實現類?
- Executor
Elastic-Job原始碼解析(三)之分片定時任務執行
通過本篇的閱讀你將學會了解Elastic-Job的定時時機,及如何通過分片方式做一個分散式的定時任務框架。瞭解常用的三種分片策略,及如何自定義分散式分片策略
目錄
Elastic-Job如何通過SpringJobScheduler啟動定時
Ela
Elastic-Job原始碼解析(二)之定時核心實現quartz
Elastic-Job是一個分散式定時任務框架,其內部的定時主要是利用quartz來實現,而Elastic-Job核心是對quartz進行了封裝,並提供了分散式任務的功能。具體是怎麼實現呢? 怎麼實現分散式呢? 主要是通過Zookeeper通訊,獲取任務伺服器ip地址,並通
一篇文章徹底讀懂HashMap之HashMap原始碼解析(下)
put函式原始碼解析
//put函式入口,兩個引數:key和value
public V put(K key, V value) {
/*下面分析這個函式,注意前3個引數,後面
2個引數這裡不太重要,因為所有的put
操作後面的2個引數預設值都一樣 */
一篇文章徹底讀懂HashMap之HashMap原始碼解析(上)
就身邊同學的經歷來看,HashMap是求職面試中名副其實的“明星”,基本上每一加公司的面試多多少少都有問到HashMap的底層實現原理、原始碼等相關問題。
在秋招面試準備過程中,博主閱讀過很多關於HashMap原始碼分析的文章,漫長的拼湊式閱讀之後,博主沒有看到過
.NET Core實戰專案之CMS 第三章 入門篇-原始碼解析配置檔案及依賴注入
作者:依樂祝 原文連結:https://www.cnblogs.com/yilezhu/p/9998021.html
寫在前面
上篇文章我給大家講解了ASP.NET Core的概念及為什麼使用它,接著帶著你一步一步的配置了.NET Core的開發環境並建立了一個ASP.NET Core的mvc專
Dubbo原始碼解析之consumer呼叫篇
閱讀須知
dubbo版本:2.6.0
spring版本:4.3.8
文章中使用/* */註釋的方法會做深入分析
正文
在分析consumer初始化時,我們看到了關聯服務引用建立代理的過程,最終會呼叫JavassistProxyFactory的getP
rxJava和rxAndroid原始碼解析系列四之subscribeOn和observeOn的理解(學習終結篇)
本篇文章主要解決subscribeOn和observeOn這兩個方法為什麼subscribeOn只有一次有效果,observeOn切換多次回撥的都有效果。
不知道朋友有沒有看過rxandroid的原始碼,如果看過的話,就會迎刃而解,沒什麼疑慮啦。沒看過原始碼的朋友,可以看看我這個系列的前幾篇文章
Dubbo原始碼解析之provider呼叫篇
閱讀須知
dubbo版本:2.6.0
spring版本:4.3.8
文章中使用/* */註釋的方法會做深入分析
正文
在之前的原始碼分析文章中,我們看到了dubbo用netty作為底層的網路通訊框架,熟悉netty的同學應該知道,使用netty時我們會使用它
dubbo原始碼解析——概要篇
這次原始碼解析借鑑《肥朝》前輩的dubbo原始碼解析,進行原始碼學習。總結起來就是先總體,後區域性.也就是先把需要注意的概念先丟擲來,把整體架構圖先畫出來.讓讀者拿著"地圖"跟著我的腳步,並且每一步我都
JDK8 HashMap原始碼解析,一篇文章徹底讀懂HashMap
在秋招面試準備中博主找過很多關於HashMap的部落格,但是秋招結束後回過頭來看,感覺沒有一篇全面、通俗易懂的講解HashMap文章(可能是博主沒有找到),所以在秋招結束後,寫下了這篇文章,盡最大的努力把HashMap原始碼講解的通俗易懂,並且儘量涵蓋面試中HashM
Android 網路程式設計(7): 原始碼解析OkHttp前篇[請求網路]
前言
學會了OkHttp3的用法後,我們當然有必要來了解下OkHttp3的原始碼,當然現在網上的文章很多,我仍舊希望我這一系列文章篇是最簡潔易懂的。
1.從請求處理開始分析
首先OKHttp3如何使用這裡就不在贅述了,不明白的同學可以檢視Android網路程式設計(5):Ok
ss-libev 原始碼解析local篇(2):ss_local和socks5客戶端握手
上一篇說到ss-libev建立listen_ctx_t物件用於監聽客戶端連線,呼叫accept_cb處理來自客戶端的新連線,建立server_t物件用於處理和客戶端之間的互動。本篇分析來自客戶端的SOCK5連線的建立以及傳輸資料的過程。
首先,回憶一下使用ne
JAVA常用集合框架原始碼解析(基於1.8)開題篇
倪升武的部落格中有一個小專題,讀完之後,發現博主的分析基本是基於JAVA1.7的,這裡我基於JAVA1.8給出一些新的解讀。但是對於JAVA1.8新增的一些新特性可能不太會作過多的分析(畢竟本人目前水平有限,且本部落格的寫作初衷也是以基礎學習為主),在徹底淺讀完
ElasticSearch原始碼解析(一):轉一篇介紹中文分詞的文章
轉自:http://www.cnblogs.com/flish/archive/2011/08/08/2131031.html
基於CRF(Conditional Random Field)分詞演算法 論文連結:http://nlp.stanford.edu/pubs/