Flink 引數配置和常見引數調優
阿新 • • 發佈:2020-08-11
1、Flink引數配置
- jobmanger.rpc.address:jobmanger的地址
- jobmanger.rpc.port:jobmanger的埠
- jobmanager.heap.mb:jobmanager的堆記憶體大小。不建議配的太大,1-2G足夠。
- taskmanager.heap.mb:taskmanager的堆記憶體大小。大小視任務量而定。需要儲存任務的中間值,網路快取,使用者資料等。
- taskmanager.numberOfTaskSlots:slot數量。
- 在yarn模式使用的時候會受到
yarn.scheduler.maximum-allocation-vcores
- 此處指定的slot數量如果超過yarn的maximum-allocation-vcores,flink啟動會報錯。
- 在yarn模式,flink啟動的task manager個數可以參照如下計算公式:
num_of_manager = ceil(parallelism / slot)
即並行度除以slot個數,結果向上取整。
- 在yarn模式使用的時候會受到
- parallelsm.default:任務預設並行度,如果任務未指定並行度,將採用此設定。
- web.port: Flink web ui的埠號。
- jobmanager.archive.fs.dir: 將已完成的任務歸檔儲存的目錄。
- history.web.port: 基於web的history server的埠號。
- historyserver.archive.fs.dir:history server的歸檔目錄。該配置必須包含jobmanager.archive.fs.dir配置的目錄,以便history server能夠讀取到已完成的任務資訊
- historyserver.archive.fs.refresh-interval: 重新整理存檔作業目錄時間間隔
- state.backend: 儲存和檢查點的後臺儲存。可選值為rocksdb、filesystem、hdfs。
- state.backend.fs.checkpointdir:檢查點資料檔案和元資料的預設目錄。
- state.checkpoints.dir:儲存檢查點的目錄
- state.savepoints.dir:save point的目錄
- state.checkpoints.num-retained:保留最近檢查點的數量
- state.backend.incremental:增量儲存
- akka.ask.timeout:jobmanager和task manager通訊連線的超時時間。如果網路擁擠經常出現超時錯誤,可以增大該配置
- akka.watch.heartbeat.interval:心跳傳送間隔,用來檢測task manager的狀態
- akka.watch.heartbeat.pause:如果超過該時間仍未收到task manager的心跳,該task manager會被認為已掛掉
- taskmanager.network.memory.max:網路快取區最大記憶體大小
- taskmanager.network.memory.min:網路快取區最小記憶體大小
- taskmanager.network.memory.fraction:網路緩衝區使用的記憶體佔用總JVM記憶體的比例。如果配置了taskmanager.network.memory.max和taskmanager.network.memory.min的配置會被覆蓋
- fs.hdfs.hadoopconf:hadoop配置檔案路徑(已被廢棄,建議使用HADOOP_CONF_DIR環境變數)
- yarn.application-attempts:job失敗嘗試次數,指jobmanager的重啟嘗試次數。該指不應該超過
yarn-site.xml
中的yarn.resourcemanager.am.max-attempts
的值
2、Flink HA(Job Manager)的配置
- high-availability:zookeeper
使用zookeeper負責HA實現
- high-availability.zookeeper.path.root:/flink
flink資訊在zookeeper儲存節點的名稱
- high-availability.zookeeper.quorum:hadoop100:2181,hadoop101:2181,hadoop103:2181
zk叢集節點的地址和埠
- high-availability.storageDir:hdfs://nameservice/flink/ha/
job manager元資料在檔案系統儲存的位置,zk僅儲存了指向該目錄的指標
3、Flink metrics監控相關配置
- metrics.reporters: prom
- metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
- metrics.reporter.prom.port: 9250-9260
4、Kafka相關調優配置
linger.ms/batch.size:這兩個配置項配合使用,可以在吞吐量和延遲中得到最佳的平衡點。
batch.size是kafka producer傳送資料的批量大小,當資料量達到batch size的時候,會將這批資料傳送出去,避免了資料一條一條的傳送,頻繁建立和斷開網路連線。但是如果資料量比較小,導致遲遲不能達到batch.size,為了保證延遲不會過大,kafka不能無限等待資料量達到batch.size的時候才傳送。為了解決這個問題,引入了linger.ms配置項。當資料在快取中的時間超過linger.ms時,無論快取中資料是否達到批量大小,都會被強制傳送出去
kafka topic分割槽數和Flink並行度的關係
Flink Kafka source的並行度需要和kafka topic的分割槽數一致。最大化利用kafka多分割槽topic的並行讀取能力
5、Yarn相關調優配置
- yarn.scheduler.maximum-allocation-vcores
- yarn.scheduler.minimum-allocation-vcores
Flink單個Task Manager的slot數量必須結餘這兩個值之間
Flink的Job Manager和Task Manager記憶體不得超過container最大分配記憶體大小
- yarn.nodemanager.resource.cpu-vcores
yarn的虛擬cpu核心數,設定為物理機CPU核心數的2-3倍。會導致CPU資源無法被充分利用,跑任務的時候CPU佔用率不高。