1. 程式人生 > >Spark 使用Python在pyspark中執行簡單wordcount

Spark 使用Python在pyspark中執行簡單wordcount

0.參考文章

1.pyspark練習

進入到spark目錄,

1.1 修改log4j.properties

Spark(和PySpark)的執行可以特別詳細,很多INFO日誌訊息都會列印到螢幕。開發過程中,這些非常惱人,因為可能丟失Python棧跟蹤或者print的輸出。為了減少Spark輸出 – 你可以設定$SPARK_HOME/conf下的log4j。首先,拷貝一份$SPARK_HOME/conf/log4j.properties.template檔案,去掉“.template”副檔名。

cp $SPARK_HOME/conf/log4j.properties.template
$SPARK_HOME/conf/log4j.properties

編輯log4.properties,將INFO替換為WARN
替換後如下:

# Set everything to be logged to the console
 log4j.rootCategory=WARN, console
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.target=System.err
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN log4j.logger
.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN

現在執行pyspark,輸出訊息將會更簡略。
然後採用預設的設定執行pyspark

./bin/pyspark

配置master引數,使用4個Worker執行緒本地化執行Spark(local[k]應該根據執行機器的CPU核數確定)

./bin/pyspark –master local[4]

增加的–py-files,是將指定的檔案加到search path,以便之後import

./bin/pyspark –master local[4] –py-files code.py

MASTER_URL 含義
local 使用一個Worker執行緒本地化執行Spark(預設
local[k] 使用K個Worker執行緒本地化執行Spark
local[*] 使用K個Worker執行緒本地化執行Spark(這裡K自動設定為機器的CPU核數)
spark://HOST:PORT 連線到指定的Spark單機版叢集(Spark standalone cluster)master。必須使用master所配置的介面,預設介面7077.如spark://10.10.10.10:7077
mesos://HOST:PORT 連線到指定的Mesos叢集。host引數是Moses master的hostname。必須使用master所配置的介面,預設介面是5050.
yarn-client 以客戶端模式連線到yarn叢集,叢集位置由環境變數HADOOP_CONF_DIR決定.
yarn-cluster 以叢集模式連線到yarn叢集,同樣由HADOOP_CONF_DIR決定連線到哪兒

接下來:

textFile = sc.textFile("file:///usr/local/cluster/spark/README.md")

如果使用本地檔案系統的路徑,那麼這個檔案在工作節點上也應該可以按照這個路徑讀到。即, 要麼把檔案copy到所有工作節點,或者使用網路共享之。

Spark預設從HDFS中讀取資料,如果輸入

textFile = sc.textFile(“./README.md”)

會報錯:

Input path does not exist: hdfs://manage02:9000/user/root/README.md

或者:
登入hadoop主節點 將要進行wordcount的檔案上傳到HDFS上,然後再從HDFS中讀入資料。

hadoop fs -put  /user/root/README.md input
# 然後進入pyspark
textFile = sc.textFile("hdfs://manage02:9000/user/root/README.md")

Spark的檔案輸入方法,可以run在目錄,壓縮檔案上,支援萬用字元。
如:

textFile("/my/directory")
textFile("/my/directory/*.txt")
textFile("/my/directory/*.gz")
textFile.count()  # textFile返回的每一行的記錄,相當於檔案行數
textFile.first()
u'# Apache Spark'
linesWithSpark = textFile.filter(lambda line:"Spark" in line) 
# filter方法建立一個新的RDD資料集(包含有'Spark'的行) 
linesWithSpark.count()

2.構建Python-Spark程式

1.加入所需的Spark模組

from pyspark import SparkContext, SparkConf
...

2.建立一個SparkContext物件(在pyspark shell裡是自動建立好的)

sc = SparkContext(master, appname)
sc = SparkContext(“local”,”Page Rank”)