1. 程式人生 > 實用技巧 >[原始碼解析]Oozie來龍去脈之提交任務

[原始碼解析]Oozie來龍去脈之提交任務

[原始碼解析]Oozie來龍去脈之提交任務

0x00 摘要

Oozie是由Cloudera公司貢獻給Apache的基於工作流引擎的開源框架,是Hadoop平臺的開源的工作流排程引擎,用來管理Hadoop作業。本文是系列的第一篇,介紹Oozie的任務提交階段。

0x01 問題

我們從需求逆推實現,即考慮如果我們從無到有實現工作流引擎,我們需要實現哪些部分?從而我們可以提出一系列問題從而去Oozie中探尋。

作為工作流引擎需要實現哪些部分?大致想了想,覺得需要有:

  • 任務提交
  • 任務持久化
  • 任務委託給某一個執行器執行
  • 任務排程
  • 任務回撥,即任務被執行器完成後通知工作流引擎
  • 支援不同任務(同步,非同步)
  • 控制任務之間邏輯關係(跳轉,等待...)
  • 狀態監控,監控任務進度
  • ......

因為篇幅和精力所限,我們無法研究所有原始碼,回答所有問題,所以我們先整理出部分問題,在後面Oozie原始碼分析中一一解答:

  • Oozie分為幾個模組?
  • 每個模組功能是什麼?
  • Oozie如何提交任務?
  • 任務提交到什麼地方?如何持久化?
  • Oozie任務有同步非同步之分嗎?
  • Oozie如何處理同步任務?
  • Oozie如何處理非同步任務?
  • 任務的控制流節點(Control Flow Nodes)和動作節點(Action Nodes)之間如何跳轉?
  • Oozie都支援什麼型別的任務?Shell?Java? Hive?
  • Oozie如何同Yarn互動?
  • Oozie如何知道Yarn任務完成?

0x02 Oozie 基本概念

2.1 元件

Oozie由Oozie client和Oozie Server兩個元件構成,Oozie Server是執行於Java Servlet容器(Tomcat)中的web應用程式。Oozie client用於給Oozie Server提及任務,Oozie client 提交任務的途徑是HTTP請求。

實際上Oozie Server就相當於Hadoop的一個客戶端,當使用者需要執行多個關聯的MR任務時,只需要將MR執行順序寫入workflow.xml,然後使用Oozie Server提交本次任務,Oozie Server會託管此任務流。

Oozie Server 具體操作的是workflow,即Oozie主要維護workflow的執行 / workflow內部Action的串聯和跳轉。

具體Action的執行是由Yarn去執行,Yarn會把Action分配給有充足資源的節點執行。Action是非同步執行,所以Action結束時候會通過回撥方式通知Oozie執行結果,Oozie也會採用輪詢方式去獲取Action結果(為了提高可靠性)。

大致提交流程如下:

Oozie client ------> Oozie  Server -------> Yarn ------> Hadoop

2.2 特點

Oozie特點如下:

  • Oozie不是僅用來配置多個MR工作流的,它可以是各種程式夾雜在一起的工作流,比如執行一個MR1後,接著執行一個java指令碼,再執行一個shell指令碼,接著是Hive指令碼,然後又是Pig指令碼,最後又執行了一個MR2,使用Oozie可以輕鬆完成這種多樣的工作流。使用Oozie時,若前一個任務執行失敗,後一個任務將不會被排程。
  • Oozie定義了控制流節點(Control Flow Nodes)和動作節點(Action Nodes),其中控制流節點定義了流程的開始和結束,以及控制流程的執行路徑(Execution Path),如decision,fork,join等;而動作節點包括Haoop map-reduce hadoop檔案系統,Pig,SSH,HTTP,eMail和Oozie子流程。
  • Oozie以action為基本單位,可以將多個action構成一個DAG圖的模式執行。
  • Oozie工作流必須是一個有向無環圖,實際上Oozie就相當於Hadoop的一個客戶端,當使用者需要執行多個關聯的MR任務時,只需要將MR執行順序寫入workflow.xml,然後使用Oozie提交本次任務,Oozie會託管此任務流。

2.3 功能模組

Oozie主要由以下功能模組構成:

  • workflow(工作流):該元件用於定義和執行一個特定順序的mapreduce,hive和pig作業。由我們需要處理的每個工作組成,進行需求的流式處理。
  • Coordinator(協調器):可將多個工作流協調成一個工作流來進行處理。多個workflow可以組成一個coordinator,可以把前幾個workflow的輸出作為後 一個workflow的輸入,也可以定義workflow的觸發條件,來做定時觸發。
  • Bundle Job:繫結多個coordinator,一起提交或觸發所有coordinator,是對一堆coordinator的抽象。
  • Oozie SLA(伺服器等級協定):該元件支援workflow應用程式執行過程的記錄跟蹤。

我們就從無到有,看看一個Workflow從提交到最後是如何執行的,假設這個workflow開始後,進入一個hive action,這個hive本身配置的是由tez引擎執行 。下面是程式碼簡化版。

<workflow-app xmlns="uri:oozie:workflow:0.5" name="hive-wf">

    <start to="hive-node"/>

    <action name="hive-node">
<hive xmlns="uri:oozie:hive-action:0.5">
<script>hive.sql</script>
</hive>
<ok to="end"/>
<error to="fail"/>
</action> <kill name="fail">
<message>Hive failed, error message</message>
</kill> <end name="end"/>
</workflow-app>

0x03 Oozie client

Oozie Client是使用者用來提交任務給Oozie Server的途徑,其可以啟動任務,停止任務,提交任務,開始任務,檢視任務執行情況。比如啟動任務如下:

oozie job -oozie oozie_url -config job.properties_address -run

3.1 程式入口

既然有啟動指令碼,我們就直接去裡面探尋程式入口。

${JAVA_BIN} ${OOZIE_CLIENT_OPTS} -cp ${OOZIECPPATH} org.apache.oozie.cli.OozieCLI "${@}"

這就看到了Client 的入口類,我們去看看。

public class OozieCLI {
public static void main(String[] args) {
if (!System.getProperties().containsKey(AuthOozieClient.USE_AUTH_TOKEN_CACHE_SYS_PROP)) {
System.setProperty(AuthOozieClient.USE_AUTH_TOKEN_CACHE_SYS_PROP, "true");
}
System.exit(new OozieCLI().run(args));
}
}

我們可以看到,經過驗證之後,程式直接從main函式進入到了run函式。

public class OozieCLI {
public synchronized int run(String[] args) {
final CLIParser parser = getCLIParser();
try {
final CLIParser.Command command = parser.parse(args);
String doAsUser = command.getCommandLine().getOptionValue(DO_AS_OPTION); if (doAsUser != null) {
OozieClient.doAs(doAsUser, new Callable<Void>() {
@Override
public Void call() throws Exception {
processCommand(parser, command);
return null;
}
});
}
else {
processCommand(parser, command);
}
return 0;
}
}
}

看來主要的內容是在這個processCommand裡面,其會根據命令呼叫相應的命令方法。通過command.getName()我們可以清楚的知道Oozie目前支援什麼種類的任務,比如 JOB_CMD,JOBS_CMD,PIG_CMD,SQOOP_CMD,MR_CMD。

public void processCommand(CLIParser parser, CLIParser.Command command) throws Exception {
switch (command.getName()) {
case JOB_CMD:
jobCommand(command.getCommandLine());
break;
case JOBS_CMD:
jobsCommand(command.getCommandLine());
break;
case HIVE_CMD:
scriptLanguageCommand(command.getCommandLine(), HIVE_CMD);
break;
......
default:
parser.showHelp(command.getCommandLine());
}
}

3.2 Hive為例

我們以Hive為例看看如何處理。Hive就是呼叫 scriptLanguageCommand。

private void scriptLanguageCommand(CommandLine commandLine, String jobType){
List<String> args = commandLine.getArgList();
try {
XOozieClient wc = createXOozieClient(commandLine);
Properties conf = getConfiguration(wc, commandLine);
String script = commandLine.getOptionValue(SCRIPTFILE_OPTION);
List<String> paramsList = new ArrayList<>();
......
System.out.println(JOB_ID_PREFIX + wc.submitScriptLanguage(conf, script,
args.toArray(new String[args.size()]),
paramsList.toArray(new String[paramsList.size()]), jobType));
}
}

這裡關鍵程式碼是:wc.submitScriptLanguage,所以我們需要看看XOozieClient.submitScriptLanguage。其註釋表明作用是通過HTTP來提交 Pig 或者 Hive。

public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String[] params, String jobType) throws IOException, OozieClientException {

    switch (jobType) {
case OozieCLI.HIVE_CMD:
script = XOozieClient.HIVE_SCRIPT;
options = XOozieClient.HIVE_OPTIONS;
scriptParams = XOozieClient.HIVE_SCRIPT_PARAMS;
break;
case OozieCLI.PIG_CMD:
......
} conf.setProperty(script, readScript(scriptFile));
setStrings(conf, options, args);
setStrings(conf, scriptParams, params); return (new HttpJobSubmit(conf, jobType)).call();
}

HttpJobSubmit就是向Oozie Server提交job,所以我們最終是需要去Oozie Server探究。

private class HttpJobSubmit extends ClientCallable<String> {
@Override
protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
writeToXml(conf, conn.getOutputStream());
if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) {
JSONObject json = (JSONObject) JSONValue.parse(
new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8));
return (String) json.get(JsonTags.JOB_ID);
}
return null;
}
}

0x04 Oozie Server

4.1 我是個web程式

前面我們提到,Oozie Server是執行於Java Servlet容器(Tomcat)中的web應用程式。所以具體啟動等配置資訊是在web.xml中。很久沒有看到web.xml了,突然覺得好陌生,嘿嘿。

<!-- Servlets -->
<servlet>
<servlet-name>callback</servlet-name>
<display-name>Callback Notification</display-name>
<servlet-class>org.apache.oozie.servlet.CallbackServlet</servlet-class>
<load-on-startup>1</load-on-startup>
</servlet> <servlet>
<servlet-name>v1jobs</servlet-name>
<display-name>WS API for Workflow Jobs</display-name>
<servlet-class>org.apache.oozie.servlet.V1JobsServlet</servlet-class>
<load-on-startup>1</load-on-startup>
</servlet> ......

4.2 初始化服務

Ooize的很多基礎工作是由Services來完成的,每一個service都是一個單例。這些服務的配置資訊在ooze-default.xml中

<property>
<name>oozie.services</name>
<value>
org.apache.oozie.service.HadoopAccessorService,
org.apache.oozie.service.LiteWorkflowAppService,
org.apache.oozie.service.JPAService,
org.apache.oozie.service.DBLiteWorkflowStoreService,
org.apache.oozie.service.CallbackService,
org.apache.oozie.service.ActionService,
org.apache.oozie.service.CallableQueueService,
org.apache.oozie.service.CoordinatorEngineService,
org.apache.oozie.service.BundleEngineService,
org.apache.oozie.service.DagEngineService,
......
</value>
</property>

ServicesLoader這個類用來啟動,載入配置的所有service。

public class ServicesLoader implements ServletContextListener {
private static Services services;
/**
* Initialize Oozie services.
*/
public void contextInitialized(ServletContextEvent event) {
services = new Services();
services.init();
}
}

init函式是用來初始化所有配置好的Services,如果有同型別服務,則後來者會被儲存。

public class Services {
public void init() throws ServiceException {
loadServices();
} private void loadServices() throws ServiceException {
try {
Map<Class<?>, Service> map = new LinkedHashMap<Class<?>, Service>();
Class<?>[] classes = ConfigurationService.getClasses(conf, CONF_SERVICE_CLASSES);
Class<?>[] classesExt = ConfigurationService.getClasses(conf, CONF_SERVICE_EXT_CLASSES); List<Service> list = new ArrayList<Service>();
loadServices(classes, list);
loadServices(classesExt, list); //removing duplicate services, strategy: last one wins
for (Service service : list) {
if (map.containsKey(service.getInterface())) {
service.getClass());
}
map.put(service.getInterface(), service);
}
for (Map.Entry<Class<?>, Service> entry : map.entrySet()) {
setService(entry.getValue().getClass());
}
}
}
}

4.3 從Job到DAG

客戶通過oozie指令碼提交job之後,進入org.apache.oozie.cli.OozieCLI。會生成一個OozieClient,然後使用JobCommand,提交執行的資訊到V1JosServlet的doPost介面,Oozier在doPos介面中會呼叫submitJob()方法。此時會生成一個DAG物件,然後DAG.submitJon(JobConf,startJob)。

我們從V1JosServlet.doPost入手。這裡是基類。

public abstract class BaseJobsServlet extends JsonRestServlet {
protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
JSONObject json = submitJob(request, conf);
}
}

然後回到 V1JosServlet.submitJob

@Override
protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException,IOException {
String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM); if (jobType == null) {
String wfPath = conf.get(OozieClient.APP_PATH); if (wfPath != null) {
json = submitWorkflowJob(request, conf); // 我們的目標在這裡
}
else if (coordPath != null) {
json = submitCoordinatorJob(request, conf);
}
else {
json = submitBundleJob(request, conf);
}
}
else { // This is a http submission job
......
}
return json;
}

然後呼叫到了 DagEngine.submitJob。從其註釋可以看出 The DagEngine provides all the DAG engine functionality for WS calls. 這樣我們就正式來到了DAG的世界

private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException {
try {
String action = request.getParameter(RestConstants.ACTION_PARAM);
DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
if (action != null) {
dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
}
if (dryrun) {
id = dagEngine.dryRunSubmit(conf);
}
else {
id = dagEngine.submitJob(conf, startJob); // 我們在這裡
}
json.put(JsonTags.JOB_ID, id);
}
return json;
}

0x06 核心引擎

Oozie有三種核心引擎,其都是繼承抽象類BaseEngine。

這三種引擎是:

  • DAGEngine,負責workflow執行,我們上面程式碼就會來到這裡.....
  • CoordinatorEngine,負責coordinator執行
  • BundleEngine,負責bundle執行

分別對應

  • org.apache.oozie.service.CoordinatorEngineService
  • org.apache.oozie.service.BundleEngineService
  • org.apache.oozie.service.DagEngineService

我們之前提到,這些屬於系統Services,都是Singletgon,在Oozie啟動時候會加入到Services中。當需要時候通過get來獲取。

public class Services {
private Map<Class<? extends Service>, Service> services = new LinkedHashMap<Class<? extends Service>, Service>(); public <T extends Service> T get(Class<T> serviceKlass) {
return (T) services.get(serviceKlass);
}
}

具體在V1JosServlet中呼叫舉例:

String user = conf.get(OozieClient.USER_NAME);
DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);

0x07 Command推動執行

Oozie把所有命令抽象成Command,這樣其內部把程式執行總結成用Command來推動,類似於訊息驅動

Command分為同步和非同步。其基類都是XCommand。XCommand提供如下模式:

  • single execution: a command instance can be executed only once
  • eager data loading: loads data for eager precondition check
  • eager precondition check: verify precondition before obtaining lock
  • data loading: loads data for precondition check and execution
  • precondition check: verifies precondition for execution is still met
  • locking: obtains exclusive lock on key before executing the command
  • execution: command logic
public abstract class XCommand<T> implements XCallable<T> {
......
private String key;
private String name;
private String type;
private AtomicBoolean used = new AtomicBoolean(false);
private Map<Long, List<XCommand<?>>> commandQueue;
protected Instrumentation instrumentation;
......
}

XCommand的父介面XCallable繼承了java.util.concurrent.Callable。最終目的是當非同步執行時候,基於優先順序來排列命令的執行計劃。

所以XCommand的幾個關鍵函式就是:queue,call,execute:

  • queue :向commandQueue加入一個command,這個command是在當前command執行之後,做delayed execution。在當前command執行過程中加入的具有同樣的delay的commands,會後續順序(single serial)執行。
  • call就是繼承的Callable實現函式,會呼叫到execute。
  • execute則是具體Command實現自己的具體業務。

從我們常見的SubmitXCommand來看,繼承關係如下:

public class SubmitXCommand extends WorkflowXCommand<String>
public abstract class WorkflowXCommand<T> extends XCommand<T>
public abstract class XCommand<T> implements XCallable<T>
public interface XCallable<T> extends Callable<T>

再比如TransitionXCommand的繼承關係:

abstract class TransitionXCommand<T> extends XCommand<T>
public abstract class SubmitTransitionXCommand extends TransitionXCommand<String>

從之前的元件可以看到,任務是有狀態機的概念的,準備,開始,執行中,失敗結束 等等,所以對任務進行操作的命令同時需要處理狀態機的變化,oozie處理任務的命令都需要繼承TransitionXCommand這個抽象類,而TransitionXCommand的父類是XCommand。

0x08 引擎處理提交

前面提到,doPost 會呼叫到 id = dagEngine.submitJob(conf, startJob);

我們看看DAGEngine是如何處理提交的任務。

首先通過SubmitXCommand直接執行其call()來提交job。

public String submitJob(Configuration conf, boolean startJob) throws DagEngineException {
validateSubmitConfiguration(conf);
try {
String jobId;
SubmitXCommand submit = new SubmitXCommand(conf);
jobId = submit.call();
if (startJob) {
start(jobId);
}
return jobId;
}
}

然後通過StartXCommand來啟動Job。從註釋中我們可以看到,此時依然是同步執行 (通過主動執行call()函式)。

public void start(String jobId) throws DagEngineException {
// Changing to synchronous call from asynchronous queuing to prevent the
// loss of command if the queue is full or the queue is lost in case of
// failure.
new StartXCommand(jobId).call();
}

8.1 SubmitXCommand

SubmitXCommand處理的是提交工作,將使用者提交的任務解析後更新到資料庫。

主要業務是在execute中實現。

  1. 解析配置,獲取WorkflowApp
  2. 建立WorkflowInstance
  3. 生成 WorkflowJobBean
  4. 通過JPA儲存WorkflowJobBean 到wf_jobs

程式碼摘要如下:

protected String execute() throws CommandException {

    WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
try {
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
FileSystem fs = has.createFileSystem(user, uri, fsConf); // 解析配置,獲取WorkflowApp
WorkflowApp app = wps.parseDef(conf, defaultConf); // 建立WorkflowInstance
WorkflowInstance wfInstance;
wfInstance = workflowLib.createInstance(app, conf); // 生成 WorkflowJobBean
WorkflowJobBean workflow = new WorkflowJobBean();
workflow.setId(wfInstance.getId());
workflow.setAppName(ELUtils.resolveAppName(app.getName(), conf));
workflow.setAppPath(conf.get(OozieClient.APP_PATH));
workflow.setConf(XmlUtils.prettyPrint(conf).toString());
......
workflow.setWorkflowInstance(wfInstance);
workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); if (!dryrun) {
workflow.setSlaXml(jobSlaXml);
// 新增到臨時list
insertList.add(workflow);
JPAService jpaService = Services.get().get(JPAService.class);
if (jpaService != null) {
// 儲存WorkflowJobBean 到wf_jobs
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
}
}
return workflow.getId();
}
}

其中insertList是用來臨時儲存 WorkflowJobBean

private List<JsonBean> insertList = new ArrayList<JsonBean>();

WorkflowJobBean 對應資料庫中表 WF_JOBS。

public class WorkflowJobBean implements Writable, WorkflowJob, JsonBean {
......//省略其他變數
@Transient
private List<WorkflowActionBean> actions;
}

在Oozie為了方便將使用者定義的Action以及Workflow進行管理,底層使用Jpa將這些資料儲存於資料庫中。具體是呼叫executeBatchInsertUpdateDelete來通過JPA插入到資料庫。

BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);

具體BatchQueryExecutor程式碼如下。

public class BatchQueryExecutor {
public void executeBatchInsertUpdateDelete(Collection<JsonBean> insertList, Collection<UpdateEntry> updateList,Collection<JsonBean> deleteList) {
List<QueryEntry> queryList = new ArrayList<QueryEntry>();
JPAService jpaService = Services.get().get(JPAService.class);
EntityManager em = jpaService.getEntityManager(); if (updateList != null) {
for (UpdateEntry entry : updateList) {
Query query = null;
JsonBean bean = entry.getBean();
if (bean instanceof WorkflowJobBean) {
// 我們程式在這裡
query = WorkflowJobQueryExecutor.getInstance().getUpdateQuery(
(WorkflowJobQuery) entry.getQueryName(), (WorkflowJobBean) entry.getBean(), em);
}
else if (bean instanceof WorkflowActionBean) {
query = WorkflowActionQueryExecutor.getInstance().getUpdateQuery(
(WorkflowActionQuery) entry.getQueryName(), (WorkflowActionBean) entry.getBean(), em);
}
else if {
//此處省略眾多其他型別
}
queryList.add(new QueryEntry(entry.getQueryName(), query));
}
}
// 這裡插入資料庫
jpaService.executeBatchInsertUpdateDelete(insertList, queryList, deleteList, em);
}
}

JPA摘要程式碼如下:

public class JPAService implements Service, Instrumentable {

    private OperationRetryHandler retryHandler;

    public void executeBatchInsertUpdateDelete(final Collection<JsonBean> insertBeans, final List<QueryEntry> updateQueryList, final Collection<JsonBean> deleteBeans, final EntityManager em) {

        try {
retryHandler.executeWithRetry(new Callable<Void>() { public Void call() throws Exception {
......
if (CollectionUtils.isNotEmpty(insertBeans)) {
for (final JsonBean bean : insertBeans) {
em.persist(bean);
}
}
......
}
});
}
}
}

這樣,一個Workflow Job就儲存到了資料庫中。

8.2 workflow生命週期

首先介紹下workflow生命週期,我們程式碼馬上會用到PREP狀態。

  • prep:一個工作流第一次建立就處於prep狀態,表示工作流以及建立但是還沒有執行。

  • running:當一個已經被建立的工作流job開始執行的時候,就處於running狀態。它不會達到結束狀態,只能因為出錯而結束,或者被掛起。

  • suspended:一個running狀態的工作流job會變成suspended狀態,而且它會一直處於該狀態,除非這個工作流job被重新開始執行或者被殺死。

  • killed:當一個工作流job處於被建立後的狀態,或者處於running,suspended狀態時,被殺死,則工作流job的狀態變為killed狀態。

  • failed:當一個工作流job不可預期的錯誤失敗而終止,就會變為failed狀態。

8.3 StartXCommand

處理完SubmitXCommand之後,Oozie Server 馬上處理StartXCommand

StartXCommand 的作用是啟動Command,其繼承了SignalXCommand ,所以 StartXCommand(jobId).call();呼叫到了SignalXCommand的call。

public class StartXCommand extends SignalXCommand

相關程式碼如下:

首先,StartXCommand呼叫基類建構函式

public StartXCommand(String id) {
super("start", 1, id);
InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
}

然後,SignalXCommand得到了jobId,這個就是之前SubmitXCommand生成並且傳回來的。

public SignalXCommand(String name, int priority, String jobId) {
super(name, name, priority);
this.jobId = ParamChecker.notEmpty(jobId, "jobId");
}

call()首先呼叫到 SignalXCommand.loadState。其會根據jobId從資料庫中讀取Workflow job資訊。

protected void loadState() throws CommandException {
try {
jpaService = Services.get().get(JPAService.class);
if (jpaService != null) {
this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId);
if (actionId != null) {
this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_SIGNAL, actionId);
}
}
}

SQL語句如下:

@NamedQuery(name = "GET_WORKFLOW", query = "select OBJECT(w) from WorkflowJobBean w where w.id = :id"),

call()接著呼叫SignalXCommand.execute(),這裡具體操作如下:

    1. execute中,因為狀態是PREP,所以呼叫workflowInstance.start,這裡對應的例項是LiteWorkflowInstance
    • 1.1) LiteWorkflowInstance.start 呼叫 signal()

      • 1.1.1) signal() 呼叫 exiting = nodeHandler.enter(context),實際呼叫的是 LiteActionHandler.enter

        • 1.1.1.1) 呼叫 LiteWorkflowStoreService.liteExecute,這裡是生成WorkflowActionBean,然後新增到臨時變數ACTIONS_TO_START

          • 1.1.1.1.1) WorkflowActionBean action = new WorkflowActionBean();
          • 1.1.1.1.2) action.setJobId(jobId); 做其他各種設定
          • 1.1.1.1.3) List list = (List) context.getTransientVar(ACTIONS_TO_START);
          • 1.1.1.1.4) list.add(action); 新增到臨時列表
    • 1.2) 回到 signal(), 因為start 是 同步操作,所以exiting 為 true
      • 1.2.1) signal all new synch transitions。遍歷 pathsToStar,如果有同步跳轉,則開始進行後一步Action的跳轉,即 signal(pathToStart, "::synch:;
    1. 回到execute(),遍歷WorkflowStoreService.getActionsToStart(workflowInstance),即從ACTIONS_TO_START取Action(因為之前剛剛放入一個,所以這次獲取到)
    • 2.1) for 迴圈中遍歷,因為是submit,所以 syncAction = newAction;
    1. execute() 會呼叫BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete把WorkflowActionBean寫入到資料庫
    1. execute() 直接啟動start command:new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call();

程式碼如下:

protected Void execute() throws CommandException {
WorkflowInstance workflowInstance = wfJob.getWorkflowInstance();
workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob);
WorkflowJob.Status prevStatus = wfJob.getStatus();
WorkflowActionBean syncAction = null;
List<WorkflowActionBean> workflowActionBeanListForForked = new ArrayList<WorkflowActionBean>(); if (wfAction == null) {
if (wfJob.getStatus() == WorkflowJob.Status.PREP) {
// 對於上面的 1)
completed = workflowInstance.start();
wfJob.setStatus(WorkflowJob.Status.RUNNING);
wfJob.setStartTime(new Date());
wfJob.setWorkflowInstance(workflowInstance);
generateEvent = true;
queue(new WorkflowNotificationXCommand(wfJob));
}
}
else {
......
}
if (completed) {
......
}
else {
// 對於上面最外層的 2)
for (WorkflowActionBean newAction :
WorkflowStoreService.getActionsToStart(workflowInstance)) {
insertList.add(newAction);
if (wfAction != null) { // null during wf job submit
// 註釋指出,wf job 提交時候不走這裡
.....
}
else {
syncAction = newAction; // first action after wf submit should always be sync
}
}
}
// 寫入 WorkflowActionBean,對於上面的 3)
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) {
......
}
else if (syncAction != null) {
// 直接呼叫 call(),對應上面的 4)
new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call();
}
}

8.4 資料庫資訊

Workflow 資料庫資訊從WorkflowActionBean中可以看出,我們這裡要重點說明的就是transition欄位,Oozie用transition來記錄本Action下一步要跳轉到哪裡。WorkflowActionBean摘要如下:

public class WorkflowActionBean implements Writable, WorkflowAction, JsonBean {
@Id
private String id; @Basic
@Index
@Column(name = "wf_id")
private String wfId = null; @Basic
@Index
@Column(name = "status")
private String statusStr = WorkflowAction.Status.PREP.toString(); @Basic
@Column(name = "execution_path", length = 1024)
private String executionPath = null; @Basic
@Column(name = "transition")
private String transition = null; @Basic
@Column(name = "data")
@Lob
@Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
private StringBlob data;
}

8.5 同步執行

同步執行的跳轉主要是在LiteWorkflowInstance.signal這裡體現,如果命令結束後發現後續還有同步跳轉,則就繼續執行。

public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException {
// signal all new synch transitions
for (String pathToStart : pathsToStart) {
signal(pathToStart, "::synch::");
}
}

至此,程式提交已經完成,後續就是程式在Oozie內部的執行階段,這就是從 ActionStartXCommand 開始了。

0xFF 參考

大資料之Oozie——原始碼分析(一)程式入口

什麼是Oozie——大資料任務排程框架

Oozie基礎小結

【原創】大資料基礎之Oozie(1)簡介、原始碼解析

【原創】大叔經驗分享(6)Oozie如何檢視提交到Yarn上的任務日誌

Oozie和Azkaban的技術選型和對比

Oozie-TransitionXCommand

Oozie-Service-CallableQueueService

YARN基本框架分析

Oozie任務排程阻塞及記憶體優化方法