Spark 實踐
1.1 避免使用 GroupByKey
讓我們看一下使用兩種不同的方式去計算單詞的個數,第一種方式使用 reduceByKey
, 另外一種方式使用 groupByKey
:
val words = Array("one", "two", "two", "three", "three", "three") val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) //reduce val wordCountsWithReduce = wordPairsRDD .reduceByKey(_ + _) .collect() //group val wordCountsWithGroup = wordPairsRDD .groupByKey() .map(t => (t._1, t._2.sum)) .collect()
雖然兩個函式都能得出正確的結果,reduceByKey
更適合使用在大資料集上。 這是因為 Spark
知道它可以在每個分割槽shuffle
資料之前,聚合key
值相同的資料。
藉助下圖可以理解在 reduceByKey
裡發生了什麼。 注意在資料對被shuffle
前同一機器上同樣 key
的資料是怎樣被組合的(reduceByKey
中的 lamdba
函式)。然後 lamdba
函式在每個區上被再次呼叫來將所有值 reduce
成一個最終結果。
但是,當呼叫 groupByKey
時,所有的鍵值對(key-value pair
) 都會被shuffle
。在網路上傳輸這些資料非常沒有必要。
為了確定將資料對shuffle
到哪臺主機,Spark
會對資料對的 key
呼叫一個分割槽函式。 當shuffle
的資料量大於單臺執行機器記憶體總量時,Spark
會把資料儲存到磁碟上。 不過在儲存時每次只會處理一個 key
的資料,所以當單個 key
的鍵值對超過記憶體容量會存在記憶體溢位的可能。 我們應避免將資料儲存到磁碟上,這會嚴重影響效能。
你可以想象一個非常大的資料集,在使用 reduceByKey
和 groupByKey
時他們的差別會被放大更多倍。
以下函式應該優先於 groupByKey
:
-
combineByKey
組合資料,但是組合之後的資料型別與輸入時值的型別不一樣。 -
foldByKey
合併每一個key
的所有值,在級聯函式和“零值”中使用。
1.2 不要將大型 RDD 的所有元素拷貝到driver
如果你的driver
記憶體容量不能容納一個大型 RDD
裡面的所有資料,不要做以下操作:
val values = myVeryLargeRDD.collect()
Collect
操作會試圖將 RDD
裡面的每一條資料複製到driver
上,這時候會發生記憶體溢位和崩潰。相反,你可以呼叫 take
或者 takeSample
來確保資料大小的上限。或者在你的 RDD
中使用過濾或抽樣。 同樣,要謹慎使用下面的操作,除非你能確保資料集小到足以儲存在記憶體中:
-
countByKey
-
countByValue
-
collectAsMap
如果你確實需要將 RDD
裡面的大量資料儲存在記憶體中,你可以將 RDD
寫成一個檔案或者把 RDD
匯出到一個容量足夠大的資料庫中。
1.3 優雅地處理壞的輸入資料
當處理大量的資料的時候,一個常見的問題是有些資料格式不對或者內容有誤。使用filter
方法可以很容易丟棄壞的輸入或者使用map
方法可以修復可能修復的壞的資料。當你嘗試著修復壞的資料,但是丟棄無法被修復的資料時, flatMap
函式是最好的選擇。讓我們考慮下面的輸入json
串。
input_rdd = sc.parallelize(["{\"value\": 1}", # Good "bad_json", # Bad "{\"value\": 2}", # Good "{\"value\": 3" # Missing an ending brace. ])
當我們嘗試著在SqlContext
中使用這個輸入串時,很明顯它會因為格式不對而報錯。
sqlContext.jsonRDD(input_rdd).registerTempTable("valueTable")
# The above command will throw an error.
讓我媽用下面的python
程式碼修復輸入資料。
def try_correct_json(json_string):
try:
# First check if the json is okay. json.loads(json_string) return [json_string] except ValueError: try: # If not, try correcting it by adding a ending brace. try_to_correct_json = json_string + "}" json.loads(try_to_correct_json) return [try_to_correct_json] except ValueError: # The malformed json input can't be recovered, drop this input. return []
經過上面函式的處理之後,我們就可以使用這些資料了。
corrected_input_rdd = input_rdd.flatMap(try_correct_json)
sqlContext.jsonRDD(corrected_input_rdd).registerTempTable("valueTable")
sqlContext.sql("select * from valueTable").collect() # Returns [Row(value=1), Row(value=2), Row(value=3)]
2 常規故障處理
2.1 Job aborted due to stage failure: Task not serializable
如果你看到以下錯誤:
org.apache.spark.SparkException: Job aborted due to stage failure:
Task not serializable: java.io.NotSerializableException: ...
上述的錯誤在這種情況下會發生:當你在 master
上初始化一個變數,但是試圖在 worker
上使用。 在這個示例中, Spark Streaming
試圖將物件序列化之後傳送到 worker
上,如果這個物件不能被序列化就會失敗。思考下面的程式碼片段:
NotSerializable notSerializable = new NotSerializable();
JavaRDD<String> rdd = sc.textFile("/tmp/myfile"); rdd.map(s -> notSerializable.doSomething(s)).collect();
這段程式碼會觸發上面的錯誤。這裡有一些建議修復這個錯誤:
- 讓
class
實現序列化 - 在作為引數傳遞給
map
方法的lambda
表示式內部宣告例項 - 在每一臺機器上建立一個
NotSerializable
的靜態例項 - 呼叫
rdd.forEachPartition
並且像下面這樣建立NotSerializable
物件:
rdd.forEachPartition(iter -> {
NotSerializable notSerializable = new NotSerializable(); // ...Now process iter });
2.2 缺失依賴
在預設狀態下,Maven
在 build
的時候不會包含所依賴的 jar
包。當執行一個 Spark
任務時,如果 Spark worker
機器上沒有包含所依賴的 jar
包會發生類無法找到的錯誤(ClassNotFoundException
)。
有一個簡單的方式,在 Maven
打包的時候建立 shaded
或 uber
任務可以讓那些依賴的 jar
包很好地打包進去。
使用 <scope>provided</scope>
可以排除那些沒有必要打包進去的依賴,對 Spark
的依賴必須使用 provided
標記,因為這些依賴已經包含在 Spark cluster
中。在你的 worker
機器上已經安裝的 jar
包你同樣需要排除掉它們。
下面是一個 Maven pom.xml
的例子,工程了包含了一些需要的依賴,但是 Spark
的 libraries
不會被打包進去,因為它使用了 provided
:
<project>
<groupId>com.databricks.apps.logs</groupId>
<artifactId>log-analyzer</artifactId>
<modelVersion>4.0.0</modelVersion> <name>Databricks Spark Logs Analyzer</name> <packaging>jar</packaging> <version>1.0</version> <repositories> <repository> <id>Akka repository</id> <url>http://repo.akka.io/releases</url> </repository> </repositories> <dependencies> <dependency> <!-- Spark --> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.1.0</version> <scope>provided</scope> </dependency> <dependency> <!-- Spark SQL --> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.1.0</version> <scope>provided</scope> </dependency> <dependency> <!-- Spark Streaming --> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.1.0</version> <scope>provided</scope> </dependency> <dependency> <!-- Command Line Parsing --> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <finalName>uber-${project.artifactId}-${project.version}</finalName> </configuration> </plugin> </plugins> </build> </project>
2.3 執行 start-all.sh 錯誤: Connection refused
如果是使用 Mac
作業系統執行 start-all.sh
發生下面錯誤時:
% sh start-all.sh starting org.apache.spark.deploy.master.Master, logging to ... localhost: ssh: connect to host localhost port 22: Connection refused
你需要在你的電腦上開啟 “遠端登入” 功能。進入 系統偏好設定 ---> 共享
勾選開啟 遠端登入
。
2.4 Spark 元件之間的網路連線問題
Spark
元件之間的網路連線問題會導致各式各樣的警告或錯誤:
- SparkContext <-> Spark Standalone Master
如果 SparkContext
不能連線到 Spark standalone master
,會顯示下面的錯誤:
ERROR AppClient$ClientActor: All masters are unresponsive! Giving up. ERROR SparkDeploySchedulerBackend: Spark cluster looks dead, giving up. ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: Spark cluster looks down
如果 driver
能夠連線到 master
但是 master
不能回連到 driver
,這時 Master
的日誌會記錄多次嘗試連線 driver
失敗並且會報告不能連線:
INFO Master: Registering app SparkPi
INFO Master: Registered app SparkPi with ID app-XXX-0000
INFO: Master: Removing app app-app-XXX-0000
[...]
INFO Master: Registering app SparkPi
INFO Master: Registered app SparkPi with ID app-YYY-0000
INFO: Master: Removing app app-YYY-0000
[...]
在這樣的情況下,master
報告應用已經被成功地註冊了。但是註冊成功的通知 driver
接收失敗了, 這時 driver
會自動嘗試幾次重新連線直到失敗的次數太多而放棄重試。 其結果是 Master web UI
會報告多個失敗的應用,即使只有一個 SparkContext
被建立。
如果你遇到上述的錯誤,有兩條可以遵循的建議:
- 檢查
workers
和drivers
配置的Spark master
的地址 - 設定 driver,master,worker 的 SPARK_LOCAL_IP 為叢集的可尋地址主機名。
配置 hostname/port
這節將描述我們如何繫結 Spark
元件的網路介面和埠。在每節裡,配置會按照優先順序降序的方式排列。如果前面所有配置沒有提供則使用最後一條作為預設配置。
SparkContext actor system:
Hostname:
- spark.driver.host 屬性
- 如果 SPARK_LOCAL_IP 環境變數的設定是主機名(hostname),就會使用設定時的主機名。如果 SPARK_LOCAL_IP 設定的是一個 IP 地址,這個 IP 地址會被解析為主機名。
- 使用預設的 IP 地址,這個 IP 地址是Java 介面 InetAddress.getLocalHost 方法的返回值。
Port:
- spark.driver.port 屬性。
- 從作業系統(OS)選擇一個臨時埠。
Spark Standalone Master / Worker actor systems:
Hostname:
- 當 Master 或 Worker 程序啟動時使用 --host 或 -h 選項(或是過期的選項 --ip 或 -i)。
- SPARK_MASTER_HOST 環境變數(僅應用在 Master 上)。
- 如果 SPARK_LOCAL_IP 環境變數的設定是主機名(hostname),就會使用設定時的主機名。如果 SPARK_LOCAL_IP 設定的是一個 IP 地址,這個 IP 地址會被解析為主機名。
- 使用預設的 IP 地址,這個 IP 地址是Java 介面 InetAddress.getLocalHost 方法的返回值.
Port:
- 當 Master 或 Worker 程序啟動時使用 --port 或 -p 選項。
- SPARK_MASTER_PORT 或 SPARK_WORKER_PORT 環境變數(分別應用到 Master 和 Worker 上)。
- 從作業系統(OS)選擇一個臨時埠。
3 效能和優化
3.1 一個 RDD 有多少分割槽
在除錯和故障處理的時候,我們通常有必要知道 RDD
有多少個分割槽。這裡有幾個方法可以找到這些資訊:
使用 UI 檢視在分割槽上執行的任務數
當 stage
執行的時候,你可以在 Spark UI
上看到這個 stage
上的分割槽數。 下面的例子中的簡單任務在 4 個分割槽上建立了共 100 個元素的 RDD
,然後在這些元素被收集到 driver
之前分發一個 map
任務:
scala> val someRDD = sc.parallelize(1 to 100, 4) someRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:12 scala> someRDD.map(x => x).collect res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
在 Spark
的應用 UI
裡,從下面截圖上看到的 "Total Tasks" 代表了分割槽數。
使用 UI 檢視分割槽快取
持久化 RDD
時通常需要知道有多少個分割槽被儲存。下面的這個例子和之前的一樣,除了現在我們要對 RDD
做快取處理。操作完成之後,我們可以在 UI
上看到這個操作導致什麼被我們儲存了。
scala> someRDD.setName("toy").cache
res2: someRDD.type = toy ParallelCollectionRDD[0] at parallelize at <console>:12 scala> someRDD.map(x => x).collect res3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
注意:下面的截圖有 4 個分割槽被快取。
程式設計檢視 RDD 分割槽
在 Scala API
裡,RDD
持有一個分割槽陣列的引用, 你可以使用它找到有多少個分割槽:
scala> val someRDD = sc.parallelize(1 to 100, 30) someRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:12 scala> someRDD.partitions.size res0: Int = 30
在 Python API
裡, 有一個方法可以明確地列出有多少個分割槽:
In [1]: someRDD = sc.parallelize(range(101),30)
In [2]: someRDD.getNumPartitions() Out[2]: 30
3.2 資料本地性
Spark
是一個並行資料處理框架,這意味著任務應該在離資料儘可能近的地方執行(即最少的資料傳輸)。
檢查本地性
檢查任務是否在本地執行的最好方式是在 Spark UI
上檢視 stage
資訊,注意下面截圖中的 Locality Level
列顯示任務執行在哪個地方。
調整本地性配置
你可以調整 Spark
在每個資料本地性level
(data local --> process local --> node local --> rack local --> Any
)上等待的時長。更多詳細的引數資訊請檢視程式配置文件的 Scheduling
章節裡類似於 spark.locality.*
的配置。
4 Spark Streaming
ERROR OneForOneStrategy
如果你在 Spark Streaming
裡啟用 checkpointing
,forEachRDD
函式使用的物件都應該可以被序列化(Serializable
)。否則會出現這樣的異常 "ERROR OneForOneStrategy: ... java.io.NotSerializableException:"
JavaStreamingContext jssc = new JavaStreamingContext(sc, INTERVAL);
// This enables checkpointing. jssc.checkpoint("/tmp/checkpoint_test"); JavaDStream<String> dStream = jssc.socketTextStream("localhost", 9999); NotSerializable notSerializable = new NotSerializable(); dStream.foreachRDD(rdd -> { if (rdd.count() == 0) { return null; } String first = rdd.first(); notSerializable.doSomething(first); return null; } ); // This does not work!!!!
按照下面的方式之一進行修改,上面的程式碼才能正常執行:
- 在配置檔案裡面刪除 jssc.checkpoint 這一行關閉 checkpointing。
- 讓物件能被序列化。
- 在 forEachRDD 函式裡面宣告 NotSerializable,下面的示例程式碼是可以正常執行的:
JavaStreamingContext jssc = new JavaStreamingContext(sc, INTERVAL);
jssc.checkpoint("/tmp/checkpoint_test"); JavaDStream<String> dStream = jssc.socketTextStream("localhost", 9999); dStream.foreachRDD(rdd -> { if (rdd.count() == 0) { return null; } String first = rdd.first(); NotSerializable notSerializable = new NotSerializable(); notSerializable.doSomething(first); return null; } ); // This code snippet is fine since the NotSerializable object // is declared and only used within the forEachRDD function.