Flink初體驗 -- Word Count
阿新 • • 發佈:2018-12-20
Flink初體驗 – Word Count
object WordCount { def main(args: Array[String]): Unit = { /** * flink 的執行模型 * 1.DataSource * 2.Transformation * 3.DataSink */ // 獲取環境,類似於spark context val env = StreamExecutionEnvironment.getExecutionEnvironment val ip = "***.***.***.***" // 從socket建立DataStream // socket DataSource val socketStream = env.socketTextStream(ip, 9000) // Transformation 資料轉換操作 val wordsStream = socketStream.flatMap(value => value.split("\\s+")) .map(value => (value, 1)) val keyValuePair1: KeyedStream[(String, Int), Tuple] = wordsStream.keyBy(0) val countPair: DataStream[(String, Int)] = keyValuePair1.sum(1) // Data Sink countPair.print() println("==================== 視窗計算 ==========================") // 視窗計算 批次處理(每隔5秒計算一次) val keyValuePair2 = wordsStream.keyBy(0).timeWindow(Time.seconds(5)) // val countStream = keyValuePair2.sum(1) countStream.print() // 觸發執行程式 env.execute(" Word Count ") } }