編寫MapReduce程式(簡單的電話被呼叫分析程式)
阿新 • • 發佈:2018-12-09
由於Hadoop 2.2.0目前還沒有好用的Eclipse外掛,目前使用Eclipse上編寫程式碼,而後放到Hadoop環境執行的形式。
準備工作:
1、搭建Hadoop環境,建立專案,專案的BuildPath中新增所有Hadoop中的jar包;
2、構造資料集:每一行資料兩個號碼組成,呼叫號和被呼叫號,生成隨機測試資料,將生成的檔案放入hdfs中;
import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.OutputStream; import java.util.Random; public class GenerateTestData { public static void writeToFile(String fileName) throws Exception{ OutputStream out = new FileOutputStream(new File(fileName)); BufferedOutputStream bo = new BufferedOutputStream(out); Random rd1 = new Random(); for(int i=0; i<10000; i++){ int j=0; StringBuffer sb = new StringBuffer(""); sb.append(1); for(j=1;j<9;j++){ sb.append(rd1.nextInt(10)); //bo.write(rd1.nextInt(10)); } sb.append(" "); switch(rd1.nextInt(10)){ case 1: sb.append("10086"); break; case 2: sb.append("110"); break; case 3: sb.append("120"); break; case 4: sb.append("119"); break; case 5: sb.append("114"); break; case 6: sb.append("17951"); break; case 7: sb.append("10010"); break; case 8: sb.append("13323567897"); break; default: sb.append(1); for(j=1;j<9;j++){ sb.append(rd1.nextInt(10)); //bo.write(rd1.nextInt(10)); } break; } sb.append("\r\n"); bo.write(sb.toString().getBytes()); } } public static void main(String[] args) { try { writeToFile("d://helloa.txt"); System.out.println("finish!"); } catch (Exception e) { e.printStackTrace(); } } }
MapReduce程式如下,目前編寫的程式參考自Hadoop權威指南,用的還是老版本的API:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FirstTest extends Configured implements Tool{ enum Counter{ LINESKIP, } public static class Map extends Mapper<LongWritable, Text, Text, Text>{ @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); try { String []arr = line.split(" "); context.write(new Text(arr[1]), new Text(arr[0])); } catch (Exception e) { context.getCounter(Counter.LINESKIP).increment(1); } } } public static class Reduce extends Reducer<Text,Text,Text,Text>{ @Override public void reduce(Text key, Iterable<Text> values,Context context)throws IOException, InterruptedException{ String out = ""; for(Text t:values){ out += t.toString()+"|"; } context.write(key, new Text(out)); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf, "First Map-Reduce Program"); job.setJarByClass(getClass()); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); return job.isSuccessful()?0:1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Configuration(),new FirstTest(), args); System.exit(exitCode); } }
在linux下編譯構造jar檔案後在hadoop環境執行:
hadoop jar FirstTest.jar /input/helloa.txt /output
注意出現的問題:
1、由於是在Eclipse編寫的程式,加了package,但是在Linux下打包時直接使用了jar cvfm abc.jar ..的命令,導致hadoop執行jar包時總提示找不到main class;
2、在linux下編譯時,FirstTest.java檔案是放在了HADOOP_CLASSPATH下編譯,在此目錄執行hadoop jar FirstTest.jar /input/helloa.txt /output時提示FirstTest&Map類找不著,將生成的FirstTest.jar放入其他目錄後執行正常。