CentOS6.5下spark分散式叢集的搭建
Spark是一個快速、通用的計算叢集框架,它的核心使用Scala語言編寫,它提供了Scala、Java和Python程式語言high-level API,使用這些API能夠非常容易地開發並行處理的應用程式。
下面,我們通過搭建Spark叢集計算環境,並進行簡單地驗證,來體驗一下使用Spark計算的特點。無論從安裝執行環境還是從編寫處理程式(用Scala,Spark預設提供的Shell環境可以直接輸入Scala程式碼進行資料處理),我們都會覺得比Hadoop MapReduce計算框架要簡單得多,而且,Spark可以很好地與HDFS進行互動(從HDFS讀取資料,以及寫資料到HDFS中)。
安裝配置
- 下載安裝配置Scala
tar -zxvf scala-2.10.3
mv scala-2.10.3 scala
- 在/etc/profile 中增加環境變數SCALA_HOME,並使之生效:
export SCALA_HOME=/usr/local/scala
export PATH=.:$PATH:$SCALA_HOME/bin
- 將scala的程式檔案和配置檔案拷貝分發到從節點機器上:
scp -r scala slaves1:/usr/local/
scp -r scala slaves2:/usr/local/
-
下載安裝配置Spark
-
我們首先在主節點master上配置Spark程式,然後將配置好的程式檔案複製分發到叢集的各個從結點上。下載解壓縮:
tar -zxvf spark-0.9.0-incubating-bin-hadoop1.tgz
-
在/etc/profile 中增加環境變數SPARK_HOME,並使之生效:
export SPARK_HOME=/usr/local/spark
export PATH=.:$PATH:$SCALA_HOME/bin:$SPARK_HOEM/bin
-
在master上配置Spark,修改spark-env.sh配置檔案:
cd /usr/local/spark
cp spark-env.sh.template spark-env.sh
在該指令碼檔案中,同時將SCALA_HOME和JAVA_HOEM配置為Unix環境下實際指向路徑,例如:
SCALA_HOME=/usr/local/scala
JAVA_HOEM=/usr/local/jdk
-
修改conf/slaves檔案,將計算節點的主機名新增到該檔案,一行一個,例如:
slaves1
salves2
-
最後,將Spark的程式檔案和配置檔案拷貝分發到從節點機器上:
scp -r spark slaves1:/usr/local/
scp -r spark slaves2:/usr/local/
-
啟動Spark叢集
我們會使用HDFS叢集上儲存的資料作為計算的輸入,所以首先要把Hadoop叢集安裝配置好,併成功啟動,我這裡使用的是Hadoop 1.2.1版本。啟動Spark計算叢集非常簡單,執行如下命令即可:
cd /usr/local/spark
sbin/start-all.sh
可以看到,在master上啟動了一個名稱為Master的程序,在slaves1和slaves2上啟動了一個名稱為Worker的程序,如下所示,我這裡也啟動了Hadoop叢集:
主節點msater上:
從節點slaves1上:
從節點slaves2上:
各個程序是否啟動成功,也可以檢視日誌來診斷,例如:
tail -100f $SPARK_HOME/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slaves1.out
tail -100f $SPARK_HOME/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-slaves2.out
-
Spark叢集計算驗證
我們使用我的網站的訪問日誌檔案來演示,示例如下:
統計該檔案裡面IP地址出現頻率,來驗證Spark叢集能夠正常計算。另外,我們需要從HDFS中讀取這個日誌檔案,然後統計IP地址頻率,最後將結果再儲存到HDFS中的 指定目錄。
首先,需要啟動用來提交計算任務的Spark Shell:
sbin/spark-shell
在Spark Shell上只能使用Scala語言寫程式碼來執行。
然後,執行統計IP地址頻率,在Spark Shell中執行如下程式碼來實現:
var file=sc.textFile("hdfs://master:9000/apachelog/ida_20141022_001.log")
val result = file.flatMap(line => line.split("\\s+.*")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
-
最後,我們想要將結果儲存到HDFS中,只要輸入如下程式碼:
result.saveAsTextFile("hdfs://master:9000/apachelog/ida_20141022_001.log.result")
-
檢視HDFS上的結果資料:
-
hadoop fs -cat /apachelog/ida_20141022_001.log_result/part-00000
(42.156.250.103,2)
(14.110.234.12,98)
(123.147.245.120,117)
(14.152.68.149,1)
(125.84.60.71,20)
(223.85.132.24,3)
(117.159.28.22,21)