布隆過濾器(Bloom Filters)的原理及程式碼實現(Python + Java)
本文介紹了布隆過濾器的概念及變體,這種描述非常適合程式碼模擬實現。重點在於標準布隆過濾器和計算布隆過濾器,其他的大都在此基礎上優化。文末附上了標準布隆過濾器和計算布隆過濾器的程式碼實現(Java版和Python版)
本文內容皆來自 《Foundations of Computers Systems Research》一書,自己翻譯的,轉載請註明出處,不準確的部分請告知,歡迎討論。
-
布隆過濾器是什麼?
布隆過濾器是一個高效的資料結構,用於集合成員查詢,具有非常低的空間複雜度。
-
標準布隆過濾器(Standard Bloom Filters,SBF)
基本情況
布隆過濾器是一個含有 m 個元素的位陣列(元素為0或1),在剛開始的時候,它的每一位都被設為0。同時還有 k 個獨立的雜湊函式 h1, h2,..., hk 。需要將集合中的元素加入到布隆過濾器中,然後就可以支援查詢了。說明如下:- 計算h1(x), h2(x),...,hk(x),其計算結果對應陣列的位置,並將其全部置1。一個位置可以被多次置1,但只有一次有效。
- 當查詢某個元素是否在集合中時,計算這 k 個雜湊函式,只有當其計算結果全部為1時,我們就認為該元素在集合內,否則認為不在。
- 布隆過濾器存在假陽性的可能,即當所有雜湊值都為1時,該元素也可能不在集合內,但該演算法認為在裡面。
- 假陽性出現的概率被雜湊函式的數量、位陣列大小、以及集合元素等因素決定。
假陽性率評估 為了評估假陽性率,需要基於一個假設:雜湊函式都是完美隨機的。約定幾個變數: - k 雜湊函式的數量
- n 集合 S 中元素的數量
- m 位陣列的大小
- p 位陣列中某一位為0的概率
- f 假陽性的概率
最佳的雜湊函式數量
根據數學推理得(過程就算了):當 p = 1/2, k = ln2 * (m/n)時,f 最小為(1/2)^k
可以看出,當位陣列中有一半零一半一時,結果最好。
事實上,m 是 n 的倍數,而且 k 常取最接近但小於理論值的整數值。部分佈隆過濾器(partial bloom filters) -
計算布隆過濾器(Counting Bloom Filters,CBF)
標準的布隆過濾器有一個致命的缺點:不支援刪除元素。CBF協議解決的這個問題。 - 將標準布隆過濾器中的位陣列變成整數陣列,即可以用多位表示。
- 標準布隆過濾器每個位置可以被多次置1,但只有一次有效,這樣,某一個位置被多個元素雜湊對映,當要刪除其中一個元素時,該元素雜湊對映的位置都應該變為零,那麼就會破壞其他元素的對映,會出現假陰性。
- 由於計算布隆過濾器的陣列可以表示更大的整數,那麼當某個位置被對映到時,該位置的計數值就自增1,而當某個元素被刪除時,就將其對映位置的計數值減1。這樣就解決了SBF的問題。
- CBF同樣存在問題,因為當計數值自增時可能會溢位,當計數值為4位元時,溢位的概率為:1.37 * 10^-15 * m,雖然很低,但對某些應用可能不夠。一個簡單的解決方法是,當計數值到達最大值時,就不在自增,但這導致假陰性。
-
壓縮布隆過濾器(Compressed Bloom Filters)
在網路應用中,布隆過濾器通常被作為資訊在各節點間傳送,為了節約資源,自然而然就想能不能壓縮布隆過濾器後再傳送。 - 由前面我們知道,要使得布隆過濾器有最小的假陽性概率,陣列中包含的0或1的概率應該是一樣的,根據夏農編碼原理(Shannon coding principle),這樣的布隆過濾器不能被壓縮。雖然這樣的布隆過濾器不能被直接壓縮,但我們可以用其他方法達到一樣的效果。
- 要使得布隆過濾器 x 與布隆過濾器 y( 包含的0或1的概率應該是一樣的)具有相同的假陽性概率,那麼,x 的大小要大於 y 的,x 的雜湊函式的數量不同於 y 的,這樣 x 中包含的0和1的數量就不同,x 就可以被壓縮。
- 問題出來了,壓縮布隆過濾器的原因是更節省空間,我們找了個更大的布隆過濾器壓縮,那麼壓縮後的布隆過濾器的空間效率比原布隆過濾器更加優秀嗎?是的。
- 壓縮後,布隆過濾器的本地儲存空間會變大,但雜湊函式數量會變小(更少的對映操作)、傳送的位更少。
-
D-left 計算布隆過濾器(D-left Counting Bloom Filters)
上面提到的計算布隆過濾器存在這樣的缺點:儲存空間是標準布隆過濾器的數倍(取決於計數值的位數)和計數值的不均勻(有些始終為0,有些則可能溢位)。下面看看 D-left Counting Bloom Filters 的特點。D-left Counting Bloom Filters 基於 D-left Hashing。 D-left Hashing 基本結構
- 將一個雜湊表分成幾個不相交的子表(subtable)
- 每個子表裡都有數量相同的桶(bucket)
- 每個桶裡都有一定數量的單元(cell,單元包括特徵值和計數值)
- 每個單元都是固定的位陣列成,用來儲存元素的特徵值(fingerprint)
- 只有一個雜湊函式,該雜湊函式可以生成和子表數量相同的桶地址和一個特徵值
假設有 d 個子表,元素為 x,雜湊函式為 f- 計算 f(x),生成桶地址 addr0, addr1, ..., addr(d-1),特徵值 p
- 我們檢查子表 i 中地址為 addri 的桶中的所有單元(i = 0,1,...,d-1)
- 如果某個單元中的特徵值和 p 相等,那麼元素 x 就在該雜湊表中
- 若沒有找到這樣的單元,那麼需要找到儲存特徵值最少的桶(在上面生成的桶地址中找),然後將該特徵值 p 隨機放入該桶的一個空單元中,該單元的計數值變為1,這考慮了裝載平衡
D-left Counting Bloom Filters
由上可知,d-left Hashing 的計數值最大為零,不支援刪除操作,為了將它變成可 Counting,可以讓它的計數值變成由多位組成。但這樣依然會出現問題,如下:- 假設 d-left counting bloom filter 包含 4 個子表,每個子表又包含 4 個桶,初始為空。
- 現在有兩個元素 x 和 y 需要對映到過濾器中,f(x) = (1, 1, 1, 1,r), f(y) = (1, 2, 3, 4, r)
- 已知插如 x 時,第四個子表的第一個桶最空,x 的特徵值 r 被插入該桶的某一個單元中,該單元計數值變為1,而插入 y 時,第一個子表的第一個桶最空,y 的特徵值 r 被插入該桶的某一個單元中,該單元計計數值變為1
- 現在要刪除 x,那麼就會尋找每個子表的第一個桶中的單元,這時,在第一個子表的第一個桶中找到了特徵值 r,接下來就會將該單元的計數值減 1 變為 0,同時,儲存的特徵值被刪除,變為空。
- 現在查詢 x 是否在表中,結果返回真,而查詢 y 是否在表中,結果返回假,導致錯誤。
為什麼會出現上面的情況?由三個因素促成
- x 和 y 有相同的特徵值 r
- f(x) 和 f(y) 生成的地址有相同的
- x 和 y 特徵值儲存的地方還不一樣(存一樣就不會出錯)
如何解決?
說實話,沒看懂英文描述的內容。。。。大致是做了排列置換等操作
效能分析比普通的計算布隆過濾器空間少了一半甚至更多,而且效率也有提升(假陽性更低)
-
Spectral Bloom Filters
Counting Bloom Filters 可以進行元素的刪除操作,然而卻不能記錄一個元素被對映的頻率,而且很多應用中元素出現的頻率相差很大,也就是說,CBF中每個計數值的位數一樣,那麼有些計數值很快就會溢位,而另一些則一直都很小。這些問題可以被 Spectral Bloom Filters 解決。 在SBF中,每一個計數值的位數都是動態改變的。它的構造我沒看懂,先留著吧
-
Dynamic Counting Filters
Spectral bloom filter 被提出來解決元素頻率查詢問題,但是,它構造了一個複雜的索引資料結構去解決動態計算器的儲存問題。Dynamic counting bloom filter(比SBF好理解多了) 是一個空間時間都很高效的資料結構,支援元素頻率查詢。相比於SBF,在實際應用中(計數器不是很大,改變不是很頻繁時)它有更快的訪問時間和更小的記憶體消耗。 構成部分
- DCBF由兩部分組成,第一部分是基礎的計算布隆過濾器
- 第二部分是一個同樣大小的向量,用於記錄第一部分中計算器溢位的次數
- 第一部分中的計算器位數固定,第二部分中每個溢位計算器位數動態改變
特點
- 當第二部分溢位計算器也面臨溢位時,會重新申請一個向量,給要溢位部分增加位數,其他溢位計算器直接拷貝到新的向量中的對應位置,舊的向量會被釋放
-
學習案例
Summary Cache
在網路中有極大的資源請求,如果所有的請求都由伺服器來處理,網路就會出現擁堵,效能就會下降。所以網路中有大量的中間代理節點。這些代理會把一部分資源放在自己的本地快取,當用戶向伺服器請求資源時,該代理先會檢查該資源是否在自己的快取中,如果在就直接傳送給使用者,否則再向伺服器請求。一個代理能夠儲存的資源是非常有限的,為了進一步減輕伺服器的負載,網路中相鄰的代理都可以共享自己的快取。這樣,當代理 A 本地快取沒有時,就會向相鄰代理廣播請求,查詢他們是否有該快取。
然而,這樣依舊有很大問題,假設,這裡有 N 個代理,每個代理的命中率為 H,一個代理平均請求 R 次,那麼廣播中,一個代理收到的查詢資訊共有 (N-1) * (1-H) * R 條,總共的請求也就是
N * (N-1) * (1-H) * R。這是非常低效的。
再次改進,各個代理之間交換自己快取的摘要資訊。這樣,當代理 A 失敗後,會先查詢各個代理的摘要資訊,然後決定是定向向某個代理請求,還是向伺服器請求資源。這就大大的減少了網路通訊量。為了滿足快速查詢、更新摘要資訊,一個非常好的選擇就是計算布隆過濾器(Counting bloom filters)。IP Traceback
網路中存在許多攻擊,有時候需要根據一些資料包去還原IP路徑,找到攻擊者。一個可行的辦法是在路由器中儲存資料包資訊。然而,有些網路中通訊量巨大,儲存所有的包是不現實的,因此可以儲存這些包的摘要資訊。這時,選用布隆過濾器可以極大的節省空間,而且具有非常快的查詢。
- 程式碼實現
標準布隆過濾器構建、測試程式碼(Python 面向過程版) 1 import math 2 import random 3 import time 4 5 6 def hash_function(a, b, c, item, tablelen): 7 return (a * item ** 2 + b * item + c) % tablelen #雜湊函式 8 9 10 def construction_of_SBF(tablelen = 1000, set = []): 11 12 k = int(math.log(2, math.e) * (tablelen / len(set))) 13 hash = [] 14 random.seed(time.time()) 15 for i in range(k): #隨機生成雜湊函式的三個引數 16 a = random.randint(1, 1000) 17 b = random.randint(1, 1000) 18 c = random.randint(1, 1000) 19 hash.append((a, b, c)) 20 21 bitArray = [0] * tablelen 22 23 for element in set: #對映集合元素到位陣列 24 for i in range(k): 25 hx = hash_function(hash[i][0], hash[i][1], hash[i][2], element, tablelen) 26 bitArray[hx] = 1 27 28 filter = [bitArray, hash] 29 return filter 30 31 # 測試 32 def test_bloom_filters(bloom_filter = None): 33 if bloom_filter == None: 34 return False 35 36 testSet = [1, 3, 7, 111, 99, 54, 34, 67, 81, 121, 101, 100, 23, 0, 845, 3339, 44] 37 for item in testSet: 38 flag = True 39 for i in range(len(filter[1])): 40 hx = hash_function(filter[1][i][0], filter[1][i][1], filter[1][i][2], item, len(filter[0])) 41 if bloom_filter[0][hx] != 1: 42 flag = False 43 break 44 45 if flag is True: 46 print("%d is in filter\n" % item) 47 else: 48 print("%d is not in filter\n" % item) 49 50 return True 51 52 53 if __name__ == "__main__": 54 filter = construction_of_SBF(set = list(range(10))) 55 test_bloom_filters(filter)View Code
計算布隆過濾器構建、測試程式碼(Python 面向過程版) 1 import math 2 import random 3 import time 4 5 """ 6 結構沒有設定好,按下寫: 7 0. 封裝函式 8 1. 雜湊函式:計算雜湊值 9 2. 生成雜湊隨機引數函式 10 3. 插入函式:被呼叫 11 4. 刪除函式:被呼叫 12 5. 查詢函式:測試函式呼叫 13 6. 測試函式:測試插入和刪除 14 15 """ 16 17 18 def hash_function(params, item, tlen): 19 return (params[0] * item ** 2 + params[1] * item + params[2]) % tlen 20 21 22 def deletion_counting_bloom_filter(cbfilter = None, item = None): 23 if (cbfilter is None) or (item is None): 24 return False 25 for params in cbfilter[2]: 26 cbfilter[0][hash_function(params, item, len(cbfilter[0]))] -= 1 27 return True 28 29 30 def insertion_counting_bloom_filter(item = None, cbfilter = None): 31 if (item == None) or (cbfilter == None): 32 return False 33 for params in cbfilter[2]: 34 cbfilter[0][hash_function(params, item, len(cbfilter[0]))] += 1 35 return True 36 37 38 def query_counting_bloom_filter(item = None, cbfilter = None): 39 for params in cbfilter[2]: 40 if(cbfilter[0][hash_function(params, item, len(cbfilter[0]))]) is 0: 41 return False 42 return True 43 44 45 def construction_counting_bloom_filter(filterSet = None, filterArray = None): 46 if (filterSet is None) or (filterArray is None): 47 return None 48 # 最佳的雜湊函式數量 49 hashNum = int(math.log(2, math.e) * (len(filterArray) / len(filterSet))) 50 hashParam = [] 51 random.seed(time.time()) 52 # 隨機生成雜湊引數 53 for i in range(hashNum): 54 a = random.randint(1, 9999) 55 b = random.randint(1, 9999) 56 c = random.randint(1, 9999) 57 hashParam.append((a, b, c)) 58 59 # 將初始集合元素對映到過濾器陣列中 60 for item in filterSet: 61 for params in hashParam: 62 filterArray[hash_function(params, item, len(filterArray))] += 1 63 64 # 返回過濾器陣列、過濾器集合、過濾器雜湊引數 65 return (filterArray, filterSet, hashParam) 66 67 68 def test_counting_bloom_filters(cbfilter = None): 69 if cbfilter is None: 70 return None 71 testSet = cbfilter[1][10:20] 72 73 # 先測試原有元素是否正常對映 74 for item in testSet: 75 if query_counting_bloom_filter(item, cbfilter) is True: 76 print("%d is in filter\n" % item) 77 else: 78 print("%d is not in filter\n" % item) 79 80 # 刪除後再查詢 81 if deletion_counting_bloom_filter(cbfilter, testSet[0]) is True: 82 print("delete successfully!\n") 83 else : 84 print("delete fails\n") 85 86 if query_counting_bloom_filter(testSet[0], cbfilter) is True: 87 print("%d is in filter\n" % testSet[0]) 88 else : 89 print("%d is not in filter\n" % testSet[0]) 90 91 # 插入後再測試 92 if insertion_counting_bloom_filter(testSet[0], cbfilter) is True: 93 print("insert %d successfully\n" % testSet[0]) 94 else: 95 print("insert %d fails\n") 96 97 if query_counting_bloom_filter(testSet[0], cbfilter) is True: 98 print("%d is in filter\n" % testSet[0]) 99 else : 100 print("%d is not in filter\n" % testSet[0]) 101 102 103 # 封裝後的函式 104 def counting_bloom_filters(filterSet = None, filterArray = None): 105 if (filterSet is None) or (filterArray is None): 106 return False 107 # 構造:初始集合元素的對映、雜湊函式引數生成 108 cbfilter = construction_counting_bloom_filter(filterSet, filterArray) 109 110 # 測試:測試插入、刪除、查詢 111 test_counting_bloom_filters(cbfilter) 112 113 114 if __name__ == "__main__": 115 filterSet = list(range(100)) 116 filterArray = [0] * 10000 117 counting_bloom_filters(filterSet, filterArray)View Code
標準布隆過濾器構建、測試程式碼(Java 面向物件版) 1 // package BloomFilters; 2 3 import java.util.Arrays; 4 import java.util.Random; 5 import java.io.*; 6 import java.math.BigInteger; 7 import java.nio.*; 8 import java.nio.charset.StandardCharsets; 9 import java.nio.file.Path; 10 import java.util.*; 11 12 /** 13 * 實現標準布隆過濾器的類 14 */ 15 public class SBFilters { 16 // 例項欄位 17 private boolean[] bitArray; //位陣列 18 private int[][] hashParams; //隨機的雜湊函式引數 19 20 // 方法欄位 21 public SBFilters(int tLen, int[] iSet) 22 { 23 this.bitArray = new boolean[tLen]; 24 Arrays.fill(this.bitArray, Boolean.FALSE); 25 this.construction_filter(iSet); 26 } 27 28 private boolean construction_filter(int[] iSet) 29 { 30 if(iSet == null || iSet.length == 0) 31 { 32 return false; 33 } 34 var hashNum = (int)(Math.log(2) * (this.bitArray.length / iSet.length)); 35 this.construction_hashParams(hashNum); 36 for(var item: iSet) 37 { 38 for(var params: this.hashParams) 39 { 40 this.bitArray[hash_function(params, item)] = true; 41 } 42 } 43 return true; 44 } 45 46 private boolean construction_hashParams(int hashNum) 47 { 48 this.hashParams = new int[hashNum][3]; 49 var time = System.currentTimeMillis(); 50 var rd = new Random(time); 51 for(int i = 0; i < hashNum; i++) 52 { 53 this.hashParams[i][0] = rd.nextInt(9999) + 1; 54 this.hashParams[i][1] = rd.nextInt(9999) + 1; 55 this.hashParams[i][2] = rd.nextInt(9999) + 1; 56 } 57 return true; 58 } 59 60 private int hash_function(int[] params, int item) 61 { 62 return (int)((params[0] * Math.pow(item, 2.0) + 63 params[1] * item + params[2]) % bitArray.length); 64 } 65 66 public boolean query_filter(int item) 67 { 68 for(var params: this.hashParams) 69 { 70 if(this.bitArray[hash_function(params, item)] == false) 71 { 72 return false; 73 } 74 } 75 return true; 76 } 77 78 } 79 80 81 82 // package BloomFilters; 83 84 85 86 87 /** 88 * 用來測試實現的布隆過濾器是否正常工作 89 */ 90 public class FiltersTest 91 { 92 public static void main(final String[] args) 93 { 94 test_counting_bloom_filters(); 95 } 96 97 98 private static void test_counting_bloom_filters() 99 { 100 var iSet = new int[10000]; 101 for(int i = 0; i < 10000; iSet[i] = i++); 102 SBFilters sbFilter = new SBFilters(999999, iSet); 103 104 for(var item: new int[]{1, 3, 5, 78, 99, 100, 101, 9999, 10000, 3534}) 105 { 106 var isIn = sbFilter.query_filter(item); 107 if(isIn == false) 108 { 109 System.out.printf("%d is not in the filter\n", item); 110 } 111 else 112 { 113 System.out.printf("%d is in the filter\n", item); 114 } 115 } 116 } 117 118 119 }View Code
|