1. 程式人生 > >[原始碼分析] 從FlatMap用法到Flink的內部實現

[原始碼分析] 從FlatMap用法到Flink的內部實現

# [原始碼分析] 從FlatMap用法到Flink的內部實現 ## 0x00 摘要 本文將從FlatMap概念和如何使用開始入手,深入到Flink是如何實現FlatMap。希望能讓大家對這個概念有更深入的理解。 ## 0x01 Map vs FlatMap 首先我們先從概念入手。 自從*響應式程式設計*慢慢壯大以來,這兩個單詞現在越來越被大家熟悉了。前端能見到它們的身影,後臺也能見到;安卓裡面有,iOS也有。很多兄弟剛遇到它們時候是懵圈的,搞不清楚之間的區別。下面我就給大家簡單講解下。 ### map 它把`陣列流`中的每一個值,使用所提供的函式執行一遍,一一對應。得到與元素個數相同的`陣列流`。然後返回這個新資料流。 ### flatMap flat是扁平的意思。所以這個操作是:先對映(map),再拍扁(join)。 flatMap輸入可能是多個`子陣列流`。所以flatMap先針對 每個`子陣列流`的每個元素進行對映操作。然後進行扁平化處理,最後彙集所有進行扁平化處理的結果集形成一個新的列表(扁平化簡而言之就是去除所有的修飾)。 flatMap與map另外一個不一樣的地方就是傳入的函式在處理完後返回值必須是List。 ### 例項 比如拿到一個文字檔案之後,我們是按行讀取,按行處理。如果要對每一行的單詞數進行計數,那麼應該選擇Map方法,如果是統計詞頻,就應該選擇flatMap方法。 如果還不清楚,可以看看下面這個例子: ```scala 梁山新進一批好馬,準備給每個馬軍頭領配置一批。於是得到函式以及頭領名單如下: 函式 = ( 頭領 => 頭領 + 好馬 ) 五虎將 = List(關勝、林沖、秦明、呼延灼、董平 ) 八驃騎 = List(花榮、徐寧、楊志、索超、張清、朱仝、史進、穆弘 ) // Map函式的例子 利用map函式,我們可以得到 五虎將馬軍 五虎將馬軍 = 五虎將.map( 頭領 => 頭領 + 好馬 ) 結果是 List( 關勝 + 馬、林沖 + 馬、秦明 + 馬、呼延灼 + 馬、董平 + 馬 ) // flatMap函式的例子 但是為了得到統一的馬軍,則可以用flatMap: 馬軍頭領 = List(五虎將,八驃騎) 馬軍 = 馬軍頭領.flatMap( 頭領 => 頭領 + 好馬 ) 結果就是:List( 關勝 + 馬、林沖 + 馬、秦明 + 馬、呼延灼 + 馬、董平 + 馬,花榮 + 馬、徐寧 + 馬、楊志 + 馬、索超 + 馬、張清 + 馬、朱仝 + 馬、史進 + 馬、穆弘 + 馬 ) ``` 現在大家應該清楚了吧。接下來看看幾個FlatMap的例項。 ### Scala語言的實現 Scala本身對於List型別就有map和flatMap操作。舉例如下: ```scala val names = List("Alice","James","Apple") val strings = names.map(x => x.toUpperCase) println(strings) // 輸出 List(ALICE, JAMES, APPLE) val chars = names.flatMap(x=> x.toUpperCase()) println(chars) // 輸出 List(A, L, I, C, E, J, A, M, E, S, A, P, P, L, E) ``` ### Flink的例子 以上是scala語言層面的實現。下面我們看看Flink框架是如何使用FlatMap的。 網上常見的一個Flink應用的例子: ```scala //載入資料來源 val source = env.fromElements("china is the best country","beijing is the capital of china") //轉化處理資料 val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1) ``` ### Flink原始碼中的例子 ```scala case class WordWithCount(word: String, count: Long) val text = env.socketTextStream(host, port, '\n') val windowCounts = text.flatMap { w => w.split("\\s") } .map { w => WordWithCount(w, 1) } .keyBy("word") .timeWindow(Time.seconds(5)) .sum("count") windowCounts.print() ``` ## 0x02 自定義運算元(in Flink) 上面提到的都是簡單的使用,如果有複雜需求,在Flink中,我們可以通過繼承FlatMapFunction和RichFlatMapFunction來自定義運算元。 ### 函式類`FlatMapFunction` 對於不涉及到狀態的使用,可以直接繼承 FlatMapFunction,其定義如下: ```java @Public @FunctionalInterface public interface FlatMap