java提交spark任務到yarn平臺
一、背景
採用spark的方式處理,所以需要將spark的功能整合到程式碼,採用yarn客戶端的方式管理spark任務。不需要將cdh的一些配置檔案放到resource路徑下,只需要配置一些配置即可,非常方便
二、任務管理架構
三、介面
1、任務提交
1. /**
2. * 提交任務到yarn叢集
3. *
4. * @param conditions
5. * yarn叢集,spark,hdfs具體資訊,引數等
6. * @return appid
7. */
8. public String submitSpark(YarnSubmitConditions conditions) {
9. logger.info("初始化spark on yarn引數");
10.
11. // 初始化yarn客戶端
12. logger.info("初始化spark on yarn客戶端");
13. List<String> args = Lists.newArrayList("--jar", conditions.getApplicationJar(), "--class",
14. conditions.getMainClass());
15. if (conditions.getOtherArgs() != null && conditions.getOtherArgs().size() >
16. for (String s : conditions.getOtherArgs()) {
17. args.add("--arg");
18. args.add(org.apache.commons.lang.StringUtils.join(new String[] { s }, ","));
19. }
20. }
21.
22. // identify that you will be using Spark as YARN mode
23. System.setProperty("SPARK_YARN_MODE"
24. SparkConf sparkConf = new SparkConf();
25. if (org.apache.commons.lang.StringUtils.isNotEmpty(conditions.getJobName())) {
26. sparkConf.setAppName(conditions.getJobName());
27. }
28.
29. sparkConf.set("spark.yarn.jars", conditions.getSparkYarnJars());
30. if (conditions.getAdditionalJars() != null && conditions.getAdditionalJars().length > 0) {
31. sparkConf.set("spark.jars", org.apache.commons.lang.StringUtils.join(conditions.getAdditionalJars(), ","));
32. }
33.
34. if (conditions.getFiles() != null && conditions.getFiles().length > 0) {
35. sparkConf.set("spark.files", org.apache.commons.lang.StringUtils.join(conditions.getFiles(), ","));
36. }
37. for (Map.Entry e : conditions.getSparkProperties().entrySet()) {
38. sparkConf.set(e.getKey().toString(), e.getValue().toString());
39. }
40.
41. // 新增這個引數,不然spark會一直請求0.0.0.0:8030,一直重試
42. sparkConf.set("yarn.resourcemanager.hostname", conditions.getYarnResourcemanagerAddress().split(":")[0]);
43. // 設定為true,不刪除快取的jar包,因為現在提交yarn任務是使用的程式碼配置,沒有配置檔案,刪除快取的jar包有問題,
44. sparkConf.set("spark.yarn.preserve.staging.files", "true");
45.
46. // 初始化 yarn的配置
47. Configuration cf = new Configuration();
48. String os = System.getProperty("os.name");
49. boolean cross_platform = false;
50. if (os.contains("Windows")) {
51. cross_platform = true;
52. }
53. cf.setBoolean("mapreduce.app-submission.cross-platform", cross_platform);// 配置使用跨平臺提交任務
54. // 設定yarn資源,不然會使用localhost:8032
55. cf.set("yarn.resourcemanager.address", conditions.getYarnResourcemanagerAddress());
56. // 設定namenode的地址,不然jar包會分發,非常噁心
57. cf.set("fs.defaultFS", conditions.getSparkFsDefaultFS());
58.
59. ClientArguments cArgs = new ClientArguments(args.toArray(new String[args.size()]));
60. Client client = new Client(cArgs, cf, sparkConf);
61. logger.info("提交任務,任務名稱:" + conditions.getJobName());
62.
63. try {
64.
65. ApplicationId appId = client.submitApplication();
66.
67. return appId.toString();
68.
69. } catch (Exception e) {
70. logger.error("提交spark任務失敗", e);
71. returnnull;
72. } finally {
73. if (client != null) {
74. client.stop();
75. }
76. }
77. }
2、任務進度獲取
1. /**
2. * 停止spark任務
3. *
4. * @param yarnResourcemanagerAddress
5. * yarn資源管理器地址,例如:master:8032,檢視yarn叢集獲取具體地址
6. * @param appIdStr
7. * 需要取消的任務id
8. */
9. publicvoid killJob(String yarnResourcemanagerAddress, String appIdStr) {
10. logger.info("取消spark任務,任務id:" + appIdStr);
11. // 初始化 yarn的配置
12. Configuration cf = new Configuration();
13. String os = System.getProperty("os.name");
14. boolean cross_platform = false;
15. if (os.contains("Windows")) {
16. cross_platform = true;
17. }
18. cf.setBoolean("mapreduce.app-submission.cross-platform", cross_platform);// 配置使用跨平臺提交任務
19. // 設定yarn資源,不然會使用localhost:8032
20. cf.set("yarn.resourcemanager.address", yarnResourcemanagerAddress);
21.
22. // 建立yarn的客戶端,此類中有殺死任務的方法
23. YarnClient yarnClient = YarnClient.createYarnClient();
24. // 初始化yarn的客戶端
25. yarnClient.init(cf);
26. // yarn客戶端啟動
27. yarnClient.start();
28. try {
29. // 根據應用id,殺死應用
30. yarnClient.killApplication(getAppId(appIdStr));
31. } catch (Exception e) {
32. logger.error("取消spark任務失敗", e);
33. }
34. // 關閉yarn客戶端
35. yarnClient.stop();
36.
37. }
3、任務取消
1. /**
2. * 獲取spark任務狀態
3. *
4. *
5. * @param yarnResourcemanagerAddress
6. * yarn資源管理器地址,例如:master:8032,檢視yarn叢集獲取具體地址
7. * @param appIdStr
8. * 需要取消的任務id
9. */
10. public SparkTaskState getStatus(String yarnResourcemanagerAddress, String appIdStr) {
11. logger.info("獲取任務狀態啟動,任務id:" + appIdStr);
12. // 初始化 yarn的配置
13. Configuration cf = new Configuration();
14. String os = System.getProperty("os.name");
15. boolean cross_platform = false;
16. if (os.contains("Windows")) {
17. cross_platform = true;
18. }
19. cf.setBoolean("mapreduce.app-submission.cross-platform", cross_platform);// 配置使用跨平臺提交任務
20. // 設定yarn資源,不然會使用localhost:8032
21. cf.set("yarn.resourcemanager.address", yarnResourcemanagerAddress);
22. logger.info("獲取任務狀態,任務id:" + appIdStr);
23.
24. SparkTaskState taskState = new SparkTaskState();
25. // 設定任務id
26. taskState.setAppId(appIdStr);
27. YarnClient yarnClient = YarnClient.createYarnClient();
28. // 初始化yarn的客戶端
29. yarnClient.init(cf);
30. // yarn客戶端啟動
31. yarnClient.start();
32. ApplicationReport report = null;
33. try {
34. report = yarnClient.getApplicationReport(getAppId(appIdStr));
35. } catch (Exception e) {
36. logger.error("獲取spark任務狀態失敗");
37. }
38.
39. if(report != null){
40. YarnApplicationState state = report.getYarnApplicationState();
41. taskState.setState(state.name());
42. // 任務執行進度
43. float progress = report.getProgress();
44. taskState.setProgress(progress);
45. // 最終狀態
46. FinalApplicationStatus status = report.getFinalApplicationStatus();
47. taskState.setFinalStatus(status.name());
48. }else{
49. taskState.setState(ConstParam.SPARK_FAILED);
50. taskState.setProgress(0.0f);
51. taskState.setFinalStatus(ConstParam.SPARK_FAILED);
52. }
53.
54. // 關閉yarn客戶端
55. yarnClient.stop();
56. logger.info("獲取任務狀態結束,任務狀態:" + JSON.toJSONString(taskState));
57. return taskState;
58. }
四、yarn引數調節
1、可分配給容器的實體記憶體數量,一個nodemanage分配的記憶體,如果機器記憶體是128g,儘量分配2/3
yarn.nodemanager.resource.memory-mb:80g
2、可以為容器分配的虛擬 CPU 核心的數量。該引數在 CDH 4.4 以前版本中無效。一個nodemanage分配的核數。如果機器是64和,儘量分配2/3.
yarn.nodemanager.resource.cpu-vcores:40
3、Java 程序堆疊記憶體的最大大小(以位元組為單位)。已傳遞到 Java -Xmx。
ResourceManager 的 Java 堆疊大小(位元組)
ResourceManager Default Group
B千位元組兆位元組吉位元組