MapReduce程式除錯工具--MRUnit簡介與使用
MRUnit簡介:
當hadoop的MapReduce作業提交到叢集環境中執行,對於出問題的定位比較是比較麻煩的,有時需要一遍遍的修改程式碼和打印出日誌來排查一個很小的問題,如果資料量大的話除錯起來相當耗時間。因此有必要使用良好的單元測試手段來儘早的消除明顯的bug。然而做MapReduce的單元測試會有一個障礙,比如Map和Reduce一些引數物件是在執行時由hadoop框架傳入的,例如OutputCollector、Reporter、InputSplit等。這就需要有其他手段去完成。MRUnit是專門為Hadoop MapReduce寫的單元測試框架,API簡單明瞭,簡單實用。但也有一些薄弱的地方,比如不支援MultibleOutputs(很多情況下我們會用MultipleOutputs作為多檔案輸出,後面將介紹如何加強MRUnit使之支援MultipleOutputs)。
MRUnit安裝:
對於在已有Hadoop工程專案中使用MUnit需要遵循如下步驟:
(1)首先下載MRUnit,網址為http://mrunit.apache.org/,下載最新的MRUnit。本人使用的hadoop版本為hadoop 1.0.4 下載的檔案為 apache-mrunit-1.0.0-hadoop1-bin.tar.gz
(2)解壓縮下載的檔案,得到hamcrest-core-1.1.jar junit-4.10.jar mockito-all-1.8.5.jar mrunit-1.0.0-hadoop1.jar
(3)將這四個檔案加入到專案的Path中。在eclipse中,選中專案-->右鍵build path-->configure build path-->add external jars。
MRUnit例項:
我們知道,在進行一般性的JUnit測試時,根據不同的測試物件要採用不同的測試模組來進行,MRUnit針對不同測試物件分別使用一下幾種Driver: MapDriver ,針對單獨的Map測試 ReduceDriver,針對單獨的Reduce測試。 MapReduceDriver ,將Map和Reduce連貫起來測試。 PipelineMapReduceDriver,將多個Map-Reduce pair貫穿測試。 下面我們首先來看使用MRUnit對自定義的Mapper進行測試的方法。下面使用經典入門程式worldcount舉例,體驗下MRUnit的效果。
Map程式:
package com.hadoop;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TxtMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
String []strs=value.toString().split(" ");
for(String str:strs){
context.write(new Text(str), new IntWritable(1));
}
};
}
Reduce程式:
package com.hadoop;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TxtReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws java.io.IOException ,InterruptedException {
int sum=0;
Iterator<IntWritable>it=values.iterator();
while(it.hasNext()){
IntWritable value=it.next();
sum+=value.get();
}
context.write(key, new IntWritable(sum));
};
}
測試程式:
package com.hadoop;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.Before;
import org.junit.Test;
public class MapTest{
private Mapper Map;
private MapDriver driver;
@Before
public void init(){
Map=new TxtMapper();
driver=new MapDriver(Map);
}
@SuppressWarnings("unchecked")
@Test
public void testMap()throws Exception{
String text="hello world goodbye world hello hadoop goodbye hadoop";
driver.withInput(new LongWritable(), new Text(text))
.withOutput(new Text("hello"),new IntWritable(1))
.withOutput(new Text("world"),new IntWritable(1))
.withOutput(new Text("goodbye"),new IntWritable(1))
.withOutput(new Text("world"),new IntWritable(1))
.withOutput(new Text("hello"),new IntWritable(1))
.withOutput(new Text("hadoop"),new IntWritable(1))
.withOutput(new Text("goodbye"),new IntWritable(1))
.withOutput(new Text("hadoop"),new IntWritable(2)).runTest();
}
}
選中方法 run as junit test,結果進度條為綠色,證明junit測試正確。
如果將.最後一行寫為 withOutput(new Text("hadoop"),new IntWritable(2)).runTest(),則出現下面的錯誤結果:
13/09/26 15:58:16 ERROR mrunit.TestDriver: Received unexpected output (hadoop, 1) at position 7.
13/09/26 15:58:16 ERROR mrunit.TestDriver: Missing expected output (hadoop, 2) at position 7.
可見MRUnit已經生效。
也可以參考MRUnit官網上的示例:
Following is an example to use MRUnit to unit test a Map Reduce program that does SMS CDR (call details record) analysis.
The records look like
- CDRID;CDRType;Phone1;Phone2;SMS Status Code
655209;1;796764372490213;804422938115889;6
353415;0;356857119806206;287572231184798;4
835699;1;252280313968413;889717902341635;0
The MapReduce program analyzes these records, finds all records with CDRType as 1, and note its corresponding SMS Status Code. For example, the Mapper outputs are
6, 1
0, 1
The Reducer takes these as inputs and output number of times a particular status code has been obtained in the CDR records.
The corresponding Mapper and Reducer are
public class SMSCDRMapper
extends Mapper<LongWritable,
Text, Text, IntWritable> {
private Text
status = new Text();
private final static
IntWritable addOne = new IntWritable( 1 );
/**
*
Returns the SMS status code and its count
*/
protected void map(LongWritable
key, Text value, Context context)
throws java.io.IOException,
InterruptedException {
//655209;1;796764372490213;804422938115889;6
is the Sample record format
String[]
line = value.toString().split( ";" );
//
If record is of SMS CDR
if (Integer.parseInt(line[ 1 ])
== 1 )
{
status.set(line[ 4 ]);
context.write(status,
addOne);
}
}
}
|
The corresponding Reducer code is
public class SMSCDRReducer
extends
Reducer<Text,
IntWritable, Text, IntWritable> {
protected void reduce(Text
key, Iterable<IntWritable> values, Context context) throws java.io.IOException,
InterruptedException {
int sum
= 0 ;
for (IntWritable
value : values) {
sum
+= value.get();
}
context.write(key, new IntWritable(sum));
|