Activiti原始碼分析(框架、核心類。。。)
http://jiangwenfeng762.iteye.com/blog/1338553
Activiti是業界很流行的java工作流引擎,關於Activiti與JBPM5的關係和如何選擇不是本文要討論的話題,相關內容可以baidu一下。Activiti從架構角度看是比較優秀的,是很面向物件的,是我所閱讀過的程式碼結構很棒的開源軟體,個人認為比Spring,Hibernate的要好。
Activiti的基礎程式設計框架
Activiti基於Spring,ibatis等開源中介軟體作為軟體平臺,在此之上構建了非常清晰的開發框架。上圖列出了Activiti的核心元件。
1.ProcessEngine:流程引擎的抽象,對於開發者來說,它是我們使用Activiti的facade,通過它可以獲得我們需要的一切服務。
2.XXService(TaskService,RuntimeService,RepositoryService...):Activiti按照流程的生命週期(定義,部署,執行)把不同階段的服務封裝在不同的Service中,使用者可以非常清晰地使用特定階段的介面。通過ProcessEngine能夠獲得這些Service例項。TaskService,RuntimeService,RepositoryService是非常重要的三個Service:
TaskService:流程執行過程中,與每個任務節點相關的介面,比如complete,delete,delegate等等
RepositoryService:流程定義和部署相關的儲存服務。
RuntimeService:流程執行時相關服務,如startProcessInstanceByKey.
關於ProcessEngine和XXService的關係,可以看下面這張圖:
3.CommandContextIntercepter(CommandExecutor):Activiti使用命令模式作為基礎開發模式,上面Service中定義的各個方法都對應相應的命令物件(xxCmd), Service把各種請求委託給xxCmd,xxCmd來決定命令的接收者,接收者執行後返回結果。而CommandContextIntercepter顧名思義,它是一個攔截器,攔截所有命令,在命令執行前後執行一些公共性操作。比如CommandContextIntercepter的核心方法:
- public <T> T execute(Command<T> command) {
- CommandContext context = commandContextFactory.createCommandContext(command);
- try {
- //執行前儲存上下文
- Context.setCommandContext(context);
- Context.setProcessEngineConfiguration(processEngineConfiguration);
- return next.execute(command);//執行命令
- } catch (Exception e) {
- context.exception(e);
- } finally {
- try {
- //關閉上下文,內部會flush session,把資料持久化到db等
- context.close();
- } finally {
- //釋放上下文
- Context.removeCommandContext();
- Context.removeProcessEngineConfiguration();
- }
- }
- return null;
- }
關於命令模式的細節說明,網上有很多資料,這裡不展開。我只是想說一下我看到Activiti的這種設計之後的兩點感受:
1)一個產品或者一個專案,從技術上必須有一個明確的、唯一的開發模型或者叫開發樣式(真不知道怎麼說恰當),我們常說希望一個團隊的所有人寫出的程式碼都有統一的風格,都像是一個人寫出來的,很理想化,但做到很難,往往我們都是通過“規範”去約束大家這樣做,而規範畢竟是程式之外的東西,主觀性很強,不遵守規範的情況屢屢發生。而如果架構師給出了明確的開發模型,並使用一些基礎元件加以強化,把程式設計師要走的路規定清楚,那你想不遵守規範都會很難,因為那意味著你寫的東西沒發工作。就像Activiti做的這樣,明確以Command作為基本開發模型,輔之以Event-Listener,這樣程式設計風格的整體性得到了保證。
2)使用命令模式的好處,我這裡體會最深的就是 職責分離,解耦。有了Command,各個Service從角色上說只是一些協調者或者控制者,他不需要知道具體怎麼做,他只是把任務交給了各個命令。直接的好處是臃腫的、萬能的大類沒有了。而這往往是我們平時開發中最深惡痛絕的地方。
4.核心業務物件(Task,ProcessInstance,Execution...):org.activiti.engine.impl.persistence.entity包下的類是Activiti的核心業務物件。它們是真正的物件,而不是隻有資料沒有行為的假物件,搞java企業級開發的人也許已經習慣了下面的層次劃分:controller->service->dao->entity, entity只是ORMapping中資料表的java物件體現,沒有任何行為(getter/setter不能算行為),對於面向物件來說,這當然是有問題的,記得曾聽人說過這樣的話“使用面嚮物件語言進行設計和開發 與 面向物件的設計和開發 是兩回事”,面向物件講究的是“封裝”,“多型”,追求的是滿足“開-閉”原則的、職責單一的物件社會。如果你認同上述觀點,那麼相信Activiti會讓你感覺舒服一些,以TaskEntity為例,其UML類圖如下:
(圖2:TaskEntity類)
TaskEntity實現了3個介面:Task,DelegateTask和PersistentObject。其中PersistentObject是一個宣告介面,表明TaskEntity需要持久化。介面是一種角色的宣告,是一份職責的描述而TaskEntity就是這個角色的具體扮演者,因此TaskEntity必須承擔如complete,delegate等職責。
但是這裡有些遺憾的是像complete這麼重要的行為居然沒有在3個介面中描述(難道是因為工期緊張?^_^),因此“面向抽象”程式設計對於TaskEntity來說還沒有完全做到。但至少Activiti告訴我們:
1)牢記面向抽象程式設計,把職責拆分為不同的介面,讓介面來體現物件的職責,而不用去關心這份職責具體由哪個物件實現;
2)entity其實可以也應該是真正的物件。
5.Activiti的上下文元件(Context)
上下文(Context)用來儲存生命週期很長的、全域性性的資訊。Activiti的Context類(在org.activiti.engine.impl.context包下)保持如下三類資訊:
(圖3:Context類)
CommandContext:命令上下文,保持每個命令需要的必要資源,如持久化需要的session。
ProcessEngineConfigurationImpl:流程引擎相關的配置資訊。它是整個引擎的全域性配置資訊,mailServerHost,DataSource等。單例。該例項在流程引擎建立時被例項化,其呼叫stack如下圖:
(圖4:ProcessEngineConfiguration的初始化)
ExecutionContext:剛看到這個類感覺有些奇怪,不明白其作用是什麼。看其程式碼持有ExecutionEntity這個例項。而ExecutionEntity是Activiti中非常重要的一個類,//TODO
6.Activiti的持久化框架(元件)
Activiti使用ibatis作為ORMapping工具。在此基礎之上Activiti設計了自己的持久化框架,看一張圖:
(圖5:Activiti持久化框架)
頂層介面是Session和SessionFactory,這都非常好理解了。
Session有兩個實現類:
DbSqlSession:簡單點說,DbSqlSession負責sql表示式的執行。
AbstractManager:簡單點說,AbstractManager及其子類負責面向物件的持久化操作
同理DbSqlSessionFactory與GenericManagerFactory的區別就很好理解了。
持久化框架也是在流程引擎建立時初始化的,具體見圖4.
7.Event-Listener 元件
Activiti允許客戶端程式碼介入流程的執行。為此提供了一個基礎元件,看圖:
(圖6:使用者程式碼介入流程的基礎元件)
使用者可以介入的程式碼型別包括:TaskListener,JavaDelegate,Expression,ExecutionListener。
ProcessEngineConfigurationImpl持有DelegateInterceptor的某個例項,這樣就可以隨時非常方便地呼叫handleInvocation
8.Cache 元件
對Activiti的cache實現很感興趣,但現在我瞭解到的情況(也許還沒有了解清楚)其cache的實現還是很簡單的,在DbSqlSession中有cache實現:
Java程式碼- protected List<PersistentObject> insertedObjects = new ArrayList<PersistentObject>();
- protected Map<Class<?>, Map<String, CachedObject>> cachedObjects = new HashMap<Class<?>, Map<String,CachedObject>>();
- protected List<DeleteOperation> deletedObjects = new ArrayList<DeleteOperation>();
- protected List<DeserializedObject> deserializedObjects = new ArrayList<DeserializedObject>();
也就是說Activiti就是基於記憶體的List和Map來做快取的。具體怎麼用的呢?以DbSqlSession.selectOne方法為例:
Java程式碼- public Object selectOne(String statement, Object parameter) {
- statement = dbSqlSessionFactory.mapStatement(statement);
- Object result = sqlSession.selectOne(statement, parameter);
- if (result instanceof PersistentObject) {
- PersistentObject loadedObject = (PersistentObject) result;
- 快取處理
- result = cacheFilter(loadedObject);
- }
- return result;
- }
- protected PersistentObject cacheFilter(PersistentObject persistentObject) {
- PersistentObject cachedPersistentObject = cacheGet(persistentObject.getClass(), persistentObject.getId());
- if (cachedPersistentObject!=null) {
- //如果快取中有就直接返回
- return cachedPersistentObject;
- }
- //否則,就先放入快取
- cachePut(persistentObject, true);
- return persistentObject;
- }
- protected CachedObject cachePut(PersistentObject persistentObject, boolean storeState) {
- Map<String, CachedObject> classCache = cachedObjects.get(persistentObject.getClass());
- if (classCache==null) {
- classCache = new HashMap<String, CachedObject>();
- cachedObjects.put(persistentObject.getClass(), classCache);
- }
- //這裡是關鍵:一個CachedObject包含被快取的物件本身:persistentObject和快取的狀態:storeState
- //Activiti正是根據storeState來判別快取中的資料是否被更新是否與db保持一致的。
- CachedObject cachedObject = new CachedObject(persistentObject, storeState);
- classCache.put(persistentObject.getId(), cachedObject);
- return cachedObject;
- }
看了Activiti的快取設計,我現在最大的疑問是Activiti貌似不支援cluster,因為其快取設計是基於單機記憶體的,這個問題還需要進一步調查。
9.非同步執行元件
Activiti可以非同步執行job(具體例子可以看一下ProcessInstance startProcessInstanceByKey(String processDefinitionKey);),下面簡單分析一下其實現過程,還是先看圖:
(圖7:非同步執行元件核心類)
JobExecutor是非同步執行元件的核心類,其包含三個主要屬性:
1)JobAcquisitionThread jobAccquisitionThread:執行任務的執行緒 extends java.lang.Thread
2)BlockingQueue<Runnable> threadPoolQueue
3)ThreadPoolExecutor threadPoolExecutor:執行緒池
方法ProcessEngines在引擎啟動時呼叫JobExecutor.start,JobAcquisitionThread 執行緒即開始工作,其run方法不斷迴圈執行AcquiredJobs中的job,執行一次後執行緒等待一定時間直到超時或者JobExecutor.jobWasAdded方法因為有新任務而被呼叫。
這裡發現有一處設計的不夠好:JobAcquisitionThread 與JobExecutor之間的關係是如此緊密(你中有我,我中有你),那麼可以把JobAcquisitionThread 作為JobExecutor的內部類來實現,同時把ThreadPoolExecutor threadPoolExecutor交給JobAcquisitionThread 來管理,JobExecutor只負責接受任務以及啟動、停止等更高階的工作,具體細節委託給JobAcquisitionThread ,責任分解,便於維護,JobExecutor的程式碼也會看起來更清晰。
10.PVM
PVM:Process Virtal Machine,流程虛擬機器API暴露了流程虛擬機器的POJO核心,流程虛擬機器API描述了一個工作流流程必備的元件,這些元件包括:
PvmProcessDefinition:流程的定義,形象點說就是使用者畫的那個圖。靜態含義。
PvmProcessInstance:流程例項,使用者發起的某個PvmProcessDefinition的一個例項,動態含義。
PvmActivity:流程中的一個節點
PvmTransition:銜接各個節點之間的路徑,形象點說就是圖中各個節點之間的連線線。
PvmEvent:流程執行過程中觸發的事件
以上這些元件很好地對一個流程進行了建模和抽象。每個元件都有很清晰的角色和職責劃分。另外,有了這些API,我們可以通過程式設計的方式,用程式碼來“畫”一個流程圖並讓他run起來,例如:
Java程式碼- PvmProcessDefinition processDefinition = new ProcessDefinitionBuilder()
- .createActivity("a").initial().<strong style="background-color: #ff0000;">behavior</strong>(new WaitState())
- .transition("b").endActivity().createActivity("b")
- .behavior(new WaitState()).transition("c").endActivity()
- .createActivity("c").behavior(new WaitState()).endActivity()
- .buildProcessDefinition();
- PvmProcessInstance processInstance = processDefinition
- .createProcessInstance();
- processInstance.start();
- PvmExecution activityInstance = processInstance.findExecution("a");
- assertNotNull(activityInstance);
- activityInstance.signal(null, null);
- activityInstance = processInstance.findExecution("b");
- assertNotNull(activityInstance);
- activityInstance.signal(null, null);
- activityInstance = processInstance.findExecution("c");
- assertNotNull(activityInstance);
以上程式碼都很簡單,很好理解,只有一點需要說明一下,粗體紅色背景的behavior方法,為一個PvmActivity增加ActivityBehavior,這是幹什麼呢?ActivityBehavior是一個interface,其介面宣告很簡單:
Java程式碼- /**
- * @author Tom Baeyens
- */
- public interface ActivityBehavior {
- void execute(ActivityExecution execution) throws Exception;
- }
我的理解:Activiti把完成一個PvmActivity的行為單獨建模封裝在ActivityBehavior中。execute方法只有一個引數ActivityExecution,為啥這麼設計?
為了更好地理解ActivityBehavior的作用,我們以TaskEntity.complete方法為例,分析其執行過程,先看complete的程式碼:
Java程式碼- public void complete() {
- fireEvent(TaskListener.EVENTNAME_COMPLETE);
- Context
- .getCommandContext()
- .getTaskManager()
- .deleteTask(this, TaskEntity.DELETE_REASON_COMPLETED, false);
- if (executionId!=null) {
- getExecution().signal(null, null);
- }
- }
程式碼很簡單,也很好理解(可能出乎我們的意料,因為完成一個task,其實有很多事情要做的):
1.fireEvent:通知Listener,本任務完成了。
2.資料持久化相關的動作
3.getExecution().signal(null, null):發訊號,這裡面隱藏的東西就多了,總體來說,完成了當前任務流程怎麼走,怎麼生成新的任務都是在這裡完成的。
進去看看:
Java程式碼- public void signal(String signalName, Object signalData) {
- ensureActivityInitialized();
- SignallableActivityBehavior activityBehavior = (SignallableActivityBehavior) activity.getActivityBehavior();
- try {
- activityBehavior.signal(this, signalName, signalData);
- } catch (RuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new PvmException("couldn't process signal '"+signalName+"' on activity '"+activity.getId()+"': "+e.getMessage(), e);
- }
- }
ExecutionEntity.signal方法核心工作就是把發訊號的工作委託給PvmActivity的activityBehavior. 這裡的設計存在問題,很顯然其觸犯了一個程式碼的壞味道:訊息鏈。它讓ExceutionEntity沒有必要地與SignallableActivityBehavior 產生了耦合,更好的做法應該是PvmActivity提供signal方法,其內部呼叫ActivityBehavior完成發訊號工作。
其實看看PvmActivity的介面宣告,我不免也有疑問,本來屬於PvmActivity的很重要的職責在其介面宣告中都沒有體現,why??
Java程式碼- /**
- * @author Tom Baeyens
- */
- public interface PvmActivity extends PvmScope {
- boolean isAsync();
- PvmScope getParent();
- List<PvmTransition> getIncomingTransitions();
- List<PvmTransition> getOutgoingTransitions();
- PvmTransition findOutgoingTransition(String transitionId);
- }
把思路拉回來,我們繼續看activityBehavior.signal方法內部的具體實現。