1. 程式人生 > >一步一步跟我學習hadoop(5)----hadoop Map/Reduce教程(2)

一步一步跟我學習hadoop(5)----hadoop Map/Reduce教程(2)

submit calc run submitjob des conf sam ner 打開

Map/Reduce用戶界面

本節為用戶採用框架要面對的各個環節提供了具體的描寫敘述,旨在與幫助用戶對實現、配置和調優進行具體的設置。然而,開發時候還是要相應著API進行相關操作。

首先我們須要了解Mapper和Reducer接口,應用通常須要提供map和reduce方法以實現他們。

接著我們須要對JobConf, JobClient,Partitioner,OutputCollector,Reporter,InputFormat,OutputFormat,OutputCommitter等進行討論。

最後,我們將通過討論框架中一些實用的功能點(比如:DistributedCache

IsolationRunner等等)。

核心功能描寫敘述

應用程序一般會通過提供mapreduce來實現MapperReducer接口,它們組成作業的核心。

Mapper

mapper對輸入的鍵值對映射成一組中間格式的鍵值對。

映射是一個獨立的任務,它將輸入記錄集轉換為中間格式記錄集。這樣的轉換的中間格式記錄不須要與輸入的記錄集類型一致,一個輸入的鍵值能夠相應多個輸出的鍵值對。

Hadoop Map/Reduce為每一個由作業中InputFormat產生的InputSplit產生一個map任務。

總的來說。對Mapper的實現者須要重寫JobConfigurable.configure(JobConf)方法。此方法須要傳遞JobConf參數。以此來完畢mapper的初始化。

接著。框架調用map方法。對任務中的InputSplit中每一個key/value pair調用以此。

應用程序通過重寫Closable.close()來運行必要的清理工作。

輸出的鍵值對不須要跟輸入的鍵值對的類型一致。輸入的鍵值可能映射成0到多個輸出的鍵值對,然後調用OutputCollector.collect(WritableComparable,Writable)來收集輸出的鍵值對。

應用程序能夠使用Reporter報告進度。設定應用級別的狀態消息,更新Counters(計數器),或者僅是表明自己執行正常。

框架隨後會把與一個特定key關聯的全部中間過程的值(value)分成組。然後把它們傳給Reducer

以產出終於的結果。用戶能夠通過 JobConf.setOutputKeyComparatorClass(Class)來指定詳細負責分組的Comparator

Mapper的輸出被排序後。就被劃分給每一個Reducer。分塊的總數目和一個作業的reduce任務的數目是一樣的。

用戶能夠通過實現自己定義的Partitioner來控制哪個key被分配給哪個Reducer

用戶可選擇通過 JobConf.setCombinerClass(Class)指定一個combiner,它負責對中間過程的輸出進行本地的聚集,這會有助於減少從MapperReducer傳輸數據量。

這些被排好序的中間過程的輸出結果保存的格式是(key-len, key, value-len, value),應用程序能夠通過JobConf控制對這些中間結果是否進行壓縮以及怎麽壓縮,使用哪種 CompressionCodec。

須要多少個Map?

Map的數目一般是由輸入數據的大小決定的,一般就是全部輸入文件的總塊(block)數。

Map正常的並行規模大致是每一個節點(node)大約10到100個map,對於CPU 消耗較小的map任務能夠設到300個左右。因為每一個任務初始化須要一定的時間,因此。比較合理的情況是map運行的時間至少超過1分鐘。

這樣,假設你輸入10TB的數據。每一個塊(block)的大小是128MB,你將須要大約82,000個map來完畢任務,除非使用 setNumMapTasks(int)(註意:這裏不過對框架進行了一個提示(hint),實際決定因素見這裏)將這個數值設置得更高。

Reducer

對於reducer,官方給出的說明為:

Reducer reduces a set of intermediate values which share a key to a smaller set of values.


大意是Reducer對中間的值集合轉換成一個key相應一個更小的數據集。

Reducer的個數取決於用戶設置,用戶通過JobConf.setNumReduceTasks(int)來設置。

總的來說。Reducer的實現須要通過重寫JobConfigurable.configure(JobConf)方法。這種方法須要傳遞一個JobConf參數,目的是完畢Reducer的初始化工作。然後。框架為成組的輸入數據中的每一個<key, (list of values)>對調用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。

之後。應用程序能夠通過重寫Closeable.close()來運行對應的清理工作。

Reducer有3個主要階段:shuffle、sort和reduce。

Shuffle

reducer的輸入相應的是mapper的已排序的輸出。

Sort

框架在此階段依據輸入key的值對reducer的輸入進行分組(由於不同mapper的輸出中可能會有同樣的key);

Shuffle和sort兩個階段是同一時候進行的;map的輸出也是邊取回邊合並的。

Secondary Sort

假設須要中間過程對key的分組規則和reduce前對key的分組規則不同,那麽能夠通過 JobConf.setOutputValueGroupingComparator(Class)來指定一個Comparator

再加上 JobConf.setOutputKeyComparatorClass(Class)可用於控制中間過程的key怎樣被分組,所以結合兩者能夠實現按值的二次排序

Reduce

本階段框架為已分組的輸入數據中的每一個 <key, (list of values)>對調用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。

reduce任務的輸出一般是通過調用OutputCollector.collect(WritableComparable, Writable)來寫入文件系統的。

應用能夠利用Reporter來報告進度。設置程序級別狀態消息和更新計數器,或是只告知程序執行正常。

Reducer的輸出沒有排序處理。

須要多少Reduce

Reduce的數目建議是0.951.75乘以 (<no. of nodes> *mapred.tasktracker.reduce.tasks.maximum)。

用0.95。全部reduce能夠在maps一完畢時就立馬啟動,開始傳輸map的輸出結果。用1.75,速度快的節點能夠在完畢第一輪reduce任務後,能夠開始第二輪,這樣能夠得到比較好的負載均衡的效果。

添加reduce的數目會添加整個框架的開銷,但能夠改善負載均衡。減少因為運行失敗帶來的負面影響。

上述比例因子比總體數目稍小一些是為了給框架中的猜測性任務(speculative-tasks) 或失敗的任務預留一些reduce的資源。

無Reducer

假設沒有歸約要進行。那麽設置reduce任務的數目為是合法的。

這樣的情況下,map任務的輸出會直接被寫入由setOutputPath(Path)指定的輸出路徑。框架在把它們寫入FileSystem之前沒有對它們進行排序。

Partitioner

Partitioner對值空間進行劃分。

Partitioner負責控制map輸出結果key的切割。Key(或者一個key子集)被用於產生分區,通常使用的是Hash函數。分區的數目與一個作業的reduce任務的數目是一樣的。因此,它控制將中間過程的key(也就是這條記錄)應該發送給m個reduce任務中的哪一個來進行reduce操作。

HashPartitioner是默認的 Partitioner

Reporter

Reporter用於Map/Reduce應用程序報告進度。設定應用級別的狀態消息, 更新Counters(計數器)的機制。

MapperReducer的實現能夠利用Reporter來報告進度。或者僅是表明自己執行正常。在那種應用程序須要花非常長時間處理個別鍵值對的場景中。這樣的機制是非常關鍵的,由於框架可能會以為這個任務超時了,從而將它強行殺死。

還有一個避免這樣的情況發生的方式是。將配置參數mapred.task.timeout設置為一個足夠高的值(或者幹脆設置為零,則沒有超時限制了)。

應用程序能夠用Reporter來更新Counter(計數器)。

OutputCollector

OutputCollector是一個Map/Reduce框架提供的用於收集MapperReducer輸出數據的通用機制(包含中間輸出結果和作業的輸出結果)。

Hadoop Map/Reduce框架附帶了一個包括很多有用型的mapper、reducer和partitioner 的類庫。

任務的運行和環境

TaskTracker是在一個單獨的jvm上以子進程的形式運行 Mapper/Reducer任務(Task)的。

子任務會繼承父TaskTracker的環境。用戶能夠通過JobConf中的 mapred.child.java.opts配置參數來設定子jvm上的附加選項。比如: 通過-Djava.library.path=<> 將一個非標準路徑設為執行時的鏈接用以搜索共享庫,等等。

假設mapred.child.java.opts包括一個符號@taskid@。 它會被替換成map/reduce的taskid的值。

以下是一個包括多個參數和替換的樣例,當中包括:記錄jvm GC日誌; JVM JMX代理程序以無password的方式啟動,這樣它就能連接到jconsole上,從而能夠查看子進程的內存和線程,得到線程的dump;還把子jvm的最大堆尺寸設置為512MB, 並為子jvm的java.library.path加入了一個附加路徑。

<property>
<name>mapred.child.java.opts</name>
<value>
-Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:[email protected]@.gc
-Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
</value>
</property>

用戶或管理員也能夠使用mapred.child.ulimit設定執行的子任務的最大虛擬內存。mapred.child.ulimit的值以(KB)為單位。而且必須大於或等於-Xmx參數傳給JavaVM的值。否則VM會無法啟動。

註意:mapred.child.java.opts僅僅用於設置task tracker啟動的子任務。為守護進程設置內存選項請查看 cluster_setup.html

${mapred.local.dir}/taskTracker/是task tracker的本地文件夾, 用於創建本地緩存和job。它能夠指定多個文件夾(跨越多個磁盤),文件會半隨機的保存到本地路徑下的某個文件夾。當job啟動時,task tracker依據配置文檔創建本地job文件夾,文件夾結構例如以下面所看到的:

  • ${mapred.local.dir}/taskTracker/archive/ :分布式緩存。這個文件夾保存本地的分布式緩存。因此本地分布式緩存是在全部task和job間共享的。

  • ${mapred.local.dir}/taskTracker/jobcache/$jobid/ : 本地job文件夾。
    • ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/: job指定的共享文件夾。各個任務能夠使用這個空間做為暫存空間。用於它們之間共享文件。

      這個文件夾通過job.local.dir 參數暴露給用戶。這個路徑能夠通過API JobConf.getJobLocalDir()來訪問。它也能夠被做為系統屬性獲得。因此。用戶(比方執行streaming)能夠調用System.getProperty("job.local.dir")獲得該文件夾。

    • ${mapred.local.dir}/taskTracker/jobcache/$jobid/jars/: 存放jar包的路徑,用於存放作業的jar文件和展開的jar。job.jar是應用程序的jar文件。它會被自己主動分發到各臺機器,在task啟動前會被自己主動展開。使用api JobConf.getJar() 函數能夠得到job.jar的位置。使用JobConf.getJar().getParent()能夠訪問存放展開的jar包的文件夾。
    • ${mapred.local.dir}/taskTracker/jobcache/$jobid/job.xml: 一個job.xml文件。本地的通用的作業配置文件。

    • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid: 每一個任務有一個文件夾task-id,它裏面有例如以下的文件夾結構:
      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml: 一個job.xml文件,本地化的任務作業配置文件。

        任務本地化是指為該task設定特定的屬性值。

        這些值會在以下詳細說明。

      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/output一個存放中間過程的輸出文件的文件夾。它保存了由framwork產生的暫時map reduce數據,比方map的輸出文件等。
      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work: task的當前工作文件夾。
      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work/tmp: task的暫時文件夾。(用戶能夠設定屬性mapred.child.tmp來為map和reduce task設定暫時文件夾。

        缺省值是./tmp

        假設這個值不是絕對路徑。 它會把task的工作路徑加到該路徑前面作為task的暫時文件路徑。假設這個值是絕對路徑則直接使用這個值。 假設指定的文件夾不存在,會自己主動創建該文件夾。

        之後,依照選項 -Djava.io.tmpdir=‘暫時文件的絕對路徑‘運行java子任務。

        pipes和streaming的暫時文件路徑是通過環境變量TMPDIR=‘the absolute path of the tmp dir‘設定的)。

        假設mapred.child.tmp./tmp值,這個文件夾會被創建。

以下的屬性是為每一個task運行時使用的本地參數,它們保存在本地化的任務作業配置文件中:

名稱 類型 描寫敘述
mapred.job.id String job id
mapred.jar String job文件夾下job.jar的位置
job.local.dir String job指定的共享存儲空間
mapred.tip.id String task id
mapred.task.id String task嘗試id
mapred.task.is.map boolean 是否是map task
mapred.task.partition int task在job中的id
map.input.file String map讀取的文件名稱
map.input.start long map輸入的數據塊的起始位置偏移
map.input.length long map輸入的數據塊的字節數
mapred.work.output.dir String task暫時輸出文件夾

task的標準輸出和錯誤輸出流會被讀到TaskTracker中,而且記錄到 ${HADOOP_LOG_DIR}/userlogs

DistributedCache可用於map或reduce task中分發jar包和本地庫。子jvm總是把 當前工作文件夾 加到 java.library.path LD_LIBRARY_PATH

因此,能夠通過 System.loadLibrary或 System.load裝載緩存的庫。有關使用分布式緩存載入共享庫的細節請參考 native_libraries.html

作業的提交與監控

JobClient是用戶提交的作業與JobTracker交互的主要接口。

JobClient 提供提交作業,追蹤進程,訪問子任務的日誌記錄。獲得Map/Reduce集群狀態信息等功能。

作業提交過程包含:

  1. 檢查作業輸入輸出樣式細節
  2. 為作業計算InputSplit值。

  3. 假設須要的話,為作業的DistributedCache建立必須的統計信息。
  4. 拷貝作業的jar包和配置文件到FileSystem上的Map/Reduce系統文件夾下。
  5. 提交作業到JobTracker而且監控它的狀態。

作業的歷史文件記錄到指定文件夾的"_logs/history/"子文件夾下。這個指定文件夾由hadoop.job.history.user.location設定。默認是作業輸出的文件夾。因此默認情況下,文件會存放在mapred.output.dir/_logs/history文件夾下。

用戶能夠設置hadoop.job.history.user.locationnone來停止日誌記錄。

用戶使用以下的命令能夠看到在指定文件夾下的歷史日誌記錄的摘要。
$ bin/hadoop job -history output-dir
這個命令會打印出作業的細節,以及失敗的和被殺死的任務細節。


要查看有關作業的很多其它細節比如成功的任務、每一個任務嘗試的次數(task attempt)等,能夠使用以下的命令
$ bin/hadoop job -history all output-dir

用戶能夠使用 OutputLogFilter從輸出文件夾列表中篩選日誌文件。

普通情況,用戶利用JobConf創建應用程序並配置作業屬性, 然後用 JobClient 提交作業並監視它的進程。

作業的控制

有時候。用一個單獨的Map/Reduce作業並不能完畢一個復雜的任務,用戶或許要鏈接多個Map/Reduce作業才行。這是easy實現的。由於作業通常輸出到分布式文件系統上的。所以能夠把這個作業的輸出作為下一個作業的輸入實現串聯。

然而,這也意味著。確保每一作業完畢(成功或失敗)的責任就直接落在了客戶身上。在這樣的情況下,能夠用的控制作業的選項有:

  • runJob(JobConf):提交作業,僅當作業完畢時返回。
  • submitJob(JobConf):僅僅提交作業,之後須要你輪詢它返回的 RunningJob句柄的狀態。並依據情況調度。

  • JobConf.setJobEndNotificationURI(String):設置一個作業完畢通知,可避免輪詢。

作業的輸入

InputFormat 為Map/Reduce作業描寫敘述輸入的細節規範。

Map/Reduce框架依據作業的InputFormat來:

  1. 檢查作業輸入的有效性。
  2. 把輸入文件切分成多個邏輯InputSplit實例。 並把每一實例分別分發給一個 Mapper
  3. 提供RecordReader的實現,這個RecordReader從邏輯InputSplit中獲得輸入記錄, 這些記錄將由Mapper處理。

基於文件的InputFormat實現(一般是 FileInputFormat的子類) 默認行為是依照輸入文件的字節大小,把輸入數據切分成邏輯分塊(logicalInputSplit )。 當中輸入文件所在的FileSystem的數據塊尺寸是分塊大小的上限。下限能夠設置mapred.min.split.size的值。

考慮到邊界情況。對於非常多應用程序來說,非常明顯依照文件大小進行邏輯切割是不能滿足需求的。

在這樣的情況下,應用程序須要實現一個RecordReader來處理記錄的邊界並為每一個任務提供一個邏輯分塊的面向記錄的視圖。

TextInputFormat 是默認的InputFormat

假設一個作業的InputformatTextInputFormat, 而且框架檢測到輸入文件的後綴是.gz.lzo,就會使用相應的CompressionCodec自己主動解壓縮這些文件。 可是須要註意。上述帶後綴的壓縮文件不會被切分,而且整個壓縮文件會分給一個mapper來處理。

InputSplit

InputSplit 是一個單獨的Mapper要處理的數據塊。

一般的InputSplit 是字節樣式輸入,然後由RecordReader處理並轉化成記錄樣式。

FileSplit 是默認的InputSplit。 它把 map.input.file 設定為輸入文件的路徑,輸入文件是邏輯分塊文件。

RecordReader

RecordReader 從InputSlit讀入<key, value>對。

一般的。RecordReader 把由InputSplit提供的字節樣式的輸入文件,轉化成由Mapper處理的記錄樣式的文件。

因此RecordReader負責處理記錄的邊界情況和把數據表示成keys/values對形式。

作業的輸出

OutputFormat 描寫敘述Map/Reduce作業的輸出樣式。

Map/Reduce框架依據作業的OutputFormat來:

  1. 檢驗作業的輸出,比如檢查輸出路徑是否已經存在。
  2. 提供一個RecordWriter的實現,用來輸出作業結果。

    輸出文件保存在FileSystem上。

TextOutputFormat是默認的 OutputFormat

任務的Side-Effect File

在一些應用程序中,子任務須要產生一些side-file。這些文件與作業實際輸出結果的文件不同。

在這樣的情況下,同一個Mapper或者Reducer的兩個實例(比方預防性任務)同一時候打開或者寫 FileSystem上的同一文件就會產生沖突。因此應用程序在寫文件的時候須要為每次任務嘗試(不不過每次任務,每一個任務能夠嘗試運行非常多次)選取一個獨一無二的文件名稱(使用attemptid,比如task_200709221812_0001_m_000000_0)。

為了避免沖突,Map/Reduce框架為每次嘗試運行任務都建立和維護一個特殊的 ${mapred.output.dir}/_temporary/_${taskid}子文件夾。這個文件夾位於本次嘗試運行任務輸出結果所在的FileSystem上。能夠通過 ${mapred.work.output.dir}來訪問這個子文件夾。

對於成功完畢的任務嘗試,僅僅有${mapred.output.dir}/_temporary/_${taskid}下的文件會移動${mapred.output.dir}。當然。框架會丟棄那些失敗的任務嘗試的子文件夾。

這樣的處理過程對於應用程序來說是全然透明的。

在任務運行期間,應用程序在寫文件時能夠利用這個特性,比方 通過FileOutputFormat.getWorkOutputPath()獲得${mapred.work.output.dir}文件夾, 並在其下創建隨意任務運行時所需的side-file,框架在任務嘗試成功時會立即移動這些文件。因此不須要在程序內為每次任務嘗試選取一個獨一無二的名字。

註意:在每次任務嘗試運行期間,${mapred.work.output.dir} 的值實際上是 ${mapred.output.dir}/_temporary/_{$taskid},這個值是Map/Reduce框架創建的。 所以使用這個特性的方法是,在FileOutputFormat.getWorkOutputPath() 路徑下創建side-file就可以。

對於僅僅使用map不使用reduce的作業,這個結論也成立。這樣的情況下。map的輸出結果直接生成到HDFS上。

RecordWriter

RecordWriter 生成<key, value>對到輸出文件。

RecordWriter的實現把作業的輸出結果寫到 FileSystem

其它實用的特性

Counters

Counters 是多個由Map/Reduce框架或者應用程序定義的全局計數器。

每個Counter能夠是不論什麽一種 Enum類型。

同一特定Enum類型的Counter能夠匯集到一個組,其類型為Counters.Group

應用程序能夠定義隨意(Enum類型)的Counters而且能夠通過 map 或者 reduce方法中的 Reporter.incrCounter(Enum, long)或者 Reporter.incrCounter(String, String, long)更新。之後框架會匯總這些全局counters。

DistributedCache

DistributedCache 可將詳細應用相關的、大尺寸的、僅僅讀的文件有效地分布放置。

DistributedCache 是Map/Reduce框架提供的功能,可以緩存應用程序所需的文件 (包含文本,檔案文件,jar文件等)。

應用程序在JobConf中通過url(hdfs://)指定須要被緩存的文件。

DistributedCache假定由hdfs://格式url指定的文件已經在 FileSystem上了。

Map-Redcue框架在作業全部任務執行之前會把必要的文件復制到slave節點上。 它執行高效是由於每一個作業的文件僅僅拷貝一次而且為那些沒有文檔的slave節點緩存文檔。

DistributedCache 依據緩存文檔改動的時間戳進行追蹤。

在作業運行期間。當前應用程序或者外部程序不能改動緩存文件。

distributedCache能夠分發簡單的僅僅讀數據或文本文件,也能夠分發復雜類型的文件比如歸檔文件和jar文件。

歸檔文件(zip,tar,tgz和tar.gz文件)在slave節點上會被解檔(un-archived)。 這些文件能夠設置運行權限

用戶能夠通過設置mapred.cache.{files|archives}來分發文件。

假設要分發多個文件,能夠使用逗號分隔文件所在路徑。

也能夠利用API來設置該屬性: DistributedCache.addCacheFile(URI,conf)/ DistributedCache.addCacheArchive(URI,conf) and DistributedCache.setCacheFiles(URIs,conf)/ DistributedCache.setCacheArchives(URIs,conf)當中URI的形式是 hdfs://host:port/absolute-path#link-name在Streaming程序中,能夠通過命令行選項 -cacheFile/-cacheArchive分發文件。

用戶能夠通過DistributedCache.createSymlink(Configuration)方法讓DistributedCache當前工作文件夾下創建到緩存文件的符號鏈接。 或者通過設置配置文件屬性mapred.create.symlinkyes。 分布式緩存會截取URI的片段作為鏈接的名字。

比如。URI是 hdfs://namenode:port/lib.so.1#lib.so, 則在task當前工作文件夾會有名為lib.so的鏈接, 它會鏈接分布式緩存中的lib.so.1

DistributedCache可在map/reduce任務中作為 一種基礎軟件分發機制使用。

它能夠被用於分發jar包和本地庫(native libraries)。 DistributedCache.addArchiveToClassPath(Path, Configuration)和 DistributedCache.addFileToClassPath(Path, Configuration) API可以被用於 緩存文件和jar包,並把它們增加子jvm的classpath。也可以通過設置配置文檔裏的屬性 mapred.job.classpath.{files|archives}達到同樣的效果。緩存文件可用於分發和裝載本地庫。

Tool

Tool接口支持處理經常使用的Hadoop命令行選項。

Tool 是Map/Reduce工具或應用的標準。應用程序應僅僅處理其定制參數。 要把標準命令行選項通過 ToolRunner.run(Tool, String[])托付給 GenericOptionsParser處理。

Hadoop命令行的經常使用選項有:
-conf <configuration file>
-D <property=value>
-fs <local|namenode:port>
-jt <local|jobtracker:port>

IsolationRunner

IsolationRunner 是幫助調試Map/Reduce程序的工具。

使用IsolationRunner的方法是,首先設置 keep.failed.task.files屬性為true(同一時候參考keep.task.files.pattern)。

然後,登錄到任務執行失敗的節點上,進入 TaskTracker的本地路徑執行 IsolationRunner
$ cd <local path>/taskTracker/${taskid}/work
$ bin/hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml

IsolationRunner會把失敗的任務放在單獨的一個可以調試的jvm上執行,而且採用和之前全然一樣的輸入數據。

Profiling

Profiling是一個工具,它使用內置的java profiler工具進行分析獲得(2-3個)map或reduce例子執行分析報告。

用戶能夠通過設置屬性mapred.task.profile指定系統是否採集profiler信息。

利用api JobConf.setProfileEnabled(boolean)能夠改動屬性值。假設設為true, 則開啟profiling功能。profiler信息保存在用戶日誌文件夾下。

缺省情況。profiling功能是關閉的。

假設用戶設定使用profiling功能,能夠使用配置文檔裏的屬性 mapred.task.profile.{maps|reduces}設置要profile map/reduce task的範圍。設置該屬性值的api是 JobConf.setProfileTaskRange(boolean,String)。

範圍的缺省值是0-2

用戶能夠通過設定配置文檔裏的屬性mapred.task.profile.params來指定profiler配置參數。改動屬性要使用api JobConf.setProfileParams(String)。當執行task時,假設字符串包括%s。 它會被替換成profileing的輸出文件名稱。這些參數會在命令行裏傳遞到子JVM中。缺省的profiling 參數是 -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s

調試

Map/Reduce框架能夠執行用戶提供的用於調試的腳本程序。 當map/reduce任務失敗時。用戶能夠通過執行腳本在任務日誌(比如任務的標準輸出、標準錯誤、系統日誌以及作業配置文件)上做興許處理工作。用戶提供的調試腳本程序的標準輸出和標準錯誤會輸出為診斷文件。假設須要的話這些輸出結果也能夠打印在用戶界面上。

在接下來的章節。我們討論怎樣與作業一起提交調試腳本。為了提交調試腳本。 首先要把這個腳本分發出去,並且還要在配置文件中設置。

怎樣分發腳本文件:

用戶要用 DistributedCache機制來分發鏈接腳本文件

怎樣提交腳本:

一個高速提交調試腳本的方法是分別為須要調試的map任務和reduce任務設置 "mapred.map.task.debug.script" 和 "mapred.reduce.task.debug.script" 屬性的值。這些屬性也能夠通過 JobConf.setMapDebugScript(String) 和 JobConf.setReduceDebugScript(String) API來設置。

對於streaming, 能夠分別為須要調試的map任務和reduce任務使用命令行選項-mapdebug 和 -reducedegug來提交調試腳本。

腳本的參數是任務的標準輸出、標準錯誤、系統日誌以及作業配置文件。

在執行map/reduce失敗的節點上執行調試命令是:
$script $stdout $stderr $syslog $jobconf

Pipes 程序依據第五個參數獲得c++程序名。 因此調試pipes程序的命令是
$script $stdout $stderr $syslog $jobconf $program

默認行為

對於pipes,默認的腳本會用gdb處理core dump, 打印 stack trace而且給出正在執行線程的信息。

JobControl

JobControl是一個工具,它封裝了一組Map/Reduce作業以及他們之間的依賴關系。

數據壓縮

Hadoop Map/Reduce框架為應用程序的寫入文件操作提供壓縮工具,這些工具能夠為map輸出的中間數據和作業終於輸出數據(比如reduce的輸出)提供支持。它還附帶了一些 CompressionCodec的實現。比方實現了 zlib和lzo壓縮算法。 Hadoop相同支持gzip文件格式。

考慮到性能問題(zlib)以及Java類庫的缺失(lzo)等因素,Hadoop也為上述壓縮解壓算法提供本地庫的實現。很多其它的細節請參考 這裏。

中間輸出

應用程序能夠通過 JobConf.setCompressMapOutput(boolean)api控制map輸出的中間結果。而且能夠通過 JobConf.setMapOutputCompressorClass(Class)api指定 CompressionCodec

作業輸出

應用程序能夠通過 FileOutputFormat.setCompressOutput(JobConf, boolean) api控制輸出是否須要壓縮而且能夠使用 FileOutputFormat.setOutputCompressorClass(JobConf, Class)api指定CompressionCodec

假設作業輸出要保存成 SequenceFileOutputFormat格式。須要使用 SequenceFileOutputFormat.setOutputCompressionType(JobConf, SequenceFile.CompressionType)api,來設定 SequenceFile.CompressionType (i.e. RECORD / BLOCK - 默認是RECORD)。

樣例:WordCount v2.0

這裏是一個更全面的WordCount樣例。它使用了我們已經討論過的非常多Map/Reduce框架提供的功能。

執行這個樣例須要HDFS的某些功能。特別是 DistributedCache相關功能。因此這個樣例僅僅能執行在 偽分布式 或者 全然分布式模式的 Hadoop上。

package org.myorg;

import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class WordCount extends Configured implements Tool {
	public static class Map extends MapReduceBase implements
			Mapper<LongWritable, Text, Text, IntWritable> {
		static enum Counters {
			INPUT_WORDS
		}

		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();
		private boolean caseSensitive = true;
		private Set<String> patternsToSkip = new HashSet<String>();
		private long numRecords = 0;
		private String inputFile;

		public void configure(JobConf job) {
			caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
			inputFile = job.get("map.input.file");
			if (job.getBoolean("wordcount.skip.patterns", false)) {
				Path[] patternsFiles = new Path[0];
				try {
					patternsFiles = DistributedCache.getLocalCacheFiles(job);
				} catch (IOException ioe) {
					System.err
							.println("Caught exception while getting cached files: "
									+ StringUtils.stringifyException(ioe));
				}
				for (Path patternsFile : patternsFiles) {
					parseSkipFile(patternsFile);
				}
			}
		}

		private void parseSkipFile(Path patternsFile) {
			try {
				BufferedReader fis = new BufferedReader(new FileReader(
						patternsFile.toString()));
				String pattern = null;
				while ((pattern = fis.readLine()) != null) {
					patternsToSkip.add(pattern);
				}
			} catch (IOException ioe) {
				System.err
						.println("Caught exception while parsing the cached file ‘"
								+ patternsFile
								+ "‘ : "
								+ StringUtils.stringifyException(ioe));
			}
		}

		public void map(LongWritable key, Text value,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			String line = (caseSensitive) ? value.toString() : value.toString()
					.toLowerCase();
			for (String pattern : patternsToSkip) {
				line = line.replaceAll(pattern, "");
			}
			StringTokenizer tokenizer = new StringTokenizer(line);
			while (tokenizer.hasMoreTokens()) {
				word.set(tokenizer.nextToken());
				output.collect(word, one);
				reporter.incrCounter(Counters.INPUT_WORDS, 1);
			}
			if ((++numRecords % 100) == 0) {
				reporter.setStatus("Finished processing " + numRecords
						+ " records " + "from the input file: " + inputFile);
			}
		}
	}

	public static class Reduce extends MapReduceBase implements
			Reducer<Text, IntWritable, Text, IntWritable> {
		public void reduce(Text key, Iterator<IntWritable> values,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			int sum = 0;
			while (values.hasNext()) {
				sum += values.next().get();
			}
			output.collect(key, new IntWritable(sum));
		}
	}

	public int run(String[] args) throws Exception {
		JobConf conf = new JobConf(getConf(), WordCount.class);
		conf.setJobName("wordcount");
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(IntWritable.class);
		conf.setMapperClass(Map.class);
		conf.setCombinerClass(Reduce.class);
		conf.setReducerClass(Reduce.class);
		conf.setInputFormat(TextInputFormat.class);
		conf.setOutputFormat(TextOutputFormat.class);
		List<String> other_args = new ArrayList<String>();
		for (int i = 0; i < args.length; ++i) {
			if ("-skip".equals(args[i])) {
				DistributedCache
						.addCacheFile(new Path(args[++i]).toUri(), conf);
				conf.setBoolean("wordcount.skip.patterns", true);
			} else {
				other_args.add(args[i]);
			}
		}
		FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));
		FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
		JobClient.runJob(conf);
		return 0;
	}

	public static void main(String[] args) throws Exception {
		int res = ToolRunner.run(new Configuration(), new WordCount(), args);
		System.exit(res);
	}
}


執行例子

輸入例子:

$ bin/hadoop dfs -ls /usr/joe/wordcount/input/
/usr/joe/wordcount/input/file01
/usr/joe/wordcount/input/file02

$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
Hello World, Bye World!

$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
Hello Hadoop, Goodbye to hadoop.

執行程序:

$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output

輸出:

$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop, 1
Hello 2
World! 1
World, 1
hadoop. 1
to 1

註意此時的輸入與第一個版本號的不同,輸出的結果也有不同。

如今通過DistributedCache插入一個模式文件,文件裏保存了要被忽略的單詞模式。

$ hadoop dfs -cat /user/joe/wordcount/patterns.txt
\.
\,
\!
to

再執行一次。這次使用很多其它的選項:

$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=true /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

應該得到這種輸出:

$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 1
Hello 2
World 2
hadoop 1

再執行一次,這一次關閉大寫和小寫敏感性(case-sensitivity):

$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=false /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

輸出:

$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
bye 1
goodbye 1
hadoop 2
hello 2
world 2

程序要點

通過使用一些Map/Reduce框架提供的功能,WordCount的第二個版本號在原始版本號基礎上有了例如以下的改進:

  • 展示了應用程序怎樣在Mapper (和Reducer)中通過configure方法 改動配置參數(28-43行)。
  • 展示了作業怎樣使用DistributedCache 來分發僅僅讀數據。 這裏同意用戶指定單詞的模式。在計數時忽略那些符合模式的單詞(104行)。
  • 展示Tool接口和GenericOptionsParser處理Hadoop命令行選項的功能 (87-116, 119行)。
  • 展示了應用程序怎樣使用Counters(68行),怎樣通過傳遞給map(和reduce) 方法的Reporter實例來設置應用程序的狀態信息(72行)。

Java和JNI是Sun Microsystems, Inc.在美國和其他國家的註冊商標。

一步一步跟我學習hadoop(5)----hadoop Map/Reduce教程(2)