1. 程式人生 > 其它 >[DB] Flink

[DB] Flink

概述

  • 流式計算,本質上是增量計算,需要不斷查詢過去的狀態

概念

  • Streams(流):分為有界流(固定大小,不隨時間增加而增長)和無界流(隨時間增加而增長),
  • State(狀態):在進行流式計算過程中的資訊,用於容錯恢復和持久化
  • Time(時間):支援Event time、Ingestion time、Processing time等,用來判斷業務狀態是否滯後或延遲
  • API:分為SQL/Table API、DataStream API、ProcessFunction三層

叢集

  • JobManager:叢集管理者,負責排程任務,協調checkpoints、協調故障恢復,收集Job狀態,管理TaskManager
  • TaskManager:實際執行計算的Worker,在其上執行Flink Job 的一組Task,將所在節點的伺服器資訊如記憶體、磁碟、任務執行情況等向JobManager彙報
  • Clinent:將任務提交到叢集,根據使用者引數選擇提交模式(yarn per job,stand-alone,yarn-session)

模型

  • DataStream 的程式設計模型包括四個部分:Environment、DataSource、Transformation、Sink
  • DataSource(資料來源):檔案、Collection、Socket、自定義
  • Sink(資料目標):Kafka、Elasticsearch、RabbitMQ、Cassandra、Redis
  • 每個資料流起始於一個或多個Source,並終止於一個或多個Sink

資源

  • 一個TaskManager就是一個JVM程序,會用獨立的執行緒來執行Task
  • 每個TaskManager為叢集提供Slot,每個task slot代表了TaskManager的一個固定大小的資源子集,slot數一般為每個節點的cpu核數
  • 一個Flink程式由多個任務組成(source、transformation和 sink)
  • 一個任務由多個並行的例項(執行緒)來執行,一個任務的並行例項 (執行緒) 數目就被稱為該任務的並行度

優點

  • 架構:主從模式
  • 容錯:基於兩階段提交,實現了精確的一次處理語義
  • 反壓:當消費者速度低於生產者時,需要消費者將資訊反饋給生產者,使二者速度匹配,Flink使用分散式阻塞佇列實現

聯結器

  • Kafka
  • Redis
  • ElasticSearch

運算元

  • Map:接受一個元素作為輸入,根據開發者自定義的邏輯處理後輸出
 1 class StreamingDemo {
 2     public static void main(String[] args) throws Exception {
 3 
 4         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 5         //獲取資料來源
 6         DataStreamSource<MyStreamingSource.Item> items = env.addSource(new MyStreamingSource()).setParallelism(1); 
 7         //Map
 8         SingleOutputStreamOperator<Object> mapItems = items.map(new MapFunction<MyStreamingSource.Item, Object>() {
 9             @Override
10             public Object map(MyStreamingSource.Item item) throws Exception {
11                 return item.getName();
12             }
13         });
14         //列印結果
15         mapItems.print().setParallelism(1);
16         String jobName = "user defined streaming source";
17         env.execute(jobName);
18     }
19 }
View Code
  • FlatMap:接受一個元素,返回0到多個元素,和Map的區別是,當返回值是列表時,FlatMap會將列表平鋪,以單個元素的形式輸出
1 SingleOutputStreamOperator<Object> flatMapItems = items.flatMap(new FlatMapFunction<MyStreamingSource.Item, Object>() {
2     @Override
3     public void flatMap(MyStreamingSource.Item item, Collector<Object> collector) throws Exception {
4         String name = item.getName();
5         collector.collect(name);
6     }
7 });
View Code
  • Filter:過濾掉不需要的資料,每個元素都會被Filter處理,如果Filter函式返回true則保留,否則丟棄
1 SingleOutputStreamOperator<MyStreamingSource.Item> filterItems = items.filter(new FilterFunction<MyStreamingSource.Item>() {
2     @Override
3     public boolean filter(MyStreamingSource.Item item) throws Exception {
4 
5         return item.getId() % 2 == 0;
6     }
7 });
View Code
  • KeyBy:根據資料的某種屬性分組,然後對不同的組採取不同的處理方式
  • Aggregations:聚合函式,常見的有sum、max、min等,需要指定一個key進行聚合
  • Reduce:按照使用者自定義邏輯進行分組聚合

狀態

  • Flink框架的計算是有狀態的
  • 狀態即中間計算結果,是在流處理過程中需要記住的資料,包括業務資料和元資料
  • 狀態儲存在JVM中
  • Flink支援不同型別的狀態,對狀態的持久化提供專門機制和狀態管理器
  • 對於任何一個狀態資料,可以設定過期時間(TTL)
  • 基本型別:是否按照某個key進行分割槽
    • Keyed State:每個key都有自己的狀態
    • Operator State(Keyed State):每個運算元例項共享一個狀態

容錯

  • Checkpoint

視窗

  • 滾動視窗
  • 滑動視窗
  • 會話視窗

時間

  • 生成時間
  • 接入時間
  • 處理時間

水位

  • 由於網路延遲等因素,事件資料往往不能即使傳遞至Flink系統中,導致系統的不穩定或資料亂序
  • 衡量資料處理進度,確保事件資料全部到達Flink系統,即使亂序或遲到,也能像預期一樣計算出正確和連續的結果
  • 任何Event進入Flink系統,都會根據當前最大事件時間產生Watermarks時間戳

廣播變數

  • 允許在每臺機器上保持一個只讀的快取變數,即一個公共的共享變數
  • 可以把一個dataset資料集廣播出去,然後不同的task在節點上都能獲取到

案例

  • 安裝flink
    • tar -zxvf flink-1.9.2-bin-scala_2.11.tgz -C ~/training/
  • 修改flink配置檔案
    • vim flink-conf.yaml
  • 啟動hadoop,zookeeper,flink
    • bin/start-cluster.sh
  • socket資料來源
    • nc -lk 9999
  • 在idea中建立maven工程,開發計數程式

FlinkStreaming.scala

 1 package com.kaikeba.demo1
 2 
 3 import org.apache.flink.runtime.state.filesystem.FsStateBackend
 4 import org.apache.flink.streaming.api.CheckpointingMode
 5 import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
 6 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 7 import org.apache.log4j.{Level, Logger}
 8 
 9 //匯入隱式轉換的包
10 import org.apache.flink.api.scala._
11 
12 /**
13   * flink接受socket資料,進行單詞計數
14   */
15 object FlinkStream {
16   Logger.getLogger("org").setLevel(Level.ERROR)
17 
18   def main(args: Array[String]): Unit = {
19       //todo:1、構建流處理的環境
20       val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
21 
22      //todo:2、從socket獲取資料
23        val sourceStream: DataStream[String] = environment.socketTextStream("bigdata111",9999)
24 
25     //todo:3、對資料進行處理     hadoop spark
26     val result: DataStream[(String, Int)] = sourceStream
27                                                         .flatMap(x => x.split(" ")) //按照空格切分
28                                                         .map(x => (x, 1))   //每個單詞計為1
29                                                         .keyBy(0)         //按照下標為0的單詞進行分組
30                                                         .sum(1)           //按照下標為1累加相同單詞出現的1
31 
32 
33     //todo: 4、對資料進行列印  sink
34     result.print()
35 
36 
37     //todo: 5、開啟任務
38     environment.execute("FlinkStream")
39   }
40 
41 }
View Code

FlinkWordCount.scala

 1 package com.kaikeba.demo1
 2 
 3 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 4 import org.apache.flink.streaming.api.windowing.time.Time
 5 
 6 /**
 7   * 使用滑動視窗
 8   * 每隔1秒鐘統計最近2秒鐘的每個單詞出現的次數
 9   */
10 object FlinkStream {
11 
12   def main(args: Array[String]): Unit = {
13       //構建流處理的環境
14         val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
15 
16      //從socket獲取資料
17        val sourceStream: DataStream[String] = env.socketTextStream("node01",9999)
18 
19      //匯入隱式轉換的包
20       import org.apache.flink.api.scala._
21 
22      //對資料進行處理
23      val result: DataStream[(String, Int)] = sourceStream
24           .flatMap(x => x.split(" ")) //按照空格切分
25           .map(x => (x, 1))           //每個單詞計為1
26           .keyBy(0)                   //按照下標為0的單詞進行分組      
27           .timeWindow(Time.seconds(2),Time.seconds(1)) //每隔1s處理2s的資料
28           .sum(1)            //按照下標為1累加相同單詞出現的次數
29 
30         //對資料進行列印
31         result.print()
32 
33         //開啟任務
34          env.execute("FlinkStream")
35       }
36 
37 }
View Code
  • 打包jar檔案,提交到yarn  
    • ~/training/flink-1.9.2/bin/flink run -m yarn-cluster -yjm 1024 -c com.kaikeba.demo1.FlinkStream original-flask_demo-1.0-SNAPSHOT.jar
  • 檢視結果
    • http://bigdata111:8088

參考

Spark Streaming 和 Flink

https://blog.csdn.net/csdnnews/article/details/81518143

讀寫MySQL

https://blog.csdn.net/hyy1568786/article/details/105886518/

Flink 和 kafka

https://blog.csdn.net/SqrsCbrOnly1/article/details/100011933

State

https://blog.csdn.net/mhaiy24/article/details/102707958

Flink 廣播

https://blog.csdn.net/nazeniwaresakini/article/details/107404951

https://www.jianshu.com/p/520376ae837e

Flink 狀態

https://blog.csdn.net/mhaiy24/article/details/102707958

Flink入門到專案

https://blog.csdn.net/lp284558195/article/details/92798595

Flink 使用 broadcast 實現維表或配置的實時更新

https://blog.csdn.net/tzs_1041218129/article/details/105283325

flink+kafka實現wordcount實時計算+錯誤解決方案

https://blog.csdn.net/xiaoyutongxue6/article/details/88861087

flink流處理訪問mysql

https://blog.csdn.net/u012447842/article/details/89175772