最新版azkaban-3.40.0原始碼解析
web上傳zip以及解析入庫
web服務上傳zip包
入口:
azkaban.webapp.servlet.LoginAbstractAzkabanServlet.doPost(HttpServletRequest, HttpServletResponse)
跟蹤程式碼,進入下面的方法 上傳zip包
azkaban.webapp.servlet.ProjectManagerServlet.ajaxHandleUpload(HttpServletRequest, HttpServletResponse, Map
logger.info("Uploading file " + name);
final File archiveFile = new File(tempDir, name);
out = new BufferedOutputStream(new FileOutputStream(archiveFile));
IOUtils.copy(item.getInputStream(), out);
out.close();
在這個方法中,
azkaban.project.AzkabanProjectLoader.uploadProject(Project, File, String, User, Props),通過上傳檔案的型別載入loader
loader = this.flowLoaderFactory.createFlowLoader(file);
手工執行flow
入口
azkaban.webapp.servlet.LoginAbstractAzkabanServlet.doGet(HttpServletRequest, HttpServletResponse)
進入下面的方法通過ajaxName來判斷是什麼型別的操作,然後呼叫不用的方法。
azkaban.webapp.servlet.ExecutorServlet.handleAJAXAction(HttpServletRequest, HttpServletResponse, Session)
比如執行flow的ajaxName是executeFlow,最後通過ExecutorApiGateway.callForJsonString方法生成一個類似下面的uri,傳送get請求去執行flow。
定時執行flow
定時任務的觸發是在execserver服務啟動的時候初始化的。
在AzkabanWebServer的main方法中,呼叫launch方法載入服務,具體是webServer.prepareAndStartServer();
之後呼叫configureRoutes方法,在這個方法中,通過 getTriggerManager().start();來啟動TriggerManager。在start方法中,啟動TriggerScannerThread執行緒來掃描所有的定時任務,
最終在azkaban.trigger.TriggerManager.TriggerScannerThread.checkAllTriggers()方法中做檢查,如果滿足了執行的條件,則通過onTriggerTrigger(t);觸發排程。
execserver執行流程
提交flow
web傳送了一個get請求,exec伺服器接受請求的入庫方法是
azkaban.execapp.ExecutorServlet doGet方法
action為execute,處理的方法是handleAjaxExecute,在這個方法中,提交了一個flow,具體的處理方法在azkaban.execapp.FlowRunnerManager.submitFlow(int)
1.根據flowid從資料庫查詢出對應的flow。
2.設定執行的目錄等
3.獲取flow的引數。
4.提交flow
//根據flowid從資料庫查詢出對應的flow。
ExecutableFlow flow = null;
flow = this.executorLoader.fetchExecutableFlow(execId);
if (flow == null) {
throw new ExecutorManagerException("Error loading flow with exec "
+ execId);
}
//設定執行的目錄等
// Sets up the project files and execution directory.
this.flowPreparer.setup(flow);
//獲取flow的引數
// Setup flow runner
FlowWatcher watcher = null;
final ExecutionOptions options = flow.getExecutionOptions();
......
//提交flow
final Future<?> future = this.executorService.submit(runner);
具體執行邏輯
具體執行在FlowRunner的run方法,最後呼叫了azkaban.execapp.FlowRunner.runFlow()方法。
private void runFlow() throws Exception {
this.logger.info("Starting flows");
runReadyJob(this.flow);//嘗試開始執行
updateFlow();
while (!this.flowFinished) {
synchronized (this.mainSyncObj) {
if (this.flowPaused) {
try {
this.mainSyncObj.wait(CHECK_WAIT_MS);
} catch (final InterruptedException e) {
}
continue;
} else {
if (this.retryFailedJobs) {
retryAllFailures();
} else if (!progressGraph()) {//推進DAG的執行
try {
this.mainSyncObj.wait(CHECK_WAIT_MS);
} catch (final InterruptedException e) {
}
}
}
}
}
this.logger.info("Finishing up flow. Awaiting Termination");
this.executorService.shutdown();
updateFlow();
this.logger.info("Finished Flow");
}
runReadyJob邏輯
private boolean runReadyJob(final ExecutableNode node) throws IOException {
..............
} else if (nextNodeStatus == Status.READY) {
//如果node型別是一個子flow,也就是ExecutableFlowBase
if (node instanceof ExecutableFlowBase) {
final ExecutableFlowBase flow = ((ExecutableFlowBase) node);
this.logger.info("Running flow '" + flow.getNestedId() + "'.");
flow.setStatus(Status.RUNNING);
flow.setStartTime(System.currentTimeMillis());
prepareJobProperties(flow);
//迴圈遞迴執行子flow
for (final String startNodeId : ((ExecutableFlowBase) node).getStartNodes()) {
final ExecutableNode startNode = flow.getExecutableNode(startNodeId);
runReadyJob(startNode);
}
} else {
//普通的job ,直接執行job
runExecutableNode(node);
}
}
return true;
}
執行完了之後,會在progressGraph方法進行查詢下一個節點,推進DAG的執行,這個裡面主要是收集已完成節點的下一個節點,放入nodesToCheck,然後迴圈呼叫runReadyJob執行。