Flink開發中的問題
1. 流與批處理的區別
- 流處理系統
流處理系統,其節點間資料傳輸的標準模型是:當一條資料被處理完成後,序列化到快取中,然後立刻通過網路傳輸到下一個節點,由下一個節點繼續處理。
- 批處理系統
批處理系統,其節點間資料傳輸的標準模型是:當一條資料被處理完成後,序列化到快取中,並不會立刻通過網路傳輸到下一個節點,當快取寫滿,就持久化到本地硬碟上,當所有資料都被處理完成後,才開始將處理後的資料通過網路傳輸到下一個節點。
- flink的流處理和批處理
Flink的執行引擎採用了一種十分靈活的方式,同時支援了這兩種資料傳輸模型:
• Flink以固定的快取塊為單位進行網路資料傳輸,使用者可以通過設定快取塊超時值指定快取塊的傳輸時機。如果快取塊的超時值為0,則Flink的資料傳輸方式類似上文所提到流處理系統的標準模型,此時系統可以獲得最低的處理延遲
• 如果快取塊的超時值為無限大,則Flink的資料傳輸方式類似上文所提到批處理系統的標準模型,此時系統可以獲得最高的吞吐量
• 同時快取塊的超時值也可以設定為0到無限大之間的任意值。快取塊的超時閾值越小,則Flink流處理執行引擎的資料處理延遲越低,但吞吐量也會降低,反之亦然。通過調整快取塊的超時閾值,使用者可根據需求靈活地權衡系統延遲和吞吐量
原文連結:https://blog.csdn.net/shujuelin/article/details/89351157
2. 恢復作業 checkpoint
檢查點(checkpoint)的目錄是依賴JobID的,每次執行任務都是一個唯一的JobID(好像不能手動設定),所以要找到上一次任務的JobID才能找到檢查點。
儲存點(savepoint)需要手動觸發,而且在指定目錄下還生成一個唯一的子目錄。
# savepoint flink run -s /tmp/state.backend/s1/savepoint-17b840-2cfe3bd5bc0c -c flink.HelloWorld target/scala-flink-0.1.jar # checkpoint flink run -s /tmp/state.backend/17b840a3d2221b1400ec03f7e3949b17/chk-960 -c flink.HelloWorld target/scala-flink-0.1.jar
檢查點和儲存點的恢復方法一樣的
3. 用流處理批資料,最後一個視窗不計算
-
現象
用流處理,處理kafka裡面的資料時,最後一個視窗會不關閉.導致最後的資料會丟失.
-
原因
最後一個視窗的水位線還沒到 視窗關閉時間.
-
解決方案
自定義觸發器.以機器時鐘為準,5秒觸發一次.
5. flink 消費kafka的多個topic
-
傳入 List
topics , kafka 支援 多個topic. -
多個kafka消費,然後用union 連線.
8.Flink state 調優跟注意點
https://blog.csdn.net/qq_31866793/article/details/97272103
9 Flink1.8.0重大更新-Flink中State的自動清除詳解
https://blog.csdn.net/u013411339/article/details/90625604
10. 記憶體溢位
-
現象
yang gc 時間達到30秒,fullgc 很少發生.
11 linux 記憶體過多
執行sync將dirty的內容寫回硬碟
sync
通過修改proc系統的drop_caches清理free的cache
echo 1 > /proc/sys/vm/drop_caches
13 ask timeout
增加引數
akka.ask.timeout: 100s
web.timeout: 300000
參看:https://www.cnblogs.com/createweb/p/12027737.html
14 Container exited with a non-zero exit code 143
at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:343)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
16 Flink 清理過期 Checkpoint 目錄的正確姿勢
https://www.jianshu.com/p/165a1bf33e4a
17 flink 記憶體越來越大,越來越慢
將視窗滑動時間由1分鐘改為10分鐘
18. flink 與 kafka
consumer.setStartFromEarliest(); //從最早的資料開始消費
consumer.setStartFromLatest(); //從最新的資料開始消費
consumer.setStartFromTimestamp(...); //從根據指定的時間戳(ms)處開始消費
consumer.setStartFromGroupOffsets(); //預設從提交的 offset 開始消費
反序列化用 KafkaDeserializationSchema 可以獲取到topic的資訊
public class ConsumerRecord<K, V> {
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final long checksum;
private final int serializedKeySize;
private final int serializedValueSize;
private final K key;
private final V value;
}
21 叢集啟動
./hadoop-daemon.sh start journalnode
./hadoop-daemon.sh start zkfc
./hadoop-daemon.sh start datanode
./hadoop-daemon.sh start namenode
./yarn-daemon.sh start nodemanager
./yarn-daemon.sh start resourcemanager
./bin/kafka-server-start.sh -daemon ./config/server.properties
./zkServer.sh start
21 flink Reduce、GroupReduce、GroupCombine筆記
reduce
應用於分組DataSet的Reduce轉換使用使用者定義的reduce函式將每個組減少為單個元素。對於每組輸入元素,reduce函式連續地將元素對組合成一個元素,直到每個組只剩下一個元素。
注意,對於ReduceFunction,返回物件的key欄位應與輸入值匹配。這是因為reduce是可隱式組合(combine)的,並且從combine運算子發出的物件在傳遞給reduce運算子時再次按key分組。
GroupReduce
應用於分組DataSet的GroupReduce呼叫使用者定義的group-reduce函式轉換每個分組。
這與Reduce的區別在於使用者定義的函式會立即獲得整個組。在組的所有元素上使用Iterable呼叫該函式,並且可以返回任意數量的結果元素
GroupCombine 分組連線 (少用)
該策略可能不會一次處理所有資料,而是以多個步驟處理
GroupCombine轉換是可組合GroupReduceFunction中組合步驟的通用形式。它在某種意義上被概括為允許將輸入型別I組合到任意輸出型別O.
相反,GroupReduce中的組合步驟僅允許從輸入型別I到輸出型別I的組合。這是因為reduce步驟中,GroupReduceFunction期望輸入型別為I.
在一些應用中,期望在執行附加變換(例如,減小資料大小)之前將DataSet組合成中間格式。這可以通過CombineGroup轉換能以非常低的成本實現。
注意:分組資料集上的GroupCombine在記憶體中使用貪婪策略執行,該策略可能不會一次處理所有資料,而是以多個步驟處理。
它也可以在各個分割槽上執行,而無需像GroupReduce轉換那樣進行資料交換。這可能會導致輸出的是部分結果,
所以GroupCombine是不能替代GroupReduce操作的,儘管它們的操作內容可能看起來都一樣。
22flink 歷史伺服器
修改歷史伺服器配置
org.apache.flink.configuration.HistoryServerOptions
historyserver.web.tmpdir 檔案地址.
23 Could not deploy Yarn job cluster
新增:
flink-conf.yaml:rest.port: 8082
24 Flink:Could not forward element to next operator
前後時間視窗不一致導致的.
25flink報錯org.apache.commons.cli.Option.builder
刪除$FLINK_HOME/lib下面的/commons-cli-1.4.jar
26 Flink中的序列化失敗問題
宣告為@transent
27 Line could not be encoded
Caused by: java.lang.RuntimeException: Line could not be encoded: [49, 56, 49, 49, 90, 77, 119, 66, 54, 48, 54, 71, 48, 53, 55, 50, 48, 49, 53, 48, 56, 48, 53, 49, 56, 52, 52, 48, 56, 109, 49, 106, 124, -26, -84, -94, -24, -65, -114, -28, -67, -65, -25]
at org.apache.flink.api.java.io.TextValueInputFormat.readRecord(TextValueInputFormat.java:127)
at org.apache.flink.api.java.io.TextValueInputFormat.readRecord(TextValueInputFormat.java:38)
at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:195)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.charset.MalformedInputException: Input length = 1
at java.nio.charset.CoderResult.throwException(CoderResult.java:281)
at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:816)
at org.apache.flink.api.java.io.TextValueInputFormat.readRecord(TextValueInputFormat.java:117)
... 6 more
解決方案:
Configuration conf = new Configuration();
conf.setBoolean("recursive.file.enumeration", true);
TextValueInputFormat inputFormat = new TextValueInputFormat(new Path(path));
inputFormat.setSkipInvalidLines(true);
28 Embedded metastore is not allowed
解決方案:flink 整合 hive 時 不支援embedded metastore的,配置hive時 需要起一個hive metastore 並在conf檔案配置 hive.metastore.uris
29 flink實戰--開發中常見的錯誤與問題
https://blog.csdn.net/aa518189/article/details/103622261
30 Exceeded checkpoint tolerable failure threshold.
重啟