1. 程式人生 > >eclipse+hadoop2.7.5的map-reduce的API的配置

eclipse+hadoop2.7.5的map-reduce的API的配置

1.環境配置:
a.將map reduce所需架包全部載入到eclipse下,
b.將
鍵:HADOOP_USER_NAME
值:hadoop
配置到系統環境中,需要重新啟動。
c.將架包hadoop-eclipse-plugin-2.7.1.jar放入eclipse安裝目錄的plugin下。
下載winutils.exe將下載下的hadoop.dll和winutils.exe放到hadoop-2.7.5/bin下
下載地址
d.配置eclipse。windows–>pref..–>hadoop map/reduce–>在路徑下新增hadoop-2.7.1的路徑
e pre…–>show –>mapreduce tool—>點選小象–>配置ip和埠(9000)
這裡寫圖片描述


f hadoop.tmp.dir :/tmp/hadoop-hadoop
將hadoop下的上傳的資料夾的許可權修改為777
hadoop fs -chmod 777 /wcin

2.Java程式碼進行操做
——————————-map類的API操作——————–

package www.oeacle.wc;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper; /* * KEYIN:輸入鍵的型別,者一行的開始的偏移量:long(每一行的第一個的字,位於第幾個,就是此行的偏移量) * VALUEIN, 輸入的值的型別。在這裡當前行的內容,String * KEYOUT, 輸出的鍵值對的型別,每一個單詞,String * VALUEOUT,輸出的值的型別,表示單詞標記一個1,int * map乾的事就是,將單詞分隔出來,然後進行將每個單詞標記1 * map的資料要網路傳輸,所以需要序列化,實現介面Serializabler, * 但是這個方式的序列化太過冗餘,所以Hadoop提供了自己的序列化方式 * 序列化介面叫:Writable,並且常用資料型別幫你序列化了。 * 常用資料烈性對應的序列化格式 * byte--ByteWritable * short--ShortWirtable * int IntWirtable * long--同上 * double--同上 * String--Text * null--同上 * 就String不一樣 * */
public class WorldCountMap extends Mapper<LongWritable, Text, Text, IntWritable>{ /** * 這個函式一行呼叫一次 * LongWritable key;輸入鍵,偏移量 * Text value:輸入的值,一行的內容 * Context:是上下文的物件,用育傳輸map的處理結果,提供Write方法 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1.取到一行的內容 String line=value.toString(); //2.切分 String [] words=line.split(" "); //3.迴圈遍歷打標記併發送 for (String w : words) { context.write(new Text(w), new IntWritable(1)); } } } -----------------------reduce類的API操作------------------------------ package www.oeacle.wc; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /* * reduce的輸入其實是map的輸出 * */ public class WordCoountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ /** * ruduce處理的最終結果儲存在hdfs * 多久呼叫一次這個方法:一組呼叫一次方法 * 啥是一組:把key相同一類就是一組 * Text arg0:這一組的第一個key * Iterable<IntWritable> arg1,這一組所有key對應的value值封裝在迭代器中 * Iterable<IntWritable> arg1,上下文物件,也是傳輸作用,往hdfs進行寫 * */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { //1.定義一個求和變數 int sum=0; //迴圈遍歷迭代器進行求和 for (IntWritable value : values) { sum+=value.get(); } context.write(key,new IntWritable(sum)); } } --------------------------------主函式的類-------------------- package www.oeacle.wc; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Driver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1.獲取Configuration物件 Configuration conf=new Configuration(); //2.需要建立一個任務,啟動一個job,一個程式就是一個作業 Job job=Job.getInstance(conf); //3.設定job提交的jar包 job.setJarByClass(Driver.class);//內部是一個反射機制 //4.設定map和reduce對應的類 job.setMapperClass(WorldCountMap.class); job.setReducerClass(WordCoountReduce.class); //5.設定map輸出的鍵和值的型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //6.最終結果輸出的值 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //7.指頂輸入路徑和輸出路徑 FileInputFormat.addInputPath(job, new Path("/wcin"));//指定一個路徑 //最終輸出路徑要求不能存在,怕覆蓋原始檔案 FileOutputFormat.setOutputPath(job, new Path("/wcout")); // job.submit(); job.waitForCompletion(true); } }