1. 程式人生 > >布隆過濾器簡述及應用

布隆過濾器簡述及應用

一、布隆過濾器

1、維基百科

  布隆過濾器(Bloom Filter)是1970年由布隆提出的。

  實際上是一個很長的二進位制向量和一系列隨機對映函式。布隆過濾器可以用於檢索一個元素是否在一個集合中。

  優點是不需要儲存 key,節省空間,空間效率和查詢時間都遠遠超過一般的演算法,缺點是有一定的誤識別率和刪除困難。

2、原理概念

  如果想判斷一個元素是不是在一個集合裡,一般想到的是將集合中所有元素儲存起來,然後通過比較確定。

  連結串列、樹、散列表(雜湊表)等等資料結構都是這種思路,但是隨著集合中元素的增加,需要的儲存空間越來越大;同時檢索速度也越來越慢,檢索時間複雜度分別是O(n)、O(log n)、O(1)。

  布隆過濾器的原理是,當一個元素被加入集合時,通過 K 個雜湊函式將這個元素對映成一個位數組(Bit array)中的 K 個點,把它們置為 1 。檢索時,只要看看這些點是不是都是1就知道元素是否在集合中;如果這些點有任何一個 0,則被檢元素一定不在;如果都是1,則被檢元素很可能在(之所以說“可能”是誤差的存在)。

3、自我理解

  直觀的說,Bloom 演算法類似於一個 HashSet(通過雜湊演算法得出元素的雜湊地址,通過對比雜湊地址就可以確定兩個物件是否為同一個地址),用來判斷某個元素(key)是否在某個集合中。

  和一般的 HashSet 不同的是,Bloom Filter 演算法無需儲存 key 的值,對於每個 key,只需要 k 個位元位,每個儲存一個標誌,用來判斷 key 是否在集合中。

 

二、演算法解析

1、BloomFilter 流程

  1. 首先需要 k 個 hash 函式,每個函式可以把 key 雜湊成為 1 個整數;

  2. 初始化時,需要一個長度為 n 位元的陣列,每個位元位初始化為 0;

  3. 某個 key 加入集合時,用 k 個 hash 函式計算出 k 個雜湊值,並把陣列中對應的位元位置為 1;

  4. 判斷某個 key 是否在集合時,用 k 個 hash 函式計算出 k 個雜湊值,並查詢陣列中對應的位元位,如果所有的位元位都是1,認為在集合中。

2、關於雜湊衝突

  假設 Hash 函式是良好的,如果我們的位陣列長度為 m 個點,那麼如果我們想將衝突率降低到例如 1%, 這個散列表就只能容納 m/100個元素。顯然這就不叫空間效率了(Space-efficient)了。解決方法,就是使用多個 Hash,如果它們有一個說元素不在集合中,那肯定就不在。如果它們都說在,雖然也有一定可能性它們都在說謊,不過直覺上判斷這種事情的概率是比較低的。--- 如上 BloomFilter 流程

  一個 Bloom Filter 是基於一個 m 位的位向量(b1,…bm),這些位向量的初始值為0。另外,還有一系列的hash函式(h1,…,hk),這些 hash 函式的值域屬於1~m。

3、演算法實現示意圖

  一個 bloom filter 插入 {x, y, z},並判斷某個值 w 是否在該資料集:

  解析:m=18,k=3;插入 x 是,三個 hash 函式分別得到藍線對應的三個值,並將對應的位向量改為1,插入 y,z 時,類似的,分別將紅線,紫線對應的位向量改為1。查詢時,當查詢 x 時,三個 hash 值對應的位向量都為1,因此判斷 x 在此資料集中。y,z 也是如此。但是查詢 w 時,w 有個 hash 值對應的位向量為0,因此可以判斷不在此集合中。但是,假如 w 的最後那個 hash 值是1,這時就會認為 w 在此集合中,而事實上,w 可能不在此集合中,因此可能出現誤報。顯然的,插入資料越多,1的位數越多,誤報的概率越大。

  Wiki的Bloom Filter詞條有關於誤報的概率的詳細分析:Probability of false positives。從分析可以看出,當 k 比較大時,誤報概率還是比較小的。

 

三、BloomFilter 的應用

1、一些應用場景

  黑名單:比如郵件黑名單過濾器,判斷郵件地址是否在黑名單中。

  排序(僅限於 BitSet) 。

  網路爬蟲:判斷某個URL是否已經被爬取過。

  K-V系統快速判斷某個key是否存在:典型的例子有 Hbase,Hbase 的每個 Region 中都包含一個 BloomFilter,用於在查詢時快速判斷某個 key 在該 region 中是否存在,如果不存在,直接返回,節省掉後續的查詢。

2、一致性校驗(ConsistencyCheck)

  Background:Database migration(SQL Server migrate to MySQL),遷移後的資料一致性校驗。

  Design:使用 BloomFilter 進行 ConsistencyCheck

  Process:

    ① Migrate

    ② Hash the MySQL tables to BloomFilter

    ③ Use the SQL Server tables data to check

3、Python Code:

 1 import pymysql
 2 import pymssql
 3 import time
 4 from bloompy import ScalableBloomFilter
 5 
 6 def timenow():
 7     timestr = time.localtime(int(time.time()))
 8     now = time.strftime("%Y-%m-%d %H:%M:%S", timestr)
 9     return now 
10 
11 #configure sql server connect
12 def mssql_conn(): 
13     conn = pymssql.connect(
14                 server="***", 
15                 user="***", 
16                 password="***", 
17                 database="***")
18     return conn     
19     
20 #configure mysql connect
21 def mysql_conn(): 
22     conn = pymysql.connect(
23                 host="***",
24                 port=3306,
25                 user="***", 
26                 password="***", 
27                 database="***")
28     return conn 
29 
30 def bloomf():
31     bloom = ScalableBloomFilter(initial_capacity=100, error_rate=0.001, mode=ScalableBloomFilter.LARGE_SET_GROWTH) 
32     conn = mysql_conn() 
33     cur = conn.cursor()    
34     print('*** Target table data add to BloomFilter ***\n...')
35     try:
36         cur.execute(t_sql)
37         result = cur.fetchone()
38         while result != None:
39             bloom.add(result)
40             result = cur.fetchone()
41     except:
42         print ("Error: unable to fetch data.")    
43     finally:
44         print('Finished add.\n')
45         cur.close() 
46         conn.close() 
47 
48     print(timenow(),'\n*** Compare source to target data ***\n...')        
49     conn = mssql_conn() 
50     cur = conn.cursor()        
51     try:
52         cur.execute(s_sql)
53         num = 0
54         result = cur.fetchone()
55         while result != None:        
56             if result in bloom:
57                 pass
58             else:
59                 print('{} is not in the bloom filter,not in Target table {}.'.format(result,tab))
60                 num += 1
61             result = cur.fetchone()
62         if num == 0:
63         
64             print('Result: {} ==> Target table data matches source table data.'.format(tab))
65         else:
66             print('\nResult: Need to compare output to repair data.')
67     except:
68         print ("Error: unable to fetch data.")
69     finally:
70         cur.close() 
71         conn.close() 
72         
73 
74 if __name__ == '__main__': 
75     tab  ='***'
76     t_sql='select concat(***, ***, ***, UpdateDate) from ***;'
77     s_sql="select convert(varchar(20),***)+convert(varchar(20),***)+convert(varchar(20),***,20)+convert(varchar(25),UpdateDate,21)+'000' from ***"
78     print('#Start:',timenow(),'\n')
79     bloomf()
80     print('\n#End:',timenow())

&n