1. 程式人生 > >布隆過濾器(Bloom Filters)的原理及程式碼實現(Python + Java)

布隆過濾器(Bloom Filters)的原理及程式碼實現(Python + Java)

本文介紹了布隆過濾器的概念及變體,這種描述非常適合程式碼模擬實現。重點在於標準布隆過濾器和計算布隆過濾器,其他的大都在此基礎上優化。文末附上了標準布隆過濾器和計算布隆過濾器的程式碼實現(Java版和Python版)

本文內容皆來自 《Foundations of Computers Systems Research》一書,自己翻譯的,轉載請註明出處,不準確的部分請告知,歡迎討論。

 

 

  • 布隆過濾器是什麼?

    布隆過濾器是一個高效的資料結構,用於集合成員查詢,具有非常低的空間複雜度。
     
 
  • 標準布隆過濾器(Standard Bloom Filters,SBF)

    基本情況

    布隆過濾器是一個含有 m 個元素的位陣列(元素為0或1),在剛開始的時候,它的每一位都被設為0。同時還有 k 個獨立的雜湊函式 h1, h2,..., hk 。需要將集合中的元素加入到布隆過濾器中,然後就可以支援查詢了。說明如下:
    1. 計算h1(x), h2(x),...,hk(x),其計算結果對應陣列的位置,並將其全部置1。一個位置可以被多次置1,但只有一次有效。
    2. 當查詢某個元素是否在集合中時,計算這 k 個雜湊函式,只有當其計算結果全部為1時,我們就認為該元素在集合內,否則認為不在。
    3. 布隆過濾器存在假陽性的可能,即當所有雜湊值都為1時,該元素也可能不在集合內,但該演算法認為在裡面。
    4. 假陽性出現的概率被雜湊函式的數量、位陣列大小、以及集合元素等因素決定。

     

    假陽性率評估   為了評估假陽性率,需要基於一個假設:雜湊函式都是完美隨機的。約定幾個變數:  
    1. k 雜湊函式的數量
    2. n 集合 S 中元素的數量
    3. m 位陣列的大小
    4. p 位陣列中某一位為0的概率
    5. f 假陽性的概率

     

    最後得出:

                                    

     最佳的雜湊函式數量


    根據數學推理得(過程就算了):當 p = 1/2, k = ln2 * (m/n)時,f 最小為(1/2)^k
    可以看出,當位陣列中有一半零一半一時,結果最好。
    事實上,m 是 n 的倍數,而且 k 常取最接近但小於理論值的整數值。

    部分佈隆過濾器(partial bloom filters)  
     
  • 計算布隆過濾器(Counting Bloom Filters,CBF)

    標準的布隆過濾器有一個致命的缺點:不支援刪除元素。CBF協議解決的這個問題。
    1. 將標準布隆過濾器中的位陣列變成整數陣列,即可以用多位表示。
    2. 標準布隆過濾器每個位置可以被多次置1,但只有一次有效,這樣,某一個位置被多個元素雜湊對映,當要刪除其中一個元素時,該元素雜湊對映的位置都應該變為零,那麼就會破壞其他元素的對映,會出現假陰性。
    3. 由於計算布隆過濾器的陣列可以表示更大的整數,那麼當某個位置被對映到時,該位置的計數值就自增1,而當某個元素被刪除時,就將其對映位置的計數值減1。這樣就解決了SBF的問題。
    4. CBF同樣存在問題,因為當計數值自增時可能會溢位,當計數值為4位元時,溢位的概率為:1.37 * 10^-15 * m,雖然很低,但對某些應用可能不夠。一個簡單的解決方法是,當計數值到達最大值時,就不在自增,但這導致假陰性。
     
  • 壓縮布隆過濾器(Compressed Bloom Filters)

    在網路應用中,布隆過濾器通常被作為資訊在各節點間傳送,為了節約資源,自然而然就想能不能壓縮布隆過濾器後再傳送。
    1. 由前面我們知道,要使得布隆過濾器有最小的假陽性概率,陣列中包含的0或1的概率應該是一樣的,根據夏農編碼原理(Shannon coding principle),這樣的布隆過濾器不能被壓縮。雖然這樣的布隆過濾器不能被直接壓縮,但我們可以用其他方法達到一樣的效果。
    2. 要使得布隆過濾器 x 與布隆過濾器 y( 包含的0或1的概率應該是一樣的)具有相同的假陽性概率,那麼,x 的大小要大於 y 的,x 的雜湊函式的數量不同於 y 的,這樣 x 中包含的0和1的數量就不同,x 就可以被壓縮。 
    3. 問題出來了,壓縮布隆過濾器的原因是更節省空間,我們找了個更大的布隆過濾器壓縮,那麼壓縮後的布隆過濾器的空間效率比原布隆過濾器更加優秀嗎?是的。
    4. 壓縮後,布隆過濾器的本地儲存空間會變大,但雜湊函式數量會變小(更少的對映操作)、傳送的位更少。
     
  • D-left 計算布隆過濾器(D-left Counting Bloom Filters)

    上面提到的計算布隆過濾器存在這樣的缺點:儲存空間是標準布隆過濾器的數倍(取決於計數值的位數)和計數值的不均勻(有些始終為0,有些則可能溢位)。下面看看 D-left Counting Bloom Filters 的特點。D-left Counting Bloom Filters 基於 D-left Hashing。

    D-left Hashing 基本結構

    1. 將一個雜湊表分成幾個不相交的子表(subtable)
    2. 每個子表裡都有數量相同的桶(bucket)
    3. 每個桶裡都有一定數量的單元(cell,單元包括特徵值和計數值)
    4. 每個單元都是固定的位陣列成,用來儲存元素的特徵值(fingerprint)
    5. 只有一個雜湊函式,該雜湊函式可以生成和子表數量相同的桶地址和一個特徵值
      插入操作
    假設有 d 個子表,元素為 x,雜湊函式為 f
    1. 計算 f(x),生成桶地址 addr0, addr1, ..., addr(d-1),特徵值 p 
    2. 我們檢查子表 i 中地址為 addri 的桶中的所有單元(i = 0,1,...,d-1)
    3. 如果某個單元中的特徵值和 p 相等,那麼元素 x 就在該雜湊表中
    4. 若沒有找到這樣的單元,那麼需要找到儲存特徵值最少的桶(在上面生成的桶地址中找),然後將該特徵值 p 隨機放入該桶的一個空單元中,該單元的計數值變為1,這考慮了裝載平衡
     

    D-left Counting Bloom Filters


    由上可知,d-left Hashing 的計數值最大為零,不支援刪除操作,為了將它變成可 Counting,可以讓它的計數值變成由多位組成。但這樣依然會出現問題,如下:

    1. 假設 d-left counting bloom filter 包含 4 個子表,每個子表又包含 4 個桶,初始為空。
    2. 現在有兩個元素 x 和 y 需要對映到過濾器中,f(x) = (1, 1, 1, 1,r), f(y) = (1, 2, 3, 4, r)
    3. 已知插如 x 時,第四個子表的第一個桶最空,x 的特徵值 r 被插入該桶的某一個單元中,該單元計數值變為1,而插入 y 時,第一個子表的第一個桶最空,y 的特徵值 r 被插入該桶的某一個單元中,該單元計計數值變為1
    4. 現在要刪除 x,那麼就會尋找每個子表的第一個桶中的單元,這時,在第一個子表的第一個桶中找到了特徵值 r,接下來就會將該單元的計數值減 1 變為 0,同時,儲存的特徵值被刪除,變為空。
    5. 現在查詢 x 是否在表中,結果返回真,而查詢 y 是否在表中,結果返回假,導致錯誤。  

     

    為什麼會出現上面的情況?由三個因素促成

    1. x 和 y 有相同的特徵值 r 
    2. f(x) 和 f(y) 生成的地址有相同的
    3. x 和 y 特徵值儲存的地方還不一樣(存一樣就不會出錯) 

     

    如何解決?

    說實話,沒看懂英文描述的內容。。。。大致是做了排列置換等操作


    效能分析

    比普通的計算布隆過濾器空間少了一半甚至更多,而且效率也有提升(假陽性更低) 

 

  • Spectral Bloom Filters

    Counting Bloom Filters 可以進行元素的刪除操作,然而卻不能記錄一個元素被對映的頻率,而且很多應用中元素出現的頻率相差很大,也就是說,CBF中每個計數值的位數一樣,那麼有些計數值很快就會溢位,而另一些則一直都很小。這些問題可以被 Spectral Bloom Filters 解決。
    在SBF中,每一個計數值的位數都是動態改變的。它的構造我沒看懂,先留著吧
 
  • Dynamic Counting Filters

    Spectral bloom filter 被提出來解決元素頻率查詢問題,但是,它構造了一個複雜的索引資料結構去解決動態計算器的儲存問題。Dynamic counting bloom filter(比SBF好理解多了) 是一個空間時間都很高效的資料結構,支援元素頻率查詢。相比於SBF,在實際應用中(計數器不是很大,改變不是很頻繁時)它有更快的訪問時間和更小的記憶體消耗。

    構成部分

    1. DCBF由兩部分組成,第一部分是基礎的計算布隆過濾器
    2. 第二部分是一個同樣大小的向量,用於記錄第一部分中計算器溢位的次數 
    3. 第一部分中的計算器位數固定,第二部分中每個溢位計算器位數動態改變

     

    特點

    1. 當第二部分溢位計算器也面臨溢位時,會重新申請一個向量,給要溢位部分增加位數,其他溢位計算器直接拷貝到新的向量中的對應位置,舊的向量會被釋放
 
  • 學習案例

    Summary Cache

        在網路中有極大的資源請求,如果所有的請求都由伺服器來處理,網路就會出現擁堵,效能就會下降。所以網路中有大量的中間代理節點。這些代理會把一部分資源放在自己的本地快取,當用戶向伺服器請求資源時,該代理先會檢查該資源是否在自己的快取中,如果在就直接傳送給使用者,否則再向伺服器請求。一個代理能夠儲存的資源是非常有限的,為了進一步減輕伺服器的負載,網路中相鄰的代理都可以共享自己的快取。這樣,當代理 A 本地快取沒有時,就會向相鄰代理廣播請求,查詢他們是否有該快取。
        然而,這樣依舊有很大問題,假設,這裡有 N 個代理,每個代理的命中率為 H,一個代理平均請求 R 次,那麼廣播中,一個代理收到的查詢資訊共有 (N-1) * (1-H) * R 條,總共的請求也就是 
    N * (N-1) * (1-H) * R。這是非常低效的。
        再次改進,各個代理之間交換自己快取的摘要資訊。這樣,當代理 A 失敗後,會先查詢各個代理的摘要資訊,然後決定是定向向某個代理請求,還是向伺服器請求資源。這就大大的減少了網路通訊量。為了滿足快速查詢、更新摘要資訊,一個非常好的選擇就是計算布隆過濾器(Counting bloom filters)。

    IP Traceback

       網路中存在許多攻擊,有時候需要根據一些資料包去還原IP路徑,找到攻擊者。一個可行的辦法是在路由器中儲存資料包資訊。然而,有些網路中通訊量巨大,儲存所有的包是不現實的,因此可以儲存這些包的摘要資訊。這時,選用布隆過濾器可以極大的節省空間,而且具有非常快的查詢。

     

 

 

  • 程式碼實現

標準布隆過濾器構建、測試程式碼(Python 面向過程版)

 1 import math
 2 import random
 3 import time
 4 
 5 
 6 def hash_function(a, b, c, item, tablelen):
 7     return (a * item ** 2 + b * item + c) % tablelen  #雜湊函式
 8 
 9 
10 def construction_of_SBF(tablelen = 1000, set = []):
11     
12     k = int(math.log(2, math.e) * (tablelen / len(set)))
13     hash = []
14     random.seed(time.time())
15     for i in range(k):  #隨機生成雜湊函式的三個引數
16         a = random.randint(1, 1000)
17         b = random.randint(1, 1000)
18         c = random.randint(1, 1000)
19         hash.append((a, b, c))
20 
21     bitArray = [0] * tablelen
22 
23     for element in set:     #對映集合元素到位陣列
24         for i in range(k):
25             hx = hash_function(hash[i][0], hash[i][1], hash[i][2], element, tablelen)
26             bitArray[hx] = 1
27 
28     filter = [bitArray, hash]
29     return filter
30     
31 # 測試
32 def test_bloom_filters(bloom_filter = None):
33     if bloom_filter == None:
34         return False
35     
36     testSet = [1, 3, 7, 111, 99, 54, 34, 67, 81, 121, 101, 100, 23, 0, 845, 3339, 44]
37     for item in testSet:
38         flag = True
39         for i in range(len(filter[1])):
40             hx = hash_function(filter[1][i][0], filter[1][i][1], filter[1][i][2], item, len(filter[0]))
41             if bloom_filter[0][hx] != 1:
42                 flag = False 
43                 break
44 
45         if flag is True:
46             print("%d is in filter\n" % item)
47         else:
48             print("%d is not in filter\n" % item)
49     
50     return True
51     
52 
53 if __name__ == "__main__":
54     filter = construction_of_SBF(set = list(range(10)))
55     test_bloom_filters(filter)
View Code

 

計算布隆過濾器構建、測試程式碼(Python 面向過程版)

  1 import math
  2 import random
  3 import time
  4 
  5 """
  6 結構沒有設定好,按下寫:
  7 0. 封裝函式
  8 1. 雜湊函式:計算雜湊值
  9 2. 生成雜湊隨機引數函式
 10 3. 插入函式:被呼叫
 11 4. 刪除函式:被呼叫
 12 5. 查詢函式:測試函式呼叫
 13 6. 測試函式:測試插入和刪除
 14 
 15 """
 16 
 17 
 18 def hash_function(params, item, tlen):
 19     return (params[0] * item ** 2 + params[1] * item + params[2]) % tlen
 20 
 21 
 22 def deletion_counting_bloom_filter(cbfilter = None, item = None):
 23     if (cbfilter is None) or (item is None):
 24         return False
 25     for params in cbfilter[2]:
 26         cbfilter[0][hash_function(params, item, len(cbfilter[0]))] -= 1
 27     return True
 28 
 29 
 30 def insertion_counting_bloom_filter(item = None, cbfilter = None):
 31     if (item == None) or (cbfilter == None):
 32         return False 
 33     for params in cbfilter[2]:
 34         cbfilter[0][hash_function(params, item, len(cbfilter[0]))] += 1
 35     return True
 36 
 37 
 38 def query_counting_bloom_filter(item = None, cbfilter = None):
 39     for params in cbfilter[2]:
 40         if(cbfilter[0][hash_function(params, item, len(cbfilter[0]))]) is 0:
 41             return False
 42     return True
 43 
 44 
 45 def construction_counting_bloom_filter(filterSet = None, filterArray = None):
 46     if (filterSet is None) or (filterArray is None):
 47         return None
 48     # 最佳的雜湊函式數量
 49     hashNum = int(math.log(2, math.e) * (len(filterArray) / len(filterSet)))
 50     hashParam = []
 51     random.seed(time.time())
 52     # 隨機生成雜湊引數
 53     for i in range(hashNum):
 54         a = random.randint(1, 9999)
 55         b = random.randint(1, 9999)
 56         c = random.randint(1, 9999)
 57         hashParam.append((a, b, c))
 58     
 59     # 將初始集合元素對映到過濾器陣列中
 60     for item in filterSet:
 61         for params in hashParam:
 62             filterArray[hash_function(params, item, len(filterArray))] += 1
 63 
 64     # 返回過濾器陣列、過濾器集合、過濾器雜湊引數
 65     return (filterArray, filterSet, hashParam)
 66 
 67 
 68 def test_counting_bloom_filters(cbfilter = None):
 69     if cbfilter is None:
 70         return None
 71     testSet = cbfilter[1][10:20]
 72     
 73     # 先測試原有元素是否正常對映
 74     for item in testSet:
 75         if query_counting_bloom_filter(item, cbfilter) is True:
 76             print("%d is in filter\n" % item)
 77         else:
 78             print("%d is not in filter\n" % item)
 79 
 80     # 刪除後再查詢
 81     if deletion_counting_bloom_filter(cbfilter, testSet[0]) is True:
 82         print("delete successfully!\n")
 83     else :
 84         print("delete fails\n")
 85 
 86     if query_counting_bloom_filter(testSet[0], cbfilter) is True:
 87         print("%d is in filter\n" % testSet[0])
 88     else :
 89         print("%d is not in filter\n" % testSet[0])
 90 
 91     # 插入後再測試
 92     if insertion_counting_bloom_filter(testSet[0], cbfilter) is True:
 93         print("insert %d successfully\n" % testSet[0])
 94     else:
 95         print("insert %d fails\n")
 96     
 97     if query_counting_bloom_filter(testSet[0], cbfilter) is True:
 98         print("%d is in filter\n" % testSet[0])
 99     else :
100         print("%d is not in filter\n" % testSet[0])
101 
102 
103 # 封裝後的函式
104 def counting_bloom_filters(filterSet = None, filterArray = None):
105     if (filterSet is None) or (filterArray is None):
106         return False
107     # 構造:初始集合元素的對映、雜湊函式引數生成
108     cbfilter = construction_counting_bloom_filter(filterSet, filterArray)
109 
110     # 測試:測試插入、刪除、查詢
111     test_counting_bloom_filters(cbfilter)
112 
113 
114 if __name__ == "__main__":
115     filterSet = list(range(100))
116     filterArray = [0] * 10000
117     counting_bloom_filters(filterSet, filterArray)
View Code

 

標準布隆過濾器構建、測試程式碼(Java 面向物件版)

  1 // package BloomFilters;
  2 
  3 import java.util.Arrays;
  4 import java.util.Random;
  5 import java.io.*;
  6 import java.math.BigInteger;
  7 import java.nio.*;
  8 import java.nio.charset.StandardCharsets;
  9 import java.nio.file.Path;
 10 import java.util.*;
 11 
 12 /**
 13  * 實現標準布隆過濾器的類
 14  */
 15 public class SBFilters {
 16     // 例項欄位
 17     private boolean[] bitArray; //位陣列
 18     private int[][] hashParams; //隨機的雜湊函式引數
 19 
 20     // 方法欄位
 21     public SBFilters(int tLen, int[] iSet)
 22     {
 23         this.bitArray = new boolean[tLen];
 24         Arrays.fill(this.bitArray, Boolean.FALSE);
 25         this.construction_filter(iSet);
 26     }
 27 
 28     private boolean construction_filter(int[] iSet)
 29     {
 30         if(iSet == null || iSet.length == 0)
 31         {
 32             return false;
 33         }
 34         var hashNum = (int)(Math.log(2) * (this.bitArray.length / iSet.length));
 35         this.construction_hashParams(hashNum);
 36         for(var item: iSet)
 37         {
 38             for(var params: this.hashParams)
 39             {
 40                 this.bitArray[hash_function(params, item)] = true;
 41             }
 42         }
 43         return true;
 44     }
 45 
 46     private boolean construction_hashParams(int hashNum)
 47     {
 48         this.hashParams = new int[hashNum][3];
 49         var time = System.currentTimeMillis();
 50         var rd = new Random(time);
 51         for(int i = 0; i < hashNum; i++)
 52         {
 53             this.hashParams[i][0] = rd.nextInt(9999) + 1;
 54             this.hashParams[i][1] = rd.nextInt(9999) + 1;
 55             this.hashParams[i][2] = rd.nextInt(9999) + 1;
 56         }
 57         return true;
 58     }
 59 
 60     private int hash_function(int[] params, int item)
 61     {
 62         return (int)((params[0] * Math.pow(item, 2.0) + 
 63             params[1] * item + params[2]) % bitArray.length);
 64     }
 65 
 66     public boolean query_filter(int item)
 67     {
 68         for(var params: this.hashParams)
 69         {
 70             if(this.bitArray[hash_function(params, item)] == false)
 71             {
 72                 return false;
 73             }
 74         }
 75         return true;
 76     }
 77     
 78 }
 79 
 80 
 81 
 82 // package BloomFilters;
 83 
 84 
 85 
 86 
 87 /**
 88  * 用來測試實現的布隆過濾器是否正常工作
 89  */
 90 public class FiltersTest
 91 {  
 92     public static void main(final String[] args) 
 93     {
 94         test_counting_bloom_filters();
 95     }
 96 
 97 
 98     private static void test_counting_bloom_filters()
 99     {
100         var iSet = new int[10000];
101         for(int i = 0; i < 10000; iSet[i] = i++);
102         SBFilters sbFilter = new SBFilters(999999, iSet);
103        
104         for(var item: new int[]{1, 3, 5, 78, 99, 100, 101, 9999, 10000, 3534})
105         {
106             var isIn = sbFilter.query_filter(item);
107             if(isIn == false)
108             {
109                 System.out.printf("%d is not in the filter\n", item);
110             }
111             else
112             {
113                 System.out.printf("%d is in the filter\n", item);
114             }
115         }
116     }
117 
118     
119 }
View Code