Spark-Programming Guides官網一
Quick Start
Spark2.0之前是RDD(彈性分散式資料集)。Spark2.0後RDD被Dataset取代
Interactive Analysis with the Spark Shell
-
讀檔案得到資料集 textFile=spaek.read.text(“README.md”) 注意從本地讀使用絕對路徑 file:/wdd/app/spark/README.md 從hdfs上讀 用絕對路徑 hdfs:/wdd/…
-
計數textFile.count() 顯示第一個textFile.first()
-
將這個資料集轉成一個新的資料集 linesWithSpark = textFile.filter(textFile.value.contains(“Spark”)
More on Dataset Operations
- \s表示 空格,回車,換行等空白符,+號表示一個或多個的意思,所以 split("\s+") 這個就能實現你的 多個空格切割的效果
from pyspark.sql.functions import * textFile.select(size(split(textFile.value,"\s+")).name("numWords")).agg(max(col("numWords"))).collect() [Row(max(numWords)=15)]
- wordCounts = textFile.select(explode(split(textFile.value, “\s+”)).alias(“word”)).groupBy(“word”).count() explode:我們在select中使用爆炸式函式,將行資料集轉換為詞資料集
Caching
- 如果某個詞反覆用到我們可以把它放到叢集的記憶體中去 linesWithSpark.cache()
Self-Contained Applications
簡單的應用程式
"""SimpleApp.py""" from pyspark.sql import SparkSession logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system spark = SparkSession.builder.appName("SimpleApp").getOrCreate() logData = spark.read.text(logFile).cache() numAs = logData.filter(logData.value.contains('a')).count() numBs = logData.filter(logData.value.contains('b')).count() print("Lines with a: %i, lines with b: %i" % (numAs, numBs)) spark.stop()
使用SparkSession建立資料集 這個程式只計算文字檔案中包含“a”的行數和包含“b”的行數。注意,您需要將您的_spark_home替換為Spark安裝位置。與Scala和Java示例一樣,我們使用SparkSession建立資料集。對於使用自定義類或第三方庫的應用程式,我們還可以通過將程式碼打包為.zip檔案(請參閱spark-submit—help獲取詳細資訊),通過它的-py-files引數向spark-submit新增程式碼依賴項。SimpleApp非常簡單,我們不需要指定任何程式碼依賴關係。
Where to Go from Here
1.For Scala and Java, use run-example: ./bin/run-example SparkPi
2.For Python examples, use spark-submit directly: ./bin/spark-submit examples/src/main/python/pi.py
3.For R examples, use spark-submit directly: ./bin/spark-submit examples/src/main/r/dataframe.R
踩坑
worker記憶體不能低於1G local[n] n個執行緒必須與你虛擬機器上設定的邏輯執行緒個數一致