1. 程式人生 > >Hadoop中共享全域性資訊的幾種方法

Hadoop中共享全域性資訊的幾種方法

在編寫Hadoop MapReduce程式的過程中有時候需要在各個Mapper或者Reducer中使用一些共享的全域性資料,例如在處理整數資料表格的時候有時候需要讓每個Reducer知道各個列的取值範圍或是一些圖演算法中需要讓各個Reducer知道圖的連通關係。

加入key/value對 通用,但效率不高
將共享檔案放在HDFS上,採用Hadoop的檔案操作API訪問
通用,效率一般(可讀可寫)
將共享資訊加入JobConf/Configure物件,使用set/get系列方法訪存 較適用於小資訊,效率最高
將共享資訊加入DistributedCache物件 較適用於大量共享資訊(只能讀)

1, 最基本的方法是把需要共享的資訊加到key/value對中。這種方法簡單易行(用Text表示value,然後在正常資料後面加間隔符和全域性資料),但是網路效率和處理效率都受到非常嚴重的影響。另外有時候還需要重新設計MR的內容。

2, 把共享檔案放在HDFS上,在每個Mapper/Reducer中使用HDFS的檔案API去訪問。這種方法比較通用,但是需要涉及HDFS的檔案操作,較為複雜且效率會受到影響。

讀寫HDFS的API與標準Java檔案API有一點差異,需要使用特定的物件來建立InputStream/OutputStream。下面舉一個從HDFS檔案中讀取資訊的例子。

其中的關鍵點在於:首先根據當前的JobConf獲得當前的檔案系統(它預設從hadoop下的配置檔案中讀取相關資訊,同樣適用於單節點模式);然後要使用FileSystem的成員方法open開啟檔案(它返回一個FSDataInputStream,它是InputStream的子類),千萬不要試圖使用

一般的Java檔案API開啟輸入流或直接使用Hadoop的Path開啟檔案,如new Scanner(p.toString())或new Scanner(new Path(hdfs.getHomeDirectory(),p).toString()),會出現找不到檔案的異常(即使檔案就在所顯示的目錄裡面)

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

class XXX{
private int N;
List<Integer> D=new ArrayList<Integer>();
.....

	private void setConfByHDFS(Path p, JobConf conf) throws IOException {
		FileSystem hdfs = FileSystem.get(conf);
		Scanner s = new Scanner(hdfs.open(p));//使用hdfs.open開啟檔案輸入流
		N = s.nextInt();for (int i = 0; i < N; i++) {
			D.add(s.nextInt());
		}
		s.close();
	}
}
3, 使用JobConf的set*方法寫入配置資訊,再在Mapper/Reducer的configure方法裡面使用JobConf的get*方法讀取相關資訊。

由於資訊是寫入JobConf的,讀取的時候不設計HDFS的讀寫,效率最高。但是這種方法難以共享大量資訊。比較適合設定一些全域性變數。

實現的時候需要過載Mapper/Reducer的configure方法。

set*方法在JobConf中根據指定的名字建立一個指定型別值,get*方法根據名字訪問已經存入的值,對於基本型別可以通過一個額外的引數指定訪問失敗時返回的預設值(class方法失敗時返回null)。可以使用setInt/getInt,setFloat/getFloat這樣的方法存取如int、float這樣的型別;存取單個字串直接使用set/get方法;setStrings/getStrings方法的訪問的是一個String型別的陣列。

class XXX{
...
	public static class CSVReducer extends MapReduceBase implements
			Reducer<IntWritable, IntWritable, IntWritable, VectorIntWritable> {
		private int N=0;
		private ArrayList<Integer> D = new ArrayList<Integer>();

		@Override
		public void configure(JobConf job) {//只有這裡能訪問到JobConf
			super.configure(job);
			N=job.getInt("csvcount.conf.num", -1);//訪問共享資訊
			String str = job.get("csvcount.conf.d");
			for (String s : str.split(",")) {
				D.add(Integer.parseInt(s));
			}
		}

		@Override
		public void reduce(IntWritable key, Iterator<IntWritable> values,
				OutputCollector<IntWritable, VectorIntWritable> output, Reporter reporter) throws IOException {
			int[] res = new int[D.get(key.get())];
			// System.out.println(D.get(key.get()));
			...
		}
	}

	private void setConfByConfigure(Path p, JobConf conf) throws IOException {//建立任務後呼叫本函式類寫入全域性共享資訊
		FileSystem hdfs = FileSystem.get(conf);
		Scanner s = new Scanner(hdfs.open(p));
		int N = s.nextInt();
		ArrayList<Integer> D = new ArrayList<Integer>();
		for (int i = 0; i < N; i++) {
			D.add(s.nextInt());
		}
		s.close();
		conf.setInt("csvcount.conf.num", N);//寫入共享資訊
		conf.set("csvcount.conf.d", D.toString().replaceAll("[\\[\\] ]", ""));
	}

4, 寫入DistributedCache。它是Hadoop專門為共享一些只讀的全域性資訊提供的一個較為簡單的機制。Hadoop將所有加入DistributedCache的檔案都copy了一份到相關節點的本地臨時目錄中(還記得配置hadoop時候的配過的那個需要寫本地路徑的臨時目錄項嗎?),因此對這些檔案的讀寫完全是本地檔案的讀寫操作。因為這些檔案只被從HDFS複製到了本地而不回傳,所以對它們的寫操作是沒有意義的也是無法共享的。

使用的時候需要先呼叫DistributedCache的靜態方法addCacheFile將共享檔案/目錄的URI加入到任務JobConf中;訪問之前使用DistributedCache的另一個靜態方法getLocalCachedFiles將job中的共享檔案全都列出來,然後就可以使用標準的Java檔案API開啟檔案了。

在Mapper/Reducer中需要過載configure方法。

public class WordCount2 extends Configured implements Tool {

	public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
		private Set<String> patternsToSkip = new HashSet<String>();
		public void configure(JobConf job) {//過載的configure方法,用來從job中獲取DistributedCache資訊
			if (job.getBoolean("wordcount2.skip.patterns", false)) {
				Path[] patternsFiles = new Path[0];
				try {
					patternsFiles = DistributedCache.getLocalCacheFiles(job);//獲取DistributedCache檔案陣列
				} catch (IOException ioe) {
					System.err.println("Caught exception while getting cached files: "
							+ StringUtils.stringifyException(ioe));
				}
				for (Path patternsFile : patternsFiles)
					parseSkipFile(patternsFile);
			}
		}

		private void parseSkipFile(Path patternsFile) {
			try {
				BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString()));//正常開啟檔案
				String pattern = null;
				while ((pattern = fis.readLine()) != null) {
					patternsToSkip.add(pattern);
				}
				fis.close();
			} catch (IOException ioe) {}
		}

		public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {....}
	}

	public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {...}

	public static void main(String[] args) throws Exception {
		JobConf conf = new JobConf(getConf(), WordCount2.class);
		conf.setJobName("wordcount2");
		...
		List<String> other_args = new ArrayList<String>();
		for (int i = 0; i < args.length; ++i) {
			if ("-skip".equals(args[i])) {
				DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);//設定DistributedCache
				conf.setBoolean("wordcount2.skip.patterns", true);
			} else {
				other_args.add(args[i]);
			}
		}

		FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));
		FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));

		JobClient.runJob(conf);
		return 0;
	}
}

轉載請註明出處