spark原始碼action系列-foreach與foreachPartition
RDD.foreachPartition/foreach的操作
在這個action的操作中:
這兩個action主要用於對每個partition中的iterator時行迭代的處理.通過使用者傳入的function對iterator進行內容的處理.
首先我們先看看foreach的操作:
在fureach中,傳入一個function,這個函式的傳入引數就是每個partition中,每次的foreach得到的一個rdd的kv例項,也就是具體的內容,這種處理你並不知道這個iterator的foreach什麼時候結果,只能是foreach的過程中,你得到一條資料,就處理一條資料.
由下面的紅色部分可以看出,foreach
defforeach(f: T => Unit): Unit = withScope {val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))}
示例說明:
val list = new ArrayBuffer()
Rdd.foreach(record => {
list += record
If (list.size >= 10000) {
list.flush....
}
})
上面這段示例程式碼中,如果這麼使用就會存在一個問題,
迭代的最後,list的結果可能還沒有達到10000條,這個時候,你在內部的處理的flush部分就不會執行,也就是迭代的最後如果沒有達到10000的資料就會丟失.
所以在foreach中,一般就是拿到一條資料進行下處理Rdd.foreach(record => {record._1 == a return})
然後接下來看看foreachPartition:
這個函式也是根據傳入的function進行處理,但不同處在於,這裡function的傳入引數是一個partition對應資料的iterator.而不是直接使用iterator的foreach,
這種情況下,如果是上面foreach的示例程式碼中list這個片段在這個action中就能夠正常的去處理.
defforeachPartition(f: Iterator[T] => Unit): Unit = withScope {val cleanF = sc.clean(f) sc.runJob(this,(iter: Iterator[T]) => cleanF(iter))}
示例程式碼:
Val list = new ArrayBuffer
rdd.foreachPartition(it => {
It.foreach(r => {
List += r
If (list.size > 10000) flush
})
If (list.size > 0) flush
})
最後說下這兩個action的區別:
Foreach與foreachPartition都是在每個partition中對iterator進行操作,
不同的是,foreach是直接在每個partition中直接對iterator執行foreach操作,而傳入的function只是在foreach內部使用,
而foreachPartition是在每個partition中把iterator給傳入的function,讓function自己對iterator進行處理.