1. 程式人生 > >HDFS 手寫mapreduce單詞計數框架

HDFS 手寫mapreduce單詞計數框架

一、資料處理類

package com.css.hdfs;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; /** * 需求:檔案(hello world hello teacher hello john tom ) 統計每個單詞出現的次數? * 資料儲存在hdfs、統計出來的結果儲存到hdfs * * 2004google:dfs/bigtable/mapreduce * * 大資料解決的問題? * 1.海量資料的儲存 * hdfs * 2.海量資料的計算 * mapreduce * * 思路? * hello 2 * world 1 * hello 1 * ... * * 基於使用者體驗: * 使用者輸入資料 * 使用者處理的方式 * 使用者指定結果資料儲存位置
*/ public class HdfsWordCount { public static void main(String[] args) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException, InterruptedException, URISyntaxException { // 反射 Properties pro = new Properties(); // 載入配置檔案 pro.load(HdfsWordCount.class
.getClassLoader().getResourceAsStream("job.properties")); Path inPath = new Path(pro.getProperty("IN_PATH")); Path outPath = new Path(pro.getProperty("OUT_PATH")); Class<?> mapper_class = Class.forName(pro.getProperty("MAPPER_CLASS")); // 例項化 Mapper mapper = (Mapper) mapper_class.newInstance(); Context context = new Context(); // 構建hdfs客戶端物件 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.146.132:9000/"), conf, "root"); // 讀取使用者輸入的檔案 RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inPath, false); while (iter.hasNext()) { LocatedFileStatus file = iter.next(); // 開啟路徑 獲取輸入流 FSDataInputStream in = fs.open(file.getPath()); BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8")); String line = null; while ((line = br.readLine()) != null) { // 呼叫map方法執行業務邏輯 mapper.map(line, context); } // 關閉資源 br.close(); in.close(); } // 如果使用者輸入的結果路徑不存在 則建立一個 Path out = new Path("/wc/out/"); if (!fs.exists(out)) { fs.mkdirs(out); } // 將快取的結果放入hdfs中儲存 HashMap<Object, Object> contextMap = context.getContextMap(); FSDataOutputStream out1 = fs.create(outPath); // 遍歷hashmap Set<Entry<Object, Object>> entrySet = contextMap.entrySet(); for (Entry<Object, Object> entry : entrySet) { // 寫資料 out1.write((entry.getKey().toString() + "\t" + entry.getValue() + "\n").getBytes()); } // 關閉資源 out1.close(); fs.close(); System.out.println("資料統計結果完成......"); } }

二、介面類

package com.css.hdfs;

/**
 * 思路:
 * 介面設計
 */
public interface Mapper {
    // 呼叫方法
    public void map(String line, Context context);
}

三、資料傳輸類

package com.css.hdfs;

import java.util.HashMap;

/**
 * 思路:
 * 資料傳輸的類
 * 封裝資料
 * 集合
 * <單詞,1>
 */
public class Context {
    // 資料封裝
    private HashMap<Object, Object> contextMap = new HashMap<>();
    
    // 寫資料
    public void write(Object key, Object value){
        // 放資料到map中
        contextMap.put(key, value);
    }
    
    // 定義根據key拿到值方法
    public Object get(Object key){
        return contextMap.get(key);
    }
    
    // 拿到map中的資料內容
    public HashMap<Object, Object> getContextMap(){
        return contextMap;
    }
}

四、單詞計數類

package com.css.hdfs;

/**
 * 思路:
 * 新增一個map方法 單詞切分 相同key的value ++
 */
public class WordCountMapper implements Mapper{

    @Override
    public void map(String line, Context context) {
        // 拿到這行資料 切分
        String[] words = line.split(" ");
        // 拿到單詞 相同的key value++  hello 1 world 1
        for (String word : words) {
            Object value = context.get(word);
            if (null == value) {
                context.write(word, 1);
            }else {
                // 不為空
                int v = (int)value;
                context.write(word, v+1);
            }
        }
    }
}

五、配置檔案job.properties

IN_PATH=/wc/in
OUT_PATH=/wc/out/rs.txt
MAPPER_CLASS=com.css.hdfs.WordCountMapper