1. 程式人生 > 其它 >多執行緒批量提交Spark任務

多執行緒批量提交Spark任務

技術標籤:大資料Javaspark排程系統多執行緒提交任務批量提交spark任務

一.目的

1.避免資源的浪費

2.提高任務執行的效率

3.防止任務未執行完畢,session和執行緒池已關閉,導致任務失敗

二、異常coding:

1)method 1

  for (String s : list) {
            Dataset<Row> sql = sparkSession.sql(s);
            sql.show();
        }
        sparkSession.close();
        System.out.println("=====任務執行完畢====");

2)method 2

      //啟動多執行緒
        ExecutorService executorService = Executors.newFixedThreadPool(list.size());
        for (String s : list) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    Dataset<Row> sql = sparkSession.sql(s);
                    sql.show();
                }
            });
        }

        //關閉session和執行緒池
        executorService.shutdown();
        sparkSession.close();
        System.out.println("=====任務執行完畢====");

上述兩段程式碼有很大的問題,以及會有異常產生

method 1:

任務是一個一個序列執行,例如

若該任務申請到10個core,10G記憶體,而在執行第一個sql1時,job只使用了4個Core和2G記憶體,那麼就造成了資源的浪費和剩下的任務還在等待該job的執行,不等做到資源的使用,而且任務時序列執行效率慢。

弊端:a.浪費資源 b.執行效率慢

method2:

雖然是使用多執行緒提交任務,但是會發生任務未執行完畢session提前關閉的異常

三.coding--解決方案

1.使用CountDownLatch的計數器批量提交任務

1)計數器的初始大小和任務數量保持一致(和執行緒數無關)

2)每執行完一次任務計數器減一

3)await()方法會一致阻塞,直到計數器的值減為0,才會釋放鎖,以便所有任務執行完畢後繼續執行下一步操作

/**
 * 批量執行sql任務
 */
public class Test {

    public static void main(String[] args) throws Exception {
        //建立批量sql任務
        String sql1 = "select count(1) from pub_penalty where dt=20210106";
        String sql2 = "select count(1) from pub_penalty_tmp";
        String sql3 = "select count(1) from pub_permission_tmp";
        String sql4 = "select count(1) from pub_permission";
        String sql5 = "select count(1) from test_sort";
        ArrayList<String> list = new ArrayList<>();
        list.add(sql1);
        list.add(sql2);
        list.add(sql3);
        list.add(sql4);
        list.add(sql5);

        //初始化SparkSession
        SparkSession sparkSession = initSparkSession();
        
        //初始化CountDownLatch計數器,計數器大小和任務數保持一致
        CountDownLatch countDownLatch = new CountDownLatch(list.size());
        //啟動多執行緒
        ExecutorService executorService = Executors.newFixedThreadPool(list.size());
        for (String s : list) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    Dataset<Row> sql = sparkSession.sql(s);
                    System.out.println(s + "---->runing..........." + sql.count());
                    //計數器減一
                    countDownLatch.countDown();
                }
            });
        }
       //阻塞等待
        countDownLatch.await();
        System.out.println("----->執行完畢");
        //關閉session和執行緒池
        sparkSession.close();
        executorService.shutdown();
    }

    private static SparkSession initSparkSession() {
        System.setProperty("hadoop.home.dir", "D:\\appinstall");
        System.setProperty("HADOOP_USER_NAME", "bbdoffline");
        SparkConf conf = new SparkConf();
        conf.setAppName("bbd-wgj");
        SparkSession sparkSession =             SparkSession.builder().config(conf).master("local[*]").enableHiveSupport().getOrCreate();
        sparkSession.sparkContext().setLogLevel("WARN");
        return sparkSession;
    }

2.使用Callable提交任務,通過返回值Future<T>的阻塞方法get()批量提交任務

1)使用Callable具有返回值的多執行緒方法提交任務

2)返回值Future<T> 的get()方法是個阻塞方法,會等待結果的返回,直到任務結束


......主體程式碼......
//啟動多執行緒
        ExecutorService executorService = Executors.newFixedThreadPool(list.size());
        ArrayList<Future<String>> list1 = new ArrayList<>();
        for (String s : list) {
            //使用Callable具有返回值的多執行緒方法提交任務
            Future<String> submit = executorService.submit(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    Dataset<Row> sql = sparkSession.sql(s);
                    sql.show();
                    return "success" + s;
                }
            });
            //將任務返回值新增到list集合
            list1.add(submit);
        }
        for (Future<String> result : list1) {
            try {
                //get是一個阻塞方法,獲取結果值
                String retult = result.get();
                System.out.println(retult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //關閉session和執行緒池
        executorService.shutdown();
        sparkSession.close();
        System.out.println("=====任務執行完畢====");

spark ui圖:

如圖所示,多執行緒提交任務時,當資源申請足夠多時,會同時執行!!!即使資源不足,也會在上一個任務結束釋放資源後立即執行

注意:任務中視圖表的建立、臨時表的建立、等共享變數的建立,多個任務同時執行時,會造成同時使用!!!

該方法親試成功程式碼!!!

Github: https://github.com/wjy9517/myjob

----值得推薦