一種輕量級工作流引擎的設計與參考實現
工作中,基於實際情況的需要,自研了一款工作流引擎,期間有不少收穫,願與同學們分享,聽我娓娓道來......
1. 什麼是工作流引擎
簡而言之,工作流引擎就是驅動工作流執行的一套程式碼。
至於什麼是工作流、為什麼要有工作流、工作流的應用場景,同學們可以看一看網上的資料,在此處不在展開。
2. 為什麼要重複造輪子
開源的工作流引擎很多,比如 activiti、flowable、Camunda 等,那麼,為什麼沒有選它們呢?基於以下幾點考慮:
- 最重要的,滿足不了業務需求,一些特殊的場景無法實現。
- 有些需求實現起來比較繞,更有甚者,需要直接修改引擎資料庫,這對於引擎的穩定執行帶來了巨大的隱患,也對以後引擎的版本升級製造了一些困難。
- 資料、程式碼量、API繁多,學習成本較高,維護性較差。
- 經過分析與評估,我們的業務場景需要的BPMN元素較少,開發實現的代價不大。
因此,重複造了輪子,其實,還有一個更深層次的戰略上的考慮,即:作為科技公司,我們一定要有我們自己的核心底層技術!這樣,才能不受制於人(參考最近的晶片問題)。
3. 怎麼造的輪子
對於一次學習型分享來講,過程比結果更重要,那些只說結果,不細說過程甚至不說的分享,我認為是秀肌肉,而不是真正意義上的分享。因此,接下來,本文將重點描述造輪子的主要過程。
一個成熟的工作流引擎的構建是很複雜的,如何應對這種複雜性呢?一般來講,有以下三種方法
- 確定性交付:弄清楚需求是什麼,驗收標準是什麼,最好能夠寫出測試用例,這一步是為了明確目標。
- 迭代式開發:先從小的問題集的解決開始,逐步過渡到解決大的問題集上來,羅馬不是一天建成的,人也不是一天就能成熟的,是需要個過程的。
- 分而治之:把大的問題拆成小的問題,小問題的解決會推動大問題的解決(這個思想適用場景比較多,同學們可以用心體會和理解哈)。
如果按照上述方法,一步一步的詳細展開,那麼可能需要一本書。為了縮減篇幅而又不失乾貨,本文會描述重點幾個迭代,進而闡述輕量級工作流引擎的設計與主要實現。
那麼,輕量級又是指什麼呢?這裡,主要是指以下幾點
- 少依賴:程式碼的java實現上,除了jdk8以外,不依賴與其他第三方jar包,從而可以更好的減少依賴帶來的問題。
- 核心化:設計上,採用了微核心架構模式,核心小巧,實用,同時提供了一定的擴充套件性。從而可以更好地理解與應用本引擎。
- 輕規範:並沒有完全實現BPMN規範,也沒有完全按照BPMN規範進行設計,而只是參考了該規範,且只實現以一小部分必須實現的元素。從而降低了學習成本,可以按照需求自由發揮。
- 工具化:程式碼上,只是一個工具(UTIL),不是一個應用程式。從而你可以簡單的執行它,擴充套件你自己的資料層、節點層,更加方便的整合到其他應用中去。
好,廢話說完了,開始第一個迭代......
4. Hello ProcessEngine
按照國際慣例,第一個迭代用來實現 hello world 。
需求
作為一個流程管理員,我希望流程引擎可以執行如下圖所示的流程,以便我能夠配置流程來列印不同的字串。
分析
- 第一個流程,可以列印Hello ProcessEngine,第二個流程可以列印ProcessEngine Hello,這兩個流程的區別是隻有順序不同,藍色的節點與紅色的節點的本身功能沒有發生變化
- 藍色的節點與紅色的節點都是節點,它們的功能是不一樣的,即:紅色的節點列印Hello,藍色的節點列印ProcessEngine
- 開始與結束節點是兩個特殊的節點,一個開始流程,一個結束流程
- 節點與節點之間是通過線來連線的,一個節點執行完畢後,是通過箭頭來確定下一個要執行的節點
- 需要一種表示流程的方式,或是XML、或是JSON、或是其他,而不是圖片
設計
流程的表示
相較於JSON,XML的語義更豐富,可以表達更多的資訊,因此這裡使用XML來對流程進行表示,如下所示
<definitions>
<process id="process_1" name="hello">
<startEvent id="startEvent_1">
<outgoing>flow_1</outgoing>
</startEvent>
<sequenceFlow id="flow_1" sourceRef="startEvent_1" targetRef="printHello_1" />
<printHello id="printHello_1" name="hello">
<incoming>flow_1</incoming>
<outgoing>flow_2</outgoing>
</printHello>
<sequenceFlow id="flow_2" sourceRef="printHello_1" targetRef="printProcessEngine_1" />
<printProcessEngine id="printProcessEngine_1" name="processEngine">
<incoming>flow_2</incoming>
<outgoing>flow_3</outgoing>
</printProcessEngine>
<sequenceFlow id="flow_3" sourceRef="printProcessEngine_1" targetRef="endEvent_1"/>
<endEvent id="endEvent_1">
<incoming>flow_3</incoming>
</endEvent>
</process>
</definitions>
- process表示一個流程
- startEvent表示開始節點,endEvent表示結束節點
- printHello表示列印hello節點,就是需求中的藍色節點
- processEngine表示列印processEngine節點,就是需求中的紅色節點
- sequenceFlow表示連線,從sourceRef開始,指向targetRef,例如:flow_3,表示一條從printProcessEngine_1到endEvent_1的連線。
節點的表示
- outgoing表示出邊,即節點執行完畢後,應該從那個邊出去。
- incoming表示入邊,即從哪個邊進入到本節點。
- 一個節點只有outgoing而沒有incoming,如:startEvent,也可以
只有入邊而沒有出邊,如:endEvent,也可以既有入邊也有出邊,如:printHello、processEngine。
流程引擎的邏輯
基於上述XML,流程引擎的執行邏輯如下
- 找到開始節點(startEvent)
- 找到startEvent的outgoing邊(sequenceFlow)
- 找到該邊(sequenceFlow)指向的節點(targetRef)
- 執行節點自身的邏輯
- 找到該節點的outgoing邊(sequenceFlow)
- 重複3-5,直到遇到結束節點(endEvent),流程結束
實現
首先要進行資料結構的設計,即:要把問題域中的資訊對映到計算機中的資料。
可以看到,一個流程(PeProcess)由多個節點(PeNode)與邊(PeEdge)組成,節點有出邊(out)、入邊(in),邊有流入節點(from)、流出節點(to)。
具體的定義如下:
public class PeProcess {
public String id;
public PeNode start;
public PeProcess(String id, PeNode start) {
this.id = id;
this.start = start;
}
}
public class PeEdge {
private String id;
public PeNode from;
public PeNode to;
public PeEdge(String id) {
this.id = id;
}
}
public class PeNode {
private String id;
public String type;
public PeEdge in;
public PeEdge out;
public PeNode(String id) {
this.id=id;
}
}
PS : 為了表述主要思想,在程式碼上比較“奔放自由”,生產中不可直接複製貼上!
接下來,構建流程圖,程式碼如下:
public class XmlPeProcessBuilder {
private String xmlStr;
private final Map<String, PeNode> id2PeNode = new HashMap<>();
private final Map<String, PeEdge> id2PeEdge = new HashMap<>();
public XmlPeProcessBuilder(String xmlStr) {
this.xmlStr = xmlStr;
}
public PeProcess build() throws Exception {
//strToNode : 把一段xml轉換為org.w3c.dom.Node
Node definations = XmlUtil.strToNode(xmlStr);
//childByName : 找到definations子節點中nodeName為process的那個Node
Node process = XmlUtil.childByName(definations, "process");
NodeList childNodes = process.getChildNodes();
for (int j = 0; j < childNodes.getLength(); j++) {
Node node = childNodes.item(j);
//#text node should be skip
if (node.getNodeType() == Node.TEXT_NODE) continue;
if ("sequenceFlow".equals(node.getNodeName()))
buildPeEdge(node);
else
buildPeNode(node);
}
Map.Entry<String, PeNode> startEventEntry = id2PeNode.entrySet().stream().filter(entry -> "startEvent".equals(entry.getValue().type)).findFirst().get();
return new PeProcess(startEventEntry.getKey(), startEventEntry.getValue());
}
private void buildPeEdge(Node node) {
//attributeValue : 找到node節點上屬性為id的值
PeEdge peEdge = id2PeEdge.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeEdge(id));
peEdge.from = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "sourceRef"), id -> new PeNode(id));
peEdge.to = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "targetRef"), id -> new PeNode(id));
}
private void buildPeNode(Node node) {
PeNode peNode = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeNode(id));
peNode.type = node.getNodeName();
Node inPeEdgeNode = XmlUtil.childByName(node, "incoming");
if (inPeEdgeNode != null)
//text : 得到inPeEdgeNode的nodeValue
peNode.in = id2PeEdge.computeIfAbsent(XmlUtil.text(inPeEdgeNode), id -> new PeEdge(id));
Node outPeEdgeNode = XmlUtil.childByName(node, "outgoing");
if (outPeEdgeNode != null)
peNode.out = id2PeEdge.computeIfAbsent(XmlUtil.text(outPeEdgeNode), id -> new PeEdge(id));
}
}
接下來,實現流程引擎主邏輯,程式碼如下:
public class ProcessEngine {
private String xmlStr;
public ProcessEngine(String xmlStr) {
this.xmlStr = xmlStr;
}
public void run() throws Exception {
PeProcess peProcess = new XmlPeProcessBuilder(xmlStr).build();
PeNode node = peProcess.start;
while (!node.type.equals("endEvent")) {
if ("printHello".equals(node.type))
System.out.print("Hello ");
if ("printProcessEngine".equals(node.type))
System.out.print("ProcessEngine ");
node = node.out.to;
}
}
}
就這?工作流引擎就這?同學們可千萬不要這樣簡單理解啊,畢竟這還只是hello world而已,各種程式碼量就已經不少了。
另外,這裡面還有很多可以改進的空間,比如異常控制、泛化、設計模式等,但畢竟只是一個hello world而已,其目的是方便同學理解,讓同學入門。
那麼,接下來呢,就要稍微貼近一些具體的實際應用場景了,我們繼續第二個迭代。
5. 簡單審批
一般來講工作流引擎屬於底層技術,在它之上可以構建審批流、業務流、資料流等型別的應用,那麼接下啦就以實際中的簡單審批場景為例,繼續深入工作流引擎的設計,好,我們開始。
需求
作為一個流程管理員,我希望流程引擎可以執行如下圖所示的流程,以便我能夠配置流程來實現簡單的審批流。
例如:小張提交了一個申請單,然後經過經理審批,審批結束後,不管通過還是不通過,都會經過第三步把結果傳送給小張。
分析
- 總體上來講,這個流程還是線性順序類的,基本上可以沿用上次迭代的部分設計
- 審批節點的耗時可能會比較長,甚至會達到幾天時間,工作流引擎主動式的調取下一個節點的邏輯並不適合此場景
- 隨著節點型別的增多,工作流引擎裡寫死的那部分節點型別自由邏輯也不合適
- 審批時需要申請單資訊、審批人,結果郵件通知還需要審批結果等資訊,這些資訊如何傳遞也是一個要考慮的問題
設計
- 採用註冊機制,把節點型別及其自有邏輯註冊進工作流引擎,以便能夠擴充套件更多節點,使得工作流引擎與節點解耦
- 工作流引擎增加被動式驅動邏輯,使得能夠通過外部來使工作流引擎執行下一個節點
- 增加上下文語義,作為全域性變數來使用,使得資料能夠流經各個節點
實現
新的XML定義如下:
<definitions>
<process id="process_2" name="簡單審批例子">
<startEvent id="startEvent_1">
<outgoing>flow_1</outgoing>
</startEvent>
<sequenceFlow id="flow_1" sourceRef="startEvent_1" targetRef="approvalApply_1" />
<approvalApply id="approvalApply_1" name="提交申請單">
<incoming>flow_1</incoming>
<outgoing>flow_2</outgoing>
</approvalApply>
<sequenceFlow id="flow_2" sourceRef="approvalApply_1" targetRef="approval_1" />
<approval id="approval_1" name="審批">
<incoming>flow_2</incoming>
<outgoing>flow_3</outgoing>
</approval>
<sequenceFlow id="flow_3" sourceRef="approval_1" targetRef="notify_1"/>
<notify id="notify_1" name="結果郵件通知">
<incoming>flow_3</incoming>
<outgoing>flow_4</outgoing>
</notify>
<sequenceFlow id="flow_4" sourceRef="notify_1" targetRef="endEvent_1"/>
<endEvent id="endEvent_1">
<incoming>flow_4</incoming>
</endEvent>
</process>
</definitions>
首先要有一個上下文物件類,用於傳遞變數的,定義如下:
public class PeContext {
private Map<String, Object> info = new ConcurrentHashMap<>();
public Object getValue(String key) {
return info.get(key);
}
public void putValue(String key, Object value) {
info.put(key, value);
}
}
每個節點的處理邏輯是不一樣的,此處應該進行一定的抽象,為了強調流程中節點的作用是邏輯處理,引入了一種新的型別--運算元(Operator),定義如下:
public interface IOperator {
//引擎可以據此來找到本運算元
String getType();
//引擎排程本運算元
void doTask(ProcessEngine processEngine, PeNode node, PeContext peContext);
}
對於引擎來講,當遇到一個節點時,需要排程之,但怎麼排程呢?首先需要各個節點運算元註冊(registNodeProcessor())進來,這樣才能找到要排程的那個運算元。
其次,引擎怎麼知道節點運算元自有邏輯處理完了呢?一般來講,引擎是不知道的,只能是由運算元告訴引擎,所以引擎要提供一個功能(nodeFinished()),這個功能由運算元呼叫。
最後,把運算元任務的排程和引擎的驅動解耦開來,放入不同的執行緒中。
修改後的ProcessEngine程式碼如下:
public class ProcessEngine {
private String xmlStr;
//儲存運算元
private Map<String, IOperator> type2Operator = new ConcurrentHashMap<>();
private PeProcess peProcess = null;
private PeContext peContext = null;
//任務資料暫存
public final BlockingQueue<PeNode> arrayBlockingQueue = new LinkedBlockingQueue();
//任務排程執行緒
public final Thread dispatchThread = new Thread(() -> {
while (true) {
try {
PeNode node = arrayBlockingQueue.take();
type2Operator.get(node.type).doTask(this, node, peContext);
} catch (Exception e) {
}
}
});
public ProcessEngine(String xmlStr) {
this.xmlStr = xmlStr;
}
//運算元註冊到引擎中,便於引擎呼叫之
public void registNodeProcessor(IOperator operator) {
type2Operator.put(operator.getType(), operator);
}
public void start() throws Exception {
peProcess = new XmlPeProcessBuilder(xmlStr).build();
peContext = new PeContext();
dispatchThread.setDaemon(true);
dispatchThread.start();
executeNode(peProcess.start.out.to);
}
private void executeNode(PeNode node) {
if (!node.type.equals("endEvent"))
arrayBlockingQueue.add(node);
else
System.out.println("process finished!");
}
public void nodeFinished(String peNodeID) {
PeNode node = peProcess.peNodeWithID(peNodeID);
executeNode(node.out.to);
}
}
接下來,簡單(簡陋)實現本示例所需的三個運算元,程式碼如下:
/**
* 提交申請單
*/
public class OperatorOfApprovalApply implements IOperator {
@Override
public String getType() {
return "approvalApply";
}
@Override
public void doTask(ProcessEngine processEngine, PeNode node, PeContext peContext) {
peContext.putValue("form", "formInfo");
peContext.putValue("applicant", "小張");
processEngine.nodeFinished(node.id);
}
}
/**
* 審批
*/
public class OperatorOfApproval implements IOperator {
@Override
public String getType() {
return "approval";
}
@Override
public void doTask(ProcessEngine processEngine, PeNode node, PeContext peContext) {
peContext.putValue("approver", "經理");
peContext.putValue("message", "審批通過");
processEngine.nodeFinished(node.id);
}
}
/**
* 結果郵件通知
*/
public class OperatorOfNotify implements IOperator {
@Override
public String getType() {
return "notify";
}
@Override
public void doTask(ProcessEngine processEngine, PeNode node, PeContext peContext) {
System.out.println(String.format("%s 提交的申請單 %s 被 %s 審批,結果為 %s",
peContext.getValue("applicant"),
peContext.getValue("form"),
peContext.getValue("approver"),
peContext.getValue("message")));
processEngine.nodeFinished(node.id);
}
}
執行一下,看看結果如何,程式碼如下:
public class ProcessEngineTest {
@Test
public void testRun() throws Exception {
//讀取檔案內容到字串
String modelStr = Tools.readResoucesFile("model/two/hello.xml");
ProcessEngine processEngine = new ProcessEngine(modelStr);
processEngine.registNodeProcessor(new OperatorOfApproval());
processEngine.registNodeProcessor(new OperatorOfApprovalApply());
processEngine.registNodeProcessor(new OperatorOfNotify());
processEngine.start();
Thread.sleep(1000 * 1);
}
}
小張 提交的申請單 formInfo 被 經理 審批,結果為 審批通過
process finished!
到此,輕量級工作流引擎的核心邏輯介紹的差不多了,然而,只支援順序結構是太單薄的,我們知道,程式流程的三種基本結構為順序、分支、迴圈,有了這三種結構,基本上就可以表示絕大多數流程邏輯。迴圈可以看做一種組合結構,即:迴圈可以由順序與分支推匯出來,我們已經實現了順序,那麼接下來只要實現分支即可,而分支有很多型別,如:二選一、N選一、N選M(1<=M<=N),其中N選一可以由二選一的組合推匯出來,N選M也可以由二選一的組合推匯出來,只是比較囉嗦,不那麼直觀,所以,我們只要實現二選一分支,即可滿足絕大多數流程邏輯場景,好,第三個迭代開始。
6. 一般審批
作為一個流程管理員,我希望流程引擎可以執行如下圖所示的流程,以便我能夠配置流程來實現一般的審批流。
例如:小張提交了一個申請單,然後經過經理審批,審批結束後,如果通過,發郵件通知,不通過,則打回重寫填寫申請單,直到通過為止。
分析
- 需要引入一種分支節點,可以進行簡單的二選一流轉
- 節點的入邊、出邊不只一條
- 需要一種邏輯表示式語義,可以配置分支節點
設計
- 節點要支援多入邊、多出邊
- 節點運算元來決定從哪個出邊出
- 使用一種簡單的規則引擎,支援簡單的邏輯表示式的解析
- 簡單分支節點的XML定義
實現
新的XML定義如下:
<definitions>
<process id="process_2" name="簡單審批例子">
<startEvent id="startEvent_1">
<outgoing>flow_1</outgoing>
</startEvent>
<sequenceFlow id="flow_1" sourceRef="startEvent_1" targetRef="approvalApply_1"/>
<approvalApply id="approvalApply_1" name="提交申請單">
<incoming>flow_1</incoming>
<incoming>flow_5</incoming>
<outgoing>flow_2</outgoing>
</approvalApply>
<sequenceFlow id="flow_2" sourceRef="approvalApply_1" targetRef="approval_1"/>
<approval id="approval_1" name="審批">
<incoming>flow_2</incoming>
<outgoing>flow_3</outgoing>
</approval>
<sequenceFlow id="flow_3" sourceRef="approval_1" targetRef="simpleGateway_1"/>
<simpleGateway id="simpleGateway_1" name="簡單是非判斷">
<trueOutGoing>flow_4</trueOutGoing>
<expr>approvalResult</expr>
<incoming>flow_3</incoming>
<outgoing>flow_4</outgoing>
<outgoing>flow_5</outgoing>
</simpleGateway>
<sequenceFlow id="flow_5" sourceRef="simpleGateway_1" targetRef="approvalApply_1"/>
<sequenceFlow id="flow_4" sourceRef="simpleGateway_1" targetRef="notify_1"/>
<notify id="notify_1" name="結果郵件通知">
<incoming>flow_4</incoming>
<outgoing>flow_6</outgoing>
</notify>
<sequenceFlow id="flow_6" sourceRef="notify_1" targetRef="endEvent_1"/>
<endEvent id="endEvent_1">
<incoming>flow_6</incoming>
</endEvent>
</process>
</definitions>
其中,加入了simpleGateway這個簡單分支節點,用於表示簡單的二選一分支,當expr中的表示式為真時,走trueOutGoing中的出邊,否則走另一個出邊。
節點支援多入邊、多出邊,修改後的PeNode如下:
public class PeNode {
public String id;
public String type;
public List<PeEdge> in = new ArrayList<>();
public List<PeEdge> out = new ArrayList<>();
public Node xmlNode;
public PeNode(String id) {
this.id = id;
}
public PeEdge onlyOneOut() {
return out.get(0);
}
public PeEdge outWithID(String nextPeEdgeID) {
return out.stream().filter(e -> e.id.equals(nextPeEdgeID)).findFirst().get();
}
public PeEdge outWithOutID(String nextPeEdgeID) {
return out.stream().filter(e -> !e.id.equals(nextPeEdgeID)).findFirst().get();
}
}
以前只有一個出邊時,是由當前節點來決定下一節點的,現在多出邊了,該由邊來決定下一個節點是什麼,修改後的流程引擎程式碼如下:
public class ProcessEngine {
private String xmlStr;
//儲存運算元
private Map<String, IOperator> type2Operator = new ConcurrentHashMap<>();
private PeProcess peProcess = null;
private PeContext peContext = null;
//任務資料暫存
public final BlockingQueue<PeNode> arrayBlockingQueue = new LinkedBlockingQueue();
//任務排程執行緒
public final Thread dispatchThread = new Thread(() -> {
while (true) {
try {
PeNode node = arrayBlockingQueue.take();
type2Operator.get(node.type).doTask(this, node, peContext);
} catch (Exception e) {
e.printStackTrace();
}
}
});
public ProcessEngine(String xmlStr) {
this.xmlStr = xmlStr;
}
//運算元註冊到引擎中,便於引擎呼叫之
public void registNodeProcessor(IOperator operator) {
type2Operator.put(operator.getType(), operator);
}
public void start() throws Exception {
peProcess = new XmlPeProcessBuilder(xmlStr).build();
peContext = new PeContext();
dispatchThread.setDaemon(true);
dispatchThread.start();
executeNode(peProcess.start.onlyOneOut().to);
}
private void executeNode(PeNode node) {
if (!node.type.equals("endEvent"))
arrayBlockingQueue.add(node);
else
System.out.println("process finished!");
}
public void nodeFinished(PeEdge nextPeEdgeID) {
executeNode(nextPeEdgeID.to);
}
}
新加入的simpleGateway節點運算元如下:
/**
* 簡單是非判斷
*/
public class OperatorOfSimpleGateway implements IOperator {
@Override
public String getType() {
return "simpleGateway";
}
@Override
public void doTask(ProcessEngine processEngine, PeNode node, PeContext peContext) {
ScriptEngineManager manager = new ScriptEngineManager();
ScriptEngine engine = manager.getEngineByName("js");
engine.put("approvalResult", peContext.getValue("approvalResult"));
String expression = XmlUtil.childTextByName(node.xmlNode, "expr");
String trueOutGoingEdgeID = XmlUtil.childTextByName(node.xmlNode, "trueOutGoing");
PeEdge outPeEdge = null;
try {
outPeEdge = (Boolean) engine.eval(expression) ?
node.outWithID(trueOutGoingEdgeID) : node.outWithOutID(trueOutGoingEdgeID);
} catch (ScriptException e) {
e.printStackTrace();
}
processEngine.nodeFinished(outPeEdge);
}
}
其中簡單使用了js指令碼作為表示式,當然其中的弊端這裡就不展開了。
為了方便同學們CC+CV,其他發生相應變化的程式碼如下:
/**
* 審批
*/
public class OperatorOfApproval implements IOperator {
@Override
public String getType() {
return "approval";
}
@Override
public void doTask(ProcessEngine processEngine, PeNode node, PeContext peContext) {
peContext.putValue("approver", "經理");
Integer price = (Integer) peContext.getValue("price");
//價格<=200審批才通過,即:approvalResult=true
boolean approvalResult = price <= 200;
peContext.putValue("approvalResult", approvalResult);
System.out.println("approvalResult : " + approvalResult + ",price : " + price);
processEngine.nodeFinished(node.onlyOneOut());
}
}
/**
* 提交申請單
*/
public class OperatorOfApprovalApply implements IOperator {
public static int price = 500;
@Override
public String getType() {
return "approvalApply";
}
@Override
public void doTask(ProcessEngine processEngine, PeNode node, PeContext peContext) {
//price每次減100
peContext.putValue("price", price -= 100);
peContext.putValue("applicant", "小張");
processEngine.nodeFinished(node.onlyOneOut());
}
}
/**
* 結果郵件通知
*/
public class OperatorOfNotify implements IOperator {
@Override
public String getType() {
return "notify";
}
@Override
public void doTask(ProcessEngine processEngine, PeNode node, PeContext peContext) {
System.out.println(String.format("%s 提交的申請單 %s 被 %s 審批,結果為 %s",
peContext.getValue("applicant"),
peContext.getValue("price"),
peContext.getValue("approver"),
peContext.getValue("approvalResult")));
processEngine.nodeFinished(node.onlyOneOut());
}
}
public class XmlPeProcessBuilder {
private String xmlStr;
private final Map<String, PeNode> id2PeNode = new HashMap<>();
private final Map<String, PeEdge> id2PeEdge = new HashMap<>();
public XmlPeProcessBuilder(String xmlStr) {
this.xmlStr = xmlStr;
}
public PeProcess build() throws Exception {
//strToNode : 把一段xml轉換為org.w3c.dom.Node
Node definations = XmlUtil.strToNode(xmlStr);
//childByName : 找到definations子節點中nodeName為process的那個Node
Node process = XmlUtil.childByName(definations, "process");
NodeList childNodes = process.getChildNodes();
for (int j = 0; j < childNodes.getLength(); j++) {
Node node = childNodes.item(j);
//#text node should be skip
if (node.getNodeType() == Node.TEXT_NODE) continue;
if ("sequenceFlow".equals(node.getNodeName()))
buildPeEdge(node);
else
buildPeNode(node);
}
Map.Entry<String, PeNode> startEventEntry = id2PeNode.entrySet().stream().filter(entry -> "startEvent".equals(entry.getValue().type)).findFirst().get();
return new PeProcess(startEventEntry.getKey(), startEventEntry.getValue());
}
private void buildPeEdge(Node node) {
//attributeValue : 找到node節點上屬性為id的值
PeEdge peEdge = id2PeEdge.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeEdge(id));
peEdge.from = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "sourceRef"), id -> new PeNode(id));
peEdge.to = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "targetRef"), id -> new PeNode(id));
}
private void buildPeNode(Node node) {
PeNode peNode = id2PeNode.computeIfAbsent(XmlUtil.attributeValue(node, "id"), id -> new PeNode(id));
peNode.type = node.getNodeName();
peNode.xmlNode = node;
List<Node> inPeEdgeNodes = XmlUtil.childsByName(node, "incoming");
inPeEdgeNodes.stream().forEach(n -> peNode.in.add(id2PeEdge.computeIfAbsent(XmlUtil.text(n), id -> new PeEdge(id))));
List<Node> outPeEdgeNodes = XmlUtil.childsByName(node, "outgoing");
outPeEdgeNodes.stream().forEach(n -> peNode.out.add(id2PeEdge.computeIfAbsent(XmlUtil.text(n), id -> new PeEdge(id))));
}
}
執行一下,看看結果如何,程式碼如下:
public class ProcessEngineTest {
@Test
public void testRun() throws Exception {
//讀取檔案內容到字串
String modelStr = Tools.readResoucesFile("model/third/hello.xml");
ProcessEngine processEngine = new ProcessEngine(modelStr);
processEngine.registNodeProcessor(new OperatorOfApproval());
processEngine.registNodeProcessor(new OperatorOfApprovalApply());
processEngine.registNodeProcessor(new OperatorOfNotify());
processEngine.registNodeProcessor(new OperatorOfSimpleGateway());
processEngine.start();
Thread.sleep(1000 * 1);
}
}
approvalResult : false,price : 400
approvalResult : false,price : 300
approvalResult : true,price : 200
小張 提交的申請單 200 被 經理 審批,結果為 true
process finished!
至此,本需求實現完畢,除了直接實現了分支語義外,我們看到,這裡還間接實現了迴圈語義。
作為一個輕量級的工作流引擎,到此就基本講完了,接下來,我們做一下總結與展望。
7. 總結與展望
經過以上三個迭代,我們可以得到一個相對穩定的工作流引擎的結構,如下圖所示:
通過此圖我們可知,這裡有一個相對穩定的引擎層,同時為了提供擴充套件性,提供了一個節點運算元層,所有的節點運算元的新增都在此處中。
此外,進行了一定程度的控制反轉,即:由運算元決定下一步走哪裡,而不是引擎。這樣,極大地提高了引擎的靈活性,更好的進行了封裝。
最後,使用了上下文,提供了一種全域性變數的機制,便於節點之間的資料流動。
當然,以上的三個迭代距離實際的線上應用場景相距甚遠,還需實現與展望以下幾點才可,如下:
- 一些異常情況的考慮與設計
- 應把節點抽象成一個函式,要有入參、出參,資料型別等
- 關鍵的地方加入埋點,用以控制引擎或吐出事件
- 圖的語義合法性檢查,xsd、自定義檢查技術等
- 圖的dag演算法檢測
- 流程的流程歷史記錄,及回滾到任意節點
- 流程圖的動態修改,即:可以在流程開始後,對流程圖進行修改
- 併發修改情況下的考慮
- 效率上的考慮
- 防止重啟後流轉資訊丟失,需要持久化機制的加入
- 流程的取消、重置、變數傳入等
- 更合適的規則引擎及多種規則引擎的實現、配置
- 前端的畫布、前後端流程資料結構定義及轉換
路漫漫其修遠兮 吾將上下而求索
我相信,讀到這裡,肯定很累,如果你的髮量有問題,安利一下以下產品。