spark提交任務的三種的方法
在學習Spark過程中,資料中介紹的提交Spark Job的方式主要有三種:
第一種:
通過命令列的方式提交Job,使用spark 自帶的spark-submit工具提交,官網和大多數參考資料都是已這種方式提交的,提交命令示例如下:
./spark-submit --class com.learn.spark.SimpleApp --master yarn --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3 ../spark-demo.jar
引數含義就不解釋了,請參考官網資料。
第二種:
提交方式是已JAVA API程式設計的方式提交,這種方式不需要使用命令列,直接可以在IDEA中點選Run 執行包含Job的Main類就行,Spark 提供了以SparkLanuncher 作為唯一入口的API來實現。這種方式很方便(試想如果某個任務需要重複執行,但是又不會寫linux 指令碼怎麼搞?我想到的是以JAV API的方式提交Job, 還可以和Spring整合,讓應用在tomcat中執行),官網的示例:http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html
根據官網的示例,通過JAVA API程式設計的方式提交有兩種方式:
第一種是呼叫SparkLanuncher例項的startApplication方法,但是這種方式在所有配置都正確的情況下使用執行都會失敗的,原因是startApplication方法會呼叫LauncherServer啟動一個程序與叢集互動,這個操作貌似是非同步的,所以可能結果是main主執行緒結束了這個程序都沒有起起來,導致執行失敗。解決辦法是呼叫new SparkLanuncher().startApplication後需要讓主執行緒休眠一定的時間後者是使用下面的例子:
package com.learn.spark;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
public class LanuncherAppV {
public static void main(String[] args) throws IOException, InterruptedException {
HashMap env = new HashMap();
//這兩個屬性必須設定
env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc/overriterHaoopConf");
env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151");
//可以不設定
//env.put("YARN_CONF_DIR","");
CountDownLatch countDownLatch = new CountDownLatch(1);
//這裡呼叫setJavaHome()方法後,JAVA_HOME is not set 錯誤依然存在
SparkAppHandle handle = new SparkLauncher(env)
.setSparkHome("/usr/local/spark")
.setAppResource("/usr/local/spark/spark-demo.jar")
.setMainClass("com.learn.spark.SimpleApp")
.setMaster("yarn")
.setDeployMode("cluster")
.setConf("spark.app.id", "11222")
.setConf("spark.driver.memory", "2g")
.setConf("spark.akka.frameSize", "200")
.setConf("spark.executor.memory", "1g")
.setConf("spark.executor.instances", "32")
.setConf("spark.executor.cores", "3")
.setConf("spark.default.parallelism", "10")
.setConf("spark.driver.allowMultipleContexts", "true")
.setVerbose(true).startApplication(new SparkAppHandle.Listener() {
//這裡監聽任務狀態,當任務結束時(不管是什麼原因結束),isFinal()方法會返回true,否則返回false
@Override
public void stateChanged(SparkAppHandle sparkAppHandle) {
if (sparkAppHandle.getState().isFinal()) {
countDownLatch.countDown();
}
System.out.println("state:" + sparkAppHandle.getState().toString());
}
@Override
public void infoChanged(SparkAppHandle sparkAppHandle) {
System.out.println("Info:" + sparkAppHandle.getState().toString());
}
});
System.out.println("The task is executing, please wait ....");
//執行緒等待任務結束
countDownLatch.await();
System.out.println("The task is finished!");
}
}
注意:如果部署模式是cluster,但是程式碼中有標準輸出的話將看不到,需要把結果寫到HDFS中,如果是client模式則可以看到輸出。
第二種方式是:通過SparkLanuncher.lanunch()方法獲取一個程序,然後呼叫程序的process.waitFor()方法等待執行緒返回結果,但是使用這種方式需要自己管理執行過程中的輸出資訊,比較麻煩,好處是一切都在掌握之中,即獲取的輸出資訊和通過命令提交的方式一樣,很詳細,實現如下:
package com.learn.spark;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import java.io.IOException;
import java.util.HashMap;
public class LauncherApp {
public static void main(String[] args) throws IOException, InterruptedException {
HashMap env = new HashMap();
//這兩個屬性必須設定
env.put("HADOOP_CONF_DIR","/usr/local/hadoop/etc/overriterHaoopConf");
env.put("JAVA_HOME","/usr/local/java/jdk1.8.0_151");
//env.put("YARN_CONF_DIR","");
SparkLauncher handle = new SparkLauncher(env)
.setSparkHome("/usr/local/spark")
.setAppResource("/usr/local/spark/spark-demo.jar")
.setMainClass("com.learn.spark.SimpleApp")
.setMaster("yarn")
.setDeployMode("cluster")
.setConf("spark.app.id", "11222")
.setConf("spark.driver.memory", "2g")
.setConf("spark.akka.frameSize", "200")
.setConf("spark.executor.memory", "1g")
.setConf("spark.executor.instances", "32")
.setConf("spark.executor.cores", "3")
.setConf("spark.default.parallelism", "10")
.setConf("spark.driver.allowMultipleContexts","true")
.setVerbose(true);
Process process =handle.launch();
InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");
Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
inputThread.start();
InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");
Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
errorThread.start();
System.out.println("Waiting for finish...");
int exitCode = process.waitFor();
System.out.println("Finished! Exit code:" + exitCode);
}
}
使用的自定義InputStreamReaderRunnable類實現如下:
package com.learn.spark;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
public class InputStreamReaderRunnable implements Runnable {
private BufferedReader reader;
private String name;
public InputStreamReaderRunnable(InputStream is, String name) {
this.reader = new BufferedReader(new InputStreamReader(is));
this.name = name;
}
public void run() {
System.out.println("InputStream " + name + ":");
try {
String line = reader.readLine();
while (line != null) {
System.out.println(line);
line = reader.readLine();
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
第三種方式是通過yarn的rest api的方式提交(不太常用但在這裡也介紹一下):
Post請求示例: * http://<rm http address:port>/ws/v1/cluster/apps
請求所帶的引數列表:
Item | Data Type | Description |
---|---|---|
application-id | string | The application id |
application-name | string | The application name |
queue | string | The name of the queue to which the application should be submitted |
priority | int | The priority of the application |
am-container-spec | object | The application master container launch context, described below |
unmanaged-AM | boolean | Is the application using an unmanaged application master |
max-app-attempts | int | The max number of attempts for this application |
resource | object | The resources the application master requires, described below |
application-type | string | The application type(MapReduce, Pig, Hive, etc) |
keep-containers-across-application-attempts | boolean | Should YARN keep the containers used by this application instead of destroying them |
application-tags | object | List of application tags, please see the request examples on how to speciy the tags |
log-aggregation-context | object | Represents all of the information needed by the NodeManager to handle the logs for this application |
attempt-failures-validity-interval | long | The failure number will no take attempt failures which happen out of the validityInterval into failure count |
reservation-id | string | Represent the unique id of the corresponding reserved resource allocation in the scheduler |
am-black-listing-requests | object | Contains blacklisting information such as “enable/disable AM blacklisting” and “disable failure threshold” |