6.手寫MR框架
阿新 • • 發佈:2018-12-18
myjob.properties: IN_PATH=/mrtest/in OUT_PATH=/mrtest/out/rs.txt MAPPER_CLASS=com.mydemo.mr.WordCountMapper
1.HdfsWordCount:
public class HdfsWordCount { public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException, ClassNotFoundException, InstantiationException, IllegalAccessException { // TODO Auto-generated method stub //反射,載入配置資訊 Properties pro = new Properties(); pro.load(HdfsWordCount.class.getClassLoader().getResourceAsStream("myjob.properties")); Path inpath = new Path(pro.getProperty("IN_PATH")); Path outpath = new Path(pro.getProperty("OUT_PATH")); //構造工具類 Context con_map = new Context(); Class<?> Mapper = Class.forName(pro.getProperty("MAPPER_CLASS")); WordCountMapper Map = (WordCountMapper) Mapper.newInstance(); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.232.132:9000"), conf, "root"); RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(inpath, false); while(listFiles.hasNext()) { LocatedFileStatus file = listFiles.next(); FSDataInputStream in = fs.open(file.getPath()); BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8")); String read = null; while((read=br.readLine())!=null) { Map.Mapper_map(read, con_map); } br.close(); in.close(); } Path Out = new Path("/mrtest/out"); if(fs.exists(Out)) { fs.mkdirs(Out); } HashMap<Object, Object> contextMap = con_map.get_Conmap(); FSDataOutputStream out1 = fs.create(outpath); //遍歷hashmap Set<Entry<Object, Object>> entrySet = contextMap.entrySet(); for(Entry<Object, Object> entry:entrySet) { //寫資料 out1.write((entry.getKey().toString()+"\t"+entry.getValue()+"\n").getBytes()); } out1.close(); fs.close(); System.out.println("資料統計結果完成...."); } }
2.Mapper:
public interface Mapper {
public void Mapper_map(String line, Context con);
}
3.WordCountMapper:
public class WordCountMapper implements Mapper { @Override public void Mapper_map(String line, Context con) { // TODO Auto-generated method stub String [] line_sq = line.split(" "); for(String k: line_sq) { Object key = con.read_map(k); if(key==null) { con.write_map(k, 1); }else { Object va = con.read_map(k); int value = (int) va; con.write_map(k, value+1); } } } }
4.Context:
public class Context { private HashMap<Object, Object> hm = new HashMap<>(); public void write_map(Object key, Object value) { hm.put(key, value); } public Object read_map(Object key) { return hm.get(key); } public HashMap<Object, Object> get_Conmap(){ return hm; } }