在Java應用中通過SparkLauncher啟動Spark任務
本部落格內容基於Spark2.2版本,在閱讀文章並想實際操作前,請確保你有:
- 一臺配置好Spark和yarn的伺服器
- 支援正常
spark-submit --master yarn xxxx
的任務提交
老版本
老版本任務提交是基於啟動本地程序,執行指令碼spark-submit xxx
** 的方式做的。其中一個關鍵的問題就是獲得提交Spark任務的Application-id,因為這個id是跟任務狀態的跟蹤有關係的。如果你的資源管理框架用的是yarn,應該知道每個執行的任務都有一個applicaiton_id,這個id的生成規則是:
appplication_時間戳_數字
老版本的spark通過修改SparkConf引數spark.app.id
就可以手動指定id,新版本的程式碼是直接讀取的taskBackend中的applicationId()方法,這個方法具體的實現是根據實現類來定的。在yarn中,是通過Yarn的YarnClusterSchedulerBackend實現的。
感興趣的同學可以看一下,生成applicaiton_id的邏輯在hadoop-yarn工程的ContainerId中定義。
總結一句話就是,想要自定義id,甭想了!!!!
於是當時腦袋瓜不靈光的我,就想到那就等應用建立好了之後,直接寫到資料庫裡面唄。怎麼寫呢?
- 我事先生成一個自定義的id,當做引數傳遞到spark應用裡面;
- 等spark初始化後,就可以通過sparkContext取得對應的application_id以及url
- 然後再driver連線資料庫,插入一條關聯關係
新版本
還是歸結於網際網路時代的資訊大爆炸,我看到群友的聊天,知道了SparkLauncer這個東西,調查後發現他可以基於Java程式碼自動提交Spark任務。SparkLauncher支援兩種模式:
- new SparkLauncher().launch() 直接啟動一個Process,效果跟以前一樣
- new SparkLauncher().startApplicaiton(監聽器) 返回一個SparkAppHandler,並(可選)傳入一個監聽器
當然是更傾向於第二種啦,因為好處很多:
- 自帶輸出重定向(Output,Error都有,支援寫到檔案裡面),超級爽的功能
- 可以自定義監聽器,當資訊或者狀態變更時,都能進行操作(對我沒啥用)
- 返回的SparkAppHandler支援 暫停、停止、斷連、獲得AppId、獲得State等多種功能,我就想要這個!!!!
一步一步,程式碼展示
首先建立一個最基本的Spark程式:
import org.apache.spark.sql.SparkSession;
import java.util.ArrayList;
import java.util.List;
public class HelloWorld {
public static void main(String[] args) throws InterruptedException {
SparkSession spark = SparkSession
.builder()
//.master("yarn")
//.appName("hello-wrold")
//.config("spark.some.config.option", "some-value")
.getOrCreate();
List<Person> persons = new ArrayList<>();
persons.add(new Person("zhangsan", 22, "male"));
persons.add(new Person("lisi", 25, "male"));
persons.add(new Person("wangwu", 23, "female"));
spark.createDataFrame(persons, Person.class).show(false);
spark.close();
}
}
然後建立SparkLauncher類:
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import java.io.IOException;
public class Launcher {
public static void main(String[] args) throws IOException {
SparkAppHandle handler = new SparkLauncher()
.setAppName("hello-world")
.setSparkHome(args[0])
.setMaster(args[1])
.setConf("spark.driver.memory", "2g")
.setConf("spark.executor.memory", "1g")
.setConf("spark.executor.cores", "3")
.setAppResource("/home/xinghailong/launcher/launcher_test.jar")
//此處應寫類的全限定名
.setMainClass("HelloWorld")
.addAppArgs("I come from Launcher")
.setDeployMode("cluster")
.startApplication(new SparkAppHandle.Listener(){
@Override
public void stateChanged(SparkAppHandle handle) {
System.out.println("********** state changed **********");
}
@Override
public void infoChanged(SparkAppHandle handle) {
System.out.println("********** info changed **********");
}
});
while(!"FINISHED".equalsIgnoreCase(handler.getState().toString()) && !"FAILED".equalsIgnoreCase(handler.getState().toString())){
System.out.println("id "+handler.getAppId());
System.out.println("state "+handler.getState());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
打包完成後上傳到部署Spark的伺服器上。由於SparkLauncher所在的類引用了SparkLauncher,所以還需要把這個jar也上傳到伺服器上。
[[email protected] launcher]$ ls
launcher_test.jar spark-launcher_2.11-2.2.0.jar
[[email protected] launcher]$ pwd
/home/xiangcong/launcher
由於SparkLauncher需要指定SPARK_HOME,因此如果你的機器可以執行spark-submit,那麼就看一下spark-submit裡面,SPARK_HOME是在哪
[[email protected] launcher]$ which spark2-submit
/var/lib/hadoop-hdfs/bin/spark2-submit
最後幾行就能看到:
export SPARK2_HOME=/var/lib/hadoop-hdfs/app/spark
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
exec "${SPARK2_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "[email protected]"
綜上,我們需要的是:
- 一個自定義的Jar,裡面包含Spark應用和SparkLauncher類
- 一個SparkLauncher的jar,spark-launcher_2.11-2.2.0.jar 版本根據你自己的來就行
- 一個當前目錄的路徑
- 一個SARK_HOME環境變數指定的目錄
然後執行命令啟動測試:
java -Djava.ext.dirs=/home/xinghailong/launcher -cp launcher_test.jar Launcher /var/lib/hadoop-hdfs/app/spark yarn
說明:
-Djava.ext.dirs
設定當前目錄為java類載入的目錄- 傳入兩個引數,一個是SPARK_HOME;一個是啟動模式
觀察發現成功啟動運行了:
id null
state UNKNOWN
Mar 10, 2018 12:00:52 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:00:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
********** state changed **********
...省略一大堆拷貝jar的日誌
********** info changed **********
********** state changed **********
Mar 10, 2018 12:00:55 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:00:55 INFO yarn.Client: Application report for application_1518263195995_37615 (state: ACCEPTED)
... 省略一堆重定向的日誌
application_1518263195995_37615 (state: ACCEPTED)
id application_1518263195995_37615
state SUBMITTED
Mar 10, 2018 12:01:00 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:01:00 INFO yarn.Client: Application report for application_1518263195995_37615 (state: RUNNING)
********** state changed **********
... 省略一堆重定向的日誌
INFO: user: hdfs
********** state changed **********
Mar 10, 2018 12:01:08 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:01:08 INFO util.ShutdownHookManager: Shutdown hook called
Mar 10, 2018 12:01:08 PM org.apache.spark.launcher.OutputRedirector redirect
INFO: 18/03/10 12:01:08 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-f07e0213-61fa-4710-90f5-2fd2030e0701
總結
這樣就實現了基於Java應用提交Spark任務,並獲得其Appliation_id和狀態進行定位跟蹤的需求了。