1. 程式人生 > >【Spark】Spark Quick Start(快速入門翻譯)

【Spark】Spark Quick Start(快速入門翻譯)

 本文主要是翻譯Spark官網Quick Start。只能保證大概意思,儘量保證細節。英文水平有限,如果有錯誤的地方請指正,輕噴

快速入門(Quick Start)

  使用 Spark Shell 互動式程式設計

    基本操作

    更多關於 Dataset 的操作

    快取

  獨立的應用程式

  下一步

這個指南提供了使用Spark的快速介紹。我們會首先介紹Spark 互動式程式設計(使用Python或者Scala)的 API, 然後展示如何用Java、Scala 和 Python來編寫應用程式。

為了使用這個指南,您需要先從 Spark 網頁 下載打包釋出的Spark安裝包。由於我們將不會(在指南中)使用HDFS, 您可以下載任意版本的Hadoop安裝包。

需要注意的是,Spark2.0 之前, Spark的主要程式設計介面是彈性分散式資料集(Resilient Distributed Dataset (RDD))。Spark2.0 之後, RDD 被 Dataset 取代,Dataset 和 RDD 一樣是強型別,但是在底層進行了更多的優化。Spark2.0 之後仍然支援 RDD 介面,並且您可以從RDD程式設計指南中 獲取更詳細的參考。當然,我們強烈建議您選擇使用Dataset, 因為它的效能比RDD更好。 檢視 SQL程式設計指南 以得到更多關於Dataset的資訊。

使用 Spark Shell 互動式程式設計

基本操作

Spark Shell 提供了一個簡單的方式去學習 API,同時也提供了一個強大的互動式資料分析工具。它可以基於 Scala(一種在java 虛擬機器上執行並因此可以很好地使用已有的java庫的程式語言)或 Python 使用。在 Spark 目錄下執行以下內容來開始(Sprk Shell):

  Scala 版

./bin/pyspark

Python 版

./bin/pyspark

如果你當前環境使用pip下載了 PySpark,可以使用如下下方式呼叫

pyspark

Spark 主要的抽象是一個被叫做 Dataset 的分散式集合。 Dataset 可以通過 Hadoop InputFormat(比如HDFS檔案)或者 轉換其他 Dataset 中建立。讓我們通過 Spark 源目錄下的 README 檔案內容建立一個新的 Dataset:

Scala 版

scala> val textFile = spark.read.textFile("
README.md") textFile: org.apache.spark.sql.Dataset[String] = [value: string]

Python 版

>>> textFile = spark.read.text("README.md")

你可以直接從Dataset中, 通過呼叫一些操作或者轉化Dataset以獲得一個新的Dataset來獲取它的值。請閱讀 API 文件(Scala / Python)  以獲取更多細節

Scala 版

scala> textFile.count() // 該Dataset中的成員數量
res0: Long = 126 //  由於README.md 會隨著時間的推移不斷改變,所以結果可能會有所不同, 其他輸出也有類似情況

scala> textFile.first() // 該Dataset的第一個成員
res1: String = # Apache Spark

Python 版

>>> textFile.count()  # 該DataFrame中的行數
126

>>> textFile.first()  # 該DataFrame的第一行
Row(value=u'# Apache Spark')

現在讓我們使用該Dataset來轉換成一個新的Dataset。 我們呼叫 filter 來返回一個新的Dataset, 其中包含這個檔案內容的子集。

Scala 版

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

Python 版

>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))

我們可以將資料集轉換和資料集操作串接在一起

Scala 版

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

Python 版

>>> textFile.filter(textFile.value.contains("Spark")).count()  # How many lines contain "Spark"?
15

更多關於Dataset的操作

Dataset操作和轉換可以用來做更復雜的計算。假設我們想要找到單詞數量最多的那行:

Scala 版

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

這首先將檔案中的一行對映成一個整數值,並建立一個新的Dataset。呼叫該 Dataset 的 reduce 方法以找到最大的單詞計數。map 和 reduce 的引數是 Scala 的函式字面量(閉包),並且可以使用任何語言的特性或者 Scala/Java 庫。 比如, 我們可以很榮譽地呼叫任何地方宣告地函式(方法)。我們將使用 Math.max() 方法以使這段程式碼易於理解:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

MapReduce是一種常見的資料流格式, 這是由Hadoop推廣的。Spark 可以很容易地實現MapReduce流:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

這裡,我們呼叫 flatMap 來將一個行級(以文字中的一行為一個成員(Item))的 Dataset 轉換成一個 單詞 級 的Dataset,然後串接呼叫 groupByKey 和 count 方法 來計算檔案中的每個單詞的數量作為(String, Long)資料對形式 的Dateset。 為了在我們的shell中統計出單詞的數量, 我們可以呼叫 collect 方法:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

Python 版

>>> from pyspark.sql.functions import *
>>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()
[Row(max(numWords)=15)]

這首先將檔案中的一行對映成一個整數值 並取一個為 “numWords” 的別名,同時建立一個新的DataFrame。呼叫該 Dataset 的 agg 方法以找到最大的單詞計數。select 和 agg 的引數都是 Colum,我們可以使用 df.colName 方法來從一個DataFrame中獲得一個 colum。我們同樣可以匯入 pyspark.sql.functions, 它提供了很多簡易的方法從一個已有的 Colum 構建一個新的 Colum。

MapReduce是一種常見的資料流格式, 這是由Hadoop推廣的。Spark 可以很容易地實現MapReduce流:

>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()

這裡,我們在 select 方法中使用了 explode 方法來將一個行級(以文字中的一行為一個成員(Item))的 Dataset 轉換成一個 單詞 級 的Dataset。然後串接呼叫 groupByKey 和 count 方法 來計算檔案中的每個單詞的數量作為一個擁有兩個Colum:“word” 和 “count” 的DataFrame。 為了在我們的shell中統計出單詞的數量, 我們可以呼叫 collect 方法:

>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]

快取(Caching)

Spark同樣支援將資料集加入到一個叢集中的記憶體快取中。當資料被重複訪問時,這是非常有用的。比如查詢一個小的熱點資料集 或者 執行像PageRank 這樣的迭代演算法。讓我們標記我們的 linesWithSpark  作為快取資料 來作為一個例子:

Scala 版

scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

Python 版

>>> linesWithSpark.cache()

>>> linesWithSpark.count()
15

>>> linesWithSpark.count()
15

使用Spark來探索和快取一個100行的文字檔案看起來很蠢。有趣的是,這些方法同樣可以作用在非常大的資料集中,哪怕它們被分佈在數十個或上百個節點中。正如 RDD程式設計指南 中描述的那樣, 您可以通過連線 bin/spark-shell 到一個叢集中來進行以上互動式操作。

獨立的應用程式

假設我們希望使用 Spark API 編寫一個獨立的 應用程式。  我們將分別使用Scala(帶sbt),Java(帶Maven) 和 Python(pip) 編寫一個簡單的應用程式。

Scala

我們將在 Scala 中建立一個Spark 應用程式——非常簡單。 實際上,它被命名為 SimleApp.scala

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

注意,這個應用程式需要定義一個 main() 方法 而不是 繼承 scala.App. scala.App 的子類可能無法正常地工作。

這個程式只是統計 Spark README 檔案中包含 “a” 的行數和 包含"b" 的行數。 注意, 您需要使用 Spark 的安裝位置 來代替 YOUR_SPARK_HOME。與之前Spark Shell中的例子不同的是,Spark Shell 初始化它自己的SparkSession, 而我們初始化一個SparkSeesion作為程式的一部分。

我們呼叫 SparkSession.builder 來構造一個 【SparkSession】,然後設定應用的名字, 最後呼叫 getOrCreate 方法獲取一個 【SparkSession】例項。

我們的應用程式取決於Spark API, 所以我們同樣需要一個 sbt 配置檔案, build.sbt, 這表示 Spark 是一個依賴元件。

name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.1"

為了使 sbt 能夠正常工作, 我們需要根據經典的目錄結構佈局 SimpleApp.scala 和 build.sbt。一旦完成這些,我們就可以建立一個包含這個應用程式原始碼的JAR包, 然後使用 spark-submit 指令碼執行我們的程式。

# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.11/simple-project_2.11-1.0.jar
...
Lines with a: 46, Lines with b: 23

Java

這個例子將會使用 Maven 編譯一個JAR 應用程式,但是很多類似的構建系統都可以工作。

我們將建立一個簡單的Spark應用程式, SimpleApp.java

/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
    Dataset<String> logData = spark.read().textFile(logFile).cache();

    long numAs = logData.filter(s -> s.contains("a")).count();
    long numBs = logData.filter(s -> s.contains("b")).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

    spark.stop();
  }
}

這個程式只是統計 Spark README 檔案中包含 “a” 的行數和 包含"b" 的行數。 注意, 您需要使用 Spark 的安裝位置 來代替 YOUR_SPARK_HOME。與之前Spark Shell中的例子不同的是,Spark Shell 初始化它自己的SparkSession, 而我們初始化一個SparkSeesion作為程式的一部分。

為了構建這個程式, 我們同樣要編寫一個 Maven pom.xml 檔案,這個檔案將 Spark 列為一個依賴元件。請注意,Spark 構件 被標記為Scala版本

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>2.3.1</version>
    </dependency>
  </dependencies>
</project>

我們根據規範的Maven目錄結構列出這些檔案

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

現在,我們可以使用 Maven 打包這個應用程式並且 通過 ./bin/spark-submit. 執行

# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23

Python

這裡我們將展示如何使用Python API(PySpark)來編寫一個應用程式

如果你正構建一個打包的 PySpark應用程式或庫,你可以將它新增到你的 setup.py 檔案中, 如下:

install_requires=[
        'pyspark=={site.SPARK_VERSION}'
    ]

作為示例,我們將建立一個簡單的 Spark 應用程式, SimpleApp.py:

"""SimpleApp.py"""
from pyspark.sql import SparkSession

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop()

這個程式只是統計 Spark README 檔案中包含 “a” 的行數和 包含"b" 的行數。 注意, 您需要使用 Spark 的安裝位置 來代替 YOUR_SPARK_HOME。與之前Spark Shell中的例子不同的是,Spark Shell 初始化它自己的SparkSession, 而我們初始化一個SparkSeesion作為程式的一部分。和 Scala 和 Java 例子一樣, 我們使用 SparkSession 來建立 Dataset 。 對於使用自定義類或者第三方庫的應用程式, 我們同樣可以通過它的 --py-- files 引數將程式碼和依賴打包成zip檔案(使用 spark-submit --help 檢視細節)的形式 新增到 spark-submit。 SimpleApp 足夠簡單, 所以我們不用指定任何程式碼依賴元件。

我們使用 bin/spark-submit 指令碼執行這個程式

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --master local[4] \
  SimpleApp.py
...
Lines with a: 46, Lines with b: 23

如果您將PySpark通過 pip 安裝到了您的環境中(eg. pip install pyspark),根據您的喜好,可以使用常規的Python直譯器 或者 使用 spark-submit 來執行您的程式

# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23

下一步

祝賀您運行了您的第一個 Spark 應用程式

  關於API的深入概述,請從 RDD 程式設計指南SQL 程式設計指南 開始, 或者 檢視程式設計指南選單 以瞭解其他元件

  關於使用叢集執行應用程式,請移步 部署概述

  最後, Spark 包含了幾個簡單的例子, 它們被儲存在 example 目錄下(Scala, Java, Python, R),你可以按照以下方式執行它們:

# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R