使用Hadoop的MapReduce與HDFS處理資料
hadoop是一個分散式的基礎架構,利用分散式實現高效的計算與儲存,最核心的設計在於HDFS與MapReduce,HDFS提供了大量資料的儲存,mapReduce提供了大量資料計算的實現,通過Java專案實現hadoop job處理海量資料解決複雜的需求。
一、基本環境及相關軟體的配置
...
二、hadoop專案開發流程
hadoop主要的開發為job的初始化與分散式處理流程的開發。
1、任務基本配置
首相根據業務需求,需要在程式碼中配置job在每臺機器上需要的java虛擬機器使用的記憶體與執行過程需要的最大記憶體。
Configuration configuration = new
Configuration();
configuration.set( "mapreduce.map.java.opts" , "-Xmx2048m" );
configuration.set( "mapreduce.map.memory.mb" , "3072" );
configuration.set( "mapreduce.reduce.java.opts" , "-Xmx2048m" );
configuration.set( "mapreduce.reduce.memory.mb" , "3072" );
Job job = new Job(configuration,
"miuilite-dailyRetain-" +arg[ 4 ]);
|
2、執行引數配置
job.setJarByClass(MiuiliteRetainJob. class );
MultipleOutputs.addNamedOutput(job, MIUIDanfaGeneralMapReduce.MULTI_OUTPUT_NAME_STATUS, SequenceFileOutputFormat. class ,Text. class ,
Text. class );
job.setOutputFormatClass(SequenceFileOutputFormat. class );
MultipleInputs.addInputPath(job, new
Path(arg[ 0 ]), SequenceFileInputFormat. class , MiuiliteRetainMapReduce.NewLogMapper. class );
MultipleInputs .addInputPath(job, new
Path(arg[ 1 ]), SequenceFileInputFormat. class , MiuiliteRetainMapReduce.StatusLogMapper. class );
FileOutputFormat.setOutputPath(job, new
Path(arg[ 2 ]));
job.setReducerClass(MiuiliteRetainMapReduce.RetainReducer. class );
job.setMapOutputKeyClass(Text. class );
job.setMapOutputValueClass(Text. class );
job.setOutputKeyClass(Text. class );
job.setOutputValueClass(LongWritable. class );
job.setNumReduceTasks( 40 ); //配置節點數量
|
hadoop任務處理過程中,各個分散式機器讀取操作資料都是通過分散式儲存檔案系統hdfs,並且分散式計算將中間結果或者最終結果都是儲存到hdfs上的,所以在job開發過程中需要的配置有:
1)相關的地址:資料hdfs地址,中間狀態快取儲存HDFS地址,以及生成的結果hdfs儲存地址,(如需要本地進一步處理結果,還需要本地地址,需要將hdfs的結果地址拉取到本 地進行處理),本地伺服器地址:
MultipleInputs.addInputPath(job, new Path(arg[0]), SequenceFileInputFormat.class, MiuiliteRetainMapReduce.NewLogMapper.class);
注意:對於要寫入資料的地址要具有寫的許可權,詳細許可權配置請看基本配置介紹。
2)各種資料格式:
一種是檔案的讀取格式,可以使用基於行的日誌檔案,也可以使用二進位制格式,多行輸入記錄或其他的格式,Hadoop有自帶的幾種格式:
輸入格式 | 解釋 | key | value |
---|---|---|---|
TextInputFormat | 預設格式,按照行讀取 | 行的位元組偏移量 | 行的內容 |
KeyValueInputFormat | 解析每一行的資料 | 第一個Tab前的字元 | 剩下的內容 |
SequenceFileInputFormat | 具有高效能的二進位制格式 | 自定義 | 自定義 |
所以在讀取輸入檔案格式中,需要選擇自己合適的格式來初始化 MultipleInputs.addInputPath(job, new Path(arg[0]),SequenceFileInputFormat.class, MiuiliteRetainMapReduce.NewLogMapper.class);
對於自定義的SequenceFileInputFormat,它會讀取特殊的特定於Hadoop的二進位制檔案,會讓Hadoop的mapper快速讀取資料。Sequence檔案是塊壓縮的,並提供了對幾種資料型別(不僅僅是文字型別)直接的序列化與反序列化操作。
其次檔案讀取key 與value的格式,以及輸出到檔案的格式:BooleanWritable:標準布林型數值,ByteWritable:單位元組數值,DoubleWritable:雙位元組數,FloatWritable:浮點數,IntWritable:整型數,LongWritable:長整型數,Text:使用UTF8格式儲存的文字,NullWritable:當<key,value>中的key或value為空時使用,需要在初始化job的過程中初始化對應輸入輸出的格式。
3)配置資料處理類,一般分為兩個階段,
第一步叫做mapping,會對資料作為mapper函式的輸入資料,每條資料對應一個,mapper會吧每次map處理後的結果可以根據相同的key單獨傳到一個輸出資料元素裡面。例子: MultipleInputs.addInputPath(job, new Path(arg[0]), SequenceFileInputFormat.class,MiuiliteRetainMapReduce.NewLogMapper.class);。
注意:可以同時使用多個數據輸入處理的mapper,但輸出key與value格式必須保持一致。
第二步叫做reducer,會接收mapping的輸出作為輸入列表的迭代器,會將同一key的值聚合在一起,並做一定的處理而返回處理結果。例子:job.setReducerClass(MiuiliteRetainMapReduce.RetainReducer.class);
3、資料處理流程
在所有配置好了之後呼叫job.waitForCompletion(true);提交任務執行任務並等待結束。
Mapper階段:
public
static class NewLogMapper extends
Mapper<Object, BytesWritable, Text, Text> {
@Override
public void
map(Object key, BytesWritable value, Context context) {
//..........省略中間處理原始資料過程,比如解密,生成OutPutKey等
context.write( new Text(OutPutKey),
new Text(OutPutValue);
}
}
|
Redecer階段:
public
static class RetainReducer extends
Reducer<Text, Text, Text, Text> {
@Override
public void
setup(Context context) throws
IOException, InterruptedException {
super .setup(context);
//資料初始化過程,初始化相關的計數工具
}
@Override
public void
reduce(Text key, Iterable<Text> values, Context context) {
//對應同一個key進行相關的統計處理階段,並將資料計入到相關的計數工具中。
}
@Override
protected void
cleanup(Reducer.Context context) throws
IOException, InterruptedException {
stringCounter.output(context);
super .cleanup(context);
//執行完畢的後續階段,將沒臺分散式計算的機器的結果輸入到hdfs上,清理context,
}
|
reducer完成後需要統一將處理結果寫入到HDFS中,所以在統計工具中應帶有最後的輸出函式:
public
void output(Reducer.Context context, int longTailBar)
throws IOException, InterruptedException {
for (Iterator<String> iterator = stringCountMap.keySet().iterator(); iterator.hasNext();) {
String key = iterator.next();
long value = stringCountMap.get(key);
if (value < longTailBar)
continue ;
key = key.replace( '\r' , ' ' );
key = key.replace( '\n' , ' ' );
context.write( new Text(key),
new LongWritable(value));
}
}
|
4、處理結果本地
hadoop處理後的結果都是儲存在hdfs上的,可以將對應的結果作為行的任務的輸入進一步精確處理,如果需要進一步本地處理,通過呼叫本地shell命令將結果複製到本地:
private
void copyToLocal(String hdfsPath, String localPath) throws
IOException, InterruptedException {
String[] cmd = { "/bin/sh" , "-c" ,
"hadoop fs -cat " + hdfsPath +
"/part* > " + localPath};
String tmpDic = loalPath.substring( 0 ,localPath.lastIndexOf( "/" ));
if (! new File(tmpDic).exists()){
new File(tmpDic).mkdirs();
}
if (! new File(localPath).exists()){
new File(localPath).createNewFile();
}
Process pid = Runtime.getRuntime().exec(cmd);
if (pid !=
null ) {
pid.waitFor();
}
}
|
三、執行流程
執行shell命令配置
hadoop專案執行方式通過shell檔案執行指定的jar包,並指定對應的入口函式,依據專案的需求傳入不同的引數。
hadoop jar miuiapp-logs.jar com.xxx.xxx.xxxMain XXX-param-1 XXX-param-2 XXX-param-3
注意:如果通過crontab -e定時指定相關的命令執行,需要在執行的shell檔案中新增 jdk,hadoop的地址到環境變數中。
注意:在配置pom過程中需要將jar包打成大包,將所有依賴的jar包都應該打進去,所以在pom中應該加入下列配置:
< plugin >
< groupId >org.apache.maven.plugins</ groupId >
< artifactId >maven-dependency-plugin</ artifactId >
</ plugin >
< plugin >
< groupId >org.apache.maven.plugins</ groupId >
< artifactId >maven-assembly-plugin</ artifactId >
< version >2.3</ version >
< configuration >
< appendAssemblyId >false</ appendAssemblyId >
< descriptorRefs >
< descriptorRef >jar-with-dependencies</ descriptorRef >
</ descriptorRefs >
</ configuration >
< executions >
< execution >
< id >make-assembly</ id >
< phase >package</ phase >
< goals >
< goal >assembly</ goal >
</ goals >
</ execution >
</ executions >
</ plugin >
|