Flink從入門到入土(詳細教程)
和其他所有的計算框架一樣,flink也有一些基礎的開發步驟以及基礎,核心的API,從開發步驟的角度來講,主要分為四大部分
1.Environment
Flink Job在提交執行計算時,需要首先建立和Flink框架之間的聯絡,也就指的是當前的flink執行環境,只有獲取了環境資訊,才能將task排程到不同的taskManager執行。而這個環境物件的獲取方式相對比較簡單
// 批處理環境 val env = ExecutionEnvironment.getExecutionEnvironment // 流式資料處理環境 val env = StreamExecutionEnvironment.getExecutionEnvironment
2.Source
Flink框架可以從不同的來源獲取資料,將資料提交給框架進行處理, 我們將獲取資料的來源稱之為資料來源.
2.1.從集合讀取資料
一般情況下,可以將資料臨時儲存到記憶體中,形成特殊的資料結構後,作為資料來源使用。這裡的資料結構採用集合型別是比較普遍的
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 陽斌 * 郵箱:[email protected] * 類的說明:從集合讀取資料 */ object SourceList { def main(args: Array[String]): Unit = { //1.建立執行的環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2.從集合中讀取資料 val sensorDS: DataStream[WaterSensor] = env.fromCollection( // List(1,2,3,4,5) List( WaterSensor("ws_001", 1577844001, 45.0), WaterSensor("ws_002", 1577844015, 43.0), WaterSensor("ws_003", 1577844020, 42.0) ) ) //3.列印 sensorDS.print() //4.執行 env.execute("sensor") } /** * 定義樣例類:水位感測器:用於接收空高資料 * * @param id 感測器編號 * @param ts 時間戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
2.2從檔案中讀取資料
通常情況下,我們會從儲存介質中獲取資料,比較常見的就是將日誌檔案作為資料來源
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 陽斌 * 郵箱:[email protected] * 類的說明:從檔案讀取資料 */ object SourceFile { def main(args: Array[String]): Unit = { //1.建立執行的環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2.從指定路徑獲取資料 val fileDS: DataStream[String] = env.readTextFile("input/data.log") //3.列印 fileDS.print() //4.執行 env.execute("sensor") } } /** * 在讀取檔案時,檔案路徑可以是目錄也可以是單一檔案。如果採用相對檔案路徑,會從當前系統引數user.dir中獲取路徑 * System.getProperty("user.dir") */ /** * 如果在IDEA中執行程式碼,那麼系統引數user.dir自動指向專案根目錄, * 如果是standalone叢集環境, 預設為叢集節點根目錄,當然除了相對路徑以外, * 也可以將路徑設定為分散式檔案系統路徑,如HDFS val fileDS: DataStream[String] = env.readTextFile( "hdfs://hadoop02:9000/test/1.txt") */
如果是standalone叢集環境, 預設為叢集節點根目錄,當然除了相對路徑以外,也可以將路徑設定為分散式檔案系統路徑,如HDFS
val fileDS: DataStream[String] = env.readTextFile( "hdfs://hadoop02:9000/test/1.txt")
預設讀取時,flink的依賴關係中是不包含Hadoop依賴關係的,所以執行上面程式碼時,會出現錯誤。
解決方法就是增加相關依賴jar包就可以了
2.3 kafka讀取資料
Kafka作為訊息傳輸佇列,是一個分散式的,高吞吐量,易於擴充套件地基於主題釋出/訂閱的訊息系統。在現今企業級開發中,Kafka 和 Flink成為構建一個實時的資料處理系統的首選
2.3.1 引入kafka聯結器的依賴
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.10.0</version> </dependency>
2.3.2 程式碼實現參考
import java.util.Properties import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011 import org.apache.flink.streaming.util.serialization.SimpleStringSchema /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 陽斌 * 郵箱:[email protected] * 類的說明:從kafka讀取資料 */ object SourceKafka { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "hadoop02:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") val kafkaDS: DataStream[String] = env.addSource( new FlinkKafkaConsumer011[String]( "sensor", new SimpleStringSchema(), properties) ) kafkaDS.print() env.execute("sensor") } }
2.4 自定義資料來源
大多數情況下,前面的資料來源已經能夠滿足需要,但是難免會存在特殊情況的場合,所以flink也提供了能自定義資料來源的方式
2.4.1 建立自定義資料來源
import com.atyang.day01.Source.SourceList.WaterSensor import org.apache.flink.streaming.api.functions.source.SourceFunction import scala.util.Random /** * description: ss * date: 2020/8/28 20:36 * version: 1.0 * * @author 陽斌 * 郵箱:[email protected] * 類的說明:自定義資料來源 */ class MySensorSource extends SourceFunction[WaterSensor] { var flg = true override def run(ctx: SourceFunction.SourceContext[WaterSensor]): Unit = { while ( flg ) { // 採集資料 ctx.collect( WaterSensor( "sensor_" +new Random().nextInt(3), 1577844001, new Random().nextInt(5)+40 ) ) Thread.sleep(100) } } override def cancel(): Unit = { flg = false; } }
3.Transform
在Spark中,運算元分為轉換運算元和行動運算元,轉換運算元的作用可以通過運算元方法的呼叫將一個RDD轉換另外一個RDD,Flink中也存在同樣的操作,可以將一個數據流轉換為其他的資料流。
轉換過程中,資料流的型別也會發生變化,那麼到底Flink支援什麼樣的資料型別呢,其實我們常用的資料型別,Flink都是支援的。比如:Long, String, Integer, Int, 元組,樣例類,List, Map等。
3.1 map
- 對映:將資料流中的資料進行轉換, 形成新的資料流,消費一個元素併產出一個元素
- 引數:Scala匿名函式或MapFunction
- 返回:DataStream
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 陽斌 * 郵箱:[email protected] * 類的說明:從集合讀取資料 */ object Transfrom_map { def main(args: Array[String]): Unit = { //1.建立執行的環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2.從集合中讀取資料 val sensorDS: DataStream[WaterSensor] = env.fromCollection( // List(1,2,3,4,5) List( WaterSensor("ws_001", 1577844001, 45.0), WaterSensor("ws_002", 1577844015, 43.0), WaterSensor("ws_003", 1577844020, 42.0) ) ) val sensorDSMap = sensorDS.map(x => (x.id+"_1",x.ts+"_1",x.vc + 1)) //3.列印 sensorDSMap.print() //4.執行 env.execute("sensor") } /** * 定義樣例類:水位感測器:用於接收空高資料 * * @param id 感測器編號 * @param ts 時間戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
3.1.1 MapFunction
Flink為每一個運算元的引數都至少提供了Scala匿名函式和函式類兩種的方式,其中如果使用函式類作為引數的話,需要讓自定義函式繼承指定的父類或實現特定的介面。例如:MapFunction
sensor-data.log 檔案資料
sensor_1,1549044122,10 sensor_1,1549044123,20 sensor_1,1549044124,30 sensor_2,1549044125,40 sensor_1,1549044126,50 sensor_2,1549044127,60 sensor_1,1549044128,70 sensor_3,1549044129,80 sensor_3,1549044130,90 sensor_3,1549044130,100 import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 陽斌 * 郵箱:[email protected] * 類的說明:從檔案讀取資料 */ object SourceFileMap { def main(args: Array[String]): Unit = { //1.建立執行的環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2.從指定路徑獲取資料 val fileDS: DataStream[String] = env.readTextFile("input/sensor-data.log") val MapDS = fileDS.map( lines => { //更加逗號切割 獲取每個元素 val datas: Array[String] = lines.split(",") WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) } ) //3.列印 MapDS.print() //4.執行 env.execute("map") } /** * 定義樣例類:水位感測器:用於接收空高資料 * * @param id 感測器編號 * @param ts 時間戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 陽斌 * 郵箱:[email protected] * 類的說明:從檔案讀取資料 */ object Transform_MapFunction { def main(args: Array[String]): Unit = { //1.建立執行的環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2.從指定路徑獲取資料 val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log") sensorDS.map() //3.列印 // MapDS.print() //4.執行 env.execute("map") } /** * 自定義繼承 MapFunction * MapFunction[T,O] * 自定義輸入和輸出 * */ class MyMapFunction extends MapFunction[String,WaterSensor]{ override def map(t: String): WaterSensor = { val datas: Array[String] = t.split(",") WaterSensor(datas(0),datas(1).toLong,datas(2).toInt) } } /** * 定義樣例類:水位感測器:用於接收空高資料 * * @param id 感測器編號 * @param ts 時間戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
3.1.2 RichMapFunction
所有Flink函式類都有其Rich版本。它與常規函式的不同在於,可以獲取執行環境的上下文,並擁有一些生命週期方法,所以可以實現更復雜的功能。也有意味著提供了更多的,更豐富的功能。例如:RichMapFunction
sensor-data.log 檔案資料 同上一致
import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 陽斌 * 郵箱:[email protected] * 類的說明:從檔案讀取資料 */ object Transform_RichMapFunction { def main(args: Array[String]): Unit = { //1.建立執行的環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //2.從指定路徑獲取資料 val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log") val myMapDS: DataStream[WaterSensor] = sensorDS.map(new MyRichMapFunction) //3.列印 myMapDS.print() //4.執行 env.execute("map") } /** * 自定義繼承 MapFunction * MapFunction[T,O] * 自定義輸入和輸出 * */ class MyRichMapFunction extends RichMapFunction[String,WaterSensor]{ override def map(value: String): WaterSensor = { val datas: Array[String] = value.split(",") // WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) WaterSensor(getRuntimeContext.getTaskName, datas(1).toLong, datas(2).toInt) } // 富函式提供了生命週期方法 override def open(parameters: Configuration): Unit = {} override def close(): Unit = {} } /** * 定義樣例類:水位感測器:用於接收空高資料 * * @param id 感測器編號 * @param ts 時間戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
Rich Function有一個生命週期的概念。典型的生命週期方法有:
- open()方法是rich function的初始化方法,當一個運算元例如map或者filter被調 用之前open()會被呼叫
- close()方法是生命週期中的最後一個呼叫的方法,做一些清理工作
- getRuntimeContext()方法提供了函式的RuntimeContext的一些資訊,例如函式執行 的並行度,任務的名字,以及state狀態
3.1.3 flatMap
- 扁平對映:將資料流中的整體拆分成一個一個的個體使用,消費一個元素併產生零到多個元素
- 引數:Scala匿名函式或FlatMapFunction
- 返回:DataStream
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 陽斌 * 郵箱:[email protected] * 類的說明:FlatMap */ object Transform_FlatMap { def main(args: Array[String]): Unit = { // 1.建立執行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2.讀取資料 val listDS: DataStream[List[Int]] = env.fromCollection( List( List(1, 2, 3, 4), List(5, 6, 7,1,1,1) ) ) val resultDS: DataStream[Int] = listDS.flatMap(list => list) resultDS.print() // 4. 執行 env.execute() } }
3.2. filter
- 過濾:根據指定的規則將滿足條件(true)的資料保留,不滿足條件(false)的資料丟棄
- 引數:Scala匿名函式或FilterFunction
- 返回:DataStream
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 陽斌 * 郵箱:[email protected] * 類的說明:Filter */ object Transform_Filter { def main(args: Array[String]): Unit = { // 1.建立執行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2.讀取資料 val listDS: DataStream[List[Int]] = env.fromCollection( List( List(1, 2, 3, 4,1, 2, 3, 4), List(5, 6, 7,1,1,1,1, 2, 3, 4,1, 2, 3, 4), List(1, 2, 3, 4), List(5, 6, 7,1,1,1), List(1, 2, 3, 4), List(5, 6, 7,1,1,1) ) ) // true就留下,false就拋棄 listDS.filter(num => { num.size>5 }) .print("filter") // 4. 執行 env.execute() } }
3.3 keyBy
在Spark中有一個GroupBy的運算元,用於根據指定的規則將資料進行分組,在flink中也有類似的功能,那就是keyBy,根據指定的key對資料進行分流
-
分流:根據指定的Key將元素髮送到不同的分割槽,相同的Key會被分到一個分割槽(這裡分割槽指的就是下游運算元多個並行節點的其中一個)。keyBy()是通過雜湊來分割槽的
-
引數:Scala匿名函式或POJO屬性或元組索引,不能使用陣列
-
返回:KeyedStream
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 陽斌 * 郵箱:[email protected] * 類的說明:FlatMap */ object Transform_KeyBy { def main(args: Array[String]): Unit = { // 1.建立執行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2.讀取資料 val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log") //3.轉換為樣例類 val mapDS = sensorDS.map( lines => { val datas = lines.split(",") WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) } ) // 4. 使用keyby進行分組 // TODO 關於返回的key的型別: // 1. 如果是位置索引 或 欄位名稱 ,程式無法推斷出key的型別,所以給一個java的Tuple型別 // 2. 如果是匿名函式 或 函式類 的方式,可以推斷出key的型別,比較推薦使用 // *** 分組的概念:分組只是邏輯上進行分組,打上了記號(標籤),跟並行度沒有絕對的關係 // 同一個分組的資料在一起(不離不棄) // 同一個分割槽裡可以有多個不同的組 // val sensorKS: KeyedStream[WaterSensor, Tuple] = mapDS.keyBy(0) // val sensorKS: KeyedStream[WaterSensor, Tuple] = mapDS.keyBy("id") val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id) // val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy( // new KeySelector[WaterSensor, String] { // override def getKey(value: WaterSensor): String = { // value.id // } // } // ) sensorKS.print().setParallelism(5) // 4. 執行 env.execute() } /** * 定義樣例類:水位感測器:用於接收空高資料 * * @param id 感測器編號 * @param ts 時間戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
3.4 shuffle
- 打亂重組(洗牌):將資料按照均勻分佈打散到下游
- 引數:無
- 返回:DataStream
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 陽斌 * 郵箱:[email protected] * 類的說明:FlatMap */ object Transform_Shuffle { def main(args: Array[String]): Unit = { // 1.建立執行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2.讀取資料 val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log") val shuffleDS = sensorDS.shuffle sensorDS.print("data") shuffleDS.print("shuffle") // 4. 執行 env.execute() } }
3.5. split
在某些情況下,我們需要將資料流根據某些特徵拆分成兩個或者多個數據流,給不同資料流增加標記以便於從流中取出。
需求:將水位感測器資料按照空高高低(以40cm,30cm為界),拆分成三個流
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 陽斌 * 郵箱:[email protected] * 類的說明:FlatMap */ object Transform_Split { def main(args: Array[String]): Unit = { // 1.建立執行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2.讀取資料 val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log") // 3.轉換成樣例類 val mapDS: DataStream[WaterSensor] = sensorDS.map( lines => { val datas: Array[String] = lines.split(",") WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) } ) val splitSS: SplitStream[WaterSensor] = mapDS.split( sensor => { if (sensor.vc < 40) { Seq("normal") } else if (sensor.vc < 80) { Seq("Warn") } else { Seq("alarm") } } ) // 4. 執行 env.execute() } /** * 定義樣例類:水位感測器:用於接收空高資料 * * @param id 感測器編號 * @param ts 時間戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
3.6 select
將資料流進行切分後,如何從流中將不同的標記取出呢,這時就需要使用select運算元了。
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 陽斌 * 郵箱:[email protected] * 類的說明:FlatMap */ object Transform_Split { def main(args: Array[String]): Unit = { // 1.建立執行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2.讀取資料 val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log") // 3.轉換成樣例類 val mapDS: DataStream[WaterSensor] = sensorDS.map( lines => { val datas: Array[String] = lines.split(",") WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) } ) val splitDS: SplitStream[WaterSensor] = mapDS.split( sensor => { if (sensor.vc < 40) { Seq("info") } else if (sensor.vc < 80) { Seq("warn") } else { Seq("error") } } ) val errorDS: DataStream[WaterSensor] = splitDS.select("error") val warnDS: DataStream[WaterSensor] = splitDS.select("warn") val infoDS: DataStream[WaterSensor] = splitDS.select("info") infoDS.print("info") warnDS.print("warn") errorDS.print("error") // 4. 執行 env.execute() } /** * 定義樣例類:水位感測器:用於接收空高資料 * * @param id 感測器編號 * @param ts 時間戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
3.7 connect
在某些情況下,我們需要將兩個不同來源的資料流進行連線,實現資料匹配,比如訂單支付和第三方交易資訊,這兩個資訊的資料就來自於不同資料來源,連線後,將訂單支付和第三方交易資訊進行對賬,此時,才能算真正的支付完成。
Flink中的connect運算元可以連線兩個保持他們型別的資料流,兩個資料流被Connect之後,只是被放在了一個同一個流中,內部依然保持各自的資料和形式不發生任何變化,兩個流相互獨立。
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 陽斌 * 郵箱:[email protected] * 類的說明:FlatMap */ object Transform_Connect { def main(args: Array[String]): Unit = { // 1.建立執行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2.讀取資料 val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log") // 3.轉換成樣例類 val mapDS: DataStream[WaterSensor] = sensorDS.map( lines => { val datas: Array[String] = lines.split(",") WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) } ) // 4. 從集合中再讀取一條流 val numDS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5, 6)) val resultCS: ConnectedStreams[WaterSensor, Int] = mapDS.connect(numDS) // coMap表示連線流呼叫的map,各自都需要一個 function resultCS.map( sensor=>sensor.id, num=>num+1 ).print() // 4. 執行 env.execute() } /** * 定義樣例類:水位感測器:用於接收空高資料 * * @param id 感測器編號 * @param ts 時間戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
3.8 union
對兩個或者兩個以上的DataStream進行union操作,產生一個包含所有DataStream元素的新DataStream
connect與 union 區別:
- union之前兩個流的型別必須是一樣,connect可以不一樣
- connect只能操作兩個流,union可以操作多個。
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 陽斌 * 郵箱:[email protected] * 類的說明:FlatMap */ object Transform_Union { def main(args: Array[String]): Unit = { // 1.建立執行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2. 從集合中讀取流 val num1DS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4)) val num2DS: DataStream[Int] = env.fromCollection(List(7, 8, 9, 10)) val num3DS: DataStream[Int] = env.fromCollection(List(17, 18, 19, 110)) // TODO union 真正將多條流合併成一條流 // 合併的流,型別必須一致 // 可以合併多條流,只要型別一致 num1DS.union(num2DS).union(num3DS) .print() // 4. 執行 env.execute() } /** * 定義樣例類:水位感測器:用於接收空高資料 * * @param id 感測器編號 * @param ts 時間戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
3.9 Operator
Flink作為計算框架,主要應用於資料計算處理上, 所以在keyBy對資料進行分流後,可以對資料進行相應的統計分析
3.9.1 滾動聚合運算元(Rolling Aggregation)
這些運算元可以針對KeyedStream的每一個支流做聚合。執行完成後,會將聚合的結果合成一個流返回,所以結果都是DataStream
sum()
min()
max()
3.9.2 reduce
一個分組資料流的聚合操作,合併當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是隻返回最後一次聚合的最終結果。
import org.apache.flink.streaming.api.scala._ /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 陽斌 * 郵箱:[email protected] * 類的說明:Reduce */ object Transform_Reduce { def main(args: Array[String]): Unit = { // 1.建立執行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2.讀取資料 val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log") // 3.轉換成樣例類 val mapDS: DataStream[WaterSensor] = sensorDS.map( lines => { val datas: Array[String] = lines.split(",") WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) } ) val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id) // 輸入的型別一樣,輸出型別和輸出型別也要一樣 // 組內的第一條資料,不進入reduce計算 val reduceDS: DataStream[WaterSensor] = sensorKS.reduce( (ws1, ws2) => { println(ws1 + "<===>" + ws2) WaterSensor(ws1.id, System.currentTimeMillis(), ws1.vc + ws2.vc) } ) reduceDS.print("reduce") // 4. 執行 env.execute() } /** * 定義樣例類:水位感測器:用於接收空高資料 * * @param id 感測器編號 * @param ts 時間戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
3.9.3process
Flink在資料流通過keyBy進行分流處理後,如果想要處理過程中獲取環境相關資訊,可以採用process運算元自定義實現 1)繼承KeyedProcessFunction抽象類,並定義泛型:[KEY, IN, OUT]
class MyKeyedProcessFunction extends KeyedProcessFunction[String, WaterSensor, String]{} 重寫方法 // 自定義KeyedProcessFunction,是一個特殊的富函式 // 1.實現KeyedProcessFunction,指定泛型:K - key的型別, I - 上游資料的型別, O - 輸出的資料型別 // 2.重寫 processElement方法,定義 每條資料來的時候 的 處理邏輯 /** * 處理邏輯:來一條處理一條 * * @param value 一條資料 * @param ctx 上下文物件 * @param out 採集器:收集資料,並輸出 */ override def processElement(value: WaterSensor, ctx: KeyedProcessFunction[String, WaterSensor, String]#Context, out: Collector[String]): Unit = { out.collect("我來到process啦,分組的key是="+ctx.getCurrentKey+",資料=" + value) // 如果key是tuple,即keyby的時候,使用的是 位置索引 或 欄位名稱,那麼key獲取到是一個tuple // ctx.getCurrentKey.asInstanceOf[Tuple1].f0 //Tuple1需要手動引入Java的Tuple }
完整程式碼:
import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * description: SourceList * date: 2020/8/28 19:02 * version: 1.0 * * @author 陽斌 * 郵箱:[email protected] * 類的說明:Reduce */ object Transform_Process { def main(args: Array[String]): Unit = { // 1.建立執行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) // 2.讀取資料 val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log") // 3.轉換成樣例類 val mapDS: DataStream[WaterSensor] = sensorDS.map( lines => { val datas: Array[String] = lines.split(",") WaterSensor(datas(0), datas(1).toLong, datas(2).toInt) } ) //按照ID 進行分組 val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id) sensorKS.process(new MyKeyedProcessFunction) // 4. 執行 env.execute() } // 自定義KeyedProcessFunction,是一個特殊的富函式 // 1.實現KeyedProcessFunction,指定泛型:K - key的型別, I - 上游資料的型別, O - 輸出的資料型別 // 2.重寫 processElement方法,定義 每條資料來的時候 的 處理邏輯 class MyKeyedProcessFunction extends KeyedProcessFunction[String, WaterSensor, String] { /** * 處理邏輯:來一條處理一條 * * @param value 一條資料 * @param ctx 上下文物件 * @param out 採集器:收集資料,並輸出 */ override def processElement(value: WaterSensor, ctx: KeyedProcessFunction[String, WaterSensor, String]#Context, out: Collector[String]): Unit = { out.collect("我來到process啦,分組的key是="+ctx.getCurrentKey+",資料=" + value) // 如果key是tuple,即keyby的時候,使用的是 位置索引 或 欄位名稱,那麼key獲取到是一個tuple // ctx.getCurrentKey.asInstanceOf[Tuple1].f0 //Tuple1需要手動引入Java的Tuple } } /** * 定義樣例類:水位感測器:用於接收空高資料 * * @param id 感測器編號 * @param ts 時間戳 * @param vc 空高 */ case class WaterSensor(id: String, ts: Long, vc: Double) }
4.Sink
Sink有下沉的意思,在Flink中所謂的Sink其實可以表示為將資料儲存起來的意思,也可以將範圍擴大,表示將處理完的資料傳送到指定的儲存系統的輸出操作
之前我們一直在使用的print方法其實就是一種Sink。
@PublicEvolving public DataStreamSink<T> print(String sinkIdentifier) { PrintSinkFunction<T> printFunction = new PrintSinkFunction(sinkIdentifier, false); return this.addSink(printFunction).name("Print to Std. Out"); }
官方提供了一部分的框架的sink。除此以外,需要使用者自定義實現sink
本文作者:Java知音@陽斌