MapReduce資料處理兩表join連線
阿新 • • 發佈:2019-01-22
現在這裡有兩個text文件,需要把它合併成一個文件,並且裡面的資料不能有冗餘..
- <strong><span style="color:#FF0000;">info.txt檔案:</span></strong>
- 1003 kaka
- 1004 da
- 1005 jue
- 1006 zhao
- <span style="color:#FF0000;"><strong>cpdata.txt檔案:</strong></span>
- 201001 1003 abc
-
201002 1005 def
- 201003 1006 ghi
- 201004 1003 jkl
- 201005 1004 mno
- 201006 1005 pqr
- 201001 1003 abc
- 201004 1003 jkl
- 201006 1005 mno
- 200113 1007 zkl
生成檔案:
- 1003 201001 abc kaka
- 1003 201004 jkl kaka
- 1004 201005 mno da
- 1005 201002 def jue
- 1005 201006 pqr jue
- 1005 201006 mno jue
- 1006 201003 ghi zhao
這裡先申明下,這個純屬個人想法,如果有跟好的方法可以告訴我
因為info.txt文件的第一個欄位與cpdata.txt的第二個欄位是相同的,所以我把他們做為key值,這樣通過Map他們就會組合了.去冗餘,
主要是用了個List記錄已經讀取過的變數,如果有一樣的就不讀取了.
程式碼如下:
- publicclass Advanced extends Configured implements Tool {
-
publicstaticclass AdMap extends Mapper<LongWritable, Text, Text, TextPair>{
- @Override
- protectedvoid map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- // TODO Auto-generated method stub
- // String filePath = ((FileSplit)context.getInputSplit()).getPath().toString();
- Text word = new Text();
- String line = value.toString();
- String[] childline = line.split(" "); //以空格擷取
- //判斷是哪一張表,其實個人覺得這樣判斷還不合理,可以使用上面注視掉的獲取路徑值來判斷
- if(childline.length == 3){
- TextPair pair = new TextPair();
- pair.setFlag("0"); //這是個標識 0.表示 cpdata.txt 1表示info.txt
- pair.setKey(childline[1]);
- pair.setValue(childline[0]+" "+childline[2]);
- pair.setContent(pair.toString());
- word.clear();
- word.set(pair.getKey());
- context.write(word, pair); //傳遞一個物件要實現WritableComparable介面
- }else{
- TextPair pair = new TextPair();
- pair.setFlag("1");
- pair.setKey(childline[0]);
- pair.setValue(childline[1]);
- pair.setContent(pair.toString());
- word.clear();
- word.set(pair.getKey());
- context.write(word, pair);
- }
- }
- }
- publicstaticclass AdReduce extends Reducer<Text, TextPair, Text, Text>{
- @Override
- publicvoid reduce(Text key, Iterable<TextPair> values,
- Context context)
- throws IOException, InterruptedException {
- // TODO Auto-generated method stub
- //list0裝載的都是cpdata的資料,list1裝載的是info的資料
- List<Text> list0 = new ArrayList<Text>();
- List<Text> list1 = new ArrayList<Text>();
- Iterator<TextPair> it = values.iterator();
- TextPair pair = new TextPair();
- while(it.hasNext()){
- pair = it.next();
- if("1".equals(pair.getFlag()))
- list1.add(new Text(pair.getValue()));
- else
- list0.add(new Text(pair.getValue()));
- }
- List<Text> sublist = new ArrayList<Text>(); //sublist用來新增已經寫過的資料,然後再判斷,如果存在就不用操作
- for(int i = 0 ; i<list1.size(); i++){
- for(int j = 0 ;j<list0.size();j++){
- if(!sublist.contains(list0.get(j))){
- sublist.add(list0.get(j));
- context.write(key, new Text(list0.get(j)+" " +list1.get(i)));
- }
- }
- }
- }
- }
- /**
- * @param args
- */
- publicstaticvoid main(String[] args) {
- try {
- int res = ToolRunner.run(new Configuration(), new Advanced(), args);
- System.exit(res);
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- @Override
- publicint run(String[] args) throws Exception {
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(conf);
- if(fs.exists(new Path(args[2]))){
- //如果檔案已近存在就刪除檔案
- // System.out.println("error : file is exists");
- // System.exit(-1);
- fs.delete(new Path(args[2]), true);
- }
- Job job = new Job(conf , "Advanced");
- job.setJarByClass(Advanced.class