1. 程式人生 > >加權隨機取樣 (Weighted Random Sampling)

加權隨機取樣 (Weighted Random Sampling)

一個集合裡有 n 個元素,每個元素有不同的權重,現在要不放回地隨機抽取 m 個元素,每個元素被抽中的概率為元素的權重佔總權重的比例。要怎麼做呢?

簡單的解法

現在考慮只抽取一個元素,假設權重之和為 1。我們可以從 [0, 1] 中隨機得到一個權重,假設為 0.71,而後從第一個元素開始,不斷累加它們的權重,直到有一個元素的累加權重包含 0.71,則選取該元素。下面是個示意圖:

要選取 m 個元素,則可以按上面的方法先選取一個,將該元素從集合中去除,再反覆按上面的方法抽取剩餘的元素。這種方法的複雜度是 O(mn),並且將元素從集合中刪除其實不太方便實現。

當然,最重要的是這個演算法需要多次遍歷資料,不適合用在流處理的場景中。

Algorithm A

  1. 對於集合 $V$ 中的元素 $v_i \in V$,選取均勻分佈的隨機數 $u_i = rand(0, 1)$ ,計算元素的特徵 $k_i = u_i^{(1/w_i)}$
  2. 將集合按 $k_i$ 排序,選取前 $m$ 大的元素。

演算法的正確性在作者 2006 年的論文 Weighted random sampling with a reservoir 裡給了詳細的證明。論文中給出了演算法的兩個變種 A-Res 與 A-ExpJ,它們都能在一次掃描中得到 m 個樣本。非常適合在流處理的場合中。

A-Res 演算法

A-Res(Algorithm A With a Reservoir) 是 Algorithm 的“蓄水池”版本,即維護含有 m

個元素的結果集,對每個新元素嘗試去替換結果集中權重最小的元素。步驟如下:

  1. 將集合 $V$ 的前 $m$ 個元素放入結果集合 $R$。
  2. 對於結果集裡的每個元素,計算特徵值 $k_i = u_i^{(1/w_i)}$,其中 $u_i = rand(0, 1)$
  3. 對 $i = m+1, m+2, \dots, n$ 重複步驟 4 ~ 6
    1. 將結果集中最小的特徵 $k$ 作為當前的閾值 $T$
    2. 對於元素 $v_i$,計算特徵 $k_i = u_i^{(1/w_i)}$,其中 $u_i = rand(0, 1)$
    3. 如果 $k_i > T$ 則將 $R$ 中擁有最小 $k$ 值的元素替換成 $v_i$。

論文證明了如果權重 $w_i$ 是一般連續分佈上的隨機變數,則上面的演算法中插入 $R$ 的次數為 $O(m \log(\frac{n}{m}))$。該演算法用 Python 實現如下:

import heapq
import random

def a_res(samples, m):
"""
:samples: [(item, weight), ...]
:k: number of selected items
:returns: [(item, weight), ...]
"""

heap = [] # [(new_weight, item), ...]
for sample in samples:
wi = sample[1]
ui = random.uniform(0, 1)
ki = ui ** (1/wi)

if len(heap) < m:
heapq.heappush(heap, (ki, sample))
elif ki > heap[0][0]:
heapq.heappush(heap, (ki, sample))

if len(heap) > m:
heapq.heappop(heap)

return [item[1] for item in heap]

A-ExpJ 演算法

A-Res 需要對每個元素產生一個隨機數,而生成高質量的隨機數有可能會有較大的效能開銷,,所以論文中給出了 A-ExpJ 演算法,能將隨機數的生成量從 $O(n)$ 減少到 $O(m\log(\frac{n}{m})))$。從步驟上看,很像我們最開始提出的簡單版本,設定一個閾值並跳過一些元素。具體步驟如下:

  1. 將集合 $V$ 的前 $m$ 個元素放入結果集合 $R$。
  2. 對於結果集裡的每個元素,計算特徵值 $k_i = u_i^{(1/w_i)}$,其中 $u_i = rand(0, 1)$
  3. 將 $R$ 中小最的特徵值記為閾值 $T_w$
  4. 對剩下的元素重複步驟 5 ~ 10
    1. 令 $r = rand(0, 1)$ 且 $X_w = \log( r )/\log(T_w)$
    2. 從當前元素 $v_c$ 開始跳過元素,直到遇到元素 $v_i$,滿足
    3. $w_c + w_{c+1} + \dots + w_{i-1} \lt X_w \le w_c + w_{c+1} + \dots + w_{i-1} + w_{i}$
    4. 使用 $v_i$ 替換 $R$ 中特徵值最小的元素。
    5. 令 $t_w = T_w^{w_i}$, $r2 = rand(t_w, 1)$, $v_i$ 的特徵 $k_i = r_2^{(1/w_i)}$
    6. 令新的閾值 $T_w$ 為此時 $R$ 中的最小特徵值。

Python 實現如下:

def a_expj(samples, m):
"""
:samples: [(item, weight), ...]
:k: number of selected items
:returns: [(item, weight), ...]
"""

heap = [] # [(new_weight, item), ...]

Xw = None
Tw = 0
w_acc = 0
for sample in samples:
if len(heap) < m:
wi = sample[1]
ui = random.uniform(0, 1)
ki = ui ** (1/wi)
heapq.heappush(heap, (ki, sample))
continue

if w_acc == 0:
Tw = heap[0][0]
r = random.uniform(0, 1)
Xw = math.log(r)/math.log(Tw)

wi = sample[1]
if w_acc + wi < Xw:
w_acc += wi
continue
else:
w_acc = 0

tw = Tw ** wi
r2 = random.uniform(tw, 1)
ki = r2 ** (1/wi)
heapq.heappop(heap)
heapq.heappush(heap, (ki, sample))

return [item[1] for item in heap]

驗證

我們用多次取樣的方式來嘗試驗證演算法的正確性。下面程式碼1中為 abc 等元素賦予了不同的權重,取樣 10 萬次後計算被取樣的次數與元素 a 被取樣次數的比值。

overall = [('a', 10), ('b', 20), ('c', 50), ('d', 100), ('e', 200)]
def test_weighted_sampling(func, k):
stat = {}
for i in range(100000):
sampled = func(overall, k)
for item in sampled:
if item[0] not in stat:
stat[item[0]] = 0
stat[item[0]] += 1
total = stat['a']
for a in stat:
stat[a] = float(stat[a])/float(total)
print(stat)

首先驗證 A-Res 演算法:

test_weighted_sampling(a_res, 1)
test_weighted_sampling(a_res, 2)
test_weighted_sampling(a_res, 3)
test_weighted_sampling(a_res, 4)
test_weighted_sampling(a_res, 5)

# output
{'e': 19.54951600893522, 'd': 9.864110201042442, 'c': 4.842889054355919, 'a': 1.0, 'b': 1.973566641846612}
{'b': 2.0223285486443383, 'e': 12.17949833260838, 'd': 8.95287806292591, 'c': 4.843410178338408, 'a': 1.0}
{'a': 1.0, 'e': 6.166443722530097, 'd': 5.597171794381808, 'b': 1.9579591056755208, 'c': 4.387922797630423}
{'b': 1.8358898492044953, 'e': 2.5878688779880092, 'c': 2.4081341327311896, 'd': 2.549897479820395, 'a': 1.0}
{'a': 1.0, 'd': 1.0, 'c': 1.0, 'b': 1.0, 'e': 1.0}

看到,在取樣一個元素時,b 被取樣到的次數約為 a2 倍,而 e 則約為 20 倍,與overall 陣列中指定的權重一致。而取樣 5 個元素時,所有元素都會被選中。

同理驗證 A-ExpJ 演算法:

test_weighted_sampling(a_expj, 1)
test_weighted_sampling(a_expj, 2)
test_weighted_sampling(a_expj, 3)
test_weighted_sampling(a_expj, 4)
test_weighted_sampling(a_expj, 5)

# output
{'e': 19.78311444652908, 'c': 4.915572232645403, 'd': 9.840900562851782, 'a': 1.0, 'b': 1.9838649155722325}
{'e': 11.831543244771057, 'c': 4.709157716223856, 'b': 1.9720180893159978, 'd': 8.75183719615602, 'a': 1.0}
{'d': 5.496249062265567, 'c': 4.280007501875469, 'e': 6.046324081020255, 'b': 1.9321080270067517, 'a': 1.0}
{'a': 1.0, 'd': 2.5883654175335105, 'c': 2.440760540383957, 'e': 2.62591841571643, 'b': 1.8787559581808126}
{'a': 1.0, 'd': 1.0, 'c': 1.0, 'b': 1.0, 'e': 1.0}

與 A-Res 的結果類似。

小結

文章中介紹了 A-Res 與 A-ExpJ 兩種演算法,按照步驟用 Python 實現了一個簡單的版本,最後用取樣的方式驗證了演算法的正確性。

加權隨機取樣本身不難,但如果需要在一次掃描中完成就不容易了。難以想像上面的演算法直到 2006 年才提出。演算法本身如此之簡單,也讓不感嘆數學與概率的精妙。

參考

  1. 1.修改自 https://blog.xingwudao.me/2017/09/26/sampling/