1. 程式人生 > >使用Hadoop的MapReduce與HDFS處理資料

使用Hadoop的MapReduce與HDFS處理資料

hadoop是一個分散式的基礎架構,利用分散式實現高效的計算與儲存,最核心的設計在於HDFS與MapReduce,HDFS提供了大量資料的儲存,mapReduce提供了大量資料計算的實現,通過Java專案實現hadoop job處理海量資料解決複雜的需求。

一、基本環境及相關軟體的配置

...

二、hadoop專案開發流程

     hadoop主要的開發為job的初始化與分散式處理流程的開發。

1、任務基本配置

    首相根據業務需求,需要在程式碼中配置job在每臺機器上需要的java虛擬機器使用的記憶體與執行過程需要的最大記憶體。

Configuration configuration =
new Configuration(); configuration.set("mapreduce.map.java.opts","-Xmx2048m"); configuration.set("mapreduce.map.memory.mb","3072"); configuration.set("mapreduce.reduce.java.opts","-Xmx2048m"); configuration.set("mapreduce.reduce.memory.mb","3072"); Job job = newJob(configuration, "miuilite-dailyRetain-"
+arg[4]);

2、執行引數配置

job.setJarByClass(MiuiliteRetainJob.class); MultipleOutputs.addNamedOutput(job, MIUIDanfaGeneralMapReduce.MULTI_OUTPUT_NAME_STATUS, SequenceFileOutputFormat.class,Text.class, Text.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); MultipleInputs.addInputPath(job,
new Path(arg[0]), SequenceFileInputFormat.class, MiuiliteRetainMapReduce.NewLogMapper.class); MultipleInputs .addInputPath(job,new Path(arg[1]), SequenceFileInputFormat.class, MiuiliteRetainMapReduce.StatusLogMapper.class); FileOutputFormat.setOutputPath(job,new Path(arg[2])); job.setReducerClass(MiuiliteRetainMapReduce.RetainReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setNumReduceTasks(40);//配置節點數量

     hadoop任務處理過程中,各個分散式機器讀取操作資料都是通過分散式儲存檔案系統hdfs,並且分散式計算將中間結果或者最終結果都是儲存到hdfs上的,所以在job開發過程中需要的配置有:

   1)相關的地址:資料hdfs地址,中間狀態快取儲存HDFS地址,以及生成的結果hdfs儲存地址,(如需要本地進一步處理結果,還需要本地地址,需要將hdfs的結果地址拉取到本 地進行處理),本地伺服器地址:

          MultipleInputs.addInputPath(job, new Path(arg[0]), SequenceFileInputFormat.class, MiuiliteRetainMapReduce.NewLogMapper.class);

        注意:對於要寫入資料的地址要具有寫的許可權,詳細許可權配置請看基本配置介紹。

   2)各種資料格式:

            一種是檔案的讀取格式,可以使用基於行的日誌檔案,也可以使用二進位制格式,多行輸入記錄或其他的格式,Hadoop有自帶的幾種格式:

輸入格式 解釋 key value
TextInputFormat 預設格式,按照行讀取 行的位元組偏移量 行的內容
KeyValueInputFormat 解析每一行的資料 第一個Tab前的字元 剩下的內容
SequenceFileInputFormat 具有高效能的二進位制格式 自定義 自定義

所以在讀取輸入檔案格式中,需要選擇自己合適的格式來初始化  MultipleInputs.addInputPath(job, new Path(arg[0]),SequenceFileInputFormat.class, MiuiliteRetainMapReduce.NewLogMapper.class);

對於自定義的SequenceFileInputFormat,它會讀取特殊的特定於Hadoop的二進位制檔案,會讓Hadoop的mapper快速讀取資料。Sequence檔案是塊壓縮的,並提供了對幾種資料型別(不僅僅是文字型別)直接的序列化與反序列化操作。

     其次檔案讀取key 與value的格式,以及輸出到檔案的格式:BooleanWritable:標準布林型數值,ByteWritable:單位元組數值,DoubleWritable:雙位元組數,FloatWritable:浮點數,IntWritable:整型數,LongWritable:長整型數,Text:使用UTF8格式儲存的文字,NullWritable:當<key,value>中的key或value為空時使用,需要在初始化job的過程中初始化對應輸入輸出的格式。

  3)配置資料處理類,一般分為兩個階段,

        第一步叫做mapping,會對資料作為mapper函式的輸入資料,每條資料對應一個,mapper會吧每次map處理後的結果可以根據相同的key單獨傳到一個輸出資料元素裡面。例子: MultipleInputs.addInputPath(job, new Path(arg[0]), SequenceFileInputFormat.class,MiuiliteRetainMapReduce.NewLogMapper.class);。

        注意:可以同時使用多個數據輸入處理的mapper,但輸出key與value格式必須保持一致。

        第二步叫做reducer,會接收mapping的輸出作為輸入列表的迭代器,會將同一key的值聚合在一起,並做一定的處理而返回處理結果。例子:job.setReducerClass(MiuiliteRetainMapReduce.RetainReducer.class);

3、資料處理流程

    在所有配置好了之後呼叫job.waitForCompletion(true);提交任務執行任務並等待結束。

Mapper階段:

public static class NewLogMapperextends Mapper<Object, BytesWritable, Text, Text> { @Override publicvoid map(Object key, BytesWritable value, Context context) { //..........省略中間處理原始資料過程,比如解密,生成OutPutKey等 context.write(newText(OutPutKey), newText(OutPutValue); } }

Redecer階段:

public static  class RetainReducerextends Reducer<Text, Text, Text, Text> { @Override publicvoid setup(Context context)throws IOException, InterruptedException { super.setup(context); //資料初始化過程,初始化相關的計數工具 } @Override publicvoid reduce(Text key, Iterable<Text> values, Context context) { //對應同一個key進行相關的統計處理階段,並將資料計入到相關的計數工具中。 } @Override protectedvoid cleanup(Reducer.Context context)throws IOException, InterruptedException { stringCounter.output(context); super.cleanup(context); //執行完畢的後續階段,將沒臺分散式計算的機器的結果輸入到hdfs上,清理context, }

reducer完成後需要統一將處理結果寫入到HDFS中,所以在統計工具中應帶有最後的輸出函式:

public void output(Reducer.Context context, intlongTailBar) throwsIOException, InterruptedException { for(Iterator<String> iterator = stringCountMap.keySet().iterator(); iterator.hasNext();) { String key = iterator.next(); longvalue = stringCountMap.get(key); if(value < longTailBar) continue; key = key.replace('\r',' '); key = key.replace('\n',' '); context.write(newText(key), new LongWritable(value)); } }

4、處理結果本地

    hadoop處理後的結果都是儲存在hdfs上的,可以將對應的結果作為行的任務的輸入進一步精確處理,如果需要進一步本地處理,通過呼叫本地shell命令將結果複製到本地:

private void copyToLocal(String hdfsPath, String localPath)throws IOException, InterruptedException { String[] cmd = {"/bin/sh","-c", "hadoop fs -cat "+ hdfsPath + "/part* > "+ localPath}; String tmpDic = loalPath.substring(0,localPath.lastIndexOf("/")); if(!newFile(tmpDic).exists()){ newFile(tmpDic).mkdirs(); } if(!newFile(localPath).exists()){ newFile(localPath).createNewFile(); } Process pid = Runtime.getRuntime().exec(cmd); if(pid != null) { pid.waitFor(); } }

三、執行流程

執行shell命令配置

      hadoop專案執行方式通過shell檔案執行指定的jar包,並指定對應的入口函式,依據專案的需求傳入不同的引數。

      hadoop jar  miuiapp-logs.jar com.xxx.xxx.xxxMain XXX-param-1 XXX-param-2 XXX-param-3

注意:如果通過crontab -e定時指定相關的命令執行,需要在執行的shell檔案中新增 jdk,hadoop的地址到環境變數中。

注意:在配置pom過程中需要將jar包打成大包,將所有依賴的jar包都應該打進去,所以在pom中應該加入下列配置:

<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.3</version> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin>