【Spark】Spark Quick Start(快速入門翻譯)
本文主要是翻譯Spark官網Quick Start。只能保證大概意思,儘量保證細節。英文水平有限,如果有錯誤的地方請指正,輕噴
快速入門(Quick Start)
使用 Spark Shell 互動式程式設計
基本操作
更多關於 Dataset 的操作
快取
獨立的應用程式
下一步
這個指南提供了使用Spark的快速介紹。我們會首先介紹Spark 互動式程式設計(使用Python或者Scala)的 API, 然後展示如何用Java、Scala 和 Python來編寫應用程式。
為了使用這個指南,您需要先從 Spark 網頁 下載打包釋出的Spark安裝包。由於我們將不會(在指南中)使用HDFS, 您可以下載任意版本的Hadoop安裝包。
需要注意的是,Spark2.0 之前, Spark的主要程式設計介面是彈性分散式資料集(Resilient Distributed Dataset (RDD))。Spark2.0 之後, RDD 被 Dataset 取代,Dataset 和 RDD 一樣是強型別,但是在底層進行了更多的優化。Spark2.0 之後仍然支援 RDD 介面,並且您可以從RDD程式設計指南中 獲取更詳細的參考。當然,我們強烈建議您選擇使用Dataset, 因為它的效能比RDD更好。 檢視 SQL程式設計指南 以得到更多關於Dataset的資訊。
使用 Spark Shell 互動式程式設計
基本操作
Spark Shell 提供了一個簡單的方式去學習 API,同時也提供了一個強大的互動式資料分析工具。它可以基於 Scala(一種在java 虛擬機器上執行並因此可以很好地使用已有的java庫的程式語言)或 Python 使用。在 Spark 目錄下執行以下內容來開始(Sprk Shell):
Scala 版
./bin/pyspark
Python 版
./bin/pyspark
如果你當前環境使用pip下載了 PySpark,可以使用如下下方式呼叫
pyspark
Spark 主要的抽象是一個被叫做 Dataset 的分散式集合。 Dataset 可以通過 Hadoop InputFormat(比如HDFS檔案)或者 轉換其他 Dataset 中建立。讓我們通過 Spark 源目錄下的 README 檔案內容建立一個新的 Dataset:
Scala 版
scala> val textFile = spark.read.textFile("README.md") textFile: org.apache.spark.sql.Dataset[String] = [value: string]
Python 版
>>> textFile = spark.read.text("README.md")
你可以直接從Dataset中, 通過呼叫一些操作或者轉化Dataset以獲得一個新的Dataset來獲取它的值。請閱讀 API 文件(Scala / Python) 以獲取更多細節
Scala 版
scala> textFile.count() // 該Dataset中的成員數量 res0: Long = 126 // 由於README.md 會隨著時間的推移不斷改變,所以結果可能會有所不同, 其他輸出也有類似情況 scala> textFile.first() // 該Dataset的第一個成員 res1: String = # Apache Spark
Python 版
>>> textFile.count() # 該DataFrame中的行數 126 >>> textFile.first() # 該DataFrame的第一行 Row(value=u'# Apache Spark')
現在讓我們使用該Dataset來轉換成一個新的Dataset。 我們呼叫 filter 來返回一個新的Dataset, 其中包含這個檔案內容的子集。
Scala 版
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
Python 版
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
我們可以將資料集轉換和資料集操作串接在一起
Scala 版
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 15
Python 版
>>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"? 15
更多關於Dataset的操作
Dataset操作和轉換可以用來做更復雜的計算。假設我們想要找到單詞數量最多的那行:
Scala 版
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Long = 15
這首先將檔案中的一行對映成一個整數值,並建立一個新的Dataset。呼叫該 Dataset 的 reduce 方法以找到最大的單詞計數。map 和 reduce 的引數是 Scala 的函式字面量(閉包),並且可以使用任何語言的特性或者 Scala/Java 庫。 比如, 我們可以很榮譽地呼叫任何地方宣告地函式(方法)。我們將使用 Math.max() 方法以使這段程式碼易於理解:
scala> import java.lang.Math import java.lang.Math scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 15
MapReduce是一種常見的資料流格式, 這是由Hadoop推廣的。Spark 可以很容易地實現MapReduce流:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count() wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
這裡,我們呼叫 flatMap 來將一個行級(以文字中的一行為一個成員(Item))的 Dataset 轉換成一個 單詞 級 的Dataset,然後串接呼叫 groupByKey 和 count 方法 來計算檔案中的每個單詞的數量作為(String, Long)資料對形式 的Dateset。 為了在我們的shell中統計出單詞的數量, 我們可以呼叫 collect 方法:
scala> wordCounts.collect() res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
Python 版
>>> from pyspark.sql.functions import * >>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect() [Row(max(numWords)=15)]
這首先將檔案中的一行對映成一個整數值 並取一個為 “numWords” 的別名,同時建立一個新的DataFrame。呼叫該 Dataset 的 agg 方法以找到最大的單詞計數。select 和 agg 的引數都是 Colum,我們可以使用 df.colName 方法來從一個DataFrame中獲得一個 colum。我們同樣可以匯入 pyspark.sql.functions, 它提供了很多簡易的方法從一個已有的 Colum 構建一個新的 Colum。
MapReduce是一種常見的資料流格式, 這是由Hadoop推廣的。Spark 可以很容易地實現MapReduce流:
>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
這裡,我們在 select 方法中使用了 explode 方法來將一個行級(以文字中的一行為一個成員(Item))的 Dataset 轉換成一個 單詞 級 的Dataset。然後串接呼叫 groupByKey 和 count 方法 來計算檔案中的每個單詞的數量作為一個擁有兩個Colum:“word” 和 “count” 的DataFrame。 為了在我們的shell中統計出單詞的數量, 我們可以呼叫 collect 方法:
>>> wordCounts.collect() [Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
快取(Caching)
Spark同樣支援將資料集加入到一個叢集中的記憶體快取中。當資料被重複訪問時,這是非常有用的。比如查詢一個小的熱點資料集 或者 執行像PageRank 這樣的迭代演算法。讓我們標記我們的 linesWithSpark
作為快取資料 來作為一個例子:
Scala 版
scala> linesWithSpark.cache() res7: linesWithSpark.type = [value: string] scala> linesWithSpark.count() res8: Long = 15 scala> linesWithSpark.count() res9: Long = 15
Python 版
>>> linesWithSpark.cache() >>> linesWithSpark.count() 15 >>> linesWithSpark.count() 15
使用Spark來探索和快取一個100行的文字檔案看起來很蠢。有趣的是,這些方法同樣可以作用在非常大的資料集中,哪怕它們被分佈在數十個或上百個節點中。正如 RDD程式設計指南 中描述的那樣, 您可以通過連線 bin/spark-shell 到一個叢集中來進行以上互動式操作。
獨立的應用程式
假設我們希望使用 Spark API 編寫一個獨立的 應用程式。 我們將分別使用Scala(帶sbt),Java(帶Maven) 和 Python(pip) 編寫一個簡單的應用程式。
Scala
我們將在 Scala 中建立一個Spark 應用程式——非常簡單。 實際上,它被命名為 SimleApp.scala
/* SimpleApp.scala */ import org.apache.spark.sql.SparkSession object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val spark = SparkSession.builder.appName("Simple Application").getOrCreate() val logData = spark.read.textFile(logFile).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println(s"Lines with a: $numAs, Lines with b: $numBs") spark.stop() } }
注意,這個應用程式需要定義一個 main() 方法 而不是 繼承 scala.App. scala.App 的子類可能無法正常地工作。
這個程式只是統計 Spark README 檔案中包含 “a” 的行數和 包含"b" 的行數。 注意, 您需要使用 Spark 的安裝位置 來代替 YOUR_SPARK_HOME。與之前Spark Shell中的例子不同的是,Spark Shell 初始化它自己的SparkSession, 而我們初始化一個SparkSeesion作為程式的一部分。
我們呼叫 SparkSession.builder 來構造一個 【SparkSession】,然後設定應用的名字, 最後呼叫 getOrCreate 方法獲取一個 【SparkSession】例項。
我們的應用程式取決於Spark API, 所以我們同樣需要一個 sbt 配置檔案, build.sbt, 這表示 Spark 是一個依賴元件。
name := "Simple Project" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.1"
為了使 sbt 能夠正常工作, 我們需要根據經典的目錄結構佈局 SimpleApp.scala 和 build.sbt。一旦完成這些,我們就可以建立一個包含這個應用程式原始碼的JAR包, 然後使用 spark-submit 指令碼執行我們的程式。
# Your directory layout should look like this $ find . . ./build.sbt ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala # Package a jar containing your application $ sbt package ... [info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/scala-2.11/simple-project_2.11-1.0.jar ... Lines with a: 46, Lines with b: 23
Java
這個例子將會使用 Maven 編譯一個JAR 應用程式,但是很多類似的構建系統都可以工作。
我們將建立一個簡單的Spark應用程式, SimpleApp.java
/* SimpleApp.java */ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.Dataset; public class SimpleApp { public static void main(String[] args) { String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate(); Dataset<String> logData = spark.read().textFile(logFile).cache(); long numAs = logData.filter(s -> s.contains("a")).count(); long numBs = logData.filter(s -> s.contains("b")).count(); System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs); spark.stop(); } }
這個程式只是統計 Spark README 檔案中包含 “a” 的行數和 包含"b" 的行數。 注意, 您需要使用 Spark 的安裝位置 來代替 YOUR_SPARK_HOME。與之前Spark Shell中的例子不同的是,Spark Shell 初始化它自己的SparkSession, 而我們初始化一個SparkSeesion作為程式的一部分。
為了構建這個程式, 我們同樣要編寫一個 Maven pom.xml 檔案,這個檔案將 Spark 列為一個依賴元件。請注意,Spark 構件 被標記為Scala版本
<project> <groupId>edu.berkeley</groupId> <artifactId>simple-project</artifactId> <modelVersion>4.0.0</modelVersion> <name>Simple Project</name> <packaging>jar</packaging> <version>1.0</version> <dependencies> <dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.1</version> </dependency> </dependencies> </project>
我們根據規範的Maven目錄結構列出這些檔案
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java
現在,我們可以使用 Maven 打包這個應用程式並且 通過 ./bin/spark-submit
. 執行
# Package a JAR containing your application $ mvn package ... [INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/simple-project-1.0.jar ... Lines with a: 46, Lines with b: 23
Python
這裡我們將展示如何使用Python API(PySpark)來編寫一個應用程式
如果你正構建一個打包的 PySpark應用程式或庫,你可以將它新增到你的 setup.py 檔案中, 如下:
install_requires=[ 'pyspark=={site.SPARK_VERSION}' ]
作為示例,我們將建立一個簡單的 Spark 應用程式, SimpleApp.py:
"""SimpleApp.py""" from pyspark.sql import SparkSession logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system spark = SparkSession.builder.appName("SimpleApp").getOrCreate() logData = spark.read.text(logFile).cache() numAs = logData.filter(logData.value.contains('a')).count() numBs = logData.filter(logData.value.contains('b')).count() print("Lines with a: %i, lines with b: %i" % (numAs, numBs)) spark.stop()
這個程式只是統計 Spark README 檔案中包含 “a” 的行數和 包含"b" 的行數。 注意, 您需要使用 Spark 的安裝位置 來代替 YOUR_SPARK_HOME。與之前Spark Shell中的例子不同的是,Spark Shell 初始化它自己的SparkSession, 而我們初始化一個SparkSeesion作為程式的一部分。和 Scala 和 Java 例子一樣, 我們使用 SparkSession 來建立 Dataset 。 對於使用自定義類或者第三方庫的應用程式, 我們同樣可以通過它的 --py-- files 引數將程式碼和依賴打包成zip檔案(使用 spark-submit --help 檢視細節)的形式 新增到 spark-submit。 SimpleApp 足夠簡單, 所以我們不用指定任何程式碼依賴元件。
我們使用 bin/spark-submit 指令碼執行這個程式
# Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --master local[4] \ SimpleApp.py ... Lines with a: 46, Lines with b: 23
如果您將PySpark通過 pip 安裝到了您的環境中(eg. pip install pyspark),根據您的喜好,可以使用常規的Python直譯器 或者 使用 spark-submit 來執行您的程式
# Use the Python interpreter to run your application $ python SimpleApp.py ... Lines with a: 46, Lines with b: 23
下一步
祝賀您運行了您的第一個 Spark 應用程式
關於API的深入概述,請從 RDD 程式設計指南 和 SQL 程式設計指南 開始, 或者 檢視程式設計指南選單 以瞭解其他元件
關於使用叢集執行應用程式,請移步 部署概述
最後, Spark 包含了幾個簡單的例子, 它們被儲存在 example 目錄下(Scala, Java, Python, R),你可以按照以下方式執行它們:
# For Scala and Java, use run-example: ./bin/run-example SparkPi # For Python examples, use spark-submit directly: ./bin/spark-submit examples/src/main/python/pi.py # For R examples, use spark-submit directly: ./bin/spark-submit examples/src/main/r/dataframe.R