BloomFilter 簡介及在 Hadoop reduce side join 中的應用
1、BloomFilter能解決什麼問題? 以少量的記憶體空間判斷一個元素是否屬於這個集合, 代價是有一定的錯誤率 2、工作原理 1. 初始化一個數組, 所有位標為0, A={x1, x2, x3,…,xm} (x1, x2, x3,…,xm 初始為0) 2. 將已知集合S中的每一個數組, 按以下方式對映到A中 2.0 選取n個互相獨立的hash函式 h1, h2, … hk 2.1 將元素通過以上hash函式得到一組索引值 h1(xi), h2(xi),…,hk(xi) 2.2 將集合A中的上述索引值標記為1(如果不同元素有重複, 則重複覆蓋為1, 這是一個覓等操作) 3. 對於一個元素x, 將其根據2.0中選取的hash函式, 進行hash, 得到一組索引值 h1(x), h2(x), …,hk(x) 如果集合A中的這些索引位置上的值都是1, 表示這個元素屬於集合S, 否則則不屬於S 舉例說明: 建立一個容量為500萬的Bit Array結構(Bit Array的大小和keyword的數量決定了誤判的機率),將集合中的每個keyword通過32個hash函式分別計算出32個數字,然後對這32個數字分別用500萬取模,然後將Bit Array中對應的位置為1,我們將其稱為特徵值。簡單的說就是將每個keyword對應到Bit Array中的32個位置上,見下圖:
當需要快速查詢某個keyword時,只要將其通過同樣的32個hash函式運算,然後對映到Bit Array中的對應位,如果Bit Array中的對應位全部是1,那麼說明該keyword匹配成功(會有誤判的機率)。 3、幾個前提 1. hash函式的計算不能效能太差, 否則得不償失 2. 任意兩個hash函式之間必須是獨立的. 即任意兩個hash函式不存在單一相關性, 否則hash到其中一個索引上的元素也必定會hash到另一個相關的索引上, 這樣多個hash沒有意義
4、錯誤率 工作原理的第3步, 的出來的結論, 一個是絕對靠譜的, 一個是不能100%靠譜的。在判斷一個元素是否屬於某個集合時,有可能會把不屬於這個集合的元素誤認為屬於這個集合(false positive)。因此,Bloom Filter不適合那些“零錯誤”的應用場合。而在能容忍低錯誤率的應用場合下,Bloom Filter通過極少的錯誤換取了儲存空間的極大節省。關於具體的錯誤率,這和最優的雜湊函式個數以及位陣列的大小有關,而這是可以估算求得一個最優解的: 雜湊函式個數k、位陣列大小m及字串數量n之間存在相互關係。相關文獻證明了對於給定的m、n,當 k = ln(2)* m/n 時出錯的概率是最小的。 具體的請看:http://blog.csdn.net/jiaomeng/article/details/1495500 5、基本特徵 從以上對基本原理和數學基礎的分析,我們可以得到Bloom filter的如下基本特徵,用於指導實際應用。 (1)存在一定錯誤率,發生在正向判斷上(存在性),反向判斷不會發生錯誤(不存在性); (2)錯誤率是可控制的,通過改變位陣列大小、hash函式個數或更低碰撞率的hash函式來調節; (3)保持較低的錯誤率,位陣列空位至少保持在一半以上; (4)給定m和n,可以確定最優hash個數,即k = ln2 * (m/n),此時錯誤率最小; (5)給定允許的錯誤率E,可以確定合適的位陣列大小,即m >= log2(e) * (n * log2(1/E)),繼而確定hash函式個數k; (6)正向錯誤率無法完全消除,即使不對位陣列大小和hash函式個數進行限制,即無法實現零錯誤率; (7)空間效率高,僅儲存“存在狀態”,但無法儲存完整資訊,需要其他資料結構輔助儲存; (8)不支援元素刪除操作,因為不能保證刪除的安全性。
6、應用場景舉例: (1)拼寫檢查、資料庫系統、檔案系統 (2)假設要你寫一個網路蜘蛛(web crawler)。由於網路間的連結錯綜複雜,蜘蛛在網路間爬行很可能會形成“環”。為了避免形成“環”,就需要知道蜘蛛已經訪問過那些URL。給一個URL,怎樣知道蜘蛛是否已經訪問過呢? (3)網路應用 P2P網路中查詢資源操作,可以對每條網路通路儲存Bloom Filter,當命中時,則選擇該通路訪問。 廣播訊息時,可以檢測某個IP是否已發包。 檢測廣播訊息包的環路,將Bloom Filter儲存在包裡,每個節點將自己新增入Bloom Filter。 資訊佇列管理,使用Counter Bloom Filter管理資訊流量。 (4)垃圾郵件地址過濾 像網易,QQ這樣的公眾電子郵件(email)提供商,總是需要過濾來自發送垃圾郵件的人(spamer)的垃圾郵件。一個辦法就是記錄下那些發垃圾郵件的email 地址。由於那些傳送者不停地在註冊新的地址,全世界少說也有幾十億個發垃圾郵件的地址,將他們都存起來則需要大量的網路伺服器。如果用雜湊表,每儲存一億個 email 地址,就需要1.6GB 的記憶體(用雜湊表實現的具體辦法是將每一個email 地址對應成一個八位元組的資訊指紋,然後將這些資訊指紋存入雜湊表,由於雜湊表的儲存效率一般只有50%,因此一個email 地址需要佔用十六個位元組。一億個地址大約要1.6GB, 即十六億位元組的記憶體)。因此存貯幾十億個郵件地址可能需要上百GB 的記憶體。而Bloom Filter只需要雜湊表1/8 到1/4 的大小就能解決同樣的問題。Bloom Filter決不會漏掉任何一個在黑名單中的可疑地址。而至於誤判問題,常見的補救辦法是在建立一個小的白名單,儲存那些可能別誤判的郵件地址。 (5)Bloomfilter在HBase中的作用 HBase利用Bloomfilter來提高隨機讀(Get)的效能,對於順序讀(Scan)而言,設定Bloomfilter是沒有作用的(0.92以後,如果設定了bloomfilter為ROWCOL,對於指定了qualifier的Scan有一定的優化,但不是那種直接過濾檔案,排除在查詢範圍的形式) Bloomfilter在HBase中的開銷? Bloomfilter是一個列族(cf)級別的配置屬性,如果你在表中設定了Bloomfilter,那麼HBase會在生成StoreFile時包含一份bloomfilter結構的資料,稱其為MetaBlock;MetaBlock與DataBlock(真實的KeyValue資料)一起由LRUBlockCache維護。所以,開啟bloomfilter會有一定的儲存及記憶體cache開銷。 Bloomfilter如何提高隨機讀(Get)的效能? 對於某個region的隨機讀,HBase會遍歷讀memstore及storefile(按照一定的順序),將結果合併返回給客戶端。如果你設定了bloomfilter,那麼在遍歷讀storefile時,就可以利用bloomfilter,忽略某些storefile。 注意:hbase的bloom filter是惰性載入的,在寫壓力比較大的情況下,會有不停的compact併產生storefile,那麼新的storefile是不會馬上將bloom filter載入到記憶體的,等到讀請求來的時候才載入。 這樣問題就來了,第一,如果storefile設定的比較大,max size為2G,這會導致bloom filter也比較大;第二,系統的讀寫壓力都比較大。這樣或許會經常出現單個 GET請求花費3-5秒的超時現象。 7、reduce side join + BloomFilter 在hadoop中的應用舉例: 在某些情況下,SemiJoin抽取出來的小表的key集合在記憶體中仍然存放不下,這時候可以使用BloomFiler以節省空間。將小表中的key儲存到BloomFilter中,在map階段過濾大表,可能有一些不在小表中的記錄沒有過濾掉(但是在小表中的記錄一定不會過濾掉),這沒關係,只不過增加了少量的網路IO而已。最後再在reduce階段做表間join即可。 這個過程其實需要先對小表的資料做BloomFilter訓練,構造一個BloomFilter樣本檔案(二進位制的),放到分散式快取,然後在map階段被讀入用來過濾大表。而hadoop早已經支援 BloomFilter 了,我們只需調相應的API即可,ok 下面上程式碼了。
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.zip.GZIPInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;
public class TrainingBloomfilter {
public static int getOptimalBloomFilterSize(int numRecords,
float falsePosRate) {
int size = (int) (-numRecords * (float) Math.log(falsePosRate) / Math
.pow(Math.log(2), 2));
return size;
}
public static int getOptimalK(float numMembers, float vectorSize) {
return (int) Math.round(vectorSize / numMembers * Math.log(2));
}
public static void main(String[] args) throws IOException {
Path inputFile = new Path("/tmp/decli/user1.txt");
int numMembers = Integer.parseInt("10");
float falsePosRate = Float.parseFloat("0.01");
Path bfFile = new Path("/tmp/decli/bloom.bin");
// Calculate our vector size and optimal K value based on approximations
int vectorSize = getOptimalBloomFilterSize(numMembers, falsePosRate);
int nbHash = getOptimalK(numMembers, vectorSize);
// create new Bloom filter
BloomFilter filter = new BloomFilter(vectorSize, nbHash,
Hash.MURMUR_HASH);
// Open file for read
System.out.println("Training Bloom filter of size " + vectorSize
+ " with " + nbHash + " hash functions, " + numMembers
+ " approximate number of records, and " + falsePosRate
+ " false positive rate");
String line = null;
int numRecords = 0;
FileSystem fs = FileSystem.get(new Configuration());
for (FileStatus status : fs.listStatus(inputFile)) {
BufferedReader rdr;
// if file is gzipped, wrap it in a GZIPInputStream
if (status.getPath().getName().endsWith(".gz")) {
rdr = new BufferedReader(new InputStreamReader(
new GZIPInputStream(fs.open(status.getPath()))));
} else {
rdr = new BufferedReader(new InputStreamReader(fs.open(status
.getPath())));
}
System.out.println("Reading " + status.getPath());
while ((line = rdr.readLine()) != null) {
filter.add(new Key(line.getBytes()));
++numRecords;
}
rdr.close();
}
System.out.println("Trained Bloom filter with " + numRecords
+ " entries.");
System.out.println("Serializing Bloom filter to HDFS at " + bfFile);
FSDataOutputStream strm = fs.create(bfFile);
filter.write(strm);
strm.flush();
strm.close();
System.out.println("Done training Bloom filter.");
}
}
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
public class BloomFilteringDriver {
public static class BloomFilteringMapper extends
Mapper<Object, Text, Text, NullWritable> {
private BloomFilter filter = new BloomFilter();
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
BufferedReader in = null;
try {
// 從當前作業中獲取要快取的檔案
Path[] paths = DistributedCache.getLocalCacheFiles(context
.getConfiguration());
for (Path path : paths) {
if (path.toString().contains("bloom.bin")) {
DataInputStream strm = new DataInputStream(
new FileInputStream(path.toString()));
// Read into our Bloom filter.
filter.readFields(strm);
strm.close();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// Get the value for the comment
String comment = value.toString();
// If it is null, skip this record
if (comment == null || comment.isEmpty()) {
return;
}
StringTokenizer tokenizer = new StringTokenizer(comment);
// For each word in the comment
while (tokenizer.hasMoreTokens()) {
// Clean up the words
String cleanWord = tokenizer.nextToken().replaceAll("'", "")
.replaceAll("[^a-zA-Z]", " ");
// If the word is in the filter, output it and break
if (cleanWord.length() > 0
&& filter.membershipTest(new Key(cleanWord.getBytes()))) {
context.write(new Text(cleanWord), NullWritable.get());
// break;
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
System.out.println("================ " + otherArgs[0]);
if (otherArgs.length != 3) {
System.err.println("Usage: BloomFiltering <in> <out>");
System.exit(1);
}
FileSystem.get(conf).delete(new Path(otherArgs[2]), true);
Job job = new Job(conf, "TestBloomFiltering");
job.setJarByClass(BloomFilteringDriver.class);
job.setMapperClass(BloomFilteringMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
DistributedCache.addCacheFile(new Path("/tmp/decli/bloom.bin").toUri(),
job.getConfiguration());
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
測試檔案:
user1.txt
test xiaowang xiao wang test user2.txt
test xiaowang xiao wang test test1 2xiaowang 1xiao wa2ng atest 執行命令:
hadoop jar trainbloom.jar TrainingBloomfilter hadoop jar bloom.jar BloomFilteringDriver /tmp/decli/user2.txt /tmp/decli/result
結果:
root@master 192.168.120.236 ~/lijun06 > hadoop fs -cat /tmp/decli/result/p* test xiaowang xiao wang test root@master 192.168.120.236 ~/lijun06 >
8、關於 hadoop mapreduce join 的幾種方式,請參考:
http://my.oschina.net/leejun2005/blog/95186
http://my.oschina.net/leejun2005/blog/111963
9、本文參考 or 推薦閱讀:
http://www.jiacheo.org/blog/304 http://blog.csdn.net/jiaomeng/article/details/1495500 http://www.iteye.com/blogs/tag/BloomFilter http://www.cnblogs.com/dong008259/archive/2012/01/04/2311332.html http://blog.csdn.net/liuben/article/details/6602683 http://ourmysql.com/archives/510?f=wb https://zh.wikipedia.org/wiki/%E5%B8%83%E9%9A%86%E8%BF%87%E6%BB%A4%E5%99%A8 http://www.oratea.net/?p=1248 http://zjushch.iteye.com/blog/1530143 https://github.com/adamjshook/mapreducepatterns/blob/master/MRDP/src/main/java/mrdp/appendixA/BloomFilterDriver.java https://github.com/adamjshook/mapreducepatterns/tree/master/MRDP/src/main/java/mrdp/ch3 https://github.com/alexholmes/hadoop-book/tree/master/src/main/java/com/manning/hip/ch7/bloom
bloom filter可以看做是對bit-map的擴充套件,只是 bitmap 一般只用了一個hash做對映,
具體可以參考:
http://www.cnblogs.com/pangxiaodong/archive/2011/08/14/2137748.html
http://kb.cnblogs.com/page/77440/