1. 程式人生 > >Spark 實踐

Spark 實踐

1.1 避免使用 GroupByKey

  讓我們看一下使用兩種不同的方式去計算單詞的個數,第一種方式使用 reduceByKey, 另外一種方式使用 groupByKey

val words = Array("one", "two", "two", "three", "three", "three") val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) //reduce val wordCountsWithReduce = wordPairsRDD .reduceByKey(_ + _) .collect() //group val wordCountsWithGroup = wordPairsRDD .groupByKey() .map(t => (t._1, t._2.sum)) .collect()

  雖然兩個函式都能得出正確的結果,reduceByKey 更適合使用在大資料集上。 這是因為 Spark 知道它可以在每個分割槽shuffle資料之前,聚合key值相同的資料。

  藉助下圖可以理解在 reduceByKey 裡發生了什麼。 注意在資料對被shuffle前同一機器上同樣 key的資料是怎樣被組合的(reduceByKey 中的 lamdba 函式)。然後 lamdba 函式在每個區上被再次呼叫來將所有值 reduce 成一個最終結果。

reduce_by

  但是,當呼叫 groupByKey 時,所有的鍵值對(key-value pair) 都會被shuffle。在網路上傳輸這些資料非常沒有必要。

  為了確定將資料對shuffle到哪臺主機,Spark 會對資料對的 key 呼叫一個分割槽函式。 當shuffle的資料量大於單臺執行機器記憶體總量時,Spark 會把資料儲存到磁碟上。 不過在儲存時每次只會處理一個 key 的資料,所以當單個 key 的鍵值對超過記憶體容量會存在記憶體溢位的可能。 我們應避免將資料儲存到磁碟上,這會嚴重影響效能。

group_by

  你可以想象一個非常大的資料集,在使用 reduceByKey 和 groupByKey 時他們的差別會被放大更多倍。

  以下函式應該優先於 groupByKey :

  • combineByKey 組合資料,但是組合之後的資料型別與輸入時值的型別不一樣。

  • foldByKey 合併每一個 key 的所有值,在級聯函式和“零值”中使用。

1.2 不要將大型 RDD 的所有元素拷貝到driver

  如果你的driver記憶體容量不能容納一個大型 RDD 裡面的所有資料,不要做以下操作:

val values = myVeryLargeRDD.collect()

  Collect 操作會試圖將 RDD 裡面的每一條資料複製到driver上,這時候會發生記憶體溢位和崩潰。相反,你可以呼叫 take 或者 takeSample 來確保資料大小的上限。或者在你的 RDD 中使用過濾或抽樣。 同樣,要謹慎使用下面的操作,除非你能確保資料集小到足以儲存在記憶體中:

  • countByKey

  • countByValue

  • collectAsMap

  如果你確實需要將 RDD 裡面的大量資料儲存在記憶體中,你可以將 RDD 寫成一個檔案或者把 RDD 匯出到一個容量足夠大的資料庫中。

1.3 優雅地處理壞的輸入資料

  當處理大量的資料的時候,一個常見的問題是有些資料格式不對或者內容有誤。使用filter方法可以很容易丟棄壞的輸入或者使用map方法可以修復可能修復的壞的資料。當你嘗試著修復壞的資料,但是丟棄無法被修復的資料時, flatMap函式是最好的選擇。讓我們考慮下面的輸入json串。

input_rdd = sc.parallelize(["{\"value\": 1}", # Good "bad_json", # Bad "{\"value\": 2}", # Good "{\"value\": 3" # Missing an ending brace. ])

  當我們嘗試著在SqlContext中使用這個輸入串時,很明顯它會因為格式不對而報錯。

sqlContext.jsonRDD(input_rdd).registerTempTable("valueTable")
# The above command will throw an error.

  讓我媽用下面的python程式碼修復輸入資料。

def try_correct_json(json_string):
  try:
    # First check if the json is okay. json.loads(json_string) return [json_string] except ValueError: try: # If not, try correcting it by adding a ending brace. try_to_correct_json = json_string + "}" json.loads(try_to_correct_json) return [try_to_correct_json] except ValueError: # The malformed json input can't be recovered, drop this input. return []

  經過上面函式的處理之後,我們就可以使用這些資料了。

corrected_input_rdd = input_rdd.flatMap(try_correct_json)
sqlContext.jsonRDD(corrected_input_rdd).registerTempTable("valueTable")
sqlContext.sql("select * from valueTable").collect() # Returns [Row(value=1), Row(value=2), Row(value=3)]

2 常規故障處理

2.1 Job aborted due to stage failure: Task not serializable

  如果你看到以下錯誤:

org.apache.spark.SparkException: Job aborted due to stage failure:
Task not serializable: java.io.NotSerializableException: ...

  上述的錯誤在這種情況下會發生:當你在 master 上初始化一個變數,但是試圖在 worker 上使用。 在這個示例中, Spark Streaming 試圖將物件序列化之後傳送到 worker 上,如果這個物件不能被序列化就會失敗。思考下面的程式碼片段:

NotSerializable notSerializable = new NotSerializable();
JavaRDD<String> rdd = sc.textFile("/tmp/myfile"); rdd.map(s -> notSerializable.doSomething(s)).collect();

  這段程式碼會觸發上面的錯誤。這裡有一些建議修復這個錯誤:

  • 讓 class 實現序列化
  • 在作為引數傳遞給 map 方法的 lambda 表示式內部宣告例項
  • 在每一臺機器上建立一個 NotSerializable 的靜態例項
  • 呼叫 rdd.forEachPartition 並且像下面這樣建立 NotSerializable 物件:
rdd.forEachPartition(iter -> {
  NotSerializable notSerializable = new NotSerializable(); // ...Now process iter });

2.2 缺失依賴

  在預設狀態下,Maven 在 build 的時候不會包含所依賴的 jar 包。當執行一個 Spark 任務時,如果 Spark worker 機器上沒有包含所依賴的 jar 包會發生類無法找到的錯誤(ClassNotFoundException)。

  有一個簡單的方式,在 Maven 打包的時候建立 shaded 或 uber 任務可以讓那些依賴的 jar 包很好地打包進去。

  使用 <scope>provided</scope> 可以排除那些沒有必要打包進去的依賴,對 Spark 的依賴必須使用 provided 標記,因為這些依賴已經包含在 Spark cluster中。在你的 worker 機器上已經安裝的 jar 包你同樣需要排除掉它們。

  下面是一個 Maven pom.xml 的例子,工程了包含了一些需要的依賴,但是 Spark 的 libraries 不會被打包進去,因為它使用了 provided

<project>
    <groupId>com.databricks.apps.logs</groupId>
    <artifactId>log-analyzer</artifactId>
    <modelVersion>4.0.0</modelVersion> <name>Databricks Spark Logs Analyzer</name> <packaging>jar</packaging> <version>1.0</version> <repositories> <repository> <id>Akka repository</id> <url>http://repo.akka.io/releases</url> </repository> </repositories> <dependencies> <dependency> <!-- Spark --> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.1.0</version> <scope>provided</scope> </dependency> <dependency> <!-- Spark SQL --> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.1.0</version> <scope>provided</scope> </dependency> <dependency> <!-- Spark Streaming --> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.1.0</version> <scope>provided</scope> </dependency> <dependency> <!-- Command Line Parsing --> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <finalName>uber-${project.artifactId}-${project.version}</finalName> </configuration> </plugin> </plugins> </build> </project>

2.3 執行 start-all.sh 錯誤: Connection refused

  如果是使用 Mac 作業系統執行 start-all.sh 發生下面錯誤時:

% sh start-all.sh
starting org.apache.spark.deploy.master.Master, logging to ...
localhost: ssh: connect to host localhost port 22: Connection refused

  你需要在你的電腦上開啟 “遠端登入” 功能。進入 系統偏好設定 ---> 共享 勾選開啟 遠端登入

2.4 Spark 元件之間的網路連線問題

  Spark 元件之間的網路連線問題會導致各式各樣的警告或錯誤:

  • SparkContext <-> Spark Standalone Master

  如果 SparkContext 不能連線到 Spark standalone master,會顯示下面的錯誤:

ERROR AppClient$ClientActor: All masters are unresponsive! Giving up. ERROR SparkDeploySchedulerBackend: Spark cluster looks dead, giving up. ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: Spark cluster looks down

  如果 driver 能夠連線到 master 但是 master 不能回連到 driver,這時 Master 的日誌會記錄多次嘗試連線 driver失敗並且會報告不能連線:

INFO Master: Registering app SparkPi
INFO Master: Registered app SparkPi with ID app-XXX-0000
INFO: Master: Removing app app-app-XXX-0000
[...]
INFO Master: Registering app SparkPi
INFO Master: Registered app SparkPi with ID app-YYY-0000
INFO: Master: Removing app app-YYY-0000
[...]

  在這樣的情況下,master 報告應用已經被成功地註冊了。但是註冊成功的通知 driver 接收失敗了, 這時 driver 會自動嘗試幾次重新連線直到失敗的次數太多而放棄重試。 其結果是 Master web UI 會報告多個失敗的應用,即使只有一個 SparkContext 被建立。

  如果你遇到上述的錯誤,有兩條可以遵循的建議:

  • 檢查 workers 和 drivers 配置的 Spark master 的地址
  • 設定 driver,master,worker 的 SPARK_LOCAL_IP 為叢集的可尋地址主機名。

配置 hostname/port

  這節將描述我們如何繫結 Spark 元件的網路介面和埠。在每節裡,配置會按照優先順序降序的方式排列。如果前面所有配置沒有提供則使用最後一條作為預設配置。

SparkContext actor system:

Hostname:

  • spark.driver.host 屬性
  • 如果 SPARK_LOCAL_IP 環境變數的設定是主機名(hostname),就會使用設定時的主機名。如果 SPARK_LOCAL_IP 設定的是一個 IP 地址,這個 IP 地址會被解析為主機名。
  • 使用預設的 IP 地址,這個 IP 地址是Java 介面 InetAddress.getLocalHost 方法的返回值。

Port:

  • spark.driver.port 屬性。
  • 從作業系統(OS)選擇一個臨時埠。
Spark Standalone Master / Worker actor systems:

Hostname:

  • 當 Master 或 Worker 程序啟動時使用 --host 或 -h 選項(或是過期的選項 --ip 或 -i)。
  • SPARK_MASTER_HOST 環境變數(僅應用在 Master 上)。
  • 如果 SPARK_LOCAL_IP 環境變數的設定是主機名(hostname),就會使用設定時的主機名。如果 SPARK_LOCAL_IP 設定的是一個 IP 地址,這個 IP 地址會被解析為主機名。
  • 使用預設的 IP 地址,這個 IP 地址是Java 介面 InetAddress.getLocalHost 方法的返回值.

Port:

  • 當 Master 或 Worker 程序啟動時使用 --port 或 -p 選項。
  • SPARK_MASTER_PORT 或 SPARK_WORKER_PORT 環境變數(分別應用到 Master 和 Worker 上)。
  • 從作業系統(OS)選擇一個臨時埠。

3 效能和優化

3.1 一個 RDD 有多少分割槽

  在除錯和故障處理的時候,我們通常有必要知道 RDD 有多少個分割槽。這裡有幾個方法可以找到這些資訊:

使用 UI 檢視在分割槽上執行的任務數

  當 stage 執行的時候,你可以在 Spark UI 上看到這個 stage 上的分割槽數。 下面的例子中的簡單任務在 4 個分割槽上建立了共 100 個元素的 RDD ,然後在這些元素被收集到 driver 之前分發一個 map 任務:

scala> val someRDD = sc.parallelize(1 to 100, 4) someRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:12 scala> someRDD.map(x => x).collect res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

  在 Spark的應用 UI 裡,從下面截圖上看到的 "Total Tasks" 代表了分割槽數。

partitions-as-tasks

使用 UI 檢視分割槽快取

  持久化 RDD 時通常需要知道有多少個分割槽被儲存。下面的這個例子和之前的一樣,除了現在我們要對 RDD 做快取處理。操作完成之後,我們可以在 UI 上看到這個操作導致什麼被我們儲存了。

scala> someRDD.setName("toy").cache
res2: someRDD.type = toy ParallelCollectionRDD[0] at parallelize at <console>:12 scala> someRDD.map(x => x).collect res3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

  注意:下面的截圖有 4 個分割槽被快取。

cached-partitions

程式設計檢視 RDD 分割槽

  在 Scala API 裡,RDD 持有一個分割槽陣列的引用, 你可以使用它找到有多少個分割槽:

scala> val someRDD = sc.parallelize(1 to 100, 30) someRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:12 scala> someRDD.partitions.size res0: Int = 30

  在 Python API 裡, 有一個方法可以明確地列出有多少個分割槽:

In [1]: someRDD = sc.parallelize(range(101),30)
In [2]: someRDD.getNumPartitions() Out[2]: 30

3.2 資料本地性

  Spark 是一個並行資料處理框架,這意味著任務應該在離資料儘可能近的地方執行(即最少的資料傳輸)。

檢查本地性

  檢查任務是否在本地執行的最好方式是在 Spark UI 上檢視 stage 資訊,注意下面截圖中的 Locality Level 列顯示任務執行在哪個地方。

locality

調整本地性配置

  你可以調整 Spark 在每個資料本地性level(data local --> process local --> node local --> rack local --> Any)上等待的時長。更多詳細的引數資訊請檢視程式配置文件的 Scheduling 章節裡類似於 spark.locality.* 的配置。

4 Spark Streaming

ERROR OneForOneStrategy

  如果你在 Spark Streaming 裡啟用 checkpointingforEachRDD 函式使用的物件都應該可以被序列化(Serializable)。否則會出現這樣的異常 "ERROR OneForOneStrategy: ... java.io.NotSerializableException:"

JavaStreamingContext jssc = new JavaStreamingContext(sc, INTERVAL);

// This enables checkpointing. jssc.checkpoint("/tmp/checkpoint_test"); JavaDStream<String> dStream = jssc.socketTextStream("localhost", 9999); NotSerializable notSerializable = new NotSerializable(); dStream.foreachRDD(rdd -> { if (rdd.count() == 0) { return null; } String first = rdd.first(); notSerializable.doSomething(first); return null; } ); // This does not work!!!!

  按照下面的方式之一進行修改,上面的程式碼才能正常執行:

  • 在配置檔案裡面刪除 jssc.checkpoint 這一行關閉 checkpointing。
  • 讓物件能被序列化。
  • 在 forEachRDD 函式裡面宣告 NotSerializable,下面的示例程式碼是可以正常執行的:
JavaStreamingContext jssc = new JavaStreamingContext(sc, INTERVAL);

jssc.checkpoint("/tmp/checkpoint_test"); JavaDStream<String> dStream = jssc.socketTextStream("localhost", 9999); dStream.foreachRDD(rdd -> { if (rdd.count() == 0) { return null; } String first = rdd.first(); NotSerializable notSerializable = new NotSerializable(); notSerializable.doSomething(first); return null; } ); // This code snippet is fine since the NotSerializable object // is declared and only used within the forEachRDD function.