Spark 使用Python在pyspark中執行簡單wordcount
0.參考文章
1.pyspark練習
進入到spark目錄,
1.1 修改log4j.properties
Spark(和PySpark)的執行可以特別詳細,很多INFO日誌訊息都會列印到螢幕。開發過程中,這些非常惱人,因為可能丟失Python棧跟蹤或者print的輸出。為了減少Spark輸出 – 你可以設定$SPARK_HOME/conf
下的log4j。首先,拷貝一份$SPARK_HOME/conf/log4j.properties.template
檔案,去掉“.template”副檔名。
cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties
編輯log4.properties,將INFO替換為WARN
替換後如下:
# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN
log4j.logger .org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN
現在執行pyspark,輸出訊息將會更簡略。
然後採用預設的設定執行pyspark
./bin/pyspark
配置master引數,使用4個Worker執行緒本地化執行Spark(local[k]應該根據執行機器的CPU核數確定)
./bin/pyspark –master local[4]
增加的–py-files,是將指定的檔案加到search path,以便之後import
./bin/pyspark –master local[4] –py-files code.py
MASTER_URL | 含義 |
---|---|
local | 使用一個Worker執行緒本地化執行Spark(預設) |
local[k] | 使用K個Worker執行緒本地化執行Spark |
local[*] | 使用K個Worker執行緒本地化執行Spark(這裡K自動設定為機器的CPU核數) |
spark://HOST:PORT | 連線到指定的Spark單機版叢集(Spark standalone cluster)master。必須使用master所配置的介面,預設介面7077.如spark://10.10.10.10:7077 |
mesos://HOST:PORT | 連線到指定的Mesos叢集。host引數是Moses master的hostname。必須使用master所配置的介面,預設介面是5050. |
yarn-client | 以客戶端模式連線到yarn叢集,叢集位置由環境變數HADOOP_CONF_DIR決定. |
yarn-cluster | 以叢集模式連線到yarn叢集,同樣由HADOOP_CONF_DIR決定連線到哪兒 |
接下來:
textFile = sc.textFile("file:///usr/local/cluster/spark/README.md")
如果使用本地檔案系統的路徑,那麼這個檔案在工作節點上也應該可以按照這個路徑讀到。即, 要麼把檔案copy到所有工作節點,或者使用網路共享之。
Spark預設從HDFS中讀取資料,如果輸入
textFile = sc.textFile(“./README.md”)
會報錯:
Input path does not exist: hdfs://manage02:9000/user/root/README.md
或者:
登入hadoop主節點 將要進行wordcount的檔案上傳到HDFS上,然後再從HDFS中讀入資料。
hadoop fs -put /user/root/README.md input
# 然後進入pyspark
textFile = sc.textFile("hdfs://manage02:9000/user/root/README.md")
Spark的檔案輸入方法,可以run在目錄,壓縮檔案上,支援萬用字元。
如:
textFile("/my/directory")
textFile("/my/directory/*.txt")
textFile("/my/directory/*.gz")
textFile.count() # textFile返回的每一行的記錄,相當於檔案行數
textFile.first()
u'# Apache Spark'
linesWithSpark = textFile.filter(lambda line:"Spark" in line)
# filter方法建立一個新的RDD資料集(包含有'Spark'的行)
linesWithSpark.count()
2.構建Python-Spark程式
1.加入所需的Spark模組
from pyspark import SparkContext, SparkConf
...
2.建立一個SparkContext物件(在pyspark shell裡是自動建立好的)
sc = SparkContext(master, appname)
sc = SparkContext(“local”,”Page Rank”)