MapReduce部分練習使用API程式設計示例之PageRank
阿新 • • 發佈:2018-12-11
package com.sxt.hadoop.mr.pagerank; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class RunJob { public static enum Mycounter { my } public static void main(String[] args) { Configuration conf = new Configuration(true); conf.set("mapreduce.app-submission.corss-paltform", "true"); //如果分散式執行,必須打jar包 //且,client在叢集外非hadoop jar 這種方式啟動,client中必須配置jar的位置 conf.set("mapreduce.framework.name", "local"); //這個配置,只屬於,切換分散式到本地單程序模擬執行的配置 //這種方式不是分散式,所以不用打jar包 double d = 0.00001; int i = 0; while (true) { i++; try { conf.setInt("runCount", i); FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(conf); job.setJarByClass(RunJob.class); job.setJobName("pr" + i); job.setMapperClass(PageRankMapper.class); job.setReducerClass(PageRankReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // job.setJar("/ooxx/jar"); //使用了新的輸入格式化類 job.setInputFormatClass(KeyValueTextInputFormat.class); Path inputPath = new Path("/data/pagerank/input/"); if (i > 1) { inputPath = new Path("/data/pagerank/output/pr" + (i - 1)); } FileInputFormat.addInputPath(job, inputPath); Path outpath = new Path("/data/pagerank/output/pr" + i); if (fs.exists(outpath)) { fs.delete(outpath, true); } FileOutputFormat.setOutputPath(job, outpath); boolean f = job.waitForCompletion(true); if (f) { System.out.println("success."); long sum = job.getCounters().findCounter(Mycounter.my).getValue(); System.out.println(sum); double avgd = sum / 4000.0; if (avgd < d) { break; } } } catch (Exception e) { e.printStackTrace(); } } } static class PageRankMapper extends Mapper<Text, Text, Text, Text> { protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { int runCount = context.getConfiguration().getInt("runCount", 1); //A B D //A B D 0.3 //K:A //V:B D //K:A //V:0.3 B D String page = key.toString(); Node node = null; if (runCount == 1) { node = Node.fromMR("1.0" , value.toString()); } else { node = Node.fromMR(value.toString()); } // A 1.0 B D 傳遞老的pr值和對應的頁面關係 context.write(new Text(page), new Text(node.toString())); if (node.containsAdjacentNodes()) { //票面值的計算 double outValue = node.getPageRank() / node.getAdjacentNodeNames().length; for (int i = 0; i < node.getAdjacentNodeNames().length; i++) { String outPage = node.getAdjacentNodeNames()[i]; // B:0.5 // D:0.5 頁面A投給誰,誰作為key,val是票面值,票面值為:A的pr值除以超連結數量 context.write(new Text(outPage), new Text(outValue + "")); } } } } static class PageRankReducer extends Reducer<Text, Text, Text, Text> { protected void reduce(Text key, Iterable<Text> iterable, Context context) throws IOException, InterruptedException { //相同的key為一組 //key:頁面名稱比如B //包含兩類資料 //B:1.0 C //頁面對應關係及老的pr值 //B:0.5 //投票值 //B:0.5 double sum = 0.0; Node sourceNode = null; for (Text i : iterable) { Node node = Node.fromMR(i.toString()); if (node.containsAdjacentNodes()) { sourceNode = node; } else { sum = sum + node.getPageRank(); } } // 4為頁面總數 double newPR = (0.15 / 4.0) + (0.85 * sum); System.out.println("*********** new pageRank value is " + newPR); // 把新的pr值和計算之前的pr比較 double d = newPR - sourceNode.getPageRank(); int j = (int) (d * 1000.0); j = Math.abs(j); System.out.println(j + "___________"); context.getCounter(Mycounter.my).increment(j); sourceNode.setPageRank(newPR); context.write(key, new Text(sourceNode.toString())); //A B D 0.8 } } }
package com.sxt.hadoop.mr.pagerank; import java.io.IOException; import java.util.Arrays; import org.apache.commons.lang.StringUtils; public class Node { private double pageRank = 1.0; private String[] adjacentNodeNames; public static final char fieldSeparator = '\t'; public double getPageRank() { return pageRank; } public Node setPageRank(double pageRank) { this.pageRank = pageRank; return this; } public String[] getAdjacentNodeNames() { return adjacentNodeNames; } public Node setAdjacentNodeNames(String[] adjacentNodeNames) { this.adjacentNodeNames = adjacentNodeNames; return this; } public boolean containsAdjacentNodes() { return adjacentNodeNames != null && adjacentNodeNames.length > 0; } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(pageRank); if (getAdjacentNodeNames() != null) { sb.append(fieldSeparator).append( StringUtils.join(getAdjacentNodeNames(), fieldSeparator)); } return sb.toString(); } // value =1.0 B D public static Node fromMR(String value) throws IOException { String[] parts = StringUtils.splitPreserveAllTokens(value, fieldSeparator); if (parts.length < 1) { throw new IOException("Expected 1 or more parts but received " + parts.length); } Node node = new Node().setPageRank(Double.valueOf(parts[0])); if (parts.length > 1) { node.setAdjacentNodeNames(Arrays .copyOfRange(parts, 1, parts.length)); } return node; } public static Node fromMR(String v1,String v2) throws IOException { return fromMR(v1+fieldSeparator+v2); //1.0 B D } }