[原始碼分析] 從FlatMap用法到Flink的內部實現
阿新 • • 發佈:2020-03-31
# [原始碼分析] 從FlatMap用法到Flink的內部實現
## 0x00 摘要
本文將從FlatMap概念和如何使用開始入手,深入到Flink是如何實現FlatMap。希望能讓大家對這個概念有更深入的理解。
## 0x01 Map vs FlatMap
首先我們先從概念入手。
自從*響應式程式設計*慢慢壯大以來,這兩個單詞現在越來越被大家熟悉了。前端能見到它們的身影,後臺也能見到;安卓裡面有,iOS也有。很多兄弟剛遇到它們時候是懵圈的,搞不清楚之間的區別。下面我就給大家簡單講解下。
### map
它把`陣列流`中的每一個值,使用所提供的函式執行一遍,一一對應。得到與元素個數相同的`陣列流`。然後返回這個新資料流。
### flatMap
flat是扁平的意思。所以這個操作是:先對映(map),再拍扁(join)。
flatMap輸入可能是多個`子陣列流`。所以flatMap先針對 每個`子陣列流`的每個元素進行對映操作。然後進行扁平化處理,最後彙集所有進行扁平化處理的結果集形成一個新的列表(扁平化簡而言之就是去除所有的修飾)。
flatMap與map另外一個不一樣的地方就是傳入的函式在處理完後返回值必須是List。
### 例項
比如拿到一個文字檔案之後,我們是按行讀取,按行處理。如果要對每一行的單詞數進行計數,那麼應該選擇Map方法,如果是統計詞頻,就應該選擇flatMap方法。
如果還不清楚,可以看看下面這個例子:
```scala
梁山新進一批好馬,準備給每個馬軍頭領配置一批。於是得到函式以及頭領名單如下:
函式 = ( 頭領 => 頭領 + 好馬 )
五虎將 = List(關勝、林沖、秦明、呼延灼、董平 )
八驃騎 = List(花榮、徐寧、楊志、索超、張清、朱仝、史進、穆弘 )
// Map函式的例子
利用map函式,我們可以得到 五虎將馬軍
五虎將馬軍 = 五虎將.map( 頭領 => 頭領 + 好馬 )
結果是 List( 關勝 + 馬、林沖 + 馬、秦明 + 馬、呼延灼 + 馬、董平 + 馬 )
// flatMap函式的例子
但是為了得到統一的馬軍,則可以用flatMap:
馬軍頭領 = List(五虎將,八驃騎)
馬軍 = 馬軍頭領.flatMap( 頭領 => 頭領 + 好馬 )
結果就是:List( 關勝 + 馬、林沖 + 馬、秦明 + 馬、呼延灼 + 馬、董平 + 馬,花榮 + 馬、徐寧 + 馬、楊志 + 馬、索超 + 馬、張清 + 馬、朱仝 + 馬、史進 + 馬、穆弘 + 馬 )
```
現在大家應該清楚了吧。接下來看看幾個FlatMap的例項。
### Scala語言的實現
Scala本身對於List型別就有map和flatMap操作。舉例如下:
```scala
val names = List("Alice","James","Apple")
val strings = names.map(x => x.toUpperCase)
println(strings)
// 輸出 List(ALICE, JAMES, APPLE)
val chars = names.flatMap(x=> x.toUpperCase())
println(chars)
// 輸出 List(A, L, I, C, E, J, A, M, E, S, A, P, P, L, E)
```
### Flink的例子
以上是scala語言層面的實現。下面我們看看Flink框架是如何使用FlatMap的。
網上常見的一個Flink應用的例子:
```scala
//載入資料來源
val source = env.fromElements("china is the best country","beijing is the capital of china")
//轉化處理資料
val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)
```
### Flink原始碼中的例子
```scala
case class WordWithCount(word: String, count: Long)
val text = env.socketTextStream(host, port, '\n')
val windowCounts = text.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count")
windowCounts.print()
```
## 0x02 自定義運算元(in Flink)
上面提到的都是簡單的使用,如果有複雜需求,在Flink中,我們可以通過繼承FlatMapFunction和RichFlatMapFunction來自定義運算元。
### 函式類`FlatMapFunction`
對於不涉及到狀態的使用,可以直接繼承 FlatMapFunction,其定義如下:
```java
@Public
@FunctionalInterface
public interface FlatMap