1. 程式人生 > >6.手寫MR框架

6.手寫MR框架

在這裡插入圖片描述 myjob.properties: IN_PATH=/mrtest/in OUT_PATH=/mrtest/out/rs.txt MAPPER_CLASS=com.mydemo.mr.WordCountMapper

1.HdfsWordCount:

public class HdfsWordCount {

	public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException, ClassNotFoundException, InstantiationException, IllegalAccessException {
		// TODO Auto-generated method stub
		//反射,載入配置資訊
		Properties pro = new Properties();
		
		pro.load(HdfsWordCount.class.getClassLoader().getResourceAsStream("myjob.properties"));
		
		Path inpath = new Path(pro.getProperty("IN_PATH"));
		
		Path outpath = new Path(pro.getProperty("OUT_PATH"));
		//構造工具類
		Context con_map = new Context();
		
		Class<?> Mapper = Class.forName(pro.getProperty("MAPPER_CLASS"));
		
		WordCountMapper Map = (WordCountMapper) Mapper.newInstance();
		
		Configuration conf = new Configuration();
		
		FileSystem fs = FileSystem.get(new URI("hdfs://192.168.232.132:9000"), conf, "root");
		
		RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(inpath, false);
		
		while(listFiles.hasNext()) {
			LocatedFileStatus file = listFiles.next();
			
			FSDataInputStream in = fs.open(file.getPath());
			
			BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8"));
			
			String read = null;
			while((read=br.readLine())!=null) {
				Map.Mapper_map(read, con_map);
			}
			
		br.close();
		in.close();
		}
		
		Path Out = new Path("/mrtest/out");
		if(fs.exists(Out)) {
			fs.mkdirs(Out);
		}
		HashMap<Object, Object> contextMap = con_map.get_Conmap();
		
		FSDataOutputStream out1 = fs.create(outpath);
		
		//遍歷hashmap
		Set<Entry<Object, Object>> entrySet = contextMap.entrySet();
		for(Entry<Object, Object> entry:entrySet) {
			//寫資料
			out1.write((entry.getKey().toString()+"\t"+entry.getValue()+"\n").getBytes());
		}
		
		out1.close();
		fs.close();
		
		System.out.println("資料統計結果完成....");
		
	}

}

2.Mapper:

public interface Mapper {

	public void Mapper_map(String line, Context con);
	
}

3.WordCountMapper:

public class WordCountMapper implements Mapper {

	@Override
	public void Mapper_map(String line, Context con) {
		// TODO Auto-generated method stub
		String [] line_sq = line.split(" ");
		
		for(String k: line_sq) {
			Object key = con.read_map(k);
			if(key==null) {
				con.write_map(k, 1);
			}else {
				Object va = con.read_map(k);
				int value = (int) va;
				con.write_map(k, value+1);
				
			}
		}
	}

}

4.Context:

public class Context {

	private HashMap<Object, Object> hm = new HashMap<>();
	
	public void write_map(Object key, Object value) {
		
		hm.put(key, value);
	}
	
	public Object read_map(Object key) {
		return hm.get(key);
	}
	
	public HashMap<Object, Object> get_Conmap(){
		return hm;
	}



	
	
}