Spark-利用SparkLauncher 類以JAVA API 程式設計的方式提交spark job
一.環境說明和使用軟體的版本說明:
hadoop-version:hadoop-2.9.0.tar.gz
spark-version:spark-2.2.0-bin-hadoop2.7.tgz
java-version:jdk1.8.0_151
叢集環境:單機偽分散式環境。
二.適用背景
在學習Spark過程中,資料中介紹的提交Spark Job的方式主要有兩種(我所知道的):第一種是通過命令列的方式提交Job,使用spark 自帶的spark-submit工具提交,官網和大多數參考資料都是已這種方式提交的,提交命令示例如下:
./spark-submit --class com.learn.spark.SimpleApp --master yarn --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3 ../spark-demo.jar
引數含義就不解釋了,請參考官網資料。
第二種提交方式是已JAVA API程式設計的方式提交,這種方式不需要使用命令列,直接可以在IDEA中點選Run 執行包含Job的Main類就行,Spark 提供了以SparkLanuncher 作為唯一入口的API來實現。這種方式很方便(試想如果某個任務需要重複執行,但是又不會寫linux 指令碼怎麼搞?我想到的是以JAV API的方式提交Job, 還可以和Spring整合,讓應用在tomcat中執行),官網的示例:http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html
三.文章的目地
官網已有demo和API的情況下寫這篇文章的目地:官網給出的demo 放在本機跑不了。出現的現象是程式結束了,什麼輸出都沒有或者輸出JAVA_HOME is not set,雖然我呼叫方法設定了,然而沒啥用,因此把我搜索和加上在自己思考後能夠執行的demo記錄下來。
四.相關demo
根據官網的示例這裡有兩種方式:
第一種是呼叫SparkLanuncher例項的startApplication方法,但是這種方式在所有配置都正確的情況下使用執行都會失敗的,原因是startApplication方法會呼叫LauncherServer啟動一個程序與叢集互動,這個操作貌似是非同步的,所以可能結果是main主執行緒結束了這個程序都沒有起起來,導致執行失敗。解決辦法是呼叫new SparkLanuncher
package com.learn.spark;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
public class LanuncherAppV {
public static void main(String[] args) throws IOException, InterruptedException {
HashMap env = new HashMap();
//這兩個屬性必須設定
env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc/overriterHaoopConf");
env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151");
//可以不設定
//env.put("YARN_CONF_DIR","");
CountDownLatch countDownLatch = new CountDownLatch(1);
//這裡呼叫setJavaHome()方法後,JAVA_HOME is not set 錯誤依然存在
SparkAppHandle handle = new SparkLauncher(env)
.setSparkHome("/usr/local/spark")
.setAppResource("/usr/local/spark/spark-demo.jar")
.setMainClass("com.learn.spark.SimpleApp")
.setMaster("yarn")
.setDeployMode("cluster")
.setConf("spark.app.id", "11222")
.setConf("spark.driver.memory", "2g")
.setConf("spark.akka.frameSize", "200")
.setConf("spark.executor.memory", "1g")
.setConf("spark.executor.instances", "32")
.setConf("spark.executor.cores", "3")
.setConf("spark.default.parallelism", "10")
.setConf("spark.driver.allowMultipleContexts", "true")
.setVerbose(true).startApplication(new SparkAppHandle.Listener() {
//這裡監聽任務狀態,當任務結束時(不管是什麼原因結束),isFinal()方法會返回true,否則返回false
@Override
public void stateChanged(SparkAppHandle sparkAppHandle) {
if (sparkAppHandle.getState().isFinal()) {
countDownLatch.countDown();
}
System.out.println("state:" + sparkAppHandle.getState().toString());
}
@Override
public void infoChanged(SparkAppHandle sparkAppHandle) {
System.out.println("Info:" + sparkAppHandle.getState().toString());
}
});
System.out.println("The task is executing, please wait ....");
//執行緒等待任務結束
countDownLatch.await();
System.out.println("The task is finished!");
}
}
注意:如果部署模式是cluster,但是程式碼中有標準輸出的話將看不到,需要把結果寫到HDFS中,如果是client模式則可以看到輸出。
第二種方式是:通過SparkLanuncher.lanunch()方法獲取一個程序,然後呼叫程序的process.waitFor()方法等待執行緒返回結果,但是使用這種方式需要自己管理執行過程中的輸出資訊,比較麻煩,好處是一切都在掌握之中,即獲取的輸出資訊和通過命令提交的方式一樣,很詳細,實現如下:
package com.learn.spark;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import java.io.IOException;
import java.util.HashMap;
public class LauncherApp {
public static void main(String[] args) throws IOException, InterruptedException {
HashMap env = new HashMap();
//這兩個屬性必須設定
env.put("HADOOP_CONF_DIR","/usr/local/hadoop/etc/overriterHaoopConf");
env.put("JAVA_HOME","/usr/local/java/jdk1.8.0_151");
//env.put("YARN_CONF_DIR","");
SparkLauncher handle = new SparkLauncher(env)
.setSparkHome("/usr/local/spark")
.setAppResource("/usr/local/spark/spark-demo.jar")
.setMainClass("com.learn.spark.SimpleApp")
.setMaster("yarn")
.setDeployMode("cluster")
.setConf("spark.app.id", "11222")
.setConf("spark.driver.memory", "2g")
.setConf("spark.akka.frameSize", "200")
.setConf("spark.executor.memory", "1g")
.setConf("spark.executor.instances", "32")
.setConf("spark.executor.cores", "3")
.setConf("spark.default.parallelism", "10")
.setConf("spark.driver.allowMultipleContexts","true")
.setVerbose(true);
Process process =handle.launch();
InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");
Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
inputThread.start();
InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");
Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
errorThread.start();
System.out.println("Waiting for finish...");
int exitCode = process.waitFor();
System.out.println("Finished! Exit code:" + exitCode);
}
}
使用的自定義InputStreamReaderRunnable類實現如下:
package com.learn.spark;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
public class InputStreamReaderRunnable implements Runnable {
private BufferedReader reader;
private String name;
public InputStreamReaderRunnable(InputStream is, String name) {
this.reader = new BufferedReader(new InputStreamReader(is));
this.name = name;
}
public void run() {
System.out.println("InputStream " + name + ":");
try {
String line = reader.readLine();
while (line != null) {
System.out.println(line);
line = reader.readLine();
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
五.總結
官網給出的例子在某種配置下肯定是可以執行的,後續需要繼續探索。