[DB] Flink
概述
- 流式計算,本質上是增量計算,需要不斷查詢過去的狀態
概念
- Streams(流):分為有界流(固定大小,不隨時間增加而增長)和無界流(隨時間增加而增長),
- State(狀態):在進行流式計算過程中的資訊,用於容錯恢復和持久化
- Time(時間):支援Event time、Ingestion time、Processing time等,用來判斷業務狀態是否滯後或延遲
- API:分為SQL/Table API、DataStream API、ProcessFunction三層
叢集
- JobManager:叢集管理者,負責排程任務,協調checkpoints、協調故障恢復,收集Job狀態,管理TaskManager
- TaskManager:實際執行計算的Worker,在其上執行Flink Job 的一組Task,將所在節點的伺服器資訊如記憶體、磁碟、任務執行情況等向JobManager彙報
- Clinent:將任務提交到叢集,根據使用者引數選擇提交模式(yarn per job,stand-alone,yarn-session)
模型
- DataStream 的程式設計模型包括四個部分:Environment、DataSource、Transformation、Sink
- DataSource(資料來源):檔案、Collection、Socket、自定義
- Sink(資料目標):Kafka、Elasticsearch、RabbitMQ、Cassandra、Redis
- 每個資料流起始於一個或多個Source,並終止於一個或多個Sink
資源
- 一個TaskManager就是一個JVM程序,會用獨立的執行緒來執行Task
- 每個TaskManager為叢集提供Slot,每個task slot代表了TaskManager的一個固定大小的資源子集,slot數一般為每個節點的cpu核數
- 一個Flink程式由多個任務組成(source、transformation和 sink)
- 一個任務由多個並行的例項(執行緒)來執行,一個任務的並行例項 (執行緒) 數目就被稱為該任務的並行度
優點
- 架構:主從模式
- 容錯:基於兩階段提交,實現了精確的一次處理語義
- 反壓:當消費者速度低於生產者時,需要消費者將資訊反饋給生產者,使二者速度匹配,Flink使用分散式阻塞佇列實現
聯結器
- Kafka
- Redis
- ElasticSearch
運算元
- Map:接受一個元素作為輸入,根據開發者自定義的邏輯處理後輸出
1 class StreamingDemo { 2 public static void main(String[] args) throws Exception { 3 4 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 5 //獲取資料來源 6 DataStreamSource<MyStreamingSource.Item> items = env.addSource(new MyStreamingSource()).setParallelism(1); 7 //Map 8 SingleOutputStreamOperator<Object> mapItems = items.map(new MapFunction<MyStreamingSource.Item, Object>() { 9 @Override 10 public Object map(MyStreamingSource.Item item) throws Exception { 11 return item.getName(); 12 } 13 }); 14 //列印結果 15 mapItems.print().setParallelism(1); 16 String jobName = "user defined streaming source"; 17 env.execute(jobName); 18 } 19 }View Code
- FlatMap:接受一個元素,返回0到多個元素,和Map的區別是,當返回值是列表時,FlatMap會將列表平鋪,以單個元素的形式輸出
1 SingleOutputStreamOperator<Object> flatMapItems = items.flatMap(new FlatMapFunction<MyStreamingSource.Item, Object>() { 2 @Override 3 public void flatMap(MyStreamingSource.Item item, Collector<Object> collector) throws Exception { 4 String name = item.getName(); 5 collector.collect(name); 6 } 7 });View Code
- Filter:過濾掉不需要的資料,每個元素都會被Filter處理,如果Filter函式返回true則保留,否則丟棄
1 SingleOutputStreamOperator<MyStreamingSource.Item> filterItems = items.filter(new FilterFunction<MyStreamingSource.Item>() { 2 @Override 3 public boolean filter(MyStreamingSource.Item item) throws Exception { 4 5 return item.getId() % 2 == 0; 6 } 7 });View Code
- KeyBy:根據資料的某種屬性分組,然後對不同的組採取不同的處理方式
- Aggregations:聚合函式,常見的有sum、max、min等,需要指定一個key進行聚合
- Reduce:按照使用者自定義邏輯進行分組聚合
狀態
- Flink框架的計算是有狀態的
- 狀態即中間計算結果,是在流處理過程中需要記住的資料,包括業務資料和元資料
- 狀態儲存在JVM中
- Flink支援不同型別的狀態,對狀態的持久化提供專門機制和狀態管理器
- 對於任何一個狀態資料,可以設定過期時間(TTL)
- 基本型別:是否按照某個key進行分割槽
- Keyed State:每個key都有自己的狀態
- Operator State(Keyed State):每個運算元例項共享一個狀態
容錯
- Checkpoint
視窗
- 滾動視窗
- 滑動視窗
- 會話視窗
時間
- 生成時間
- 接入時間
- 處理時間
水位
- 由於網路延遲等因素,事件資料往往不能即使傳遞至Flink系統中,導致系統的不穩定或資料亂序
- 衡量資料處理進度,確保事件資料全部到達Flink系統,即使亂序或遲到,也能像預期一樣計算出正確和連續的結果
- 任何Event進入Flink系統,都會根據當前最大事件時間產生Watermarks時間戳
廣播變數
- 允許在每臺機器上保持一個只讀的快取變數,即一個公共的共享變數
- 可以把一個dataset資料集廣播出去,然後不同的task在節點上都能獲取到
案例
- 安裝flink
- tar -zxvf flink-1.9.2-bin-scala_2.11.tgz -C ~/training/
- 修改flink配置檔案
- vim flink-conf.yaml
- 啟動hadoop,zookeeper,flink
- bin/start-cluster.sh
- socket資料來源
- nc -lk 9999
- 在idea中建立maven工程,開發計數程式
FlinkStreaming.scala
1 package com.kaikeba.demo1 2 3 import org.apache.flink.runtime.state.filesystem.FsStateBackend 4 import org.apache.flink.streaming.api.CheckpointingMode 5 import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup 6 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} 7 import org.apache.log4j.{Level, Logger} 8 9 //匯入隱式轉換的包 10 import org.apache.flink.api.scala._ 11 12 /** 13 * flink接受socket資料,進行單詞計數 14 */ 15 object FlinkStream { 16 Logger.getLogger("org").setLevel(Level.ERROR) 17 18 def main(args: Array[String]): Unit = { 19 //todo:1、構建流處理的環境 20 val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 21 22 //todo:2、從socket獲取資料 23 val sourceStream: DataStream[String] = environment.socketTextStream("bigdata111",9999) 24 25 //todo:3、對資料進行處理 hadoop spark 26 val result: DataStream[(String, Int)] = sourceStream 27 .flatMap(x => x.split(" ")) //按照空格切分 28 .map(x => (x, 1)) //每個單詞計為1 29 .keyBy(0) //按照下標為0的單詞進行分組 30 .sum(1) //按照下標為1累加相同單詞出現的1 31 32 33 //todo: 4、對資料進行列印 sink 34 result.print() 35 36 37 //todo: 5、開啟任務 38 environment.execute("FlinkStream") 39 } 40 41 }View Code
FlinkWordCount.scala
1 package com.kaikeba.demo1 2 3 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} 4 import org.apache.flink.streaming.api.windowing.time.Time 5 6 /** 7 * 使用滑動視窗 8 * 每隔1秒鐘統計最近2秒鐘的每個單詞出現的次數 9 */ 10 object FlinkStream { 11 12 def main(args: Array[String]): Unit = { 13 //構建流處理的環境 14 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 15 16 //從socket獲取資料 17 val sourceStream: DataStream[String] = env.socketTextStream("node01",9999) 18 19 //匯入隱式轉換的包 20 import org.apache.flink.api.scala._ 21 22 //對資料進行處理 23 val result: DataStream[(String, Int)] = sourceStream 24 .flatMap(x => x.split(" ")) //按照空格切分 25 .map(x => (x, 1)) //每個單詞計為1 26 .keyBy(0) //按照下標為0的單詞進行分組 27 .timeWindow(Time.seconds(2),Time.seconds(1)) //每隔1s處理2s的資料 28 .sum(1) //按照下標為1累加相同單詞出現的次數 29 30 //對資料進行列印 31 result.print() 32 33 //開啟任務 34 env.execute("FlinkStream") 35 } 36 37 }View Code
- 打包jar檔案,提交到yarn
- ~/training/flink-1.9.2/bin/flink run -m yarn-cluster -yjm 1024 -c com.kaikeba.demo1.FlinkStream original-flask_demo-1.0-SNAPSHOT.jar
- 檢視結果
- http://bigdata111:8088
參考
Spark Streaming 和 Flink
https://blog.csdn.net/csdnnews/article/details/81518143
讀寫MySQL
https://blog.csdn.net/hyy1568786/article/details/105886518/
Flink 和 kafka
https://blog.csdn.net/SqrsCbrOnly1/article/details/100011933
State
https://blog.csdn.net/mhaiy24/article/details/102707958
Flink 廣播
https://blog.csdn.net/nazeniwaresakini/article/details/107404951
https://www.jianshu.com/p/520376ae837e
Flink 狀態
https://blog.csdn.net/mhaiy24/article/details/102707958
Flink入門到專案
https://blog.csdn.net/lp284558195/article/details/92798595
Flink 使用 broadcast 實現維表或配置的實時更新
https://blog.csdn.net/tzs_1041218129/article/details/105283325
flink+kafka實現wordcount實時計算+錯誤解決方案
https://blog.csdn.net/xiaoyutongxue6/article/details/88861087
flink流處理訪問mysql