1. 程式人生 > 實用技巧 >Flink開發中的問題

Flink開發中的問題

1. 流與批處理的區別

  • 流處理系統

流處理系統,其節點間資料傳輸的標準模型是:當一條資料被處理完成後,序列化到快取中,然後立刻通過網路傳輸到下一個節點,由下一個節點繼續處理。

  • 批處理系統

批處理系統,其節點間資料傳輸的標準模型是:當一條資料被處理完成後,序列化到快取中,並不會立刻通過網路傳輸到下一個節點,當快取寫滿,就持久化到本地硬碟上,當所有資料都被處理完成後,才開始將處理後的資料通過網路傳輸到下一個節點。

  • flink的流處理和批處理

Flink的執行引擎採用了一種十分靈活的方式,同時支援了這兩種資料傳輸模型:
• Flink以固定的快取塊為單位進行網路資料傳輸,使用者可以通過設定快取塊超時值指定快取塊的傳輸時機。如果快取塊的超時值為0,則Flink的資料傳輸方式類似上文所提到流處理系統的標準模型,此時系統可以獲得最低的處理延遲
• 如果快取塊的超時值為無限大,則Flink的資料傳輸方式類似上文所提到批處理系統的標準模型,此時系統可以獲得最高的吞吐量
• 同時快取塊的超時值也可以設定為0到無限大之間的任意值。快取塊的超時閾值越小,則Flink流處理執行引擎的資料處理延遲越低,但吞吐量也會降低,反之亦然。通過調整快取塊的超時閾值,使用者可根據需求靈活地權衡系統延遲和吞吐量

原文連結:https://blog.csdn.net/shujuelin/article/details/89351157

2. 恢復作業 checkpoint

檢查點(checkpoint)的目錄是依賴JobID的,每次執行任務都是一個唯一的JobID(好像不能手動設定),所以要找到上一次任務的JobID才能找到檢查點。
儲存點(savepoint)需要手動觸發,而且在指定目錄下還生成一個唯一的子目錄。

# savepoint
flink run -s /tmp/state.backend/s1/savepoint-17b840-2cfe3bd5bc0c -c flink.HelloWorld target/scala-flink-0.1.jar

# checkpoint
flink run -s /tmp/state.backend/17b840a3d2221b1400ec03f7e3949b17/chk-960 -c flink.HelloWorld target/scala-flink-0.1.jar

檢查點和儲存點的恢復方法一樣的

3. 用流處理批資料,最後一個視窗不計算

  • 現象

    用流處理,處理kafka裡面的資料時,最後一個視窗會不關閉.導致最後的資料會丟失.

  • 原因

    最後一個視窗的水位線還沒到 視窗關閉時間.

  • 解決方案

    自定義觸發器.以機器時鐘為準,5秒觸發一次.

5. flink 消費kafka的多個topic

  1. 傳入 List topics , kafka 支援 多個topic.

  2. 多個kafka消費,然後用union 連線.

8.Flink state 調優跟注意點

https://blog.csdn.net/qq_31866793/article/details/97272103

9 Flink1.8.0重大更新-Flink中State的自動清除詳解

https://blog.csdn.net/u013411339/article/details/90625604

10. 記憶體溢位

  • 現象

    yang gc 時間達到30秒,fullgc 很少發生.

11 linux 記憶體過多

執行sync將dirty的內容寫回硬碟
sync

通過修改proc系統的drop_caches清理free的cache
echo 1 > /proc/sys/vm/drop_caches

13 ask timeout

增加引數

akka.ask.timeout: 100s
web.timeout: 300000

參看:https://www.cnblogs.com/createweb/p/12027737.html

14 Container exited with a non-zero exit code 143

at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:343)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

16 Flink 清理過期 Checkpoint 目錄的正確姿勢

https://www.jianshu.com/p/165a1bf33e4a

17 flink 記憶體越來越大,越來越慢

將視窗滑動時間由1分鐘改為10分鐘

18. flink 與 kafka

consumer.setStartFromEarliest();     //從最早的資料開始消費
consumer.setStartFromLatest();       //從最新的資料開始消費
consumer.setStartFromTimestamp(...); //從根據指定的時間戳(ms)處開始消費
consumer.setStartFromGroupOffsets(); //預設從提交的 offset 開始消費

反序列化用 KafkaDeserializationSchema 可以獲取到topic的資訊

public class ConsumerRecord<K, V> {
    private final String topic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final TimestampType timestampType;
    private final long checksum;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final K key;
    private final V value;
}

21 叢集啟動

./hadoop-daemon.sh start journalnode
./hadoop-daemon.sh start zkfc
./hadoop-daemon.sh start datanode
./hadoop-daemon.sh start namenode
./yarn-daemon.sh start nodemanager
./yarn-daemon.sh start resourcemanager

./bin/kafka-server-start.sh -daemon ./config/server.properties
./zkServer.sh start




21 flink Reduce、GroupReduce、GroupCombine筆記

reduce

應用於分組DataSet的Reduce轉換使用使用者定義的reduce函式將每個組減少為單個元素。對於每組輸入元素,reduce函式連續地將元素對組合成一個元素,直到每個組只剩下一個元素。

注意,對於ReduceFunction,返回物件的key欄位應與輸入值匹配。這是因為reduce是可隱式組合(combine)的,並且從combine運算子發出的物件在傳遞給reduce運算子時再次按key分組。

GroupReduce

應用於分組DataSet的GroupReduce呼叫使用者定義的group-reduce函式轉換每個分組。
這與Reduce的區別在於使用者定義的函式會立即獲得整個組。在組的所有元素上使用Iterable呼叫該函式,並且可以返回任意數量的結果元素

GroupCombine 分組連線 (少用)

該策略可能不會一次處理所有資料,而是以多個步驟處理

GroupCombine轉換是可組合GroupReduceFunction中組合步驟的通用形式。它在某種意義上被概括為允許將輸入型別I組合到任意輸出型別O.
相反,GroupReduce中的組合步驟僅允許從輸入型別I到輸出型別I的組合。這是因為reduce步驟中,GroupReduceFunction期望輸入型別為I.

在一些應用中,期望在執行附加變換(例如,減小資料大小)之前將DataSet組合成中間格式。這可以通過CombineGroup轉換能以非常低的成本實現。

注意:分組資料集上的GroupCombine在記憶體中使用貪婪策略執行,該策略可能不會一次處理所有資料,而是以多個步驟處理。
它也可以在各個分割槽上執行,而無需像GroupReduce轉換那樣進行資料交換。這可能會導致輸出的是部分結果,
所以GroupCombine是不能替代GroupReduce操作的,儘管它們的操作內容可能看起來都一樣。

22flink 歷史伺服器

修改歷史伺服器配置

org.apache.flink.configuration.HistoryServerOptions

historyserver.web.tmpdir  檔案地址.

23 Could not deploy Yarn job cluster

新增:

flink-conf.yaml:rest.port: 8082

24 Flink:Could not forward element to next operator

前後時間視窗不一致導致的.

25flink報錯org.apache.commons.cli.Option.builder

刪除$FLINK_HOME/lib下面的/commons-cli-1.4.jar

26 Flink中的序列化失敗問題

宣告為@transent

27 Line could not be encoded

Caused by: java.lang.RuntimeException: Line could not be encoded: [49, 56, 49, 49, 90, 77, 119, 66, 54, 48, 54, 71, 48, 53, 55, 50, 48, 49, 53, 48, 56, 48, 53, 49, 56, 52, 52, 48, 56, 109, 49, 106, 124, -26, -84, -94, -24, -65, -114, -28, -67, -65, -25]
	at org.apache.flink.api.java.io.TextValueInputFormat.readRecord(TextValueInputFormat.java:127)
	at org.apache.flink.api.java.io.TextValueInputFormat.readRecord(TextValueInputFormat.java:38)
	at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:195)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.charset.MalformedInputException: Input length = 1
	at java.nio.charset.CoderResult.throwException(CoderResult.java:281)
	at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:816)
	at org.apache.flink.api.java.io.TextValueInputFormat.readRecord(TextValueInputFormat.java:117)
	... 6 more

解決方案:

Configuration conf = new Configuration();
conf.setBoolean("recursive.file.enumeration", true);
TextValueInputFormat inputFormat = new TextValueInputFormat(new Path(path));
inputFormat.setSkipInvalidLines(true);

28 Embedded metastore is not allowed

解決方案:flink 整合 hive 時 不支援embedded metastore的,配置hive時 需要起一個hive metastore 並在conf檔案配置 hive.metastore.uris

29 flink實戰--開發中常見的錯誤與問題

https://blog.csdn.net/aa518189/article/details/103622261

30 Exceeded checkpoint tolerable failure threshold.

重啟