1. 程式人生 > >又一篇azkaban原始碼分析

又一篇azkaban原始碼分析

轉載至http://www.ivanopt.com/deep-research-source-code-azkaban/

本篇文章研究azkaban 排程器以下幾個問題:

1. Executor 和 Web Server 是如何互動的。
2. Executor 的擴充套件是如何實現的。
3. Executor 是如何執行即時任務的。

我們從使用者提交工作流作為入口,研究一下azkaban任務執行的原理。

Web Server 提交工作流的過程分析

使用者在web介面提交任務流的時候,會觸發一個action為executeFlow的Ajax請求。這個請求是由azkaban-web-server的ExecutorServlet去handle的,一起看下ExecutorServlet.java 的構造及接收ajax 請求的模組。

執行工作流的Action是: executeFlow

  • 路由入口(ExecutorServlet.java)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 public class ExecutorServlet extends LoginAbstractAzkabanServlet { private ProjectManager projectManager; private ExecutorManagerAdapter executorManager; private ScheduleManager scheduleManager; private ExecutorVelocityHelper velocityHelper; private UserManager userManager; @Override
public void init(ServletConfig config) throws ServletException { super.init(config); // 父類LoginAbstractAzkabanServlet如何初始化配置引數? AzkabanWebServer server = (AzkabanWebServer) getApplication(); userManager = server.getUserManager(); projectManager = server.getProjectManager(); executorManager = server.getExecutorManager(); scheduleManager = server.getScheduleManager(); velocityHelper = new ExecutorVelocityHelper(); } @Override protected void handleGet(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException { if (hasParam(req, "ajax")) { // 此處接受ajax形式的請求 handleAJAXAction(req, resp, session); } else if (hasParam(req, "execid")) { if (hasParam(req, "job")) { handleExecutionJobDetailsPage(req, resp, session); } else { handleExecutionFlowPage(req, resp, session); } } else { handleExecutionsPage(req, resp, session); } } // handleAJAXAction() 函式對請求引數進行解析。 private void handleAJAXAction(HttpServletRequest req, HttpServletResponse resp, Session session) throws ServletException, IOException { ... String projectName = getParam(req, "project"); ret.put("project", projectName); if (ajaxName.equals("executeFlow")) { ajaxAttemptExecuteFlow(req, resp, ret, session.getUser()); } ... } // 提交工作流前的引數檢測 private void ajaxAttemptExecuteFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) throws ServletException { String projectName = getParam(req, "project"); String flowId = getParam(req, "flow"); // 檢查專案是否存在,工作流基於project這一層級 Project project = getProjectAjaxByPermission(ret, projectName, user, Type.EXECUTE); if (project == null) { ret.put("error", "Project '" + projectName + "' doesn't exist."); return; } // 檢查工作流是否存在 ret.put("flow", flowId); Flow flow = project.getFlow(flowId); if (flow == null) { ret.put("error", "Flow '" + flowId + "' cannot be found in project " + project); return; } ajaxExecuteFlow(req, resp, ret, user); // 提交工作流 } // 提交工作流模組 private void ajaxExecuteFlow(HttpServletRequest req, HttpServletResponse resp, HashMap<String, Object> ret, User user) throws ServletException { // 此處檢測project是否存在,原始碼作者應該是考慮到此方法被單獨呼叫,所以添加了對project和flow的存在檢測,個人覺得關於project,flow,task等模組存在的檢測,抽象成單獨的檢測類會更好,避免程式碼的冗餘。(ajaxAttemptExecuteFlow的檢測程式碼就和此部分重複了) String projectName = getParam(req, "project"); String flowId = getParam(req, "flow"); Project project = getProjectAjaxByPermission(ret, projectName, user, Type.EXECUTE); if (project == null) { ret.put("error", "Project '" + projectName + "' doesn't exist."); return; } ret.put("flow", flowId); Flow flow = project.getFlow(flowId); if (flow == null) { ret.put("error", "Flow '" + flowId + "' cannot be found in project " + project); return; } ExecutableFlow exflow = new ExecutableFlow(project, flow); exflow.setSubmitUser(user.getUserId()); exflow.addAllProxyUsers(project.getProxyUsers()); // 設定執行的引數,比如執行成功郵件通知人、執行失敗郵件通知人等 ExecutionOptions options = HttpRequestUtils.parseFlowOptions(req); exflow.setExecutionOptions(options); if (!options.isFailureEmailsOverridden()) { options.setFailureEmails(flow.getFailureEmails()); } if (!options.isSuccessEmailsOverridden()) { options.setSuccessEmails(flow.getSuccessEmails()); } options.setMailCreator(flow.getMailCreator()); try { HttpRequestUtils.filterAdminOnlyFlowParams(userManager, options, user); String message = executorManager.submitExecutableFlow(exflow, user.getUserId()); // 提交flow到executor,那麼executorManager是如何初始化的? ret.put("message", message); } catch (Exception e) { e.printStackTrace(); ret.put("error", "Error submitting flow " + exflow.getFlowId() + ". " + e.getMessage()); } ret.put("execid", exflow.getExecutionId()); } }
  • ExecutorServlet.java 所繼承的父類: LoginAbstractAzkabanServlet.java init()過程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public abstract class LoginAbstractAzkabanServlet extends AbstractAzkabanServlet { private MultipartParser multipartParser; private boolean shouldLogRawUserAgent = false; @Override public void init(ServletConfig config) throws ServletException { super.init(config); multipartParser = new MultipartParser(DEFAULT_UPLOAD_DISK_SPOOL_SIZE); shouldLogRawUserAgent = getApplication().getServerProps().getBoolean("accesslog.raw.useragent", false); } }
  • LoginAbstractAzkabanServlet.java 繼承的父類 AbstractAzkabanServlet.java init() 過程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public abstract class AbstractAzkabanServlet extends HttpServlet { @Override public void init(ServletConfig config) throws ServletException { // 此處獲取executor server, 在接下來的模組中會分析executor server是如何初始化的。AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY 對應的字串是"azkaban_app"。 application = (AzkabanServer) config.getServletContext().getAttribute( AzkabanServletContextListener.AZKABAN_SERVLET_CONTEXT_KEY); if (application == null) { throw new IllegalStateException( "No batch application is defined in the servlet context!"); } Props props = application.getServerProps(); name = props.getString("azkaban.name", ""); label = props.getString("azkaban.label", ""); color = props.getString("azkaban.color", "#FF0000"); if (application instanceof AzkabanWebServer) { AzkabanWebServer server = (AzkabanWebServer) application; viewerPlugins = PluginRegistry.getRegistry().getViewerPlugins(); triggerPlugins = new ArrayList<TriggerPlugin>(server.getTriggerPlugins().values()); } } }

從以上幾個模組的程式碼我們可以知道,AzkabanServer 是從ServerContext以getAttribute(“azkaban_app”)的方式獲得的。userManager, projectManager, executorManager, scheduleManager 等源自AzkabanServer, 那麼這個AzkabanServer是在什麼時候初始化的?

Executor Server 初始化過程及與web的通訊機制

我們研究一下executor的包,jar包啟動的入口main()在AzkabanExecutorServer.java內,那麼main()做了些什麼呢?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public static void main(String[] args) throws Exception { // Redirect all std out and err messages into log4j StdOutErrRedirect.redirectOutAndErrToLog(); logger.info("Starting Jetty Azkaban Executor..."); Props azkabanSettings = AzkabanServer.loadProps(args); if (azkabanSettings == null) { logger.error("Azkaban Properties not loaded."); logger.error("Exiting Azkaban Executor Server..."); return; } // Setup time zone if (azkabanSettings.containsKey(DEFAULT_TIMEZONE_ID)) { String timezone = azkabanSettings.getString(DEFAULT_TIMEZONE_ID); System.setProperty("user.timezone", timezone); TimeZone.setDefault(TimeZone.getTimeZone(timezone)); DateTimeZone.setDefault(DateTimeZone.forID(timezone)); logger.info("Setting timezone to " + timezone); } // 初始化了executor server,這的關注的是該構建方式: 1. 啟動了怎樣的容器? 2. 如何與web server進行通訊 app = new AzkabanExecutorServer(azkabanSettings); // 在shutdown的過程中增加了hook,輸出memory top消耗的程序 Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { // 輸出memory消費較高的消費者 logTopMemoryConsumers(); } catch (Exception e) { logger.info(("Exception when logging top memory consumers"), e); } logger.info("Shutting down..."); try { app.shutdownNow(); } catch (Exception e) { logger.error("Error while shutting down http server.", e); } } public void logTopMemoryConsumers() throws Exception, IOException { if (new File("/bin/bash").exists() && new File("/bin/ps").exists() && new File("/usr/bin/head").exists()) { logger.info("logging top memeory consumer"); java.lang.ProcessBuilder processBuilder = new java.lang.ProcessBuilder("/bin/bash", "-c", "/bin/ps aux --sort -rss | /usr/bin/head"); Process p = processBuilder.start(); p.waitFor(); InputStream is = p.getInputStream(); java.io.BufferedReader reader = new java.io.BufferedReader(new InputStreamReader(is)); String line = null; while ((line = reader.readLine()) != null) { logger.info(line); } is.close(); } } }); }

繼續探究一下server是如何啟動的,需要怎樣的配置,它如何向外通訊的. 即上個模組出現的AzkabanExecutorServer.java的建構函式部分。

  • AzkabanExecutorServer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public AzkabanExecutorServer(Props props) throws Exception { this.props = props; // 啟動容器是jetty,所以配置及對外通訊的機制還要再研究一下createJettyServer()函式 server = createJettyServer(props); executionLoader = new JdbcExecutorLoader(props); projectLoader = new JdbcProjectLoader(props); runnerManager = new FlowRunnerManager(props, executionLoader, projectLoader, getClass().getClassLoader()); JmxJobMBeanManager.getInstance().initialize(props); // make sure this happens before configureJobCallback(props); configureMBeanServer(); configureMetricReports(); SystemMemoryInfo.init(props.getInt("executor.memCheck.interval", 30)); loadCustomJMXAttributeProcessor(props); try { server.start(); } catch (Exception e) { logger.error(e); Utils.croak(e.getMessage(), 1); } // 每次啟動時,會將啟動資訊寫入DB, 包括host、port insertExecutorEntryIntoDB(); dumpPortToFile(); logger.info("Started Executor Server on " + getExecutorHostPort()); if (props.getBoolean(ServerProperties.IS_METRICS_ENABLED, false)) { startExecMetrics(); } }
  • Jetty Server 建立: createJettyServer() [AzkabanExecutorServer.java]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 private Server createJettyServer(Props props) { int maxThreads = props.getInt("executor.maxThreads", DEFAULT_THREAD_NUMBER); /* * Default to a port number 0 (zero) * The Jetty server automatically finds an unused port when the port number is set to zero * TODO: This is using a highly outdated version of jetty [year 2010]. needs to be updated. */ // Server comes from package: org.mortbay.jetty; Server server = new Server(props.getInt("executor.port", 0)); QueuedThreadPool httpThreadPool = new QueuedThreadPool(maxThreads); server.setThreadPool(httpThreadPool); boolean isStatsOn = props.getBoolean("executor.connector.stats", true); logger.info("Setting up connector with stats on: " + isStatsOn); for (Connector connector : server.getConnectors()) { connector.setStatsOn(isStatsOn); logger.info(String.format( "Jetty connector name: %s, default header buffer size: %d", connector.getName(), connector.getHeaderBufferSize())); connector.setHeaderBufferSize(props.getInt("jetty.headerBufferSize", DEFAULT_HEADER_BUFFER_SIZE)); logger.info(String.format( "Jetty connector name: %s, (if) new header buffer size: %d", connector.getName(), connector.getHeaderBufferSize())); } // jetty server的例項放在"/"session內 Context root = new Context(server, "/", Context.SESSIONS); root.setMaxFormContentSize(MAX_FORM_CONTENT_SIZE); root.addServlet(new ServletHolder(new ExecutorServlet()), "/executor"); root.addServlet(new ServletHolder(new JMXHttpServlet()), "/jmx"); root.addServlet(new ServletHolder(new StatsServlet()), "/stats"); root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics"); // server 例項對應的key是ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY, ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY 對應的字串是"azkaban_app", 這就解釋了web server 在AbstractAzkabanServlet.java初始化時如何找到存在的server。 root.setAttribute(ServerInternals.AZKABAN_SERVLET_CONTEXT_KEY, this); return server; }

Excutor 即時任務執行分析

到目前為止,我們已經知道web service和executor server的通訊機制,那麼executor是如何執行的呢?

再回到ExecutorServlet.java這個類,在方法ajaxExecuteFlow()中有具體的執行過程:

  • ExecutorServlet.java
1 2 3 4 5 6 7 8 9 10 try { HttpRequestUtils.filterAdminOnlyFlowParams(userManager, options, user); String message = executorManager.submitExecutableFlow(exflow, user.getUserId()); ret.put("message", message); } catch (Exception e) { e.printStackTrace(); ret.put("error", "Error submitting flow " + exflow.getFlowId() + ". " + e.getMessage()); }

從以上程式碼我們知道是由executorManager發起的這次執行指令,那麼executorManager又是如何例項化的呢?

其實不難發現在ExecutorServlet.java的建構函式中就已經從server中獲得了executorManager, 所以這個過程是在轉成AzkabanWebServer的過程中賦值的,應該是在AzkabanWebServer.java 的建構函式內做的。

  • ExecutorManager.java
1 2 3 4 private ExecutorManager loadExecutorManager(Props props) throws Exception { JdbcExecutorLoader loader = new JdbcExecutorLoader(props); ExecutorManager execManager = new ExecutorManager(props, loader, alerters); return execManager;

至此,我們知道submitExecutableFlow()的執行指令是由ExecutorManager例項化後發出的,那submitExecutableFlow()做了些什麼呢?

我們先不看原始碼,想一下執行使用者在介面指定的這個命令的過程會是怎樣呢?

首先使用者向web服務端發出了指令,執行過程的邏輯在executor,那麼executor需要知道這個指令。
1. Web 服務將執行指令記入DB.
2. Web 服務端和executor通訊,有任務需要執行。
3. Executor接收到指令,去db裡面找出執行引數,發起執行命令。
那麼有幾個問題:
1. 寫入db的資訊和發起的請求之間有事務機制麼?
2. 關於執行結果,有狀態標識麼?
3. 如果執行失敗了,azkaban會重試麼? 重試的機制又是什麼?
4. Web服務和executor之間的通訊如何做的?
5. DB裡面需要存一些什麼引數?

帶著這些問題,我們來看下submitExecutableFlow()的原始碼。

  • submitExecutableFlow() [ExecutorManager.java]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 @Override public String submitExecutableFlow(ExecutableFlow exflow, String userId) throws ExecutorManagerException { synchronized (exflow) { String flowId = exflow.getFlowId(); logger.info("Submitting execution flow " + flowId + " by " + userId); String message = ""; // 此處佇列的管理也很值得探究, 其實現在QueuedExecutions.java中完成,該佇列包含兩個基本型別共同管理資料集,以提升效率。ConcurrentHashMap, PriorityBlockingQueue; if (queuedFlows.isFull()) { message = String .format( "Failed to submit %s for project %s. Azkaban has overrun its webserver queue capacity", flowId, exflow.getProjectName()); logger.error(message); } else { int projectId = exflow.getProjectId(); exflow.setSubmitUser(userId); exflow.setSubmitTime(System.currentTimeMillis()); List<Integer> running = getRunningFlows(projectId, flowId); ExecutionOptions options = exflow.getExecutionOptions(); if (options == null) { options = new ExecutionOptions(); } if (options.getDisabledJobs() != null) { FlowUtils.applyDisabledJobs(options.getDisabledJobs(), exflow); } // 如果該flow已經在執行了,需要比較此次執行的優先順序。 if (!running.