1. 程式人生 > 實用技巧 >Flink 引數配置和常見引數調優

Flink 引數配置和常見引數調優

1、Flink引數配置

  • jobmanger.rpc.address:jobmanger的地址
  • jobmanger.rpc.port:jobmanger的埠
  • jobmanager.heap.mb:jobmanager的堆記憶體大小。不建議配的太大,1-2G足夠。
  • taskmanager.heap.mb:taskmanager的堆記憶體大小。大小視任務量而定。需要儲存任務的中間值,網路快取,使用者資料等。
  • taskmanager.numberOfTaskSlots:slot數量。
    • 在yarn模式使用的時候會受到yarn.scheduler.maximum-allocation-vcores
      值的影響。
    • 此處指定的slot數量如果超過yarn的maximum-allocation-vcores,flink啟動會報錯。
    • 在yarn模式,flink啟動的task manager個數可以參照如下計算公式:num_of_manager = ceil(parallelism / slot) 即並行度除以slot個數,結果向上取整。
  • parallelsm.default:任務預設並行度,如果任務未指定並行度,將採用此設定。
  • web.port: Flink web ui的埠號。
  • jobmanager.archive.fs.dir: 將已完成的任務歸檔儲存的目錄。
  • history.web.port: 基於web的history server的埠號。
  • historyserver.archive.fs.dir:history server的歸檔目錄。該配置必須包含jobmanager.archive.fs.dir配置的目錄,以便history server能夠讀取到已完成的任務資訊
  • historyserver.archive.fs.refresh-interval: 重新整理存檔作業目錄時間間隔
  • state.backend: 儲存和檢查點的後臺儲存。可選值為rocksdb、filesystem、hdfs。
  • state.backend.fs.checkpointdir:檢查點資料檔案和元資料的預設目錄。
  • state.checkpoints.dir:儲存檢查點的目錄
  • state.savepoints.dir:save point的目錄
  • state.checkpoints.num-retained:保留最近檢查點的數量
  • state.backend.incremental:增量儲存
  • akka.ask.timeout:jobmanager和task manager通訊連線的超時時間。如果網路擁擠經常出現超時錯誤,可以增大該配置
  • akka.watch.heartbeat.interval:心跳傳送間隔,用來檢測task manager的狀態
  • akka.watch.heartbeat.pause:如果超過該時間仍未收到task manager的心跳,該task manager會被認為已掛掉
  • taskmanager.network.memory.max:網路快取區最大記憶體大小
  • taskmanager.network.memory.min:網路快取區最小記憶體大小
  • taskmanager.network.memory.fraction:網路緩衝區使用的記憶體佔用總JVM記憶體的比例。如果配置了taskmanager.network.memory.max和taskmanager.network.memory.min的配置會被覆蓋
  • fs.hdfs.hadoopconf:hadoop配置檔案路徑(已被廢棄,建議使用HADOOP_CONF_DIR環境變數)
  • yarn.application-attempts:job失敗嘗試次數,指jobmanager的重啟嘗試次數。該指不應該超過yarn-site.xml中的yarn.resourcemanager.am.max-attempts的值

2、Flink HA(Job Manager)的配置

  • high-availability:zookeeper

使用zookeeper負責HA實現

  • high-availability.zookeeper.path.root:/flink

flink資訊在zookeeper儲存節點的名稱

  • high-availability.zookeeper.quorum:hadoop100:2181,hadoop101:2181,hadoop103:2181

zk叢集節點的地址和埠

  • high-availability.storageDir:hdfs://nameservice/flink/ha/

job manager元資料在檔案系統儲存的位置,zk僅儲存了指向該目錄的指標

3、Flink metrics監控相關配置

  • metrics.reporters: prom
  • metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
  • metrics.reporter.prom.port: 9250-9260

4、Kafka相關調優配置

linger.ms/batch.size:這兩個配置項配合使用,可以在吞吐量和延遲中得到最佳的平衡點。

batch.size是kafka producer傳送資料的批量大小,當資料量達到batch size的時候,會將這批資料傳送出去,避免了資料一條一條的傳送,頻繁建立和斷開網路連線。但是如果資料量比較小,導致遲遲不能達到batch.size,為了保證延遲不會過大,kafka不能無限等待資料量達到batch.size的時候才傳送。為了解決這個問題,引入了linger.ms配置項。當資料在快取中的時間超過linger.ms時,無論快取中資料是否達到批量大小,都會被強制傳送出去

kafka topic分割槽數和Flink並行度的關係

Flink Kafka source的並行度需要和kafka topic的分割槽數一致。最大化利用kafka多分割槽topic的並行讀取能力

5、Yarn相關調優配置

  • yarn.scheduler.maximum-allocation-vcores
  • yarn.scheduler.minimum-allocation-vcores

Flink單個Task Manager的slot數量必須結餘這兩個值之間

Flink的Job Manager和Task Manager記憶體不得超過container最大分配記憶體大小

  • yarn.nodemanager.resource.cpu-vcores

yarn的虛擬cpu核心數,設定為物理機CPU核心數的2-3倍。會導致CPU資源無法被充分利用,跑任務的時候CPU佔用率不高。