MapReduce提高效率的幾點建議
阿新 • • 發佈:2019-02-03
Cloudera提供給客戶的服務內容之一就是調整和優化MapReduce job執行效能。MapReduce和HDFS組成一個複雜的分散式系統,並且它們執行著各式各樣使用者的程式碼,這樣導致沒有一個快速有效的規則來實現優化程式碼效能的目的。在我看來,調整cluster或job的執行更像一個醫生對待病人一樣,找出關鍵的“症狀”,對於不同的症狀有不同的診斷和處理方式。
在醫學領域,沒有什麼可以代替一位經驗豐富的醫生;在複雜的分散式系統上,這個道理依然正確—有經驗的使用者和操作者在面對很多常見問題上都會有“第六感”。我曾經為Cloudera不同行業的客戶解決過問題,他們面對的工作量、資料集和cluster硬體有很大區別,因此我在這方面積累了很多的經驗,並且想把這些經驗分享給諸位。
在這篇blog裡,我會高亮那些提高MapReduce效能的建議。前面的一些建議是面向整個cluster的,這可能會對cluster 操作者和開發者有幫助。後面一部分建議是為那些用Java編寫MapReduce job的開發者而提出。在每一個建議中,我列出一些“症狀”或是“診斷測試”來說明一些針對這些問題的改進措施,可能會對你有所幫助。
請注意,這些建議中包含很多我以往從各種不同場景下總結出來的直觀經驗。它們可能不太適用於你所面對的特殊的工作量、資料集或cluster,如果你想使用它,就需要測試使用前和使用後它在你的cluster環境中的表現。對於這些建議,我會展示一些對比性的資料,資料產生的環境是一個4個節點的cluster來執行40GB的Wordcount job。應用了我以下所提到的這些建議後,這個job中的每個map task大概執行33秒,job總共執行了差不多8分30秒。
第一點 正確地配置你的Cluster
診斷結果/症狀:
1. Linux top命令的結果顯示slave節點在所有map和reduce slot都有task執行時依然很空閒。
2. top命令顯示核心的程序,如RAID(mdX_raid*)或pdflush佔去大量的CPU時間。
3. Linux的平均負載通常是系統CPU數量的2倍。
4. 即使系統正在執行job,Linux平均負載總是保持在系統CPU數量的一半的狀態。
5. 一些節點上的swap利用率超過幾MB
優化你的MapReduce效能的第一步是確保你整個cluster的配置檔案被調整過。對於新手,請參考這裡關於配置引數的一篇blog:配置引數。 除了這些配置引數 ,在你想修改job引數以期提高效能時,你應該參照下我這裡的一些你應該注意的項:
1. 確保你正在DFS和MapReduce中使用的儲存mount被設定了noatime選項。這項如果設定就不會啟動對磁碟訪問時間的記錄,會顯著提高IO的效能。
2. 避免在TaskTracker和DataNode的機器上執行RAID和LVM操作,這通常會降低效能
3. 在這兩個引數mapred.local.dir 和dfs.data.dir 配置的值應當是分佈在各個磁碟上目錄,這樣可以充分利用節點的IO讀寫能力。執行 Linux sysstat包下的iostat -dx 5命令可以讓每個磁碟都顯示它的利用率。
4. 你應該有一個聰明的監控系統來監控磁碟裝置的健康狀態。MapReduce job的設計是可容忍磁碟失敗,但磁碟的異常會導致一些task重複執行而使效能下降。如果你發現在某個TaskTracker被很多job中列入黑名單,那麼它就可能有問題。
5. 使用像Ganglia這樣的工具監控並繪出swap和網路的利用率圖。如果你從監控的圖看出機器正在使用swap記憶體,那麼減少mapred.child.java.opts 屬性所表示的記憶體分配。
基準測試:
很遺憾我不能為這個建議去生成一些測試資料,因為這需要構建整個cluster。如果你有相關的經驗,請把你的建議及結果附到下面的留言區。
第二點 使用LZO壓縮
診斷結果/症狀:
1. 對 job的中間結果資料使用壓縮是很好的想法。
2. MapReduce job的輸出資料大小是不可忽略的。
3. 在job執行時,通過linux top 和 iostat命令可以看出slave節點的iowait利用率很高。
幾乎每個Hadoop job都可以通過對map task輸出的中間資料做LZO壓縮獲得較好的空間效益。儘管LZO壓縮會增加一些CPU的負載,但在shuffle過程中會減少磁碟IO的資料量,總體上總是可以節省時間的。
當一個job需要輸出大量資料時,應用LZO壓縮可以提高輸出端的輸出效能。這是因為預設情況下每個檔案的輸出都會儲存3個幅本,1GB的輸出檔案你將要儲存3GB的磁碟資料,當採用壓縮後當然更能節省空間並提高效能。
為了使LZO壓縮有效,請設定引數mapred.compress.map.output值為true。
基準測試:
在我的cluster裡,Wordcount例子中不使用LZO壓縮的話,job的執行時間只是稍微增加。但FILE_BYTES_WRITTEN計數器卻從3.5GB增長到9.2GB,這表示壓縮會減少62%的磁碟IO。在我的cluster裡,每個資料節點上磁碟數量對task數量的比例很高,但Wordcount job並沒有在整個cluster中共享,所以cluster中IO不是瓶頸,磁碟IO增長不會有什麼大的問題。但對於磁碟因很多併發活動而受限的環境來說,磁碟IO減少60%可以大幅提高job的執行速度。
第三點 調整map和reduce task的數量到合適的值
診斷結果/症狀:
1. 每個map或reduce task的完成時間少於30到40秒。
2. 大型的job不能完全利用cluster中所有空閒的slot。
3. 大多數map或reduce task被排程執行了,但有一到兩個task還在準備狀態,在其它task完成之後才單獨執行
調整job中map和reduce task的數量是一件很重要且常常被忽略的事情。下面是我在設定這些引數時的一些直觀經驗:
1. 如果每個task的執行時間少於30到40秒,就減少task的數量。Task的建立與排程一般耗費幾秒的時間,如果task完成的很快,我們就是在浪費時間。同時,設定JVM重用也可以解決這個問題。
2. 如果一個job的輸入資料大於1TB,我們就增加block size到256或者512,這樣可以減少task的數量。你可以使用這個命令去修改已存在檔案的block size: hadoop distcp -Ddfs.block.size=$[256*1024*1024] /path/to/inputdata /path/to/inputdata-with/largeblocks。在執行完這個命令後,你就可以刪除原始的輸入檔案了(/path/to/inputdata)。
3. 只要每個task執行至少30到40秒,那麼就增加map task的數量,增加到整個cluster上map slot總數的幾倍。如果你的cluster中有100個map slot,那就避免執行一個有101個map task的job — 如果執行的話,前100個map同時執行,第101個task會在reduce執行之前單獨執行。這個建議對於小型cluste和小型job是很重要的。
4. 不要排程太多的reduce task — 對於大多數job來說,我們推薦reduce task的數量應當等於或是略小於cluster中reduce slot的數量。
基準測試:
為了讓Wordcount job有很多的task執行,我設定瞭如下的引數:Dmapred.max.split.size=$[16*1024*1024]。以前預設會產生360個map task,現在就會有2640個。當完成這個設定之後,每個task執行耗費9秒,並且在JobTracker的Cluster Summar檢視中可以觀看到,正在執行的map task數量在0到24之間浮動。job在17分52秒之後結束,比原來的執行要慢兩倍多。
第四點 為job新增一個Combiner
診斷結果/症狀:
1. job在執行分類的聚合時,REDUCE_INPUT_GROUPS計數器遠小於REDUCE_INPUT_RECORDS計數器。
2. job執行一個大的shuffle任務(例如,map的輸出資料每個節點就是好幾個GB)。
3. 從job計數器中看出,SPILLED_RECORDS遠大於MAP_OUTPUT_RECORDS。
如果你的演算法涉及到一些分類的聚合,那麼你就可以使用Combiner來完成資料到達reduce端之前的初始聚合工作。MapReduce框架很明智地運用Combiner來減少寫入磁碟以及通過網路傳輸到reduce端的資料量。
基準測試:
我刪去Wordcount例子中對setCombinerClass方法的呼叫。僅這個修改就讓map task的平均執行時間由33秒增長到48秒,shuffle的資料量也從1GB提高到1.4GB。整個job的執行時間由原來的8分30秒變成15分42秒,差不多慢了兩倍。這次測試過程中開啟了map輸出結果的壓縮功能,如果沒有開啟這個壓縮功能的話,那麼Combiner的影響就會變得更加明顯。
第五點 為你的資料使用最合適和簡潔的Writable型別
診斷/症狀:
1. Text 物件在非文字或混合資料中使用。
2. 大部分的輸出值很小的時候使用IntWritable 或 LongWritable物件。
當一個開發者是初次編寫MapReduce,或是從開發Hadoop Streaming轉到Java MapReduce,他們會經常在不必要的時候使用Text 物件。儘管Text物件使用起來很方便,但它在由數值轉換到文字或是由UTF8字串轉換到文字時都是低效的,且會消耗大量的CPU時間。當處理那些非文字的資料時,可以使用二進位制的Writable型別,如IntWritable, FloatWritable等。
除了避免檔案轉換的消耗外,二進位制Writable型別作為中間結果時會佔用更少的空間。當磁碟IO和網路傳輸成為大型job所遇到的瓶頸時,減少些中間結果的大小可以獲得更好的效能。在處理整形數值時,有時使用VIntWritable或VLongWritable型別可能會更快些—這些實現了變長整形編碼的型別在序列化小數值時會更節省空間。例如,整數4會被序列化成單位元組,而整數10000會被序列化成兩個位元組。這些變長型別用在統計等任務時更加有效,在這些任務中我們只要確保大部分的記錄都是一個很小的值,這樣值就可以匹配一或兩個位元組。
如果Hadoop自帶的Writable型別不能滿足你的需求,你可以開發自己的Writable型別。這應該是挺簡單的,可能會在處理文字方面更快些。如果你編寫了自己的Writable型別,請務必提供一個RawComparator類—你可以以內建的Writable型別做為例子。
基準測試:
對於Wordcount例子,我修改了它在map計數時的中間變數,由IntWritable改為Text。並且在reduce統計最終和時使用Integer.parseString(value.toString)來轉換出真正的數值。這個版本比原始版本要慢近10%—整個job完成差不多超過9分鐘,且每個map task要執行36秒,比之前的33秒要慢。儘量看起來整形轉換還是挺快的,但這不說明什麼情況。在正常情況下,我曾經看到過選用合適的Writable型別可以有2到3倍的效能提升的例子。
第六點 重用Writable型別
診斷/症狀:
1. 在mapred.child.java.opts引數上增加-verbose:gc -XX:+PriintGCDetails,然後檢視一些task的日誌。如果垃圾回收頻繁工作且消耗一些時間,你需要注意那些無用的物件。
2. 在你的程式碼中搜索"new Text" 或"new IntWritable"。如果它們出現在一個內部迴圈或是map/reduce方法的內部時,這條建議可能會很有用。
3. 這條建議在task記憶體受限的情況下特別有用。
很多MapReduce使用者常犯的一個錯誤是,在一個map/reduce方法中為每個輸出都建立Writable物件。例如,你的Wordcout mapper方法可能這樣寫:
在醫學領域,沒有什麼可以代替一位經驗豐富的醫生;在複雜的分散式系統上,這個道理依然正確—有經驗的使用者和操作者在面對很多常見問題上都會有“第六感”。我曾經為Cloudera不同行業的客戶解決過問題,他們面對的工作量、資料集和cluster硬體有很大區別,因此我在這方面積累了很多的經驗,並且想把這些經驗分享給諸位。
在這篇blog裡,我會高亮那些提高MapReduce效能的建議。前面的一些建議是面向整個cluster的,這可能會對cluster 操作者和開發者有幫助。後面一部分建議是為那些用Java編寫MapReduce job的開發者而提出。在每一個建議中,我列出一些“症狀”或是“診斷測試”來說明一些針對這些問題的改進措施,可能會對你有所幫助。
請注意,這些建議中包含很多我以往從各種不同場景下總結出來的直觀經驗。它們可能不太適用於你所面對的特殊的工作量、資料集或cluster,如果你想使用它,就需要測試使用前和使用後它在你的cluster環境中的表現。對於這些建議,我會展示一些對比性的資料,資料產生的環境是一個4個節點的cluster來執行40GB的Wordcount job。應用了我以下所提到的這些建議後,這個job中的每個map task大概執行33秒,job總共執行了差不多8分30秒。
第一點 正確地配置你的Cluster
診斷結果/症狀:
1. Linux top命令的結果顯示slave節點在所有map和reduce slot都有task執行時依然很空閒。
2. top命令顯示核心的程序,如RAID(mdX_raid*)或pdflush佔去大量的CPU時間。
3. Linux的平均負載通常是系統CPU數量的2倍。
4. 即使系統正在執行job,Linux平均負載總是保持在系統CPU數量的一半的狀態。
5. 一些節點上的swap利用率超過幾MB
優化你的MapReduce效能的第一步是確保你整個cluster的配置檔案被調整過。對於新手,請參考這裡關於配置引數的一篇blog:配置引數。 除了這些配置引數 ,在你想修改job引數以期提高效能時,你應該參照下我這裡的一些你應該注意的項:
1. 確保你正在DFS和MapReduce中使用的儲存mount被設定了noatime選項。這項如果設定就不會啟動對磁碟訪問時間的記錄,會顯著提高IO的效能。
2. 避免在TaskTracker和DataNode的機器上執行RAID和LVM操作,這通常會降低效能
3. 在這兩個引數mapred.local.dir
4. 你應該有一個聰明的監控系統來監控磁碟裝置的健康狀態。MapReduce job的設計是可容忍磁碟失敗,但磁碟的異常會導致一些task重複執行而使效能下降。如果你發現在某個TaskTracker被很多job中列入黑名單,那麼它就可能有問題。
5. 使用像Ganglia這樣的工具監控並繪出swap和網路的利用率圖。如果你從監控的圖看出機器正在使用swap記憶體,那麼減少mapred.child.java.opts
基準測試:
很遺憾我不能為這個建議去生成一些測試資料,因為這需要構建整個cluster。如果你有相關的經驗,請把你的建議及結果附到下面的留言區。
第二點 使用LZO壓縮
診斷結果/症狀:
1. 對 job的中間結果資料使用壓縮是很好的想法。
2. MapReduce job的輸出資料大小是不可忽略的。
3. 在job執行時,通過linux top 和 iostat命令可以看出slave節點的iowait利用率很高。
幾乎每個Hadoop job都可以通過對map task輸出的中間資料做LZO壓縮獲得較好的空間效益。儘管LZO壓縮會增加一些CPU的負載,但在shuffle過程中會減少磁碟IO的資料量,總體上總是可以節省時間的。
當一個job需要輸出大量資料時,應用LZO壓縮可以提高輸出端的輸出效能。這是因為預設情況下每個檔案的輸出都會儲存3個幅本,1GB的輸出檔案你將要儲存3GB的磁碟資料,當採用壓縮後當然更能節省空間並提高效能。
為了使LZO壓縮有效,請設定引數mapred.compress.map.output值為true。
基準測試:
在我的cluster裡,Wordcount例子中不使用LZO壓縮的話,job的執行時間只是稍微增加。但FILE_BYTES_WRITTEN計數器卻從3.5GB增長到9.2GB,這表示壓縮會減少62%的磁碟IO。在我的cluster裡,每個資料節點上磁碟數量對task數量的比例很高,但Wordcount job並沒有在整個cluster中共享,所以cluster中IO不是瓶頸,磁碟IO增長不會有什麼大的問題。但對於磁碟因很多併發活動而受限的環境來說,磁碟IO減少60%可以大幅提高job的執行速度。
第三點 調整map和reduce task的數量到合適的值
診斷結果/症狀:
1. 每個map或reduce task的完成時間少於30到40秒。
2. 大型的job不能完全利用cluster中所有空閒的slot。
3. 大多數map或reduce task被排程執行了,但有一到兩個task還在準備狀態,在其它task完成之後才單獨執行
調整job中map和reduce task的數量是一件很重要且常常被忽略的事情。下面是我在設定這些引數時的一些直觀經驗:
1. 如果每個task的執行時間少於30到40秒,就減少task的數量。Task的建立與排程一般耗費幾秒的時間,如果task完成的很快,我們就是在浪費時間。同時,設定JVM重用也可以解決這個問題。
2. 如果一個job的輸入資料大於1TB,我們就增加block size到256或者512,這樣可以減少task的數量。你可以使用這個命令去修改已存在檔案的block size: hadoop distcp -Ddfs.block.size=$[256*1024*1024] /path/to/inputdata /path/to/inputdata-with/largeblocks。在執行完這個命令後,你就可以刪除原始的輸入檔案了(/path/to/inputdata)。
3. 只要每個task執行至少30到40秒,那麼就增加map task的數量,增加到整個cluster上map slot總數的幾倍。如果你的cluster中有100個map slot,那就避免執行一個有101個map task的job — 如果執行的話,前100個map同時執行,第101個task會在reduce執行之前單獨執行。這個建議對於小型cluste和小型job是很重要的。
4. 不要排程太多的reduce task — 對於大多數job來說,我們推薦reduce task的數量應當等於或是略小於cluster中reduce slot的數量。
基準測試:
為了讓Wordcount job有很多的task執行,我設定瞭如下的引數:Dmapred.max.split.size=$[16*1024*1024]。以前預設會產生360個map task,現在就會有2640個。當完成這個設定之後,每個task執行耗費9秒,並且在JobTracker的Cluster Summar檢視中可以觀看到,正在執行的map task數量在0到24之間浮動。job在17分52秒之後結束,比原來的執行要慢兩倍多。
第四點 為job新增一個Combiner
診斷結果/症狀:
1. job在執行分類的聚合時,REDUCE_INPUT_GROUPS計數器遠小於REDUCE_INPUT_RECORDS計數器。
2. job執行一個大的shuffle任務(例如,map的輸出資料每個節點就是好幾個GB)。
3. 從job計數器中看出,SPILLED_RECORDS遠大於MAP_OUTPUT_RECORDS。
如果你的演算法涉及到一些分類的聚合,那麼你就可以使用Combiner來完成資料到達reduce端之前的初始聚合工作。MapReduce框架很明智地運用Combiner來減少寫入磁碟以及通過網路傳輸到reduce端的資料量。
基準測試:
我刪去Wordcount例子中對setCombinerClass方法的呼叫。僅這個修改就讓map task的平均執行時間由33秒增長到48秒,shuffle的資料量也從1GB提高到1.4GB。整個job的執行時間由原來的8分30秒變成15分42秒,差不多慢了兩倍。這次測試過程中開啟了map輸出結果的壓縮功能,如果沒有開啟這個壓縮功能的話,那麼Combiner的影響就會變得更加明顯。
第五點 為你的資料使用最合適和簡潔的Writable型別
診斷/症狀:
1. Text 物件在非文字或混合資料中使用。
2. 大部分的輸出值很小的時候使用IntWritable 或 LongWritable物件。
當一個開發者是初次編寫MapReduce,或是從開發Hadoop Streaming轉到Java MapReduce,他們會經常在不必要的時候使用Text 物件。儘管Text物件使用起來很方便,但它在由數值轉換到文字或是由UTF8字串轉換到文字時都是低效的,且會消耗大量的CPU時間。當處理那些非文字的資料時,可以使用二進位制的Writable型別,如IntWritable, FloatWritable等。
除了避免檔案轉換的消耗外,二進位制Writable型別作為中間結果時會佔用更少的空間。當磁碟IO和網路傳輸成為大型job所遇到的瓶頸時,減少些中間結果的大小可以獲得更好的效能。在處理整形數值時,有時使用VIntWritable或VLongWritable型別可能會更快些—這些實現了變長整形編碼的型別在序列化小數值時會更節省空間。例如,整數4會被序列化成單位元組,而整數10000會被序列化成兩個位元組。這些變長型別用在統計等任務時更加有效,在這些任務中我們只要確保大部分的記錄都是一個很小的值,這樣值就可以匹配一或兩個位元組。
如果Hadoop自帶的Writable型別不能滿足你的需求,你可以開發自己的Writable型別。這應該是挺簡單的,可能會在處理文字方面更快些。如果你編寫了自己的Writable型別,請務必提供一個RawComparator類—你可以以內建的Writable型別做為例子。
基準測試:
對於Wordcount例子,我修改了它在map計數時的中間變數,由IntWritable改為Text。並且在reduce統計最終和時使用Integer.parseString(value.toString)來轉換出真正的數值。這個版本比原始版本要慢近10%—整個job完成差不多超過9分鐘,且每個map task要執行36秒,比之前的33秒要慢。儘量看起來整形轉換還是挺快的,但這不說明什麼情況。在正常情況下,我曾經看到過選用合適的Writable型別可以有2到3倍的效能提升的例子。
第六點 重用Writable型別
診斷/症狀:
1. 在mapred.child.java.opts引數上增加-verbose:gc -XX:+PriintGCDetails,然後檢視一些task的日誌。如果垃圾回收頻繁工作且消耗一些時間,你需要注意那些無用的物件。
2. 在你的程式碼中搜索"new Text" 或"new IntWritable"。如果它們出現在一個內部迴圈或是map/reduce方法的內部時,這條建議可能會很有用。
3. 這條建議在task記憶體受限的情況下特別有用。
很多MapReduce使用者常犯的一個錯誤是,在一個map/reduce方法中為每個輸出都建立Writable物件。例如,你的Wordcout mapper方法可能這樣寫: