1. 程式人生 > 程式設計 >Spark調優多執行緒並行處理任務實現方式

Spark調優多執行緒並行處理任務實現方式

方式1:

1. 明確 Spark中Job 與 Streaming中 Job 的區別

1.1 Spark Core

一個 RDD DAG Graph 可以生成一個或多個 Job(Action操作)

一個Job可以認為就是會最終輸出一個結果RDD的一條由RDD組織而成的計算

Job在spark裡應用裡是一個被排程的單位

1.2 Streaming

一個 batch 的資料對應一個 DStreamGraph

而一個 DStreamGraph 包含一或多個關於 DStream 的輸出操作

每一個輸出對應於一個Job,一個 DStreamGraph 對應一個JobSet,裡面包含一個或多個Job

2. Streaming Job的並行度

Job的並行度由兩個配置決定:

spark.scheduler.mode(FIFO/FAIR)
spark.streaming.concurrentJobs

一個 Batch 可能會有多個 Action 執行,比如註冊了多個 Kafka 資料流,每個Action都會產生一個Job

所以一個 Batch 有可能是一批 Job,也就是 JobSet 的概念

這些 Job 由 jobExecutor 依次提交執行

而 JobExecutor 是一個預設池子大小為1的執行緒池,所以只能執行完一個Job再執行另外一個Job

這裡說的池子,大小就是由spark.streaming.concurrentJobs 控制的

concurrentJobs 決定了向 Spark Core 提交Job的並行度

提交一個Job,必須等這個執行完了,才會提交第二個

假設我們把它設定為2,則會併發的把 Job 提交給 Spark Core

Spark 有自己的機制決定如何執行這兩個Job,這個機制其實就是FIFO或者FAIR(決定了資源的分配規則)

預設是 FIFO,也就是先進先出,把 concurrentJobs 設定為2,但是如果底層是FIFO,那麼會優先執行先提交的Job

雖然如此,如果資源夠兩個job執行,還是會並行執行兩個Job

Spark Streaming 不同Batch任務可以平行計算麼https://developer.aliyun.com/article/73004

conf.setMaster("local[4]")
conf.set("spark.streaming.concurrentJobs","3") //job 並行對
conf.set("spark.scheduler.mode","FIFO")
val sc = new StreamingContext(conf,Seconds(5))

你會發現,不同batch的job其實也可以並行執行的,這裡需要有幾個條件:

有延時發生了,batch無法在本batch完成

concurrentJobs > 1

如果scheduler mode 是FIFO則需要某個Job無法一直消耗掉所有資源

Mode是FAIR則盡力保證你的Job是並行執行的,毫無疑問是可以並行的。

方式2:

場景1:

程式每次處理的資料量是波動的,比如週末比工作日多很多,晚八點比凌晨四點多很多。

一個spark程式處理的時間在1-2小時波動是OK的。而spark streaming程式不可以,如果每次處理的時間是1-10分鐘,就很蛋疼。
設定10分鐘吧,實際上10分鐘的也就那一段高峰時間,如果設定每次是1分鐘,很多時候會出現程式處理不過來,排隊過多的任務延遲更久,還可能出現程式崩潰的可能。

場景2:

  • 程式需要處理的相似job數隨著業務的增長越來越多
  • 我們知道spark的api裡無相互依賴的stage是並行處理的,但是job之間是序列處理的。
  • spark程式通常是離線處理,比如T+1之類的延遲,時間變長是可以容忍的。而spark streaming是準實時的,如果業務增長導致延遲增加就很不合理。

spark雖然是序列執行job,但是是可以把job放到執行緒池裡多執行緒執行的。如何在一個SparkContext中提交多個任務

DStream.foreachRDD{
   rdd =>
    //建立執行緒池
    val executors=Executors.newFixedThreadPool(rules.length)
    //將規則放入執行緒池
    for( ru <- rules){
     val task= executors.submit(new Callable[String] {
      override def call(): String ={
       //執行規則
       runRule(ru,spark)
      }
     })
    }
    //每次建立的執行緒池執行完所有規則後shutdown
    executors.shutdown()
  }

注意點

1.最後需要executors.shutdown()。

  • 如果是executors.shutdownNow()會發生未執行完的task強制關閉執行緒。
  • 如果使用executors.awaitTermination()則會發生阻塞,不是我們想要的結果。
  • 如果沒有這個shutdowm操作,程式會正常執行,但是長時間會產生大量無用的執行緒池,因為每次foreachRDD都會建立一個執行緒池。

2.可不可以將建立執行緒池放到foreachRDD外面?

不可以,這個關係到對於scala閉包到理解,經測試,第一次或者前幾次batch是正常的,後面的batch無執行緒可用。

3.執行緒池executor崩潰了就會導致資料丟失

原則上是這樣的,但是正常的程式碼一般不會發生executor崩潰。至少我在使用的時候沒遇到過。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。