海量資料的儲存計算和查詢模型
海量資料(“Big Data”)是指那些足夠大的資料,以至於無法再使用傳統的方法進行處理。在過去,一直是Web搜尋引擎的建立者們首當其衝的面對這個問題。而今天,各種社交網路,移動應用以及各種感測器和科學領域每天建立著上PB的資料。 為了應對這種大規模資料處理的挑戰,google創造了MapReduce。Google的工作以及yahoo建立的Hadoop孵化出一個完整的海量資料處理工具的生態系統。
隨著MapReduce的流行,一個由資料儲存層,MapReduce和查詢(簡稱SMAQ)組成的海量資料處理的棧式模型也逐漸展現出來。SMAQ系統通常是開源的,分散式的,執行在普通硬體上。
就像由Linux, Apache, MySQL and PHP 組成的LAMP改變了網際網路應用開發領域一樣,SMAQ將會把海量資料處理帶入一個更廣闊的天地。正如LAMP成為Web2.0的關鍵推動者一樣,SMAQ系統將支撐起一個創新的以資料為驅動的產品和服務的新時代。
儘管基於Hadoop的架構佔據了主導地位,但是SMAQ模型也包含大量的其他系統,包括主流的NoSQL資料庫。這篇文章描述了SMAQ棧式模型以及今天那些可以包括在這個模型下的海量資料處理工具。
MapReduce
MapReduce是google為建立web網頁索引而建立的。MapReduce框架已成為今天大多數海量資料處理的廠房。MapReduce的關鍵在於,將在資料集合上的一個查詢進行劃分,然後在多個節點上並行執行。這種分散式模式解決了資料太大以至於無法存放在單獨一臺機器上的難題。
為了理解MapReduce是如何工作的,我們首先看它名字所體現出的兩個過程。首先在map階段,輸入資料被一項一項的處理,轉換成一箇中間結果集,然後在reduce階段,這些中間結果又被規約產生一個我們所期望得到的歸納結果。
說到MapReduce,通常要舉的一個例子就是查詢一篇文件中不同單詞的出現個數。在map階段單詞被抽出來,然後給個count值1,在reduce節點,將相同的單詞的count值累加起來。
看起來是不是將一個很簡單的工作搞地很複雜了,這就是MapReduce。為了讓MapReduce完成這項任務,map和reduce階段必須遵守一定的限制來使得工作可以並行化。將查詢請求轉換為一個或者多個MapReduce並不是一個直觀的過程,為了解決這個問題,一些更高階的抽象被提出來,我們將在下面關於查詢的那節裡進行討論。
使用MapReduce解決問題,通常需要三個操作:
資料載入—用資料倉庫的叫法,這個過程叫做抽取(extract),轉換(transform),載入(load){簡稱ETL}更合適些。為了利用MapReduce進行處理,資料必須從源資料裡抽取出來,進行必要的結構化,載入到MapReduce可以訪問的儲存層。
MapReduce—從儲存層訪問資料,進行處理,再將結果返回給儲存層
結果抽取—一旦處理完畢,為了讓結果對於人來說是可用的,還需要能夠將儲存層的結果資料進行查詢和展示。
很多SMAQ系統都具有自身的一些屬性,主要就是圍繞上述三個過程的簡化。
Hadoop MapReduce
Hadoop是主要的開源MapReduce實現。由yahoo資助,2006年由Doug Cutting建立,2008年達到了web規模的資料處理容量。
Hadoop專案現在由Apache管理。隨著不斷的努力,和多個子專案一起共同構成了完整的SMAQ模型。
由於是用java實現的,所以Hadoop的MapReduce實現可以通過java語言互動。建立MapReduce job通常需要寫一些函式用來實現map和reduce階段需要做的計算。處理資料必須能夠載入到Hadoop的分散式檔案系統中。
以wordcount為例,map函式如下(來源於Hadoop MapReduce文件,展示了其中關鍵的步驟)
public static class Map
extends Mapper {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
對應的reduce函式如下:
public static class Reduce
extends Reducer {
public void reduce(Text key, Iterable values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
使用Hadoop執行一個MapReduce job包括如下幾個步驟:
1. 用一個java程式定義MapReduce的各個階段
2. 將資料載入進檔案系統
3. 提交job進行執行
4. 從檔案系統獲取執行結果
直接通過java API,Hadoop MapReduce job寫起來可能很複雜,需要程式設計師很多方面的參與。為了讓資料載入和處理工作更加簡單直接,圍繞著Hadoop一個很大的生態系統已經形成。
其他實現
MapReduce已經在很多其他的程式語言和系統中實現,詳細的列表可以參考Wikipedia's entry for MapReduce.。尤其是幾個NoSQL資料庫已經集成了MapReduce,後面我們會對此進行描述。
Storage
從資料獲取到結果存放,MapReduce都需要與儲存打交道。與傳統資料庫不同,MapReduce的輸入資料並不是關係型的。輸入資料存放在不同的chunk上,能夠劃分給不同的節點,然後提供以key-value的形式提供給map階段。資料不需要一個schema,而且可能是無結構的。但是資料必須是可分佈的,能夠提供給不同的處理節點。
儲存層的設計和特點很重要不僅僅是因為它與MapReduce的介面,而且因為它們直接決定了資料載入和結果查詢和展示的方便性。
Hadoop分散式檔案系統
Hadoop使用的標準儲存機制是HDFS。作為Hadoop的核心部分,HDFS有如下特點,詳細參見HDFS design document.:
容錯 -- 假設失敗是常態允許HDFS執行在普通硬體上
流資料訪問 – HDFS實現時考慮的是批量處理,因此著重於高吞吐率而不是資料的隨機訪問
高度可擴充套件性 – HDFS可以擴充套件到PB級的資料,比如Facebook就有一個這樣的產品級使用
可移植性 – Hadoop是可以跨作業系統移植的
單次寫 – 假設檔案寫後不會改變,HDFS簡化了replication提高了資料吞吐率
計算本地化 – 考慮到資料量,通常將程式移到資料附近執行會更快,HDFS提供了這方面的支援
HDFS提供了一個類似於標準檔案系統的介面。與傳統資料庫不同,HDFS只能進行資料儲存和訪問,而不能為資料建立索引。無法對資料進行簡單的隨機訪問。但是一些更高階的抽象已經創建出來,用來提供對Hadoop的更細粒度的功能,比如HBase。
HBase,Hadoop資料庫
一種使HDFS更具可用性的方法是HBase。模仿谷歌的BigTable資料庫,HBase也是一個設計用來儲存海量資料的列存式資料庫。它也屬於NoSQL資料庫範疇,類似於Cassandra and Hypertable。
HBase使用HDFS作為底層儲存系統,因此也具有通過大量容錯分散式節點來儲存大量的資料的能力。與其他的列儲存資料庫類似,HBase也提供基於REST和Thrift的訪問API。
由於建立了索引,HBase可以為一些簡單的查詢提供對內容快速的隨機訪問。對於複雜的操作,HBase為Hadoop MapReduce提供資料來源和儲存目標。因此HBase允許系統以資料庫的方式與MapReduce進行互動,而不是通過底層的HDFS。
Hive
資料倉庫或者是使報告和分析更簡單的儲存方式是SMAQ系統的一個重要應用領域。最初在Facebook開發的Hive,是一個建立在Hadoop之上是資料倉庫框架。類似於HBase,Hive提供一個在HDFS上的基於表的抽象,簡化了結構化資料的載入。與HBase相比,Hive只能執行MapReduce job進行批量資料分析。如下面查詢那部分描述的,Hive提供了一個類SQL的查詢語言來執行MapReduce job。
Cassandra and Hypertable
Cassandra和 Hypertable都是具有BigTable模式的類似於HBase的列儲存資料庫。
作為Apache的一個專案,Cassandra最初是在Facebook產生的。現在應用在很多大規模的web站點,包括Twitter, Facebook, Reddit and Digg。Hypertable產生於Zvents,現在也是一個開源專案。
這兩個資料庫都提供與Hadoop MapReduce互動的介面,允許它們作為Hadoop MapReduce job的資料來源和目標。在更高層次上,Cassandra提供與Pig查詢語言的整合(參見查詢章節),而Hypertable已經與Hive整合。
NoSQL資料庫的MapReduce實現
目前為止我們提到的儲存解決方案都是依賴於Hadoop進行MapReduce。還有一些NoSQL資料庫為了對儲存資料進行平行計算本身具有內建的Mapreduce支援。與Hadoop系統的多元件SMAQ架構不同,它們提供一個由storage, MapReduce and query一體組成的自包含系統。
基於Hadoop的系統通常是面向批量處理分析,NoSQL儲存通常是面向實時應用。在這些資料庫裡,MapReduce通常只是一個附加功能,作為其他查詢機制的一個補充而存在。比如,在Riak裡,對MapReduce job通常有一個60秒的超時限制,而通常來說, Hadoop 認為一個job可能執行數分鐘或者數小時。
下面的這些NoSQL資料庫都具有MapReduce功能:
CouchDB,一個分散式資料庫,提供了半結構化的文件儲存功能。主要特點是提供很強的多副本支援,以及可以進行分散式更新。在CouchDB裡,查詢是通過使用javascript定義MapReduce的map和reduce階段實現的。
MongoDB,本身很類似於CouchDB,但是更注重效能,對於分散式更新,副本,版本的支援相對弱些。MapReduce也是通過javascript描述的。
Riak,與前面兩個資料庫也很類似。但是更關注高可用性。可以使用javascript或者Erlang描述MapReduce。
與關係型資料庫的整合
在很多應用中,主要的源資料儲存在關係型資料庫中,比如Mysql或者Oracle。MapReduce通常通過兩種方式使用這些資料:
使用關係型資料庫作為源(比如社交網路中的朋友列表)
將MapReduce結果重新注入到關係型資料庫(比如基於朋友的興趣產生的產品推薦列表)
理解MapReduce如何與關係型資料庫互動是很重要的。最簡單的,通過組合使用SQL匯出命令和HDFS操作,帶分隔符的文字檔案可以作為傳統關係型資料庫和Hadoop系統間的匯入匯出格式。更進一步的講,還存在一些更復雜的工具。
Sqoop工具是設計用來將資料從關係型資料庫匯入到Hadoop系統。它是由Cloudera開發的,一個專注於企業級應用的Hadoop平臺經銷商。Sqoop是與具體資料庫無關的,因為它使用了java的JDBC資料庫API。可以將整個表匯入,也可以使用查詢命令限制需要匯入的資料。
Sqoop也提供將MapReduce的結果從HDFS導回關係型資料庫的功能。因為HDFS是一個檔案系統,所以Sqoop需要以分隔符標識的文字為輸入,需要將它們轉換為相應的SQL命令才能將資料插入到資料庫。
與streaming資料來源的整合
關係型資料庫以及流式資料來源(比如web伺服器日誌,感測器輸出)組成了海量資料系統的最常見的資料來源。Cloudera的Flume專案就是旨在提供流式資料來源與Hadoop之間整合的方便工具。Flume收集來自於叢集機器上的資料,將它們不斷的注入到HDFS中。Facebook的Scribe伺服器也提供類似的功能。
商業性的SMAQ解決方案
一些MPP資料庫具有內建的MapReduce功能支援。MPP資料庫具有一個由並行執行的獨立節點組成的分散式架構。它們的主要功能是資料倉庫和分析,可以使用SQL。
Greenplum:基於開源的PostreSQL DBMS,執行在分散式硬體組成的叢集上。MapReduce作為SQL的補充,可以進行在Greenplum上的更快速更大規模的資料分析,減少了幾個數量級的查詢時間。Greenplum MapReduce允許使用由資料庫儲存和外部資料來源組成的混合資料。MapReduce操作可以使用Perl或者Python函式進行描述。
Aster Data 的nCluster資料倉庫系統也提供MapReduce支援。MapReduce操作可以通過使用Aster Data的SQL-MapReduce技術呼叫。SQL-MapReduce技術可以使SQL查詢和通過各種語言(C#, C++, Java, R or Python)的原始碼定義的MapReduce job組合在一塊。
其他的一些資料倉庫解決方案選擇提供與Hadoop的聯結器,而不是在內部整合MapReduce功能。
Vertica:是一個提供了Hadoop聯結器的列存式資料庫。
Netezza:最近由IBM收購。與Cloudera合作提高了它與Hadoop之間的互操作性。儘管它解決了類似的問題,但是實際上它已經不在我們的SMAQ模型定義之內,因為它既不開源也不執行在普通硬體上。
儘管可以全部使用開源軟體來建立一個基於Hadoop的系統,但是整合這樣的一個系統仍然需要一些努力。Cloudera的目的就是使得Hadoop更能適應用企業化的應用,而且在它們的Cloudera Distribution for Hadoop (CDH)中已經提供一個統一的Hadoop發行版。
查詢
通過上面的java程式碼可以看出使用程式語言定義MapReduce job的map和reduce過程並不是那麼的直觀和方便。為了解決這個問題,SMAQ系統引人了一個更高層的查詢層來簡化MapReduce操作和結果查詢。
很多使用Hadoop的組織為了使操作更加方便,已經對Hadoop的API進行了內部的封裝。有些已經成為開源專案或者商業性產品。
查詢層通常並不僅僅提供用於描述計算過程的特性,而且支援對資料的存取以及簡化在MapReduce叢集上的執行流程。
Pig
由yahoo開發,目前是Hadoop專案的一部分。Pig提供了一個稱為Pig Latin的高階查詢語言來描述和執行MapReduce job。它的目的是讓Hadoop更容易被那些熟悉SQL的開發人員訪問,除了一個Java API,它還提供一個互動式的介面。Pig目前已經整合在Cassandra 和HBase資料庫中。 下面是使用Pig寫的上面的wordcount的例子,包括了資料的載入和儲存過程($0代表記錄的第一個欄位)。
input = LOAD 'input/sentences.txt' USING TextLoader();
words = FOREACH input GENERATE FLATTEN(TOKENIZE($0));
grouped = GROUP words BY $0;
counts = FOREACH grouped GENERATE group, COUNT(words);
ordered = ORDER counts BY $0;
STORE ordered INTO 'output/wordCount' USING PigStorage();
Pig是非常具有表達力的,它允許開發者通過UDFs(User Defined Functions )書寫一些定製化的功能。這些UDF使用java語言書寫。儘管它比MapReduce API更容易理解和使用,但是它要求使用者去學習一門新的語言。某些程度上它與SQL有些類似,但是它又與SQL具有很大的不同,因為那些熟悉SQL的人們很難將它們的知識在這裡重用。
Hive
正如前面所述,Hive是一個建立在Hadoop之上的開源的資料倉庫。由Facebook建立,它提供了一個非常類似於SQL的查詢語言,而且提供一個支援簡單內建查詢的web介面。因此它很適合於那些熟悉SQL的非開發者使用者。
與Pig和Cascading的需要進行編譯相比,Hive的一個長處是提供即席查詢。對於那些已經成熟的商務智慧系統來說,Hive是一個更自然的起點,因為它提供了一個對於非技術使用者更加友好的介面。Cloudera的Hadoop發行版裡集成了Hive,而且通過HUE專案提供了一個更高階的使用者介面,使得使用者可以提交查詢並且監控MapReduce job的執行。
Cascading, the API Approach
Cascading提供了一個對Hadoop的MapReduce API的包裝以使它更容易被java應用程式使用。它只是一個為了讓MapReduce整合到更大的系統中時更簡單的一個包裝層。Cascading包括如下幾個特性:
旨在簡化MapReduce job定義的資料處理API
一個控制MapReduce job在Hadoop叢集上執行的API
訪問基於Jvm的指令碼語言,比如Jython, Groovy, or JRuby.
與HDFS之外的資料來源的整合,包括Amazon S3,web伺服器
提供MapReduce過程測試的驗證機制
Cascading的關鍵特性是它允許開發者將MapReduce job以流的形式進行組裝,通過將選定的一些pipes連線起來。因此很適用於將Hadoop整合到一個更大的系統中。 Cascading本身並不提供高階查詢語言,由它而衍生出的一個叫Cascalog的開源專案完成了這項工作。Cascalog通過使用Clojure JVM語言實現了一個類似於Datalog的查詢語言。儘管很強大,Cascalog仍然只是一個小範圍內使用的語言,因為它既不像Hive那樣提供一個類SQL的語言,也不像Pig那樣是過程性的。下面是使用Cascalog完成的wordcout的例子:
(defmapcatop split [sentence]
(seq (.split sentence "\\s+")))
(?<- (stdout) [?word ?count]
(sentence ?s) (split ?s :> ?word)
(c/count ?count))
使用Solr進行搜尋
大規模資料系統的一個重要元件就是資料查詢和摘要。資料庫層比如HBase提供了對資料的簡單訪問,但是並不具備複雜的搜尋能力。為了解決搜尋問題。開源的搜尋和索引平臺Solr通常與NoSQL資料庫組合使用。Solr使用Luence搜尋技術提供一個自包含的搜尋伺服器產品。比如,考慮一個社交網路資料庫,MapReduce可以使用一些合理的引數用來計算個人的影響力,這個數值會被寫回到資料庫。之後使用Solr進行索引,就允許在這個社交網路上進行一些操作,比如找到最有影響力的人。
最初在CENT開發,現在作為Apache專案的Solr,已經從一個單一的文字搜尋引擎演化為支援導航和結果聚類。此外,Solr還可以管理儲存在分散式伺服器上的海量資料。這使得它成為在海量資料上進行搜尋的理想解決方案,以及構建商業智慧系統的重要元件。
總結
MapReduce尤其是Hadoop實現提供了在普通伺服器上進行分散式計算的強有力的方式。再加上分散式儲存以及使用者友好的查詢機制,它們形成的SMAQ架構使得海量資料處理通過小型團隊甚至個人開發也能實現。
現在對資料進行深入的分析或者建立依賴於複雜計算的資料產品已經變得很廉價。其結果已經深遠的影響了資料分析和資料倉庫領域的格局,降低了該領域的進入門檻,培養了新一代的產品,服務和組織方式。這種趨勢在Mike Loukides的"What is Data Science?"報告中有更深入的詮釋。
Linux的出現僅僅通過一臺擺在桌面上的linux伺服器帶給那些創新的開發者們以力量。SMAQ擁有同樣大的潛力來提高資料中心的效率,促進組織邊緣的創新,開啟廉價建立資料驅動業務的新時代。