Hadoop分散式環境下的資料抽樣
1. 問題由來
Google曾經有一道非常經典的面試題:
給你一個長度為N的連結串列。N很大,但你不知道N有多大。你的任務是從這N個元素中隨機取出k個元素。你只能遍歷這個連結串列一次。你的演算法必須保證取出的元素恰好有k個,且它們是完全隨機的(出現概率均等)?
這道題的解法非常多,網上討論也非常熱烈。本文要討論的是,這個問題是從何而來,有什麼實用價值?
自從有了Hadoop之後,該問題便有了新的應用載體。隨著資料量的增多,很多資料探勘演算法被轉移到MapReduce上實現,而資料探勘中有個基本的問題是怎樣對資料進行抽樣。在Hadoop中,每個job會被分解成多個task平行計算,而資料的總量事先是不知道的(知道job執行結束才能獲取數總數,而資料量非常大時,掃描一遍資料的代價非常高),使用者知道的只是要獲取的樣本量,那怎樣在類似於Hadoop的分散式平臺上進行資料抽樣?
回過頭來看google的這道面試題,是不是正好時Hadoop平臺上海量資料抽樣問題?
2. 在Hadoop上編寫抽樣程式
2.1 解法一
(1) 設計思想
蓄水池抽樣:先儲存前k個元素, 從第k+1個元素開始, 以k/i (i=k+1, k+2,…,N) 的概率選中第i個元素,並隨機替換掉一個已儲存的記錄,這樣遍歷一次得到k個元素,可以保證完全隨機選取。
(2) MapReduce實現
要實現該抽樣演算法,只需編寫Mapper即可。在Map函式中,使用者定義一個vector儲存選中的k個元素,待掃描完所有元素後,在解構函式中將vector中的資料寫到磁碟中。
使用者執行job時,需指定每個map task的取樣量。比如,該job的map task個數為s,則每個map task需要採集k/s個元素。
(3) 優缺點分析
由於該job沒有reduce task,因而效率很高。然而,該方法選中某個元素的概率並不完全是k/N!
2.2 解法二
(1) 設計思想
依次掃描每個元素,為每個元素賦予一個隨機的整數值;然後使用Top K演算法(譬如最大K個整數)得到需要的K個元素。
(2) MapReduce實現
要實現該演算法,使用者需要編寫mapper和reducer,在map函式中,為每個元素賦予一個隨機數,將該隨機數作為key,並將key最大的前k個儲存下來(可使用小頂堆);在reduce函式中,唯一的reduce task輸出前k個元素。
(3) 優缺點分析
該演算法比第一種演算法低效,但由於整個過程自然流暢,實現起來非常簡單,不易出錯。
2.3 解法三
(1) 設計思想
考慮第一個元素,其以K/N的概率被選中;如果該節點被選中,則從剩下的(N-1)個元素中選出(K-1)個元素;如果沒有被選中,則從剩下的(N-1)個元素中選出K個元素,…,依次這樣下去,直到獲取K個元素。
(2) MapReduce實現
使用者只需編寫Mapper即可。首先要獲取每個map task輸入的資料量,這個可以在InputFormat中計算得到。然後,在每個map函式中,採集k/s(其中s為map task資料量)個元素。
(3) 優缺點分析
由於該演算法沒有reduce task,效率比較高,但需要在InputFormat中統計資料量,程式設計複雜度較高。
3. 延伸
這個問題與《程式設計珠璣》上討論的問題很相似:
輸入兩個整數m和n,其中m<n。輸出是0~n-1範圍內m個隨機整數的有序表,不允許重複。
對於該問題,大致存在四種演算法,他們有不同的優缺點。
(1) 第一種方法來自Knuth的《The art of Computer Programming, Volume 2: Seminumerical Algorithms》
虛擬碼是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
select = m
remaining = n
for
I = [0 n )
if (bigrand() % remaining) < select
print i
select—
remaining—
|
只要m<=n,程式選出來的整數就恰為m個。
C++的實現如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
void
genKnuth( int
m, int n) {
for ( int i = 0; i < n; i++) {
if (bigrand() % (n - i) < m) {
cout <<i << endl;
m--;
}
}
}
|
該演算法非常節省空間,但需要全部掃描n個數,當n很多時,效率不高。
(2)第二種方法的複雜度只與m有關,採用了set(實際上是紅黑樹)節省時間。程式碼如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
void
gensets( int m, int n) {
set< int > S;
while (S.size() < m) {
S.insert(bigrand() % n);
}
// print S
}
|
該方法每次插入均在O(log m)時間內完成,但需要的空間開銷很大。
(3)第三種方法克服了(2)的缺點,程式碼如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
void
genshuf( int m, int n) {
int i, j;
int *x = new
int [n];
for (i = 0; i < n; i++) {
x[i] = i;
}
for (i = 0; i < m; i++) {
j = randint(i, n-1);
int t = x[i]; x[i] = x[j]; x[j] = x;
}
sort(x, x+m);
//print result
}
|
該演算法需要n個元素的記憶體空間和O(n+mlogm)的時間,其效能通常不吐Knuth的演算法。
(4)當m接近n時,基於集合的演算法生成的很多隨機數都要丟掉,因為之前的數已經存在於集合中了,為了改進這一點,演算法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
void
genfloyd( int
m, int n)
{
set< int > S;
set< int >::iterator i;
for ( int j=n-m; j < n; j++) {
int t = bigrand()%(j+1);
if (S.find(t) == S.end()){
S.insert(t); // t not in S
} else {
S.insert(j); // t in S
}
}
//print results
}
|
4. 參考資料
(1) 《程式設計珠璣》第二版