Spark Streaming實時處理應用
1 框架一覽
事件處理的架構圖如下所示。
2 優化總結
當我們第一次部署整個方案時,kafka
和flume
元件都執行得非常好,但是spark streaming
應用需要花費4-8分鐘來處理單個batch
。這個延遲的原因有兩點,一是我們使用DataFrame
來強化資料,而強化資料需要從hive
中讀取大量的資料; 二是我們的引數配置不理想。
為了優化我們的處理時間,我們從兩方面著手改進:第一,快取合適的資料和分割槽;第二,改變配置引數優化spark應用。執行spark應用的spark-submit
命令如下所示。通過引數優化和程式碼改進,我們顯著減少了處理時間,處理時間從4-8分鐘降到了低於25秒。
/opt/app/dev/spark-1.5.2/bin/spark-submit \
--jars \
/opt/cloudera/parcels/CDH/jars/zkclient-0.3.jar,/opt/cloudera/parcels/CDH/jars/kafka_2.10-0.8.1.1.jar,\
/opt/app/dev/jars/datanucleus-core-3.2.2.jar,/opt/app/dev/jars/datanucleus-api-jdo-3.2.1.jar,/opt/app/dev/jars/datanucleus-rdbms-3.2.1.jar \
--files /opt/app/dev/spark-1.5.2/conf/hive-site.xml,/opt/app/dev/jars/log4j-eir.properties \
--queue spark_service_pool \
--master yarn \
--deploy-mode cluster \
--conf "spark.ui.showConsoleProgress=false" \
--conf "spark.driver.extraJavaOptions=-XX:MaxPermSize=6G -XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j-eir.properties" \ --conf "spark.sql.tungsten.enabled=false" \ --conf "spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory" \ --conf "spark.eventLog.enabled=true" \ --conf "spark.sql.codegen=false" \ --conf "spark.sql.unsafe.enabled=false" \ --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j-eir.properties" \ --conf "spark.streaming.backpressure.enabled=true" \ --conf "spark.locality.wait=1s" \ --conf "spark.streaming.blockInterval=1500ms" \ --conf "spark.shuffle.consolidateFiles=true" \ --driver-memory 10G \ --executor-memory 8G \ --executor-cores 20 \ --num-executors 20 \ --class com.bigdata.streaming.OurApp \ /opt/app/dev/jars/OurStreamingApplication.jar external_props.conf
下面我們將詳細介紹這些改變的引數。
2.1 driver選項
這裡需要注意的是,driver
執行在spark on yarn
的叢集模式下。因為spark streaming
應用是一個長期執行的任務,生成的日誌檔案會很大。為了解決這個問題,我們限制了寫入日誌的訊息的條數, 並且用RollingFileAppender
限制了它們的大小。我們也關閉了spark.ui.showConsoleProgress
選項來禁用控制檯日誌訊息。
通過測試,我們的driver
因為永久代空間填滿而頻繁發生記憶體耗盡(永久代空間是類、方法等儲存的地方,不會被重新分配)。將永久代空間的大小升高到6G可以解決這個問題。
spark.driver.extraJavaOptions=-XX:MaxPermSize=6G
2.2 垃圾回收
因為我們的spark streaming
應用程式是一個長期執行的程序,在處理一段時間之後,我們注意到GC
暫停時間過長,我們想在後臺減少或者保持這個時間。調整UseConcMarkSweepGC
引數是一個技巧。
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -Dlog4j.configuration=log4j-eir.properties" \
2.3 禁用Tungsten
Tungsten
是spark
執行引擎主要的改進。但是它的第一個版本是有問題的,所以我們暫時禁用它。
spark.sql.tungsten.enabled=false spark.sql.codegen=false spark.sql.unsafe.enabled=false
2.4 啟用反壓
Spark Streaming
在批處理時間大於批間隔時間時會出現問題。換一句話說,就是spark
讀取資料的速度慢於kafka
資料到達的速度。如果按照這個吞吐量執行過長的時間,它會造成不穩定的情況。 即接收executor
的記憶體溢位。設定下面的引數解決這個問題。
spark.streaming.backpressure.enabled=true
2.5 調整本地化和塊配置
下面的兩個引數是互補的。一個決定了資料本地化到task
或者executor
等待的時間,另外一個被spark streaming receiver
使用對資料進行組塊。塊越大越好,但是如果資料沒有本地化到executor
,它將會通過網路移動到 任務執行的地方。我們必須在這兩個引數間找到一個好的平衡,因為我們不想資料塊太大,並且也不想等待本地化太長時間。我們希望所有的任務都在幾秒內完成。
因此,我們改變本地化選項從3s到1s,我們也改變塊間隔為1.5s。
--conf "spark.locality.wait=1s" \
--conf "spark.streaming.blockInterval=1500ms" \
2.6 合併臨時檔案
在ext4
檔案系統中,推薦開啟這個功能。因為這會產生更少的臨時檔案。
--conf "spark.shuffle.consolidateFiles=true" \
2.7 開啟executor配置
在你配置kafka Dstream
時,你能夠指定併發消費執行緒的數量。然而,kafka Dstream
的消費者會執行在相同的spark driver
節點上面。因此,為了從多臺機器上面並行消費kafka topic
, 我們必須例項化多個Dstream
。雖然可以在處理之前合併相應的RDD
,但是執行多個應用程式例項,把它們都作為相同kafka consumer group
的一部分。
為了達到這個目的,我們設定20個executor
,並且每個executor
有20個核。
--executor-memory 8G --executor-cores 20 --num-executors 20
2.8 快取方法
使用RDD
之前快取RDD
,但是記住在下次迭代之前從快取中刪除它。快取那些需要使用多次的資料非常有用。然而,不要使分割槽數目過大。保持分割槽數目較低可以減少,最小化排程延遲。下面的公式是我們使用的分割槽數的計算公式。
# of executors * # of cores = # of partitions