Python海量資料處理之_Hadoop&Spark
1. 說明
前篇介紹了安裝和使用Hadoop,本篇將介紹Hadoop+Spark的安裝配置及如何用Python呼叫Spark。
當資料以TB,PB計量時,用單機處理資料變得非常困難,於是使用Hadoop建立計算叢集處理海量資料,Hadoop分為兩部分,一部分是資料儲存HDFS,另一部分是資料計算MapReduce。MapReduce框架將資料處理分成map,reduce兩段,使用起來比較麻煩,並且有一些限制,如:資料都是流式的,且必須所有Map結束後才能開始Reduce。我們可以引入Spark加以改進。
Spark的優點在於它的中間結果儲存在記憶體中,而非HDFS檔案系統中,所以速度很快。用Scala 語言可以像操作本地集合物件一樣輕鬆地操作分散式資料集。雖然它支援中間結果儲存在記憶體,但叢集中的多臺機器仍然需要讀寫資料集,所以它經常與HDFS共同使用。因此,它並非完全替代Hadoop。
Spark的框架是使用Scala語言編寫的,Spark的開發可以使用語言有:Scala、R語言、Java、Python。
2. Scala
Scala是一種類似java的程式語言,使用Scala語言相對來說程式碼量更少,呼叫spark更方便,也可以將它和其它程式混用。
在不安裝scala的情況下,啟動hadoop和spark,python的基本例程也可以正常執行。但出於進一步開發的需要,最好安裝scala。
(1) 下載scala
(2) 安裝
$ cd /home/hadoop #使用者可選擇安裝的資料夾 $ tar xvzf tgz/scala-2.11.12.tgz $ ln -s scala-2.11.12/ scala 在.bashrc中加入 export PATH=/home/hadoop/scala/bin:$PATH
3. 下載安裝Spark
(1) 下載spark
(2) 安裝spark
$ cd /home/hadoop #使用者可選擇安裝的資料夾
$ tar xvzf spark-2.2.1-bin-hadoop2.7.tgz
$ ln -s spark-2.2.1-bin-hadoop2.7/ spark
在.bashrc中加入
export SPARK_HOME=/home/hadoop/spark
export PATH=$SPARK_HOME/bin:$PATH
(3) 配置檔案
不做配置,pyspark可以在本機上執行,但不能使用叢集中其它機器。配置檔案在$SPARK_HOME/conf/目錄下。
i. 配置spark-env.sh
$ cd $SPARK_HOME/conf/
$ cp spark-env.sh.template spark-env.sh
按具體配置填寫內容
export SCALA_HOME=/home/hadoop/scala
export JAVA_HOME=/exports/android/jdk/jdk1.8.0_91/
export SPARK_MASTER_IP=master
export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=/home/hadoop/hadoop/etc/hadoop/
ii. 設定主從伺服器slave
$ cp slaves.template slaves
在其中列出從伺服器地址,單機不用設
iii. 設定spark-defaults.conf
$ cp conf/spark-defaults.conf.template conf/spark-defaults.conf
按具體配置填寫內容
spark.master spark://master:7077
spark.eventLog.enabled false
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 1g
spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
(4) 啟動
執行spark之前,需要執行hadoop,具體見之前的Hadoop文件
$ $SPARK_HOME/sbin/start-all.sh
該指令碼啟動了所有master和workers,在本機用jps檢視,增加了Worker和Master,
4. 命令列呼叫
下面我們來看看從程式層面如何使用Spark
(1) 準備工作
在使用相對路徑時,系統預設是從hdfs://localhost:9000/中讀資料,因此需要先把待處理的本地檔案複製到HDFS上,常用命令見之前的Hadoop有意思。
$ hadoop fs -mkdir -p /usr/hadoop
$ hadoop fs -copyFromLocal README.md /user/hadoop/
(2) Spark命令列
$ pyspark
>>> textFile = spark.read.text("README.md")
>>> textFile.count() # 返回行數
>>> textFile.first() # 返回第一行
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark")) # 返回所有含Spark行的資料集
5. 程式
(1) 實現功能
統計檔案中的詞頻
(2) 程式碼
這裡使用了spark自帶的例程 /home/hadoop/spark/examples/src/main/python/wordcount.py,和之前介紹過的hadoop程式一樣,同樣是實現的針對key,value的map,reduce,一個檔案就完成了,看起來更簡徢更靈活,像是hadoop自帶MapReduce的加強版。具體內容如下:
from __future__ import print_function
import sys
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: wordcount <file>", file=sys.stderr)
exit(-1)
spark = SparkSession\
.builder\
.appName("PythonWordCount")\
.getOrCreate()
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
output = counts.collect() # 收集結果
for (word, count) in output:
print("%s: %i" % (word, count))
spark.stop()
(3) 執行
spark-submit命令在$HOME_SPARK/bin目錄下,之前設定了PATH,可以直接使用
$ spark-submit $SPARK_HOME/examples/src/main/python/wordcount.py /user/hadoop/README.md
引數是hdfs中的檔案路徑。
此時訪問$SPARK_IP:8080埠,可以看到程式PythonWordCount正在hadoop中執行。
6. 多臺機器上安裝Spark以建立叢集
和hadoop的叢集設定類似,同樣是把整個spark目錄複製叢集中其它的伺服器上,用slaves檔案設定主從關係,然後啟動$SPARK_HOME/sbin/start-all.sh。正常開啟後可以通過網頁檢視狀態:SparkMaster_IP:8080