eclipse+hadoop2.7.5的map-reduce的API的配置
阿新 • • 發佈:2019-01-09
1.環境配置:
a.將map reduce所需架包全部載入到eclipse下,
b.將
鍵:HADOOP_USER_NAME
值:hadoop
配置到系統環境中,需要重新啟動。
c.將架包hadoop-eclipse-plugin-2.7.1.jar放入eclipse安裝目錄的plugin下。
下載winutils.exe將下載下的hadoop.dll和winutils.exe放到hadoop-2.7.5/bin下
下載地址
d.配置eclipse。windows–>pref..–>hadoop map/reduce–>在路徑下新增hadoop-2.7.1的路徑
e pre…–>show –>mapreduce tool—>點選小象–>配置ip和埠(9000)
f hadoop.tmp.dir :/tmp/hadoop-hadoop
將hadoop下的上傳的資料夾的許可權修改為777
hadoop fs -chmod 777 /wcin
2.Java程式碼進行操做
——————————-map類的API操作——————–
package www.oeacle.wc;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
* KEYIN:輸入鍵的型別,者一行的開始的偏移量:long(每一行的第一個的字,位於第幾個,就是此行的偏移量)
* VALUEIN, 輸入的值的型別。在這裡當前行的內容,String
* KEYOUT, 輸出的鍵值對的型別,每一個單詞,String
* VALUEOUT,輸出的值的型別,表示單詞標記一個1,int
* map乾的事就是,將單詞分隔出來,然後進行將每個單詞標記1
* map的資料要網路傳輸,所以需要序列化,實現介面Serializabler,
* 但是這個方式的序列化太過冗餘,所以Hadoop提供了自己的序列化方式
* 序列化介面叫:Writable,並且常用資料型別幫你序列化了。
* 常用資料烈性對應的序列化格式
* byte--ByteWritable
* short--ShortWirtable
* int IntWirtable
* long--同上
* double--同上
* String--Text
* null--同上
* 就String不一樣
*
*/
public class WorldCountMap extends Mapper<LongWritable, Text, Text, IntWritable>{
/**
* 這個函式一行呼叫一次
* LongWritable key;輸入鍵,偏移量
* Text value:輸入的值,一行的內容
* Context:是上下文的物件,用育傳輸map的處理結果,提供Write方法
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//1.取到一行的內容
String line=value.toString();
//2.切分
String [] words=line.split(" ");
//3.迴圈遍歷打標記併發送
for (String w : words) {
context.write(new Text(w), new IntWritable(1));
}
}
}
-----------------------reduce類的API操作------------------------------
package www.oeacle.wc;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* reduce的輸入其實是map的輸出
*
*/
public class WordCoountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
/**
* ruduce處理的最終結果儲存在hdfs
* 多久呼叫一次這個方法:一組呼叫一次方法
* 啥是一組:把key相同一類就是一組
* Text arg0:這一組的第一個key
* Iterable<IntWritable> arg1,這一組所有key對應的value值封裝在迭代器中
* Iterable<IntWritable> arg1,上下文物件,也是傳輸作用,往hdfs進行寫
*
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
//1.定義一個求和變數
int sum=0;
//迴圈遍歷迭代器進行求和
for (IntWritable value : values) {
sum+=value.get();
}
context.write(key,new IntWritable(sum));
}
}
--------------------------------主函式的類--------------------
package www.oeacle.wc;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.獲取Configuration物件
Configuration conf=new Configuration();
//2.需要建立一個任務,啟動一個job,一個程式就是一個作業
Job job=Job.getInstance(conf);
//3.設定job提交的jar包
job.setJarByClass(Driver.class);//內部是一個反射機制
//4.設定map和reduce對應的類
job.setMapperClass(WorldCountMap.class);
job.setReducerClass(WordCoountReduce.class);
//5.設定map輸出的鍵和值的型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//6.最終結果輸出的值
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//7.指頂輸入路徑和輸出路徑
FileInputFormat.addInputPath(job, new Path("/wcin"));//指定一個路徑
//最終輸出路徑要求不能存在,怕覆蓋原始檔案
FileOutputFormat.setOutputPath(job, new Path("/wcout"));
// job.submit();
job.waitForCompletion(true);
}
}