hadoop 單詞個數及所處檔案位置統計
一、題目描述
輸入若干個檔案,得到所有檔案中某單詞的所在檔名,單詞在文件中出現的次數和具體的位置資訊
例如,輸入檔案如下:
1.txt:
it iswhat it is
what isit
it is abanana
2.txt:
i is whathe is
haoop isit
it he abanana
3.txt:
hadoop iswhat hello is
what isit
hello isa he
輸出如下:
a {1:1;(3,3) };{3:1;(3,3) };{2:1;(3,3) };
banana {2:1;(3,4) };{1:1;(3,4) };
hadoop {3:1;(1,1) };
haoop {2:1;(2,1) };
he {2:2;(1,4) (3,2) };{3:1;(3,4) };
hello {3:2;(1,4) (3,1) };
i {2:1;(1,1) };
is {2:3;(1,2) (1,5) (2,2) };{3:4;(1,2) (1,5)(2,2) (3,2) };{1:4;(1,2) (1,5) (2,2) (3,2) };
it {1:4;(1,1) (1,4) (2,3) (3,1) };{2:2;(2,3)(3,1) };{3:1;(2,3) };
what {3:2;(1,3) (2,1) };{2:1;(1,3)};{1:2;(1,3) (2,1) };
例如,在輸出檔案中“he {2:2;(1,4) (3,2) };{3:1;(3,4) };”表示單詞he,在文件2.txt中出現兩次,位置是第一行第四個和第三行第二個,在3.txt中出現一次,位置是第三行第四個。
二、實現思路
首先通過inputSplit()自帶函式獲取檔案的名稱,在每次maper函式執行時記錄一次該檔案的名稱,maper,combine,reduce ,都設定成檔案輸入輸出介面。
maper 函式主要功能是獲取檔名稱,給讀取的每個單詞設定個數為1, 方便reduce 處理統計相同單詞的格式。 map 每次讀取的一個檔案是一行一行的讀取的,每讀取該檔案一行,用變數a 記錄該單詞所處的行數,用b 記錄該單詞所處的列數。然後key 處理成string 格式(包含,單詞名,所處檔名,行a,列b), value 為該單詞的個數。
然後(key, value)打包成hadoop 檔案介面傳遞給combine() 函式。
combine() 函式主要處理傳遞過來的(key, value) , 將string 形式的key 用split() 劃分,統計每個單詞的總個數,累加為sum , 然後處理key 按題目要求寫入
輸出檔案的格式, 然後將單詞用key 儲存, 單詞後面的(檔案: (行: 列))這樣的格式存為key , 然後將(key, value) 以text 型別介面傳給reduce().
reduce 主要功能是實現單詞和 單詞所處的檔案位置,最終按 單詞 { (檔案:(行:列);(檔案:(行:列))......} 這樣的格式寫入輸出檔案中。
main() 函式主要是實現 輸入輸出檔案的路徑, 重新整理輸出檔案的內容, mapper , reduce 的初始化等功能。
三、完整程式碼
package wordcount;
import java.io.IOException;
import java.util.*;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class WordCount {
public static int a=1; //need change
public static class TokenizerMapper extends Mapper<Object, Text, Text, Text>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public Map<String,String> wd;
String strl = new String();
private FileSplit split;
private Text valueInfo =new Text();
private Text keyInfo = new Text();
public String fileName = "";
public void setup(Context context) throws IOException,InterruptedException{
InputSplit inputSplit = context.getInputSplit();
String fileFormat = ((FileSplit) inputSplit).getPath().getName().toString();
fileName = fileFormat.substring(0,fileFormat.lastIndexOf("."));
System.out.println("fileName: "+ fileName);
}
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
int b=1;
if(a==4)
a=1;
split =(FileSplit) context.getInputSplit();
StringTokenizer stk = new StringTokenizer(value.toString());
String[] st=value.toString().split(" ");
System.out.println("Line sentence : "+ value.toString()+" "+st.length);
while(stk.hasMoreElements()) {
keyInfo.set(stk.nextToken()+ ":"+fileName+":"+a+":"+b);
if(b==st.length) a++;
valueInfo.set("1");
context.write(keyInfo, valueInfo);
b++;
}
// a++;
}
}
public int cNum =1;
private void setLocationFormat(String docID, String val){
String[] aa= val.split("");
String res = new String();
for (int i=0;i<val.length();i++)
res ="{" + docID +":"+i +"};";
cNum++;
}
public static class InvertedIndexCombiner extends Reducer<Text,Text,Text,Text> {
Text info = new Text();
public void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
int sum = 0;
// String str = new String();
// str=" ";
for (Text val : values) {
sum += Integer.parseInt(val.toString());
}
// result.set(sum);
// str += result;
String[] str = key.toString().split(":");
// int splitIndex = key.toString().indexOf(":");
// key.set(key.toString().substring(0,splitIndex));
// System.out.println(key.toString().substring(0,splitIndex));
key.set(str[0]);
// System.out.println(key.toString().substring(splitIndex+1));
// info.set(key.toString().substring(splitIndex+1) + ":" +sum);
System.out.println("str[0]: "+ str[0]+" "+"str[1]: "+str[1]);
info.set(str[1] + ":" + sum+"("+ str[2] + "," + str[3] + ")");
context.write(key, info);
}
}
public static class InvertedIndexReduce extends Reducer<Text,Text,Text,Text>{
private Text result = new Text();
public void reduce(Text key,Iterable<Text> values,Context context)
throws IOException, InterruptedException {
String fileList = new String();
for(Text value:values){
fileList += value.toString()+";";
}
result.set("{"+fileList+"}");
context.write(key, result);
}
}
public static void wordDeal(String wordOfDoc, String docID,TreeMap<String ,TreeMap<String,Integer>> tmp){
wordOfDoc = wordOfDoc.toLowerCase();
if(!tmp.containsKey(wordOfDoc)){
// word load first time
TreeMap<String,Integer>tmpST = new TreeMap<String, Integer>();
tmpST.put(docID, 1);
tmp.put(wordOfDoc, tmpST);
}else{
//if load the word first time ,and count =null , then
//add (docId,1) to tmpST, if not , then count++ , rewrite it to tmpST
TreeMap<String,Integer>tmpST = tmp.get(wordOfDoc);
Integer count = tmpST.get(docID);
count = ((count == null)?1:count++);
tmpST.put(docID, count);
tmp.put(wordOfDoc, tmpST);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//conf.set("fs.default.name", "hdfs://master:9000");
//conf.set("mapred.job.tracker", "hdfs://master:9001");
conf.set("fs.default.name", "file:///");
conf.set("mapred.job.tracker", "local");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
FileSystem fs= FileSystem.get(conf);
fs.delete(new Path(args[1]),true);
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
四、輸出結果
a{1:1(3,3);2:1(3,3);3:1(3,3);}
banana{1:1(3,4);2:1(3,4);}
hadoop{3:1(1,1);}
haooop{2:1(2,1);}
he{2:1(1,4);2:1(3,2);3:1(3,4);}
hello{3:1(1,4);3:1(3,1);}
i{2:1(1,1);}
is{2:1(1,2);2:1(1,5);2:1(2,2);3:1(1,2);3:1(1,5);3:1(2,2);3:1(3,2);1:1(1,2);1:1(1,5);1:1(2,2);1:1(3,2);}
it{1:1(1,1);1:1(1,4);1:1(2,3);1:1(3,1);2:1(2,3);2:1(3,1);3:1(2,3);}
what{3:1(1,3);3:1(2,1);2:1(1,3);1:1(1,3);1:1(2,1);}