1. 程式人生 > 其它 >BloomFilter 簡介及在 Hadoop reduce side join 中的應用

BloomFilter 簡介及在 Hadoop reduce side join 中的應用

1、BloomFilter能解決什麼問題?      以少量的記憶體空間判斷一個元素是否屬於這個集合, 代價是有一定的錯誤率 2、工作原理      1. 初始化一個數組, 所有位標為0,  A={x1, x2, x3,…,xm}  (x1, x2, x3,…,xm 初始為0)      2. 將已知集合S中的每一個數組, 按以下方式對映到A中           2.0  選取n個互相獨立的hash函式 h1, h2, … hk           2.1  將元素通過以上hash函式得到一組索引值 h1(xi), h2(xi),…,hk(xi)           2.2  將集合A中的上述索引值標記為1(如果不同元素有重複, 則重複覆蓋為1, 這是一個覓等操作)      3.  對於一個元素x, 將其根據2.0中選取的hash函式, 進行hash, 得到一組索引值 h1(x), h2(x), …,hk(x)           如果集合A中的這些索引位置上的值都是1, 表示這個元素屬於集合S, 否則則不屬於S 舉例說明: 建立一個容量為500萬的Bit Array結構(Bit Array的大小和keyword的數量決定了誤判的機率),將集合中的每個keyword通過32個hash函式分別計算出32個數字,然後對這32個數字分別用500萬取模,然後將Bit Array中對應的位置為1,我們將其稱為特徵值。簡單的說就是將每個keyword對應到Bit Array中的32個位置上,見下圖:

當需要快速查詢某個keyword時,只要將其通過同樣的32個hash函式運算,然後對映到Bit Array中的對應位,如果Bit Array中的對應位全部是1,那麼說明該keyword匹配成功(會有誤判的機率)。 3、幾個前提      1. hash函式的計算不能效能太差, 否則得不償失      2. 任意兩個hash函式之間必須是獨立的.           即任意兩個hash函式不存在單一相關性, 否則hash到其中一個索引上的元素也必定會hash到另一個相關的索引上, 這樣多個hash沒有意義

4、錯誤率      工作原理的第3步, 的出來的結論, 一個是絕對靠譜的, 一個是不能100%靠譜的。在判斷一個元素是否屬於某個集合時,有可能會把不屬於這個集合的元素誤認為屬於這個集合(false positive)。因此,Bloom Filter不適合那些“零錯誤”的應用場合。而在能容忍低錯誤率的應用場合下,Bloom Filter通過極少的錯誤換取了儲存空間的極大節省。關於具體的錯誤率,這和最優的雜湊函式個數以及位陣列的大小有關,而這是可以估算求得一個最優解的: 雜湊函式個數k、位陣列大小m及字串數量n之間存在相互關係。相關文獻證明了對於給定的m、n,當 k = ln(2)* m/n 時出錯的概率是最小的。  具體的請看:http://blog.csdn.net/jiaomeng/article/details/1495500 5、基本特徵 從以上對基本原理和數學基礎的分析,我們可以得到Bloom filter的如下基本特徵,用於指導實際應用。 (1)存在一定錯誤率,發生在正向判斷上(存在性),反向判斷不會發生錯誤(不存在性); (2)錯誤率是可控制的,通過改變位陣列大小、hash函式個數或更低碰撞率的hash函式來調節; (3)保持較低的錯誤率,位陣列空位至少保持在一半以上; (4)給定m和n,可以確定最優hash個數,即k = ln2 * (m/n),此時錯誤率最小; (5)給定允許的錯誤率E,可以確定合適的位陣列大小,即m >= log2(e) * (n * log2(1/E)),繼而確定hash函式個數k; (6)正向錯誤率無法完全消除,即使不對位陣列大小和hash函式個數進行限制,即無法實現零錯誤率; (7)空間效率高,僅儲存“存在狀態”,但無法儲存完整資訊,需要其他資料結構輔助儲存; (8)不支援元素刪除操作,因為不能保證刪除的安全性。

6、應用場景舉例: (1)拼寫檢查、資料庫系統、檔案系統 (2)假設要你寫一個網路蜘蛛(web crawler)。由於網路間的連結錯綜複雜,蜘蛛在網路間爬行很可能會形成“環”。為了避免形成“環”,就需要知道蜘蛛已經訪問過那些URL。給一個URL,怎樣知道蜘蛛是否已經訪問過呢? (3)網路應用   P2P網路中查詢資源操作,可以對每條網路通路儲存Bloom Filter,當命中時,則選擇該通路訪問。   廣播訊息時,可以檢測某個IP是否已發包。   檢測廣播訊息包的環路,將Bloom Filter儲存在包裡,每個節點將自己新增入Bloom Filter。   資訊佇列管理,使用Counter Bloom Filter管理資訊流量。 (4)垃圾郵件地址過濾   像網易,QQ這樣的公眾電子郵件(email)提供商,總是需要過濾來自發送垃圾郵件的人(spamer)的垃圾郵件。一個辦法就是記錄下那些發垃圾郵件的email 地址。由於那些傳送者不停地在註冊新的地址,全世界少說也有幾十億個發垃圾郵件的地址,將他們都存起來則需要大量的網路伺服器。如果用雜湊表,每儲存一億個 email 地址,就需要1.6GB 的記憶體(用雜湊表實現的具體辦法是將每一個email 地址對應成一個八位元組的資訊指紋,然後將這些資訊指紋存入雜湊表,由於雜湊表的儲存效率一般只有50%,因此一個email 地址需要佔用十六個位元組。一億個地址大約要1.6GB, 即十六億位元組的記憶體)。因此存貯幾十億個郵件地址可能需要上百GB 的記憶體。而Bloom Filter只需要雜湊表1/8 到1/4 的大小就能解決同樣的問題。Bloom Filter決不會漏掉任何一個在黑名單中的可疑地址。而至於誤判問題,常見的補救辦法是在建立一個小的白名單,儲存那些可能別誤判的郵件地址。 (5)Bloomfilter在HBase中的作用       HBase利用Bloomfilter來提高隨機讀(Get)的效能,對於順序讀(Scan)而言,設定Bloomfilter是沒有作用的(0.92以後,如果設定了bloomfilter為ROWCOL,對於指定了qualifier的Scan有一定的優化,但不是那種直接過濾檔案,排除在查詢範圍的形式)        Bloomfilter在HBase中的開銷?  Bloomfilter是一個列族(cf)級別的配置屬性,如果你在表中設定了Bloomfilter,那麼HBase會在生成StoreFile時包含一份bloomfilter結構的資料,稱其為MetaBlock;MetaBlock與DataBlock(真實的KeyValue資料)一起由LRUBlockCache維護。所以,開啟bloomfilter會有一定的儲存及記憶體cache開銷。       Bloomfilter如何提高隨機讀(Get)的效能?  對於某個region的隨機讀,HBase會遍歷讀memstore及storefile(按照一定的順序),將結果合併返回給客戶端。如果你設定了bloomfilter,那麼在遍歷讀storefile時,就可以利用bloomfilter,忽略某些storefile。       注意:hbase的bloom filter是惰性載入的,在寫壓力比較大的情況下,會有不停的compact併產生storefile,那麼新的storefile是不會馬上將bloom filter載入到記憶體的,等到讀請求來的時候才載入。  這樣問題就來了,第一,如果storefile設定的比較大,max size為2G,這會導致bloom filter也比較大;第二,系統的讀寫壓力都比較大。這樣或許會經常出現單個 GET請求花費3-5秒的超時現象。 7、reduce side join + BloomFilter 在hadoop中的應用舉例: 在某些情況下,SemiJoin抽取出來的小表的key集合在記憶體中仍然存放不下,這時候可以使用BloomFiler以節省空間。將小表中的key儲存到BloomFilter中,在map階段過濾大表,可能有一些不在小表中的記錄沒有過濾掉(但是在小表中的記錄一定不會過濾掉),這沒關係,只不過增加了少量的網路IO而已。最後再在reduce階段做表間join即可。 這個過程其實需要先對小表的資料做BloomFilter訓練,構造一個BloomFilter樣本檔案(二進位制的),放到分散式快取,然後在map階段被讀入用來過濾大表。而hadoop早已經支援 BloomFilter 了,我們只需調相應的API即可,ok 下面上程式碼了。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.zip.GZIPInputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;

public class TrainingBloomfilter {

	public static int getOptimalBloomFilterSize(int numRecords,
			float falsePosRate) {
		int size = (int) (-numRecords * (float) Math.log(falsePosRate) / Math
				.pow(Math.log(2), 2));
		return size;
	}

	public static int getOptimalK(float numMembers, float vectorSize) {
		return (int) Math.round(vectorSize / numMembers * Math.log(2));
	}

	public static void main(String[] args) throws IOException {

		Path inputFile = new Path("/tmp/decli/user1.txt");
		int numMembers = Integer.parseInt("10");
		float falsePosRate = Float.parseFloat("0.01");
		Path bfFile = new Path("/tmp/decli/bloom.bin");

		// Calculate our vector size and optimal K value based on approximations
		int vectorSize = getOptimalBloomFilterSize(numMembers, falsePosRate);
		int nbHash = getOptimalK(numMembers, vectorSize);

		// create new Bloom filter
		BloomFilter filter = new BloomFilter(vectorSize, nbHash,
				Hash.MURMUR_HASH);

		// Open file for read

		System.out.println("Training Bloom filter of size " + vectorSize
				+ " with " + nbHash + " hash functions, " + numMembers
				+ " approximate number of records, and " + falsePosRate
				+ " false positive rate");

		String line = null;
		int numRecords = 0;
		FileSystem fs = FileSystem.get(new Configuration());
		for (FileStatus status : fs.listStatus(inputFile)) {
			BufferedReader rdr;
			// if file is gzipped, wrap it in a GZIPInputStream
			if (status.getPath().getName().endsWith(".gz")) {
				rdr = new BufferedReader(new InputStreamReader(
						new GZIPInputStream(fs.open(status.getPath()))));
			} else {
				rdr = new BufferedReader(new InputStreamReader(fs.open(status
						.getPath())));
			}

			System.out.println("Reading " + status.getPath());
			while ((line = rdr.readLine()) != null) {
				filter.add(new Key(line.getBytes()));
				++numRecords;
			}

			rdr.close();
		}

		System.out.println("Trained Bloom filter with " + numRecords
				+ " entries.");

		System.out.println("Serializing Bloom filter to HDFS at " + bfFile);
		FSDataOutputStream strm = fs.create(bfFile);
		filter.write(strm);

		strm.flush();
		strm.close();

		System.out.println("Done training Bloom filter.");

	}

}
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;

public class BloomFilteringDriver {

	public static class BloomFilteringMapper extends
			Mapper<Object, Text, Text, NullWritable> {

		private BloomFilter filter = new BloomFilter();

		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {

			BufferedReader in = null;

			try {
				// 從當前作業中獲取要快取的檔案
				Path[] paths = DistributedCache.getLocalCacheFiles(context
						.getConfiguration());
				for (Path path : paths) {
					if (path.toString().contains("bloom.bin")) {
						DataInputStream strm = new DataInputStream(
								new FileInputStream(path.toString()));
						// Read into our Bloom filter.
						filter.readFields(strm);
						strm.close();
					}
				}
			} catch (IOException e) {
				e.printStackTrace();
			} finally {
				try {
					if (in != null) {
						in.close();
					}
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}

		@Override
		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {

			// Get the value for the comment
			String comment = value.toString();

			// If it is null, skip this record
			if (comment == null || comment.isEmpty()) {
				return;
			}

			StringTokenizer tokenizer = new StringTokenizer(comment);
			// For each word in the comment
			while (tokenizer.hasMoreTokens()) {

				// Clean up the words
				String cleanWord = tokenizer.nextToken().replaceAll("'", "")
						.replaceAll("[^a-zA-Z]", " ");

				// If the word is in the filter, output it and break
				if (cleanWord.length() > 0
						&& filter.membershipTest(new Key(cleanWord.getBytes()))) {
					context.write(new Text(cleanWord), NullWritable.get());
					// break;
				}
			}
		}
	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();
		System.out.println("================ " + otherArgs[0]);
		if (otherArgs.length != 3) {
			System.err.println("Usage: BloomFiltering <in> <out>");
			System.exit(1);
		}

		FileSystem.get(conf).delete(new Path(otherArgs[2]), true);

		Job job = new Job(conf, "TestBloomFiltering");
		job.setJarByClass(BloomFilteringDriver.class);
		job.setMapperClass(BloomFilteringMapper.class);
		job.setNumReduceTasks(0);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

		DistributedCache.addCacheFile(new Path("/tmp/decli/bloom.bin").toUri(),
				job.getConfiguration());

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}

測試檔案:

user1.txt

test xiaowang xiao wang test user2.txt

test xiaowang xiao wang test test1 2xiaowang 1xiao wa2ng atest 執行命令:

hadoop jar trainbloom.jar TrainingBloomfilter  hadoop jar bloom.jar BloomFilteringDriver /tmp/decli/user2.txt /tmp/decli/result

結果:

root@master 192.168.120.236 ~/lijun06 > hadoop fs -cat /tmp/decli/result/p* test xiaowang xiao wang test root@master 192.168.120.236 ~/lijun06 >

8、關於 hadoop mapreduce join 的幾種方式,請參考:

http://my.oschina.net/leejun2005/blog/95186

http://my.oschina.net/leejun2005/blog/111963

9、本文參考 or 推薦閱讀:

http://www.jiacheo.org/blog/304 http://blog.csdn.net/jiaomeng/article/details/1495500 http://www.iteye.com/blogs/tag/BloomFilter http://www.cnblogs.com/dong008259/archive/2012/01/04/2311332.html http://blog.csdn.net/liuben/article/details/6602683 http://ourmysql.com/archives/510?f=wb https://zh.wikipedia.org/wiki/%E5%B8%83%E9%9A%86%E8%BF%87%E6%BB%A4%E5%99%A8 http://www.oratea.net/?p=1248 http://zjushch.iteye.com/blog/1530143 https://github.com/adamjshook/mapreducepatterns/blob/master/MRDP/src/main/java/mrdp/appendixA/BloomFilterDriver.java https://github.com/adamjshook/mapreducepatterns/tree/master/MRDP/src/main/java/mrdp/ch3 https://github.com/alexholmes/hadoop-book/tree/master/src/main/java/com/manning/hip/ch7/bloom

bloom filter可以看做是對bit-map的擴充套件,只是 bitmap 一般只用了一個hash做對映,

具體可以參考:

http://www.cnblogs.com/pangxiaodong/archive/2011/08/14/2137748.html

http://kb.cnblogs.com/page/77440/

http://hongweiyi.com/2012/03/data-structure-bitmap/

http://blog.csdn.net/hit_kongquan/article/details/6255673