1. 程式人生 > 實用技巧 >Spark Streaming整合Kafka調優

Spark Streaming整合Kafka調優

調優

Spark Streaming整合Kafka時,當資料量較小時預設配置一般都能滿足我們的需要,但是當資料量大的時候,就需要進行一定的調整和優化。

  1. 合理的批處理時間(batchDuration)
    幾乎所有的Spark Streaming調優文件都會提及批處理時間的調整,在StreamingContext初始化的時候,有一個引數便是批處理時間的設定。如果這個值設定的過短,即個batchDuration所產生的Job並不能在這期間完成處理,那麼就會造成資料不斷堆積,最終導致Spark Streaming發生阻塞。而且,一般對於batchDuration的設定不會小於500ms,因為過小會導致SparkStreaming頻繁的提交作業,對整個streaming造成額外的負擔。在平時的應用中,根據不同的應用場景和硬體配置,我設在1~10s之間,我們可以根據SparkStreaming的視覺化監控介面,觀察Total Delay來進行batchDuration的調整,如下圖:

    調整批處理時間
  2. 合理的Kafka拉取量(maxRatePerPartition重要)
    對於Spark Streaming消費kafka中資料的應用場景,這個配置是非常關鍵的,配置引數為:spark.streaming.kafka.maxRatePerPartition。這個引數預設是沒有上線的,即kafka當中有多少資料它就會直接全部拉出。而根據生產者寫入Kafka的速率以及消費者本身處理資料的速度,同時這個引數需要結合上面的batchDuration,使得每個partition拉取在每個batchDuration期間拉取的資料能夠順利的處理完畢,做到儘可能高的吞吐量,而這個引數的調整可以參考視覺化監控介面中的Input Rate和Processing Time,如下圖:

    maxRatePerPartition1 maxRatePerPartition2

    spark.streaming.kafka.maxRatePerPartition這個引數是控制吞吐量的,一般和spark.streaming.backpressure.enabled=true一起使用。那麼應該怎麼算這個值呢。

    如例要10分鐘的吞吐量控制在5000,0000,kafka分割槽是10個。

    spark.streaming.kafka.maxRatePerPartition=8400這個值是怎麼算的呢。如下是公式

    spark.streaming.kafka.maxRatePerPartition的值 * kafka分割槽數 * (10 *60)(每秒時間)

  3. 快取反覆使用的Dstream(RDD)
    Spark中的RDD和SparkStreaming中的Dstream,如果被反覆的使用,最好利用cache(),將該資料流快取起來,防止過度的排程資源造成的網路開銷。可以參考觀察Scheduling Delay引數,如下圖:

    Dstream
  4. 設定合理的GC
    長期使用Java的小夥伴都知道,JVM中的垃圾回收機制,可以讓我們不過多的關注與記憶體的分配回收,更加專注於業務邏輯,JVM都會為我們搞定。對JVM有些瞭解的小夥伴應該知道,在Java虛擬機器中,將記憶體分為了初生代(eden generation)、年輕代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗費一定時間的,尤其是老年代的GC回收,需要對記憶體碎片進行整理,通常採用標記-清楚的做法。同樣的在Spark程式中,JVM GC的頻率和時間也是影響整個Spark效率的關鍵因素。在通常的使用中建議:
    --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"
    
  5. 設定合理的CPU資源數
    CPU的core數量,每個executor可以佔用一個或多個core,可以通過觀察CPU的使用率變化來了解計算資源的使用情況,例如,很常見的一種浪費是一個executor佔用了多個core,但是總的CPU使用率卻不高(因為一個executor並不總能充分利用多核的能力),這個時候可以考慮讓麼個executor佔用更少的core,同時worker下面增加更多的executor,或者一臺host上面增加更多的worker來增加並行執行的executor的數量,從而增加CPU利用率。但是增加executor的時候需要考慮好記憶體消耗,因為一臺機器的記憶體分配給越多的executor,每個executor的記憶體就越小,以致出現過多的資料spill over甚至out of memory的情況。
  6. 設定合理的parallelism
    partition和parallelism,partition指的就是資料分片的數量,每一次task只能處理一個partition的資料,這個值太小了會導致每片資料量太大,導致記憶體壓力,或者諸多executor的計算能力無法利用充分;但是如果太大了則會導致分片太多,執行效率降低。在執行action型別操作的時候(比如各種reduce操作),partition的數量會選擇parent RDD中最大的那一個。而parallelism則指的是在RDD進行reduce類操作的時候,預設返回資料的paritition數量(而在進行map類操作的時候,partition數量通常取自parent RDD中較大的一個,而且也不會涉及shuffle,因此這個parallelism的引數沒有影響)。所以說,這兩個概念密切相關,都是涉及到資料分片的,作用方式其實是統一的。通過spark.default.parallelism可以設定預設的分片數量,而很多RDD的操作都可以指定一個partition引數來顯式控制具體的分片數量。
    在SparkStreaming+kafka的使用中,我們採用了Direct連線方式,前文闡述過Spark中的partition和Kafka中的Partition是一一對應的,我們一般預設設定為Kafka中Partition的數量。
  7. 使用高效能的運算元
    這裡參考了美團技術團隊的博文,並沒有做過具體的效能測試,其建議如下:
  • 使用reduceByKey/aggregateByKey替代groupByKey
  • 使用mapPartitions替代普通map
  • 使用foreachPartitions替代foreach
  • 使用filter之後進行coalesce操作
  • 使用repartitionAndSortWithinPartitions替代repartition與sort類操作
  1. 使用Kryo優化序列化效能
    這個優化原則我本身也沒有經過測試,但是好多優化文件有提到,這裡也記錄下來。
    在Spark中,主要有三個地方涉及到了序列化:

在運算元函式中使用到外部變數時,該變數會被序列化後進行網路傳輸(見“原則七:廣播大變數”中的講解)。
將自定義的型別作為RDD的泛型型別時(比如JavaRDD,Student是自定義型別),所有自定義型別物件,都會進行序列化。因此這種情況下,也要求自定義的類必須實現Serializable介面。
使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的位元組陣列。
對於這三種出現序列化的地方,我們都可以通過使用Kryo序列化類庫,來優化序列化和反序列化的效能。Spark預設使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。但是Spark同時支援使用Kryo序列化庫,Kryo序列化類庫的效能比Java序列化類庫的效能要高很多。官方介紹,Kryo序列化機制比Java序列化機制,效能高10倍左右。Spark之所以預設沒有使用Kryo作為序列化類庫,是因為Kryo要求最好要註冊所有需要進行序列化的自定義型別,因此對於開發者來說,這種方式比較麻煩。

以下是使用Kryo的程式碼示例,我們只要設定序列化類,再註冊要序列化的自定義型別即可(比如運算元函式中使用到的外部變數型別、作為RDD泛型型別的自定義型別等):

// 建立SparkConf物件。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 設定序列化器為KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 註冊要序列化的自定義型別。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
  1. 結果
    經過種種除錯優化,我們最終要達到的目的是,Spark Streaming能夠實時的拉取Kafka當中的資料,並且能夠保持穩定,如下圖所示:

    結果

    當然不同的應用場景會有不同的圖形,這是本文詞頻統計優化穩定後的監控圖,我們可以看到Processing Time這一柱形圖中有一Stable的虛線,而大多數Batch都能夠在這一虛線下處理完畢,說明整體Spark Streaming是執行穩定的。

附:

使用SparkStreaming整合kafka時有幾個比較重要的引數:

(1)spark.streaming.stopGracefullyOnShutdown (true / false)預設fasle

確保在kill任務時,能夠處理完最後一批資料,再關閉程式,不會發生強制kill導致資料處理中斷,沒處理完的資料丟失

(2)spark.streaming.backpressure.enabled (true / false) 預設false

開啟後spark自動根據系統負載選擇最優消費速率

(3)spark.streaming.backpressure.initialRate (整數) 預設直接讀取所有

在(2)開啟的情況下,限制第一次批處理應該消費的資料,因為程式冷啟動 佇列裡面有大量積壓,防止第一次全部讀取,造成系統阻塞

(4)spark.streaming.kafka.maxRatePerPartition (整數) 預設直接讀取所有

限制每秒每個消費執行緒讀取每個kafka分割槽最大的資料量

注意:

只有(4)啟用的時候,每次消費的最大資料量,就是設定的資料量,如果不足這個數,就有多少讀多少,如果超過這個數字,就讀取這個數字的設定的值

只有(2)+(4)啟用的時候,每次消費讀取的數量最大會等於(4)設定的值,最小是spark根據系統負載自動推斷的值,消費的資料量會在這兩個範圍之內變化根據系統情況,但第一次啟動會有多少讀多少資料。此後按(2)+(4)設定規則執行

(2)+(3)+(4)同時啟用的時候,跟上一個消費情況基本一樣,但第一次消費會得到限制,因為我們設定第一次消費的頻率了。

參考:

https://www.jianshu.com/p/2623a145c508

https://blog.csdn.net/u010454030/article/details/54629049