基於springboot建立Spark應用的submit服務
目錄
背景
一直很好奇web後臺如何啟動Spark應用程式,查詢Api後發現可以使用org.apache.spark.launcher.SparkLauncher來做到這一點。我想得動手測試一下,而且要做的體面一些,所以搞個簡易的web工程吧,順便學習熟悉一下使用springboot框架。在這裡將整個折騰的過程記錄下來,新手上路,有任何搞錯的地方,或者走了彎路,還請大家不吝指出,幫我進步。
準備工作
1. 搭建hadoop叢集,我這邊用的是兩臺主機的分散式叢集
2. 安裝Spark,測試能執行spark-submit即可,然後配置好HistoryServer
3. 安裝Mysql,建立一個Spark應用資訊表,只有mainClass和jarPath兩個欄位
4. 熟悉Springboot框架的基本使用
主要流程
我設想主要有三個html頁面:
1. 查詢已經開發好的spark應用(應用資訊提前入到資料庫裡)
2. 設定執行引數後提交(引數包括mainclass、jar包路徑、driver記憶體、executor記憶體等)
3. 顯示應用執行結果
效果截圖
1. 查詢Spark應用,點選應用進入submit頁面
2. 執行引數設定
3. 提交應用程式,正在執行中
4. 執行結束後跳轉,檢視執行結果。點選Tracking URL會跳轉到Yarn的Application管理
頁面,還能檢視Spark應用的job資訊。
主要程式碼
-
搭建一個springboot專案,配置依賴DevTools
DevTools模組使Spring Boot應用支援熱部署,提高開發者的開發效率,修改後無需手動重啟Spring Boot應用。可以先不配,需要用的時候再說。
-
Spark應用資訊表,只有三個欄位:mainClass是應用程式的main方法,jarPath是jar包存放路徑,note是應用說明
-
實體類
這裡只用到兩個實體類:Spark應用資訊AppInfo和Spark應用執行引數SparkAppParapublic class AppInfo { String mainClass;//應用程式的mainClass
public class SparkAppPara { String mainClass; String jarPath; String master;//可以是Yarn或StandAlone String deployMode;//可以是Cluster或Client String driverMemory ;//driver記憶體 String executorMemory;//executor記憶體 String executorInstances;//executor個數 String executorCores;//executor核數 String defaultParallelism;//引數spark.default.parallelism的值 //省略getter和setter }
-
Controller
(1)訪問應用資訊頁面@RequestMapping("/appInfo") public String appInfo(){ return "appInfo"; }
(2)查詢Spark應用資訊
@RequestMapping("/getAllAppInfo") @ResponseBody public String getAllAppInfo(){ return sparkAppInfoService.getAllAppInfo(); }
(3)點選某個應用,跳轉到提交頁面
@RequestMapping("/submitApp") public ModelAndView submitApp(String mainClass,String jarPath){ ModelAndView mav = new ModelAndView(); mav.setViewName("submitApp"); mav.addObject("mainClass",mainClass); mav.addObject("jarPath",jarPath); return mav; }
這裡我希望跳轉之後,自動填寫mainClass和jarPath,我的做法是把這倆引數通過後臺轉給新頁面。由於頁面不是jsp,所以不能用el表示式獲取model值。需要靠Thymeleaf的語法th:xxx=${…}來獲取渲染資料。
<div class="icon"> <label class="cd-label" for="mainClass">mainClass</label> <input class="mainClass" type="text" name="mainClass" id="mainClass" th:value=${mainClass}> </div> <div class="icon"> <label class="cd-label" for="jarPath">jarPath</label> <input class="jarPath" type="text" name="jarPath" id="jarPath" th:value=${jarPath}> </div>
(4)提交任務
@RequestMapping(value = "/submit") @ResponseBody public String Submit(@RequestBody SparkAppPara sparkAppPara) throws IOException, InterruptedException { return submitService.submitApp(sparkAppPara); }
(5)執行完後跳轉到結果頁面
在這裡我希望拿到執行結果json之後,跳轉到結果頁面展示。我的做法是在Ajax請求成功後帶引數跳轉頁面,我覺得肯定有更好的辦法,在此拋磚引玉。success: function(data) { window.location.href=host+'/result?resultJson='+ encodeURIComponent(data); }
因為url請求裡不能有大小括號等特殊字元,所以請求之前需要使用encodeURIComponent方法進行編碼。
@RequestMapping("/result") public ModelAndView toResult(String resultJson){ ModelAndView mav = new ModelAndView(); mav.setViewName("result"); mav.addObject("resultJson",resultJson); return mav; }
關於在結果頁面的JS程式碼裡獲取resultJson:
第(3)步中,Thymeleaf直接把model值渲染到html標籤中。而在結果頁面中,我需要先拿到resultJson,進行一些處理後再渲染。在JS程式碼裡,我們可以像下面這樣來獲取resultJson。<script th:inline="javascript"> var resultJson = JSON.parse([[${resultJson}]]); $("#trackingUrl").attr("href",yarnAppUrl+resultJson.id); $("#applicationId").html(resultJson.id); $("#applicationName").html(resultJson.name); //次要程式碼省略 </script>
這裡需要注意的是,這部分JS程式碼只能內嵌在html頁面中,外聯JS中不會生效。
-
Service和Mapper
(1)獲取Spark應用資訊的Service和Mapper@Service public class SparkAppInfoService { @Autowired private AppInfoMapper appInfo; public String getAllAppInfo(){ List<AppInfo> list = appInfo.getAllAppInfo(); return JSONObject.toJSONString(list); } }
@Component public interface AppInfoMapper { @Select("SELECT * FROM appinfo") @Results({ @Result(property = "mainClass", column = "mainclass"), @Result(property = "jarPath", column = "jarpath"), @Result(property = "note", column = "note") }) List<AppInfo> getAllAppInfo(); }
(2)提交Spark應用的Service
提交spark應用的API不止一種,我用的是org.apache.spark.launcher.SparkLauncher<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-launcher_2.12</artifactId> <version>2.4.0</version> </dependency>
@Service public class SparkSubmitService { public String submitApp(SparkAppPara sparkAppPara) throws IOException, InterruptedException { HashMap env = new HashMap(); //這兩個屬性必須設定 env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc/hadoop"); env.put("JAVA_HOME", "/usr/lib/jdk/jdk1.8.0_191/"); CountDownLatch countDownLatch = new CountDownLatch(1); SparkAppHandle handle = new SparkLauncher(env) .setSparkHome("/usr/local/spark/") .setAppResource(sparkAppPara.getJarPath()) .setMainClass(sparkAppPara.getMainClass()) .setMaster(sparkAppPara.getMaster()) .setDeployMode(sparkAppPara.getDeployMode()) .setConf("spark.driver.memory", sparkAppPara.getDriverMemory()+"g") .setConf("spark.executor.memory", sparkAppPara.getExecutorMemory()+"g") .setConf("spark.executor.instances", sparkAppPara.getExecutorInstances()) .setConf("spark.executor.cores", sparkAppPara.getExecutorCores()) .setConf("spark.default.parallelism", sparkAppPara.getDefaultParallelism()) .setVerbose(true).startApplication(new SparkAppHandle.Listener() { @Override public void stateChanged(SparkAppHandle sparkAppHandle) { if (sparkAppHandle.getState().isFinal()) { countDownLatch.countDown(); } System.out.println("state:" + sparkAppHandle.getState().toString()); } @Override public void infoChanged(SparkAppHandle sparkAppHandle) { System.out.println("Info:" + sparkAppHandle.getState().toString()); } }); System.out.println("The task is executing, please wait ...."); //執行緒等待任務結束 countDownLatch.await(); System.out.println("The task is finished!"); //通過Spark原生的監測api獲取執行結果資訊 String restUrl = "http://master:18080/api/v1/applications/"+handle.getAppId(); String resultJson = RestUtil.httpGet(restUrl,null); return resultJson; } }
-
Http請求工具
我們使用這個工具,傳送rest請求,就可以獲取Spark應用執行結果的json資訊(我覺得有一個前提是需要配置好History Server服務並啟動)。public class RestUtil { public static String httpGet(String urlStr, List<String> urlParam) throws IOException, InterruptedException { // 例項一個URL資源 URL url = new URL(urlStr); HttpURLConnection connet = null; int i = 0; while(connet==null || connet.getResponseCode() != 200 ){ connet = (HttpURLConnection) url.openConnection(); connet.setRequestMethod("GET"); connet.setRequestProperty("Charset", "UTF-8"); connet.setRequestProperty("Content-Type", "application/json"); connet.setConnectTimeout(15000);// 連線超時 單位毫秒 connet.setReadTimeout(15000);// 讀取超時 單位毫秒 i++; if (i==50)break; Thread.sleep(500); } //將返回的值存入到String中 BufferedReader brd = new BufferedReader(new InputStreamReader(connet.getInputStream(),"UTF-8")); StringBuilder sb = new StringBuilder(); String line; while((line = brd.readLine()) != null){ sb.append(line); } brd.close(); connet.disconnect(); return sb.toString(); } }
外部引用
專案裡引用的第三方模板和外掛如下,如有侵權請聯絡我刪除。
- 應用查詢頁面——https://www.lanrenzhijia.com/others/6564.html
- 任務提交頁面——https://www.lanrenzhijia.com/jquery/3981.html
- ajax非同步請求等待特效——http://www.jq22.com/jquery-info15050
參考資料
https://blog.csdn.net/sparkexpert/article/details/51045762
https://blog.csdn.net/u011244682/article/details/79170134
http://spark.apache.org/docs/latest/monitoring.html