多執行緒批量提交Spark任務
阿新 • • 發佈:2021-01-17
技術標籤:大資料Javaspark排程系統多執行緒提交任務批量提交spark任務
一.目的
1.避免資源的浪費
2.提高任務執行的效率
3.防止任務未執行完畢,session和執行緒池已關閉,導致任務失敗
二、異常coding:
1)method 1
for (String s : list) { Dataset<Row> sql = sparkSession.sql(s); sql.show(); } sparkSession.close(); System.out.println("=====任務執行完畢====");
2)method 2
//啟動多執行緒 ExecutorService executorService = Executors.newFixedThreadPool(list.size()); for (String s : list) { executorService.submit(new Runnable() { @Override public void run() { Dataset<Row> sql = sparkSession.sql(s); sql.show(); } }); } //關閉session和執行緒池 executorService.shutdown(); sparkSession.close(); System.out.println("=====任務執行完畢====");
上述兩段程式碼有很大的問題,以及會有異常產生
method 1:
任務是一個一個序列執行,例如
若該任務申請到10個core,10G記憶體,而在執行第一個sql1時,job只使用了4個Core和2G記憶體,那麼就造成了資源的浪費和剩下的任務還在等待該job的執行,不等做到資源的使用,而且任務時序列執行效率慢。
弊端:a.浪費資源 b.執行效率慢
method2:
雖然是使用多執行緒提交任務,但是會發生任務未執行完畢session提前關閉的異常
三.coding--解決方案
1.使用CountDownLatch的計數器批量提交任務
1)計數器的初始大小和任務數量保持一致(和執行緒數無關)
2)每執行完一次任務計數器減一
3)await()方法會一致阻塞,直到計數器的值減為0,才會釋放鎖,以便所有任務執行完畢後繼續執行下一步操作
/**
* 批量執行sql任務
*/
public class Test {
public static void main(String[] args) throws Exception {
//建立批量sql任務
String sql1 = "select count(1) from pub_penalty where dt=20210106";
String sql2 = "select count(1) from pub_penalty_tmp";
String sql3 = "select count(1) from pub_permission_tmp";
String sql4 = "select count(1) from pub_permission";
String sql5 = "select count(1) from test_sort";
ArrayList<String> list = new ArrayList<>();
list.add(sql1);
list.add(sql2);
list.add(sql3);
list.add(sql4);
list.add(sql5);
//初始化SparkSession
SparkSession sparkSession = initSparkSession();
//初始化CountDownLatch計數器,計數器大小和任務數保持一致
CountDownLatch countDownLatch = new CountDownLatch(list.size());
//啟動多執行緒
ExecutorService executorService = Executors.newFixedThreadPool(list.size());
for (String s : list) {
executorService.submit(new Runnable() {
@Override
public void run() {
Dataset<Row> sql = sparkSession.sql(s);
System.out.println(s + "---->runing..........." + sql.count());
//計數器減一
countDownLatch.countDown();
}
});
}
//阻塞等待
countDownLatch.await();
System.out.println("----->執行完畢");
//關閉session和執行緒池
sparkSession.close();
executorService.shutdown();
}
private static SparkSession initSparkSession() {
System.setProperty("hadoop.home.dir", "D:\\appinstall");
System.setProperty("HADOOP_USER_NAME", "bbdoffline");
SparkConf conf = new SparkConf();
conf.setAppName("bbd-wgj");
SparkSession sparkSession = SparkSession.builder().config(conf).master("local[*]").enableHiveSupport().getOrCreate();
sparkSession.sparkContext().setLogLevel("WARN");
return sparkSession;
}
2.使用Callable提交任務,通過返回值Future<T>的阻塞方法get()批量提交任務
1)使用Callable具有返回值的多執行緒方法提交任務
2)返回值Future<T> 的get()方法是個阻塞方法,會等待結果的返回,直到任務結束
......主體程式碼......
//啟動多執行緒
ExecutorService executorService = Executors.newFixedThreadPool(list.size());
ArrayList<Future<String>> list1 = new ArrayList<>();
for (String s : list) {
//使用Callable具有返回值的多執行緒方法提交任務
Future<String> submit = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
Dataset<Row> sql = sparkSession.sql(s);
sql.show();
return "success" + s;
}
});
//將任務返回值新增到list集合
list1.add(submit);
}
for (Future<String> result : list1) {
try {
//get是一個阻塞方法,獲取結果值
String retult = result.get();
System.out.println(retult);
} catch (Exception e) {
e.printStackTrace();
}
}
//關閉session和執行緒池
executorService.shutdown();
sparkSession.close();
System.out.println("=====任務執行完畢====");
spark ui圖:
如圖所示,多執行緒提交任務時,當資源申請足夠多時,會同時執行!!!即使資源不足,也會在上一個任務結束釋放資源後立即執行
注意:任務中視圖表的建立、臨時表的建立、等共享變數的建立,多個任務同時執行時,會造成同時使用!!!
該方法親試成功程式碼!!!
Github: https://github.com/wjy9517/myjob
----值得推薦