Spark如何在一個SparkContext中提交多個任務
阿新 • • 發佈:2019-01-04
在使用spark處理資料的時候,大多數都是提交一個job執行,然後job內部會根據具體的任務,生成task任務,執行在多個程序中,比如讀取的HDFS檔案的資料,spark會載入所有的資料,然後根據block個數生成task數目,多個task執行中不同的程序中,是並行的,如果在同一個程序中一個JVM裡面有多個task,那麼多個task也可以並行,這是常見的使用方式。
考慮下面一種場景,在HDFS上某個目錄下面有10個檔案,我想要同時並行的去統計每個檔案的數量,應該怎麼做? 其實spark是支援在一個spark context中可以通過多執行緒同時提交多個任務執行,然後spark context接到這所有的任務之後,通過中央排程,在來分配執行各個task,最終任務完成程式退出。
下面就來看下如何使用多執行緒提交任務,可以直接使用new Thread來建立執行緒提交,但是不建議這麼做,推薦的做法是通過Executors執行緒池來非同步管理執行緒,尤其是在提交的任務比較多的時候用這個會更加方便。
核心程式碼如下:
def main(args: Array[String]): Unit = {
val sparkConf=new SparkConf()
//例項化spark context
val sc=new SparkContext(sparkConf)
sparkConf.setAppName("multi task submit ")
//儲存任務返回值
val list=new util.ArrayList[Future[String]]()
//並行任務讀取的path
val task_paths=new util.ArrayList[String]()
task_paths.add("/tmp/data/path1/")
task_paths.add("/tmp/data/path2/")
task_paths.add("/tmp/data/path3/")
//執行緒數等於path的數量
val nums_threads=task_paths.size()
//構建執行緒池
val executors=Executors.newFixedThreadPool(nums_threads)
for (i<-0 until nums_threads){
val task= executors.submit(new Callable[String] {
override def call(): String ={
val count=sc.textFile(task_paths.get(i)).count()//獲取統計檔案數量
return task_paths.get(i)+" 檔案數量: "+count
}
})
list.add(task)//新增集合裡面
}
//遍歷獲取結果
list.asScala.foreach(result=>{
log.info(result.get())
})
//停止spark
sc.stop()
}
可以看到使用scala寫的程式碼比較精簡,這樣就完成了一個並行task提交的spark任務,最後我們打包完畢後,上傳到linux上進行提交,命令如下:
/opt/bigdata/spark/bin/spark-submit
--class MultiTaskSubmit
--master yarn
--deploy-mode cluster
--executor-cores 3
--driver-memory 1g
--executor-memory 1g
--num-executors 10
--jars $jars task.jar
最後需要注意一點,線上程裡面呼叫的方法如果包含一些全域性載入的屬性,最好放線上程的成員變數裡面進行初始化,否則多個執行緒去更改全域性屬性,有可能會造成一些未知的問題。