1. 程式人生 > 實用技巧 >探尋從HDFS到Spark的高效資料通道:以小檔案輸入為案例(轉)

探尋從HDFS到Spark的高效資料通道:以小檔案輸入為案例(轉)

>>> hot3.png

為了保證高效的資料移動,locality是大資料棧以及分散式應用程式所必須保證的性質,這一點在Spark中尤為明顯。如果資料集大到不能保證完全放入記憶體,那就不能貿然使用cache()將資料固化到記憶體中。如果讀取資料不能保證較好的locality性質的話,不論是對即席查詢還是迭代計算都將面臨輸入瓶頸。而作為常用的分散式檔案系統,HDFS承擔著資料儲存、一致性保證等關鍵問題。HDFS自開發之初就與Google GFS一脈相承,因此也繼承了其無法較好的處理小檔案的問題,但大量小檔案輸入又是分散式計算中常見場景。本文以小檔案輸入為案例,看看從HDFS到Spark的資料通道中到底發生了什麼,並討論如何設計有效的小檔案輸入。瞭解了這些話題,可以更高效的使用Spark。


背景


MLlib進展如火如荼,近期最令人振奮的訊息莫過於MLlib對sparse vector的支援,以及隨之而來的一系列重構和改進工作。機器學習一般演算法的輸入是訓練集和測試集,通常來說是(label, key : value)這樣的序對。對於這種輸入,直接使用SparkContext提供的textFile()介面就好了,MLlib內部會轉換成LabeledPoint類。但MLlib還缺少圖模型演算法,如LDA。LDA (Latent Allocation Dirichlet)演算法常用來獲取文件集合的主題,是機器學習中廣泛應用的演算法,其輸入格式和核心元件與常見的機器學習分類、聚類演算法不同。


兩個月之前,筆者有一份差不多要完成的基於Spark的並行LDA演算法準備提交給Spark社群,同時也在準備醞釀已久的學術論文。當筆者完成了LDA演算法的核心模組Gibbs sampling之後,突然發覺要想實現一個“可用的“LDA演算法,不僅僅是一個核心功能這麼簡單,還牽扯到許多零碎的事情。所謂零碎的事情,其實並不簡單。機器學習演算法就是這樣,學起來難,但是真正懂了之後發現核心演算法特別簡單,預處理又非常難。總之,機器學習演算法學起來難的地方做起來簡單,但是學起來簡單的地方, 並不見得很快就能做好。


相關廠商內容


產品經理眼中的面向雲的業務流程管理 傳統企業軟體廠商眼中的的企業應用架構 《搜狗商業廣告平臺Java生態演化之路》——搜狗商用平臺架構師劉建 下廚房創始人王旭升《下廚房團隊成長記錄》 唯品會黎毅敏《唯品會運維架構和流程改造之路》

相關贊助商


QCon全球軟體開發大會(北京站)2014,4月25-27日,誠邀蒞臨。


大部分的零碎工作在語料庫的預處理和後續輸出的模型的使用上,這些零碎的工作機器學習者們都不怎麼注重,因為書本上很少會講到這些知識。就拿後者來說,模型後續使用這件事兒看起來小,其實不然,這關係到機器學習演算法的實際運用能力。我們做模型的最終目的除了發論文之外還是想讓它對現實生活產生影響。學術派關注模型多,但對學術和工業的結合看的相對少一點,線下模型如何輕鬆部署?模型可否增量訓練?模型的訓練和使用是否可以同步進行?是否可以做到對模型的線上查詢?這都是將機器學習“搬出實驗室”的關鍵問題。這類問題在Strata大會上有很多工業界人士做了很好的講解,比如這裡。


閒言莫談,回到語料庫的預處理工作。關於分詞的問題不多談,筆者學習ScalaNLP的做法,直接在Lucene的分詞實現上裹了一層Scala的介面。但是在語料集的輸入上花了很多時間。Spark目前所有的標準輸入介面是SparkContext類中提供的textFile(path, miniSplits)介面,但該介面不適合語料庫的直接輸入,因為這是一個文字行處理函式,每次只能操作其中的一行文字。而LDA更期待的輸入格式是Key-Value對,其中key是檔案的絕對路徑(便於分辨和去重),value是檔案的全部內容。由於Spark下層多使用HDFS作為輸入,因此筆者打算自己定製InputFormat。


LDA應用場景


首先得說明一些問題。LDA的實際使用場景有二:


第一種是在實驗室環境下使用,這是最直觀的情況。例如你有一堆小檔案存在本地磁碟上,即你的語料庫。可能你想直接把它們上傳到HDFS,或者在每臺機器的磁碟上仍一份,甚至直接放在當前機器的本地磁碟好了(這種情況下不是真正的分散式,所有的Spark executor只會在你當前機器上啟動),之後開啟Spark呼叫其中的LDA演算法。如果你只是打算做個實驗,這樣就足夠了。換言之,這是一種offline的訓練方式。


第二種情況是工業應用,你可能不會有一堆離線的語料庫,而是有一個流式管道,語料文字源源不斷地傳遞過來,如twitter/weibo feed等。或許你可以把這些資料放到HDFS或HBase上,也有可能直接處理流資料,而不管最終儲存。


這是完全不同的應用場景,針對不同的場景要有不同的處理方式。不論是實驗室環境下的嘗試,還是工業應用,兩者都很重要。本文只涉及offline的資料處理方法,因為offline的資料處理才更加需求讓資料經過HDFS。


離線LDA訓練

離線場景下或許我們不必理會語料集預處理的過程,直接交給終端使用者好了。使用者將語料集轉換成你指定的樣式,之後將預處理結果上載到HDFS,這樣你的LDA程式可以直接訪問這部分資料,而我們要做的只是規定好輸入的樣式,妙不可言。我們舒服了,使用者吃些苦頭。例如我們指定使用者輸入檔案的每一行是一個完整的檔案內容,開頭處以Tab分割作為檔名。這樣我們可以直接呼叫textFile()介面,自己切分一下就可以得到”檔名--檔案內容”這個KV對。值得一提的是,這種離線場景下看似不好用的方法,在工業界線上訓練過程中反倒可能有好的效果。比如一次記錄過來就是一個KV對,這樣就省去了這一步輸入的處理。


或許我們可以進一步幫幫使用者。咱們寫一個預處理程式,不論是序列的還是並行的,幫助使用者進行預處理,Mahout就是這麼做的。這種情況下,可能需要終端使用者寫一個ad-hoc的shell指令碼組織所有的工作流和資料流。Mahout中的dirTosequentialFile就是把本地磁碟或者HDFS上的目錄讀入,將其中的小檔案合併在一起轉換成一個sequential file。


但是,筆者覺得最好的方法還是將預處理過程與LDA訓練過程融合起來,不要讓使用者做這麼多工作就能呼叫Spark上的LDA,使用者只需要指定檔案路徑即可。這時我們必須提供函式將語料庫所有的文字和檔名讀入。CombineFileInputFormat比較適合處理小檔案,因此最初筆者實現了一個CombineFileInputFormat,一個CombineFileRecordReader,一個FileLineWritable以及一個類似於textFile()的介面。


Interface exposed to end-user - SparkContext.scala

def wholeTextFiles(path: String): RDD[(String, String)] = {

newAPIHadoopFile(

path,

classOf[WholeTextFileInputFormat],

classOf[String],

classOf[String])

}

要注意的是,雖然筆者這麼做了,但這並不代表小檔案輸入的最佳實踐。實際上,最佳實踐是不要使用小檔案。因為將大量的小檔案放到HDFS上是比較糟糕的,不僅將block用滿率降低,還會佔滿NameNode上面的索引。這裡只是討論一種可行的方案。


分析幾個問題。首先是HDFS中block的大小。我們在這裡稱之“小檔案”的檔案究竟有多“小”?是否會超過HDFS的block大小?答案是肯定的。在這種情況下,如果我們按照block位單位讀取資料,那麼我們就要自己處理同一個檔案的block拼接的問題,尤其是檔案由多位元組字元組成的時候,如UTF編碼的字元,很可能在一個字元中間被切斷。如果不能正確的拼接各個block,會出現亂碼的情況。


重點是,考慮到效能問題,我們不希望有額外的網路傳輸開銷存在,尤其是不必要的網路傳輸。我們希望同一個檔案的block都在同一個節點上,這樣在合併這些block的時候就完全不會出現機器之間資料的網路傳輸。HDFS裡面很討厭的一點是,這裡有兩套極為相同的API,分別在mapred和mapreduce兩個包下,稍不注意用錯了API就會有一種很抓狂的感覺。筆者最初為了相容Spark中HadoopRDD的介面,用了mapred的這套API,其中的CombineFileRecordReader中的isSplitable()函式是不起作用的,因為如果不修改CombineFileRecordReader本身的程式碼就無法阻止一個檔案的多個block分配到不同的split中的情況。一旦這件事情發生,那拼接一個檔案的時候就無法阻止shuffle的發生。


線上LDA訓練


現在來看看線上訓練。注意線上產品不應該使用上述方式執行。當然,如果不顧及線上模型訓練,認為模型可以線下訓練好的話另當別論。資料處理部門的人是不會把大量原始文字儲存到本地磁碟,之後再將資料上傳到伺服器處理的。在我看來,大資料就應該放到合適的地方去。這種場景下,原始文字或者網頁應該儲存到一種KV儲存中,例如HBase(Facebook在其論文Analysis of HDFS Under HBase: A Facebook Messages Case Study中詳述了HDFS之上的HBase效能問題,值得一看。)。除此之外,HDF5也是種不錯的選擇。


網路傳輸來自哪裡


筆者一直在討論避免網路傳輸開銷的問題,那麼網路傳輸到底出現在哪裡從而導致的不可避免呢?


首先,檔案大小超過單個block大小的檔案不免被切分,不論是ASCII編碼的檔案,還是UTF這種多位元組編碼的檔案,都需要一個join的過程。最好的情況下,我們期望同一個檔案所有的block都從同一個機器上讀取,這樣可以避免網路傳輸。


第二,出於效率的考量,mapred包中的CombineFileInputFormat不能保證這一點。這是因為每個block都會有副本的存在。為了保證資料讀取的高效,同一個檔案的不同block可能讀取不同機器上的副本。同時由於單個split大小的限制,同一個機器上的block也可能分到不同的split裡面。正是由於HDFS多副本容錯的特性,導致同個檔案不同的block甚至可能在任何一個位置被讀取。


自定義Partitioner怎麼樣


在筆者看來,Spark之所以好用的原因之一就是可以簡單地定製partition方法。使用自定製的partitioner來重新安排我們KV對的儲存。然而,定製化的partitioner最大的作用是在迭代地進行RDD join的時候,正如Spark PageRank所展示的那樣。如果是一次性的使用,有點得不償失,因為第一次的shuffle在所難免。


Hadoop locality全揭祕


為了更好的瞭解Hadoop I/O保持良好locality的祕密,我們要深入看一看mapred包中的InputFormat實現。我們選擇FileInputFormat作為突破口,因為這也是spark的“重點使用物件”。首先要記住這些後面會經常用到的概念:rack、node、file、block、replica。資料中心通常由一堆堆rack組成,rack是同一個機架中的機器。由於同個rack之間網路狀況通常都是非常好的,因此考慮本地性的時候通常也將同個rack中的資料算作本地資料。一個rack由多個node組成,這裡的node特指作為DataNode的機器。HDFS上每個檔案包含多個block,每個block有一些副本(通常是三個)。要注意的是Hadoop的worker可能包含所有的DataNode節點,當然也會出現不匹配的現象,即有些機器僅僅是DataNode節點而非Hadoop worker,反之亦然。同時也要注意,考慮到魯棒性,每個block的三副本通常都是當前node一個,本rack其他機器一個,其他rack上一個。


把Hadoop worker也考慮進來後,問題稍顯複雜。程式可能分佈在多個worker上,資料分佈在多個DataNode上。因此,問題抽象成如何在worker和DataNode之間做多對一的對映(一個worker會可能處理多個DataNode上的資料,而一份資料通常只要一個worker處理就好了),使得讀取HDFS檔案時造成的網路開銷最小?換句話說,讀取檔案整體耗時最少。


這件事兒不是很容易,因為應用程式和資料之間還隔著好多層。程式所直接接觸的就是檔名。一個檔案被分為多個block,每個block可能存在於每一個數據節點上,其副本存在於其他節點上,不同的節點還屬於不同的rack。我們先從程式開始看起。以Spark為例,我們呼叫hadoopRDD = sc.textFile(path)告訴Spark開始讀取path中的資料。這個path可能是一個本地檔案路徑,更常見的是HDFS路徑。為了分散式處理的要求,hadoopRDD通常情況下是被切分的。那麼,其partition的資訊來自何處呢?答案就是HDFS中的split,更確切的說是FileSplit,其在FileInputFormat中被用到。FileSplit就是這樣一種程式和block之間的對映。


每個FileSplit都是一個block集合,裡面的block會在同一個worker上的同一個程式讀出,因此也理所當然作為一個partition。為了保持同一個split中block的本地性,FileSplit花了大力氣把合適的block放到一起。例如貢獻度計算,以及node-block、rack-block之間的雙向連結串列等。現在我們把之前的程式-block對映問題退化成split-block的對映問題。


Node/Rack貢獻度計算


假設我們有一個split,其中有三個block,這三個block來自8個節點。8個節點屬於4個不同的rack,每個block有三個副本。假設這三個block的大小分別為100、150、75。這種情況下怎麼安排“最佳地點”?即該split應該在哪個worker上計算?

13210642_GiVk.png



首先,我們一致同意的一點是“最佳地點”應該是所有node的子集。在我們的例子中,這個集合是[h1 --- h8]這八個節點。怎樣對這個集合進行排序,依次找到“最佳地點”、“次佳地點”?


對節點集合進行排序有兩種方法,分別是考慮rack的資訊和不考慮rack的資訊。上文說過,可以將同一個rack中的block也算做本地的block。要想排序,先要確定準則,即什麼才是“好”。參照上圖,我們定義一個“effective size”的概念,這是說在本節點上,存在多少位元有效的資料可供讀取。同樣的,rack的effective size就是該rack上所有的有效讀取的位元。注意,並非是將該節點有的block以及位元組數加起來這麼簡單,這裡的“有效”是指的有區分度的位元組數。例如,Rack4有兩個block,每個block的大小都是75,但Rack4的effective size只有75,並非150,因為這兩個block具有相同的內容,他們互為副本。


考慮到rack的計算方式就是將rack看作跟node同等的位置,計算effective size之後,可得如下順序:


1. Rack 2 (250)


h4 (150)

h3 (100)

2. Rack 1 (175)


h1 (175)

h2 (100)

3. Rack 3 (150)


h5 (150)

h6 (150)

4. Rack 4 (75)


h7 (75)

h8 (75)

因此,優先順序是h4 > h3 > h1 > h2 > h5 > h6 > h7 > h8。


不考慮rack的方法更簡單,節點排序的結果為:


h1 (175)

h4 (150)

h5 (150)

h6 (150)

h2 (100)

h3 (100)

h7 (75)

h8 (75)

其優先順序為 h1 > h4 > h5 > h6 > h2 > h3 > h7 > h8。


更多細節參見Hadoop的測試用例: https://github.com/apache/hadoop-common/blob/release-1.0.4/src/test/org/apache/hadoop/mapred/TestGetSplitHosts.java


雙向連結串列


CombineFileInputFormat選擇了另外的方式保持locality的性質,它使用雙向連結串列將block串在一起,之後先是逐節點掃描block,再次是逐rack掃描node,最後剩下來的“殘片”丟到最後一堆處理,這樣最大限度的保證locality,並且維持partition的大小盡可能平衡。缺點就是出現跨block的檔案的情況下,同一個檔案的block很有可能落到不同的partition中。這裡的陷阱是,在Hadoop老API中,isSplitable()函式不能起到保持同一個檔案內容在同一個partition中的作用,而在新API中反倒有這個功能了。各位使用的時候可要睜大眼睛。順便說一句,新API雖然加入了這個功能,但是不切分檔案的情況下,在保持locality和partition均衡的性質上可就沒老API好了。無論如何,這些trade-off總是逃不掉的。


Double linked lists sweep for constructing split - CombineFileInputFormat.java

// mapping from a rack name to the list of blocks it has

HashMap<String, List<OneBlockInfo>> rackToBlocks =

new HashMap<String, List<OneBlockInfo>>();

// mapping from a block to the nodes on which it has replicas

HashMap<OneBlockInfo, String[]> blockToNodes =

new HashMap<OneBlockInfo, String[]>();

// mapping from a node to the list of blocks that it contains

HashMap<String, List<OneBlockInfo>> nodeToBlocks =

new HashMap<String, List<OneBlockInfo>>();

...

// process all nodes and create splits that are local

// to a node.

for (Iterator<Map.Entry<String,

List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();

iter.hasNext();) {

如何讀取


聊了這麼多,我們總算清楚MapReduce類的程式如何在組織split的時候保持良好的block的本地性了。我們很開心的獲得其中的“最佳地點”,並將這個資訊傳遞給spark的partition。下一步工作就是Spark根據“最佳地點”,如上例中的節點h4,啟動worker上的處理程序/執行緒開始讀取資料了。現在h4啟動了spark的executor開始處理split中的block。但是稍等,h4怎麼知道從哪個節點上獲得每個block呢?要知道,每個block有三個副本呢!具體讀取該block的哪個副本這個資訊並未傳遞給partition。


從RecordReader開始,我們再來一步步還原資料讀取的過程。有了上面的基礎,這次的旅程應該很快了。以筆者寫的BatchFileRecordReader為例:


Constructer of BatchFileRecoderReader - BatchFileRecorderReader.java

public BatchFileRecordReader(

CombineFileSplit split,

Configuration conf,

Reporter reporter,

Integer index)

throws IOException {

path = split.getPath(index);

startOffset = split.getOffset(index);

pos = startOffset;

end = startOffset + split.getLength(index);


FileSystem fs = path.getFileSystem(conf);

fileIn = fs.open(path);

fileIn.seek(startOffset);


totalMemory = Runtime.getRuntime().totalMemory();

}

在上面的程式碼中,我們從split中拿到了path,注意這裡的path是當前檔案路徑,可不是block路徑。有了它,我們可以拿到一個fileIn,其型別為FSDataInputStream。之後我們seek到這個block開始的位置,稱作startOffset。等一下,我們根本沒用到“最佳地點”的資訊,是不是很奇怪呢?我們之前花了大量力氣拿到的資訊,這裡沒有用到。


這裡需要記住的是,目前為止我們獲得的split資訊只是為每個block分配了計算節點,僅此而已。如何讀取由別的程式碼控制。那麼再來看看FSDataInputStream,這裡也沒有太多東西,只有一些看上去沒啥用的程式碼。


FSDataInputStream.java

public class FSDataInputStream extends DataInputStream

implements Seekable, PositionedReadable, Closeable {


public FSDataInputStream(InputStream in)

throws IOException {

super(in);

if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {

throw new IllegalArgumentException(

"In is not an instance of Seekable or PositionedReadable");

}

}


public synchronized void seek(long desired) throws IOException {

((Seekable)in).seek(desired);

}

}

好吧,我們另覓他途。注意之前fileIn是由fs.open()這個呼叫獲得的,在HDFS的場景下,這個fs其實是DistributedFileSystem,即常說的DFS。結果我們在DFS中找到了由DFSInputStream包裝成的FSDataInputStream,前者在DFSClient中實現。我們所期待的函式是blockSeekTo(),這個函式負責給定偏移量之後找到合適的block。之後它會找到最優的DataNode,並從中讀取資料。


Find an appropriate block and select a DataNode - DFSClient.java

DatanodeInfo chosenNode = null;

int refetchToken = 1; // only need to get a new access token once

while (true) {

//

// Compute desired block

//

LocatedBlock targetBlock = getBlockAt(target, true);

assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;

long offsetIntoBlock = target - targetBlock.getStartOffset();


DNAddrPair retval = chooseDataNode(targetBlock);

chosenNode = retval.info;

InetSocketAddress targetAddr = retval.addr;

}

這其中最重要的函式是chooseDataNode(),它非常簡單,只是從一個DataNode列表中選擇第一個node。如果第一個node連線不上,再找第二個,依次類推。bestNode()函式中的註釋說這裡的node列表已經按照優先規則排序好的了。很奇怪,這是在什麼時候排序的呢?


實際上,在這個檔案首次開啟的時候就已經排序好了。參見openInfo()函式,它呼叫callGetBlockLocations()函式進行排序。後者在NameNode中的getBlockLocations()中查詢資訊。可以看到它呼叫了clusterMap中的pseudoSortByDistance()進行排序。至此,我們獲得了Hadoop為應用保持資料本地性的全景。


Get block locations and sorted in the priority order - FSNamesystem.java

LocatedBlocks getBlockLocations(String clientMachine, String src,

long offset, long length) throws IOException {

LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true);

if (blocks != null) {

//sort the blocks

DatanodeDescriptor client = host2DataNodeMap.getDatanodeByHost(

clientMachine);

for (LocatedBlock b : blocks.getLocatedBlocks()) {

clusterMap.pseudoSortByDistance(client, b.getLocations());

}

}

return blocks;

}

結語


分散式資料並行環境下,保持資料的本地性是非常重要的內容,事關分散式系統性能高下。要想更好的瞭解Spark是怎麼運作的,輸入也許是很重要的一個環節。舉一個小例子,你或許有心情在一臺不錯的機器上使用Spark處理100GB的資料。按理說這不應算作多大的應用場景,但如果不仔細調整一下你的輸入的話,你會發現Spark甚至會在這臺機器上切分上千個partition來並行處理這份資料。而這上千個partition隨便來一個shuffle造成的百萬量級的shuffle資料交換會把Spark效能拖死。實際上,呼叫Hadoop的API訪問本地磁碟的預設塊大小為32MB,據其分塊策略,當然會產生上千個partition。另外,如果你本地是一堆小檔案,如LDA的語料庫,你會發現Spark甚至會為每個檔案分配一個或多個partition!所以,這下你應該知道為什麼有時簡單的Spark程式也會非常慢了吧。


本文為了解決LDA小檔案輸入的問題,一步步揭開HDFS與Spark的資料通道的故事。總結來看,為了分散式使用各個機器,HDFS讀取的時候將資料分成了各個分塊,為了防止straggler的產生,MapReduce的讀取模組會盡量保證各個分塊在每臺機器上的大小和個數均衡。為了保證較好的locality,Spark獲取preferredLocation資訊,儘量保證在臨近的機器上讀取所需的資料。為了合理讀取小檔案,CombineFileInputFormat合理安排小檔案分片,既要保證資料在各個分塊中均衡,又不能切斷單個檔案。為了保證HDFS與Spark之間的高效資料通道,正可謂”無所不用其極”。


作者簡介


尹緒森,Intel實習生,熟悉並熱愛機器學習相關內容,對自然語言處理、推薦系統等有所涉獵。目前致力於機器學習演算法並行、凸優化層面的演算法優化問題,以及大資料平臺效能調優。對Spark、Mahout、GraphLab等開源專案有所嘗試和理解,並希望從優化層向下,系統層向上對並行演算法及平臺做出貢獻。


感謝辛湜對本文的審校。


感謝包研對本文的策劃。


給InfoQ中文站投稿或者參與內容翻譯工作,請郵件至[email protected]。也歡迎大家通過新浪微博(@InfoQ)或者騰訊微博(@InfoQ)關注我們,並與我們的編輯和其他讀者朋友交流。


轉載於:https://my.oschina.net/qiyong/blog/221862