案例5-挖掘微博廣告高權重詞條
微博內容(如圖):ID content
公式:
TF:詞條在某個微博中出現的詞頻(出現次數).
N:微博總數
DF:詞條在多少個微博中出現過
案例用到四個reduceTask,下標計數從0開始,三個統計詞頻TF,一個統計微博總數N。
FirstMapper.java
對輸入檔案的每行記錄微博內容進行分詞,統計微博詞頻TF及微博總數,每個詞條輸出詞頻數1;每個微博輸出一個count=1
package com.jeff.mr.tf; import java.io.IOException; import java.io.StringReader; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.wltea.analyzer.core.IKSegmenter; import org.wltea.analyzer.core.Lexeme; /** * TF:詞條在某個微博中出現的詞頻(出現次數). N:微博總數 DF:詞條在多少個微博中出現過 -------------------------------- * 第一個MR,計算TF和計算N(微博總數) * @author root * */ public class FirstMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //value是微博檔案每一行以製表符\t隔開 String[] v =value.toString().trim().split("\t"); if(v.length>=2){ String id=v[0].trim(); String content =v[1].trim(); //對微博內容進行中文分詞處理 StringReader sr =new StringReader(content); IKSegmenter ikSegmenter =new IKSegmenter(sr, true); Lexeme word=null; while( (word=ikSegmenter.next()) !=null ){ String w= word.getLexemeText();//w就是微博內容的每一個詞彙 //輸出格式為:key為:詞彙_微博ID value是1,出現次數 context.write(new Text(w+"_"+id), new IntWritable(1)); } //每執行一次這個方法,就表示統計了一條微博數,將來在第四個reduce分割槽執行,參見FirstPartition,自定義分割槽規則 context.write(new Text("count"), new IntWritable(1)); }else{ System.out.println(value.toString()+"-------------"); } } }
FirstPartition.java
自定義分割槽,使得key為count的分割槽到最後一個分割槽(編號3),其他的分別分割槽編號為0/1/2三個reduceTask
package com.jeff.mr.tf; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; /** * 第一個MR自定義分割槽,把key為count的,即用來計算微博總數的資料分割槽到第四個reduce分割槽, * 前三個reduce分割槽用來計算TF,就是單個微博中詞彙出現次數 * @author root * */ public class FirstPartition extends HashPartitioner<Text, IntWritable>{ public int getPartition(Text key, IntWritable value, int reduceCount) { if(key.equals(new Text("count"))) return 3; else return super.getPartition(key, value, reduceCount-1); } }
FirstReduce.java
計算單個詞條的詞頻TF,輸入資料為FirstMapper.java的輸出,key為詞條_id.或者count,值為詞頻個數或者count個數,當key為count時不參與計算只輸出檢視。
輸出格式:詞條_ID 詞頻
package com.jeff.mr.tf; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * c1_001,2 * c2_001,1 * count,10000 * @author root * */ public class FirstReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ protected void reduce(Text arg0, Iterable<IntWritable> arg1, Context arg2) throws IOException, InterruptedException { int sum =0; for( IntWritable i :arg1 ){ sum= sum+i.get(); } if(arg0.equals(new Text("count"))){ System.out.println(arg0.toString() +"___________"+sum); } arg2.write(arg0, new IntWritable(sum)); } }
在dfs-location上新建路徑:/usr/input/tf-idf並上傳檔案微博內容:
接下來就可以執行FirstJob.java來執行第一個MR:
package com.jeff.mr.tf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FirstJob {
public static void main(String[] args) {
Configuration config =new Configuration();
config.set("fs.defaultFS", "hdfs://node4:8020");
config.set("yarn.resourcemanager.hostname", "node4");
try {
FileSystem fs =FileSystem.get(config);
// JobConf job =new JobConf(config);
Job job =Job.getInstance(config);
job.setJarByClass(FirstJob.class);
job.setJobName("weibo1");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// job.setMapperClass();
job.setNumReduceTasks(4);
job.setPartitionerClass(FirstPartition.class);
job.setMapperClass(FirstMapper.class);
job.setCombinerClass(FirstReduce.class);
job.setReducerClass(FirstReduce.class);
FileInputFormat.addInputPath(job, new Path("/usr/input/tf-idf"));
Path path =new Path("/usr/output/weibo1");
if(fs.exists(path)){
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job,path);
boolean f= job.waitForCompletion(true);
if(f){
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
執行成功:
重新整理DFS-Location,看到在/usr/output/weibo1的目錄下生成了四個分割槽檔案,每一個分割槽檔案都是四個reduceTask的輸出檔案
其中第四個分割槽檔案就是用來計算Count微博總數N的,其他三個都是微博中詞彙即出現次數。
比如:0.03元_3824213951437432 1
這個就表示0.03元這個詞在ID為3824213951437432微博中出現了1次
TwoMapper.java
統計DF,詞條在多少個微博中出現過
輸出格式:詞條 出現的微博個數
package com.jeff.mr.tf;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
//統計df:詞在多少個微博中出現過。
public class TwoMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
/**
* 1 獲取當前 mapper Task的資料片段(split)
* 2 當前mapper Task的資料來源於第一個MR輸出的四個檔案
*/
FileSplit fs = (FileSplit) context.getInputSplit();
//可以從fs獲取第一個MR的檔名,除了最後一個檔案是用來計算微博總數的,其他都是TF
if (!fs.getPath().getName().contains("part-r-00003")) {
String[] v = value.toString().trim().split("\t");
if (v.length >= 2) {
//獲取{0.03元_3824213951437432 1},這種第一個MR的輸出資料,即每一行
String[] ss = v[0].split("_");
if (ss.length >= 2) {
String w = ss[0];//得到每一個詞彙,輸出次數1,此處所有微博的詞彙都會輸出1次
context.write(new Text(w), new IntWritable(1));
}
} else {
System.out.println(value.toString() + "-------------");
}
}
}
}
TwoReduce.java
package com.jeff.mr.tf;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* 計算詞彙在所有微博中出現的次數
* @author jeffSheng
* 2018年10月17日
*/
public class TwoReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
/**
* 輸入資料:
* key:0.03元 value:1(次)
* Iterable<IntWritable> arg1,即key相等的一組資料
*/
protected void reduce(Text key, Iterable<IntWritable> arg1,Context context)
throws IOException, InterruptedException {
int sum =0;
for( IntWritable i :arg1 ){
sum= sum + i.get();
}
context.write(key, new IntWritable(sum));
}
}
執行TwoJob.java第二個MR,計算每個詞彙在所有微博出現次數即DF
package com.jeff.mr.tf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TwoJob {
public static void main(String[] args) {
Configuration config =new Configuration();
config.set("fs.defaultFS", "hdfs://node4:8020");
config.set("yarn.resourcemanager.hostname", "node4");
try {
// JobConf job =new JobConf(config);
Job job =Job.getInstance(config);
job.setJarByClass(TwoJob.class);
job.setJobName("weibo2");
//設定map任務的輸出key型別、value型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// job.setMapperClass();
job.setMapperClass(TwoMapper.class);
job.setCombinerClass(TwoReduce.class);
job.setReducerClass(TwoReduce.class);
//mr執行時的輸入資料從hdfs的哪個目錄中獲取
FileInputFormat.addInputPath(job, new Path("/usr/output/weibo1"));
FileOutputFormat.setOutputPath(job, new Path("/usr/output/weibo2"));
boolean f= job.waitForCompletion(true);
if(f){
System.out.println("執行job成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
重新整理DFS-Location看到/usr/output/weibo2下的DF輸出檔案:
比如0.03元 在所有微博中出現了1次
根據公式計算微博詞彙權重:
LastMapper.java
輸入資料為所有詞的TF,所有詞的DF,微博總數N,根據這三個變數計算詞條最終權重。
輸出格式:微博ID 詞條:權重
package com.jeff.mr.tf;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* 最後計算
* @author root
*
*/
public class LastMapper extends Mapper<LongWritable, Text, Text, Text> {
//存放微博總數
public static Map<String, Integer> cmap = null;
//存放df
public static Map<String, Integer> df = null;
// 在map方法執行之前,即mapperTask初始化的時候執行
/**
* mapReduce的執行過程回顧:
* 比如一個檔案被分割成1024個碎片段,則一定有與之對應的1024個mapTask去執行每個碎片段。
* mapTask在有碎片段的節點上執行,即 dataNode上有碎片段,在dataNode上執行。所以每個DataNode上就
* 有一個NodeManager來執行mapReduce程式,NodeManager裡面有一個與之對應的ApplicationMatser
* 負責從resourceManager中請求資源即Contianer中文是容器,其實是資源。申請資源後,ApplicationMatser
* 則可以通過一個Executor物件執行mapperTask,並監控和記錄執行狀態、進度等資料彙報給NodeManager,NodeManager
* 再彙報給resourceManager。
* Executor物件執行mapperTask的時候先初始化對應的MapTask,其實就是我們的LastMapper.
* java自定義的xxxMapper,只要初始化成功就呼叫LastMapper的setUp方法,這個時候map方法還沒執行,
* map方法是迴圈呼叫的,即每一行都呼叫一次,但是setUp方法只會呼叫一次。不過1024個碎片段對應1024個mapTask,
* 就會執行setup方法1024次,還是狠多次,所以我們可以考慮從共享記憶體中取得一部分資料,比如微博總數N和DF記錄。
* 我們使用cmap和df兩個Map來存放,判斷是否為空,即保證存過就不用再存了。
*
*
*/
protected void setup(Context context) throws IOException,
InterruptedException {
System.out.println("******************");
if (cmap == null || cmap.size() == 0 || df == null || df.size() == 0) {
URI[] ss = context.getCacheFiles();
if (ss != null) {
for (int i = 0; i < ss.length; i++) {
URI uri = ss[i];
if (uri.getPath().endsWith("part-r-00003")) {//微博總數
Path path =new Path(uri.getPath());
// FileSystem fs =FileSystem.get(context.getConfiguration());
// fs.open(path);
BufferedReader br = new BufferedReader(new FileReader(path.getName()));
String line = br.readLine();
if (line.startsWith("count")) {
String[] ls = line.split("\t");
cmap = new HashMap<String, Integer>();
cmap.put(ls[0], Integer.parseInt(ls[1].trim()));
}
br.close();
} else if (uri.getPath().endsWith("part-r-00000")) {//詞條的DF
df = new HashMap<String, Integer>();
Path path =new Path(uri.getPath());
BufferedReader br = new BufferedReader(new FileReader(path.getName()));
String line;
while ((line = br.readLine()) != null) {
String[] ls = line.split("\t");
df.put(ls[0], Integer.parseInt(ls[1].trim()));
}
br.close();
}
}
}
}
}
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
FileSplit fs = (FileSplit) context.getInputSplit();
// System.out.println("--------------------");
if (!fs.getPath().getName().contains("part-r-00003")) {
String[] v = value.toString().trim().split("\t");
if (v.length >= 2) {
int tf =Integer.parseInt(v[1].trim());//tf值
String[] ss = v[0].split("_");
if (ss.length >= 2) {
String w = ss[0];
String id=ss[1];
//根據公式計算權重,輸出:微博Id 詞彙1:權重1 詞彙2:權重2
double s=tf * Math.log(cmap.get("count")/df.get(w));
NumberFormat nf =NumberFormat.getInstance();
nf.setMaximumFractionDigits(5);
context.write(new Text(id), new Text(w+":"+nf.format(s)));
}
} else {
System.out.println(value.toString() + "-------------");
}
}
}
}
LastReduce.java
計算所有詞條的最終權重,相同微博在後邊顯示其所有的詞條:權重,並使用製表符\t隔開。
輸出格式:微博ID 詞條:權重 詞條:權重
package com.jeff.mr.tf;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class LastReduce extends Reducer<Text, Text, Text, Text>{
protected void reduce(Text key, Iterable<Text> arg1,
Context context)
throws IOException, InterruptedException {
StringBuffer sb =new StringBuffer();
for( Text i :arg1 ){
sb.append(i.toString()+"\t");
}
context.write(key, new Text(sb.toString()));
}
}
執行LastJob計算最終輸出結果:
我們這裡採用的是在本地提交到Linux環境下進行執行測試的
package com.jeff.mr.tf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LastJob {
public static void main(String[] args) {
Configuration config =new Configuration();
// config.set("fs.defaultFS", "hdfs://node1:8020");
// config.set("yarn.resourcemanager.hostname", "node1");
config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\weibo3.jar");
try {
FileSystem fs =FileSystem.get(config);
// JobConf job =new JobConf(config);
Job job =Job.getInstance(config);
job.setJarByClass(LastJob.class);
job.setJobName("weibo3");
// DistributedCache.addCacheFile(uri, conf);
//2.5
/**
* 之所以以下兩行可以載入到記憶體因為微博總數的檔案和df檔案其實都不大,所有可以在任務啟動之初先載入到記憶體
*/
//把微博總數N載入到記憶體
job.addCacheFile(new Path("/usr/output/weibo1/part-r-00003").toUri());
//把df載入到記憶體
job.addCacheFile(new Path("/usr/output/weibo2/part-r-00000").toUri());
//設定map任務的輸出key型別、value型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// job.setMapperClass();
job.setMapperClass(LastMapper.class);
job.setReducerClass(LastReduce.class);
//mr執行時的輸入資料從hdfs的哪個目錄中獲取
FileInputFormat.addInputPath(job, new Path("/usr/output/weibo1"));
Path outpath =new Path("/usr/output/weibo3");
if(fs.exists(outpath)){
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job,outpath );
boolean f= job.waitForCompletion(true);
if(f){
System.out.println("執行job成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
需要做的是將工程打包放在桌面weibo3.jar,然後在LastJob中新增:
config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\weibo3.jar");
配置檔案放在src下:
開始執行:
觀察剛開始執行
觀察執行完成;
重新整理DFS-Location
比如:3823890239358658 繼續:4.89035 支援:3.04452
表示在微博ID為3823890239358658微博中,[繼續]的全部微博中權重為4.89035,[支援]的全部微博中權重為3.04452
有了這些結果,我們就可以做出一些商業或者其他領域的重要選擇!
當然也可以在本地進行測試,就是在LastMapper的setUp中註釋掉的程式碼:
FileSystem fs =FileSystem.get(context.getConfiguration());
FSDataInputStream fsdInputStream = fs.open(path);
將輸入流封裝進BufferedReader即可。