1. 程式人生 > >hadoop處理不同輸入目錄檔案

hadoop處理不同輸入目錄檔案

在寫mapred任務的時候免不了要處理join。
在join中最簡單的就是一對一的join。
下面通過一個小例子介紹如果在mapred中實現一對一的join。

name.txt
100 tom
101 mary
102 kate

score.txt
100 90
101 85
102 80

要得到如下的join結果
100 tom 90
101 mary 85
102 kate 80

處理思路:
mapred的輸入檔案為name.txt和score.txt兩個,我們要通過標誌區分出每條記錄是來自哪個檔案,所以在map的輸出結果要增加檔案的標誌。
map的輸出類似
100 name+tom
100 score+90
然後在red的過程中根據不同的字首來區分不同記錄。因為是一對一的join,所以只要將相同key的不同value連線起來後輸出即可。

程式程式碼如下:

 package org.myorg; 
 
 import java.io.IOException; 
 import java.util.*; 
 import java.lang.String;
 
 import org.apache.hadoop.fs.Path; 
 import org.apache.hadoop.conf.*; 
 import org.apache.hadoop.io.*; 
 import org.apache.hadoop.mapred.*; 
 import org.apache.hadoop.util.*; 
 
 public class OneOnOneJoin { 
 
    public static class Map extends MapReduceBase implements Mapper<Text, Text, Text ,Text> {
   
      public void map(Text key, Text value, OutputCollector<Text , Text> output, Reporter reporter) throws IOException { 
     
       String path=((FileSplit)reporter.getInputSplit()).getPath().toString();
       Text kv = new Text();
       
       if(path.indexOf(”name.txt”)>0) {
        kv.set(”name”+”+”+value);
       } else if(path.indexOf(”score.txt”)>0) {
        kv.set(”score”+”+”+value);
       }
       
       output.collect(key,kv);
      } 
    } 

    public static class Reduce extends MapReduceBase implements Reducer<Text ,Text,  Text,Text> { 
      public void reduce(Text key, Iterator<Text> values, OutputCollector<Text,Text> output, Reporter reporter) throws IOException { 
       
 String name=”";
 String score=”";
 
        while(values.hasNext()) {
         String value = values.next().toString();
         
         if(value.startsWith(”name+”)) {
          name=value.substring(5 , value.length());  
         } else if(value.startsWith(”score+”)) {      
          score=value.substring(6 , value.length()); 
         }
        }
       
 if(!name.equals(”") && !score.equals(”")) {
  output.collect(key,new Text(name + “\t” + score));
 }
       
      } 
    } 
   
    public static void main(String[] args) throws Exception { 
      JobConf conf = new JobConf(OneOnOneJoin.class); 
      conf.setJobName(”oneononejoin”); 
 
      conf.setOutputKeyClass(Text.class); 
      conf.setOutputValueClass(Text.class); 
 
      conf.setMapperClass(Map.class); 
      conf.setReducerClass(Reduce.class); 
 
      conf.setInputFormat(KeyValueTextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 
 
      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
 
      JobClient.runJob(conf); 
    } 
 }  
 
編譯並執行
javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar -d OneOnOneJoin OneOnOneJoin.java
jar -cvf OneOnOneJoin.jar -C OneOnOneJoin/ .

hadoop fs -rmr /sunwg/output
hadoop jar OneOnOneJoin.jar org.myorg.OneOnOneJoin /sunwg/input /sunwg/output  

檢視結果
[[email protected] sunwg]$ hadoop fs -cat /sunwg/output/part-00000
100     tom     90
101     mary    85
102     kate    80

進一步考慮外連線的情況,有可能在score中沒有對應的記錄
比如:

name.txt
100 tom
101 mary
102 kate

score.txt
100 90
102 80

要得到如下的join結果
100 tom 90
101 mary 
102 kate 80

只要修改reduce中的最後輸出結果的檢驗條件為

if(!name.equals(”")) {
 output.collect(key,new Text(name + “\t” + score));
}

結果
[[email protected] sunwg]$ hadoop fs -cat /sunwg/output/part-00000
100     tom     90
101     mary
102     kate    80

在進一步考慮全連線的情況,有可能在name中沒有對應的記錄
比如:
name.txt
100 tom
101 mary

score.txt
100 90
102 80

要得到如下的join結果
100 tom 90
101 mary 
102   80
只要修改reduce中的最後輸出結果的檢驗條件為

if(!name.equals(”")) {
 output.collect(key,new Text(name + “\t” + score));
}
else if(!score.equals(”")) {
 output.collect(key,new Text(name + “\t” + score));
}

結果
[[email protected] sunwg]$ hadoop fs -cat /sunwg/output/part-00000
100     tom     90
101     mary
102             80

以上實現了兩個檔案的JOIN操作,採用相同的策略,可以對不同輸入目錄的檔案新增相同標記,採用不同的map、reduce策略來實現對不同的目錄實現不同的操作

相關推薦

hadoop處理不同輸入目錄檔案

在寫mapred任務的時候免不了要處理join。 在join中最簡單的就是一對一的join。 下面通過一個小例子介紹如果在mapred中實現一對一的join。 name.txt 100 tom 101 mary 102 kate score.txt 100 90 10

maprecue將兩類ip分類去重並且輸出到不同目錄檔案

有一份含有兩類ip的資料,根據一個欄位標記來區分,現在需要將去重,兩類Ip分類儲存到不同檔案中,第三類資料捨棄。 主要知識點##: 1、 自定義分割槽:繼承Partitoner類,重寫getPartitoin()方法; 2、多路徑輸出:MultipleOutputs類的

自定義OutputFormat -實現往不同目錄輸出檔案

程式碼地址 : https://gitee.com/tanghongping/hadoopMapReduce/tree/master/src/com/thp/bigdata/myInputFormat 需求: 現有一些原始日誌需要做增強解析處理,流程: 1、 從原始日誌檔案中讀取資料

springboot程式logback日誌基本配置,多個包不同日誌級別輸入檔案

日誌是程式中必不可少的內容。依據日誌我們可以有效診斷程式bug,統計使用者訪問和各主要功能的使用頻率時間段等資訊。因此我們會需要不同package使用不同的日誌級別,以及不同業務的日誌輸出到不同的檔案。下面本文簡要概述如何使用logback將不同包的不同級別日誌輸出到info1.txt中

javaSE (三十)IO流異常處理、圖片加密處理輸入檔案路徑並拷貝檔案內容、在檔案中鍵入內容

1、IO流異常處理: IO流異常處理一般可以寫得如下這麼複雜(面試備用) alt + shift + z 直接try-catch ,不過沒有加finally 因為作用域的問題,需要在外面建立BufferedInputStream物件並且初始化為null(要是不初始化,就

目錄檔案處理

  >>> import os >>> cwd = os.getcwd() >>> cwd '/home/dinsdale' cwd stands for “current working directory”. The resu

處理-CMD檢視目錄檔案

注意:本文適用於目錄下檔案較多的情況,複製檔名較慢,可採用此方法,結合Notepad++的列模式 1. 進入所要檢視檔案的目錄: cd /D d:test 2. 開啟echo命令: echo on 3. 輸出檔案下目錄到檔案: dir>content.txt

使用spark對輸入目錄檔案進行過濾

使用spark進行檔案過濾 在使用spark的很多情形下, 我們需要計算某個目錄的資料. 但這個資料夾下面並不是所有的檔案都是我們想要計算的 比如 : 對於某一天的資料,我們只想計算其中的幾個小時,這

DOS命令FOR批處理COPY指定目錄檔案到指定目錄

這段時間,俺把自己寫的Qt小專案的執行結果都截了個圖sample.png儲存,可是要想更新到git上的時候發現問題了,專案很多個,這要一個個COPY也還是有點工作量的。 思來想去,印象中DOS命令應該有

Linux學習日記 —— 4.1.2 檔案處理命令之目錄處理命令

目錄處理命令 ————建立目錄———— 命令名稱:mkdir 命令英文原意:make directories 命令所在路徑:/bin/mkdir 執行許可權:所有使用者 語法:mkdir [-p] [目錄名] 功能: 建立新目錄(資料夾) -p

MapReduce處理多個不同的出入檔案

MultipleInputs類指定不同的輸入檔案路徑以及輸入文化格式 現有兩份資料 phone 123,good number 124,common number 125,bad number user zhangsan,123 lisi,124 wangwu,125 現在

如何將程式的執行檔案和靜態載入動態庫放在不同目錄

一般windows程式的exe和dll需要放在同一個目錄,靜態載入才不會報錯,否則需要修改path環境變數,將所有沒有和exe放在同一目錄的dll的路徑加在path環境變數中。 有沒有一種方法不去手動修改path環境變數並且可以將exe和dll隨心所欲的改變路徑呢?我沒有發

第4章 處理使用者輸入與顯示資料------------(標準檔案描述符、STDIN/STDERR/STDOUT、臨時重定向、永久重定向exec)

4.4 顯示資料 4.4.1 標準檔案描述符          Linux系統將每個物件當作檔案處理。這包括輸入和輸出過程。Linux使用檔案描述符(file descriptor)標識每個檔案物件。檔案描述符是一個非負整數,可以唯一地標識會話中開啟的檔案。每個進行最多可以

Hadoop NameNode元資料相關檔案目錄解析

 下面所有的內容是針對Hadoop 2.x版本進行說明的,Hadoop 1.x和這裡有點不一樣。   在第一次部署好Hadoop叢集的時候,我們需要在NameNode(NN)節點上格式化磁碟:1[wyp@wyp hadoop-2.2.0]$  $HADOOP_HOME

Web---檔案上傳-用apache的工具處理、打散目錄、簡單檔案上傳進度

我們需要先準備好2個apache的類: 上一個部落格文章只講了最簡單的入門,現在來開始慢慢加深。 先過渡一下:只上傳一個file項 index.jsp: <h2>用apache的工具處理檔案上傳</h2>

MapReduce對輸入檔案處理

 MultipleInputs類指定不同的輸入檔案路徑以及輸入文化格式  現有兩份資料  phone  123,good number  124,common number  125,bad number  user  zhangsan,123  lisi,124  wan

[批處理] 把盤內指定檔案複製到指定目錄

  使用for 指令,    FOR %%variable IN (set) DO command [command-parameters]   /R 遞迴  進入根目錄樹 [Drive:]Path,

【總結】Hadoop NameNode元資料相關檔案目錄解析

原文:https://www.iteblog.com/archives/967.html 在第一次部署好叢集的時候,我們需要在NameNode(NN)節點上格式化磁碟: [[email protected]]$  $HADOOP_HOME/bin/hdfs

自定義hadoop map/reduce輸入檔案切割InputFormat 更改輸入value的分隔符

本文轉載自:http://hi.baidu.com/lzpsky/blog/item/99d58738b08a68e7b311c70d.html hadoop會對原始輸入檔案進行檔案切割,然後把每個split傳入mapper程式中進行處理,FileInputForma

hadoop 處理檔案的過程

注意:       1、如果一個檔案的大小小於這個機器上獲取的檔案的1.1倍時,則會預設為這個檔案為一個檔案進行處理       2、檔案的大小越小對像處理的時間要求就越大       3、一個切片就會交給一個mapTask,也真是消耗時間的原因       4、預設