1. 程式人生 > 其它 >Stream Processing with Apache Flink中文版-- 第10章 操作Flink和流式應用程式

Stream Processing with Apache Flink中文版-- 第10章 操作Flink和流式應用程式

流處理應用程式是長時間執行的,它們的工作負載通常是不可預測的。連續執行數月的流作業並不少見,因此其操作需求與短期批處理作業的操作需求非常不同。考慮這樣一個場景:您在部署的應用程式中檢測到一個bug。如果您的應用程式是批處理作業,那麼您可以輕鬆地在離線狀態下修復錯誤,然後在當前作業例項完成後重新部署新的應用程式程式碼。但是,如果您的作業是長時間執行的流作業呢?如何在保證正確性的同時,輕鬆地應用重構?

如果您正在使用Flink,就沒有什麼好擔心的。Flink將完成所有繁重的工作,因此您可以輕鬆地監視、操作和重新配置您的作業,而只需付出最小的努力,同時保留精確一次狀態語義。在本章中,我們將介紹Flink提供的用於操作和維護連續執行的流應用程式的工具。我們將向您展示如何收集指標和監控您的應用程式,以及當您想要更新應用程式程式碼或調整應用程式的資源時,如何保持結果一致性。

執行和管理流應用程式

如您所料,維護流應用程式比維護批處理應用程式更具挑戰性。雖然流應用程式是有狀態的並持續執行,但批處理應用程式是定期執行的。可以在執行之間對批處理應用程式進行重新配置、擴充套件或更新,這比升級不斷讀取、處理和傳送資料的應用程式要容易得多。

但是,Apache Flink有許多特性可以顯著地簡化流應用程式的維護。這些特性中的大多數都是基於儲存點的。Flink公開以下介面來監視和控制其master程序和worker程序以及應用程式:

  1. 命令列客戶端是用於提交和控制應用程式的工具。

  2. REST API是命令列客戶端和Web UI使用的底層介面。它可以被使用者和指令碼訪問,並提供對所有系統和應用程式指標以及提交和管理應用程式的端點的訪問。

  3. Web UI是一個Web介面,它提供關於Flink叢集和正在執行的應用程式的詳細資訊和指標。它還提供了提交和管理應用程式的基本功能。Web UI在“Flink Web UI”中進行了描述。

在本節中,我們將解釋儲存點的實踐層面,並討論如何使用Flink的命令列客戶端和Flink的REST API啟動、停止、暫停和恢復、擴充套件和升級有狀態流應用程式。

Savepoints(儲存點)

儲存點(savepoint)基本上與檢查點(checkpoint)相同——它是應用程式狀態的一致且完整的快照。然而,檢查點和儲存點的生命週期是不同的。檢查點會自動建立,在出現故障時載入,並通過Flink自動刪除(取決於應用程式的配置)。此外,當應用程式被取消時,檢查點將被自動刪除,除非應用程式顯式地啟用檢查點保留。相反,儲存點必須由使用者或外部服務手動觸發,並且不會被Flink自動刪除。

儲存點是持久化資料儲存中的目錄。它由一個子目錄和一個二進位制元資料檔案組成,子目錄儲存包含所有任務狀態的資料檔案,而二進位制元資料檔案包含所有資料檔案的絕對路徑。因為元資料檔案中的路徑是絕對的,所以將儲存點移動到不同的路徑將使其不可用。下面是儲存點的結構:

# Savepoint root path/savepoints/# Path of a particular savepoint/savepoints/savepoint-:shortjobid-:savepointid/# Binary metadata file of a savepoint/savepoints/savepoint-:shortjobid-:savepointid/_metadata# Checkpointed operator states/savepoints/savepoint-:shortjobid-:savepointid/:xxx

使用命令列客戶端管理應用程式

Flink的命令列客戶端提供啟動、停止和管理Flink應用程式的功能。它從./conf/flink-conf.yaml檔案中讀取配置(參見“系統配置”章節)。可以從Flink安裝目錄中呼叫命令./bin/flink。

在沒有附加引數的情況下執行時,客戶端列印一條幫助訊息。

                WINDOWS上的命令列客戶端
命令列客戶端基於bash指令碼。因此,它不能與Windows命令列一起工作。用於Windows命令列的./bin/flink.bat指令碼只提供非常有限的功能。如果您是Windows使用者,我們建議您使用常規的命令列客戶端並在WSL或Cygwin上執行它。

啟動一個應用程式

你可以使用命令列客戶端的run命令,啟動一個應用程式:

./bin/flink run ~/myApp.jar

上面的命令從類的main()方法啟動應用程式,這個方法在JAR檔案的META-INF/MANIFEST的program-class屬性中引用,不向應用程式傳遞任何引數。客戶端將JAR檔案提交給master程序,master程序將其分發給worker節點。

你可以把引數傳遞給一個應用程式的main()方法,方法是在命令後面附加引數:

./bin/flink run ~/myApp.jar my-arg1 my-arg2 my-arg3

預設情況下,客戶端在提交應用程式後不返回,而是等待應用程式終止。你可以使用-d標誌以分離(detached)模式提交申請,如下圖所示:

./bin/flink run -d ~/myApp.jar

客戶端返回並列印提交的作業的JobID,而不是等待應用程式終止。JobID用於在採取儲存點、取消或重新調整應用程式時指定job。你可以使用-p標誌指定一個應用程式的預設並行度。

./bin/flink run -p 16 ~/myApp.jar

上面的命令將執行環境的預設並行度設定為16。應用程式的原始碼顯式指定的並行度設定會覆蓋執行環境的預設並行度--—通過呼叫StreamExecutionEnvironment上的setParallelism()或操作符定義的並行性優先於預設值。

如果您的應用程式JAR檔案的清單檔案沒有指定一個入口類,您可以使用-c引數指定這個類:

./bin/flink run -c my.app.MainClass ~/myApp.jar

此時客戶端會啟動my.app.MainClass類的main方法。

預設情況下,客戶端向./conf/flink-conf.yaml檔案指定的Flink master伺服器提交一個應用程式(參見“系統配置”中的不同設定)。您可以使用-m標誌向特定的master程序提交應用程式:

./bin/flink run -m myMasterHost:9876 ~/myApp.jar

此命令將應用程式提交給執行在主機myMasterHost,埠9876上的master。

注意,如果您第一次啟動應用程式,或者沒有提供儲存點或檢查點來初始化狀態,則應用程式的狀態將為空。在這種情況下,一些有狀態操作符執行特殊的邏輯來初始化它們的狀態。例如,如果沒有可用的恢復讀位置,Kafka源需要選擇所消費Topic的分割槽偏移量。

列出執行中的應用程式

對於您希望應用於正在執行的作業的所有操作,您需要提供標識應用程式的JobID。JobID可以從Web UI、REST API或使用命令列客戶端獲得。客戶端可以打印出一個所有執行作業的列表,包括他們的jobid,執行以下命令:

./bin/flink list -rWaiting for response...------------------ Running/Restarting Jobs ------------------- 17.10.2018 21:13:14 :bc0b2ad61ecd4a615d92ce25390f61ad :Socket Window WordCount (RUNNING)-------------------------------------------------------------

在本例中,JobID是bc0b2ad61ecd4a615d92ce25390f61ad。

設定和取消儲存點

使用命令列客戶端,可以為一個執行的應用程式,設定一個儲存點,命令如下:

./bin/flink savepoint <jobId> [savepointPath]

該命令使用提供的JobID為作業觸發儲存點。如果顯式指定儲存點路徑,則將其儲存在提供的目錄中。否則,使用flink-conf.yaml檔案中配置的預設儲存點目錄。

觸發作業bc0b2ad61ecd4a615d92ce25390f61ad的儲存點,並將其儲存在目錄hdfs:///xxx:50070/savepoints中,我們使用命令列客戶端:

./bin/flink savepoint bc0b2ad61ecd4a615d92ce25390f61ad hdfs:///xxx:50070/savepointsTriggering savepoint for jobbc0b2ad61ecd4a615d92ce25390f61ad.Waiting for response...Savepoint completed.Path: hdfs:///xxx:50070/savepoints/savepoint-bc0b2a-63cf5d5ccef8You can resume your program from this savepoint with the run command.

儲存點會佔用大量空間,不會被Flink自動刪除。您需要手動刪除它們以釋放消耗的儲存空間。使用以下命令刪除儲存點:

./bin/flink savepoint -d <savepointPath>

例如,刪除我們前面建立的儲存點,呼叫命令:

./bin/flink savepoint -d hdfs:///xxx:50070/savepoints/savepoint-bc0b2a-63cf5d5ccef8Disposing savepoint 'hdfs:///xxx:50070/savepoints/savepointbc0b2a-63cf5d5ccef8'.Waiting for response...Savepoint 'hdfs:///xxx:50070/savepoints/savepoint-bc0b2a-63cf5d5ccef8' disposed.

刪除儲存點

在下一個檢查點或儲存點完成之前,你不能刪除當前儲存點。因為儲存點是由系統處理,類似於普通檢查點,操作符也接收檢查點完成通知,完成儲存點,並在儲存點上採取操作。例如,事務sinks 在儲存點完成時,提交修改到外部系統。為了保證精確一次輸出,Flink必須從最新完成檢查點或儲存點恢復。如果Flink試圖從一個被刪除的儲存點恢復,故障恢復將會失敗。一旦另一個檢查點(或儲存點)完成後,您可以安全地刪除一個儲存點。

取消執行應用程式

可以通過兩種方式取消應用程式:以採用儲存點的方式取消,或以不採用儲存點的方式取消。要取消正在執行的應用程式而不採用儲存點,執行以下命令:

./bin/flink cancel <jobId>

為了在取消正在執行的應用程式之前採取一個儲存點,將-s標誌新增到cancel命令:

./bin/flink cancel -s [savepointPath] <jobId>

如果不指定儲存點路徑,則使用./conf/flink-conf.yaml檔案中配置的預設儲存點目錄(參見“系統配置”)。如果在命令中未顯式指定儲存點資料夾,也無法從配置中獲得該資料夾,則該命令將失敗。若要使用JobID bc0b2ad61ecd4a615d92ce25390f61ad取消應用程式,並將儲存點儲存在hdfs:///xxx:50070/savepoints,請執行以下命令:

./bin/flink cancel -s hdfs:///xxx:50070/savepoints d5fdaff43022954f5f02fcd8f25ef855Cancelling job bc0b2ad61ecd4a615d92ce25390f61adwith savepoint to hdfs:///xxx:50070/savepoints.Cancelled job bc0b2ad61ecd4a615d92ce25390f61ad.Savepoint stored in hdfs:///xxx:50070/savepoints/savepointbc0b2a-d08de07fbb10.

取消應用程式可能會失敗

注意,如果採用儲存點失敗,作業將繼續執行。你需要再試一次取消應用程式。

從儲存點啟動應用程式

從儲存點啟動應用程式相當簡單。你所要做的就是用run命令啟動一個應用程式,另外用-s選項提供一個儲存點的路徑:

./bin/flink run -s <savepointPath> [options] <jobJar> [arguments]

當作業啟動時,Flink將儲存點的各個狀態快照與啟動的應用程式的所有狀態匹配。這種匹配分兩個步驟完成。首先,Flink比較儲存點的唯一操作符識別符號和應用程式的操作符。其次,它為每個操作符匹配儲存點和應用程式的狀態識別符號(有關詳細資訊,請參閱“儲存點”)。

應該定義唯一的操作符ids

如果您沒有使用uid()方法為操作符分配惟一的id,那麼Flink將分配預設識別符號,這些識別符號是依賴於操作符的型別和所有先前操作符的雜湊值。由於不可能更改儲存點中的識別符號,所以如果不使用uid()手動分配操作符識別符號,那麼更新和迭代應用程式的可用選項就會減少。

如前所述,應用程式只能在與儲存點相容的情況下從儲存點啟動。未修改的應用程式始終可以從其儲存點重新啟動。但是,如果重新啟動的應用程式與執行儲存點的應用程式不相同,則需要考慮以下三種情況:

  • 如果嚮應用程式新增新狀態或更改有狀態操作符的唯一識別符號,則Flink將在儲存點中找不到相應的狀態快照。在這種情況下,新狀態初始化為空。

  • 如果從應用程式中刪除狀態或更改有狀態操作符的唯一識別符號,則儲存點中存在無法與應用程式匹配的狀態。在這種情況下,Flink不會啟動應用程式以避免丟失儲存點中的狀態。您可以通過在run命令中新增-n選項來禁用此安全檢查。

  • 如果在應用程式中更改了狀態—更改了狀態原語或修改了狀態的資料型別—則應用程式無法啟動。這意味著您不能輕鬆地在應用程式中演進狀態的資料型別,除非您在設計應用程式時從一開始就考慮了狀態演進。Flink社群目前正在改進對狀態演化的支援。(參見“修改操作符的狀態”。)

縮放應用程式

減少或增加應用程式的並行性並不困難。您需要生成一個儲存點,取消應用程式,然後從儲存點用調整後的並行度重新啟動它。應用程式的狀態會自動重新分配給更多或更少的並行操作符任務。有關如何縮放不同型別的操作符狀態和鍵控狀態的詳細資訊,請參閱“縮放有狀態操作符”。然而,有一些事情需要考慮。

如果您需要精確一次的結果,您應該使用儲存點,並使用savepoint和cancel命令停止應用程式。這將阻止另一個檢查點在儲存點之後完成,儲存點將觸發精確一次的sinks輸出儲存點之後的資料。

正如在“設定並行度”中討論的,可以通過不同的方式指定應用程式及其操作符的並行度。預設情況下,操作符以相關的StreamExecutionEnvironment的預設並行度執行。可以在啟動應用程式時指定預設的並行度(例如,使用CLI客戶端中的-p引數)。如果應用程式中的操作符的並行性依賴於預設的環境並行度,那麼可以通過從相同的JAR檔案啟動應用程式並指定新的並行性來擴充套件它。但是,如果您在StreamExecutionEnvironment或某些操作符上對並行度進行了硬編碼,那麼在提交應用程式執行之前,可能需要調整原始碼並重新編譯和重新打包應用程式。

如果應用程式的並行度取決於環境的預設並行度,那麼Flink提供了一個原子rescale命令,該命令接受一個儲存點,取消應用程式,並使用新的預設並行度重新啟動它:

./bin/flink modify <jobId> -p <newParallelism>

若要使jobId為bc0b2ad61ecd4a615d92ce25390f61ad的應用程式,並行度擴充套件為16,請執行以下命令:

./bin/flink modify bc0b2ad61ecd4a615d92ce25390f61ad -p 16Modify job bc0b2ad61ecd4a615d92ce25390f61ad.Rescaled job bc0b2ad61ecd4a615d92ce25390f61ad. Its new parallelism is 16.

正如在“縮放有狀態操作符”中所描述的,Flink將鍵控狀態分佈在所謂的key組的粒度上。因此,有狀態操作符的key組的數量決定了它的最大並行度。每個操作符使用setMaxParallelism()方法配置key組的數量。(參見“定義鍵控狀態操作符的最大並行度”。)

使用REST API管理應用程式

使用者或指令碼可以直接訪問REST API,檢視關於Flink叢集及其應用程式的資訊,包括提交的應用程式的指標資訊,和控制應用程式。Flink從相同的Web伺服器提供REST API和Web UI,Web伺服器作為Dispatcher程序的一部分執行。預設情況下,兩者都在埠8081上公開。您可以在./conf/flinkconf. yaml檔案中配置一個不同的埠。值-1禁用REST API和Web UI。

與REST API互動的一個常用命令列工具是curl。典型的curl REST命令如下:

curl -X <HTTP-Method> [-d <parameters>] http://hostname:port/v1/<REST-point>

v1表示REST API的版本。Flink 1.7公開了API的第一個版本(v1)。假設您正在執行一個本地Flink,它在埠8081上公開了它的REST API,下面的curl命令向/overview REST提交一個GET請求:

curl -X GET http://localhost:8081/v1/overview

該命令返回關於叢集的一些基本資訊,如Flink版本、正在執行、完成、取消或失敗的TaskManagers、slots和jobs的數量:

{   "taskmanagers":2,   "slots-total":8,   "slots-available":6,   "jobs-running":1,   "jobs-finished":2,   "jobs-cancelled":1,   "jobs-failed":0,   "flink-version":"1.7.1",   "flink-commit":"89eafb4"}

在下面,我們列出並簡要描述最重要的REST呼叫。有關受支援的REST呼叫的完整列表,請參閱Apache Flink的官方文件。“使用命令列客戶端管理應用程式”提供了關於某些操作的更多細節,比如升級或擴充套件應用程式。

管理和監控FLINK叢集

REST API公開端點來查詢關於正在執行的叢集的資訊並將其關閉。表10-1、10-2和10-3顯示了獲取關於Flink叢集的資訊的REST請求,比如任slots的數量、執行和完成的作業、JobManager的配置或所有可連線的TaskManagers列表。

表10-1 獲取基本的叢集資訊的REST請求

RequestGET /overview
Response 關於叢集的基本資訊如前所示

表10-2 獲取JobManager配置的REST請求

RequestGET /jobmanager/config
Response 返回./conf/flinkconf.yaml檔案中定義的JobManager配置

表10-3 列出所有可連線的工作管理員的REST請求

RequestGET /taskmanagers
Response 返回所有TaskManagers的列表,包括它們的IDs和基本資訊(記憶體統計資訊和連線埠)

表10-4顯示的REST請求,列出了為JobManager收集的所有指標。

表10-4 列出可用的JobManager指標的REST請求

RequestGET /jobmanager/metrics
Response 返回JobManager可用的指標列表

為了檢索一個或多個JobManager指標,將帶有所有請求指標的get查詢引數新增到請求中:

curl -X GET http://hostname:port/v1/jobmanager/metrics?get=metric1,metric2

表10-5顯示了REST請求,以列出為TaskManagers收集的所有指標。

表10-5 列出可用的TaskManager指標的REST請求

RequestGET /taskmanagers/<tmId>/metrics
Parameters tmId:可連線的TaskManager ID
Response 返回可用於所選TaskManager的指標列表

要檢索TaskManager的一個或多個指標,請將get查詢引數與所有請求的指標一起新增到請求中:

curl -X GET http://hostname:port/v1/taskmanagers/<tmId>/metrics?get=metric1

還可以使用表10-6所示的REST呼叫來關閉叢集

表10-6 關閉叢集的REST請求

RequestDELETE /cluster
Action

關閉Flink叢集。注意,在standalone模式下,只有master程序將被終止,worker程序將繼續執行。

管理和監控FLINK應用程式

REST API還可以用於管理和監視Flink應用程式。要啟動應用程式,首先需要將應用程式的JAR檔案上傳到叢集。表10-7、10-8和10-9顯示了管理這些JAR檔案的REST請求。

表10-7 上傳JAR檔案的REST請求

RequestPOST /jars/upload
Parameters 檔案必須作為multipart資料傳送
Action 將JAR檔案上載到叢集
Response 上傳的JAR檔案的儲存位置

curl命令上傳一個JAR檔案:

curl -X POST -H "Expect:" -F "jarfile=@path/to/flink-job.jar"  http://hostname:port/v1/jars/upload

表10-8 列出所有上傳的JAR檔案的REST請求

RequestGET /jars
Response 所有上傳的JAR檔案的列表。該列表包括JAR檔案的內部ID、原始名稱和上傳時間。

表10-9 刪除JAR檔案的REST請求

RequestDELETE /jars/<jarId>
Parameters jarId: JAR檔案的內部ID(見表10-8命令)
Action 根據ID刪除JAR檔案

使用REST請求從上傳的JAR檔案啟動應用程式,如表10-10所示。

表10-10 啟動應用程式的REST請求

RequestPOST /jars/<jarId>/run
Parameters jarId:啟動應用程式的JAR檔案的ID。可以將其他引數(如作業引數、入口類、預設並行性、儲存點路徑和allow-nonrestored-state標記)作為JSON物件傳遞。
Action 使用提供的引數啟動JAR檔案(和入口類)定義的應用程式。如果提供了儲存點路徑,則從儲存點初始化應用程式狀態。
Response 啟動的應用程式的jobID

curl命令啟動一個預設並行度為4的應用程式:

curl -d '{"parallelism":"4"}' -X POST http://localhost:8081/v1/jars/43e844ef-382f-45c3-aa2f-00549acd961e_App.jar/run

表10-11、10-12和10-13展示瞭如何使用REST API管理正在執行的應用程式。

表10-11 列出所有應用程式的REST請求

RequestGET /jobs
Response 列出所有正在執行的應用程式的作業id,以及最近失敗、取消和完成的應用程式的作業id。

表10-12 顯示應用程式細節的REST請求

RequestGET /jobs/<jobId>
Parameters jobId: list application命令(表10-11)提供的作業ID
Response 基本統計資訊,如應用程式的名稱、開始時間(和結束時間),以及關於執行任務的資訊,包括接收和輸出的記錄和位元組的數量

REST API還提供了關於應用程式的以下方面的更詳細的資訊:

  • 應用程式的操作計劃

  • 應用程式的配置

  • 在不同層次收集應用程式的度量標準

  • 檢查點指標

  • 反壓指標

  • 導致應用程式失敗的異常

有關如何訪問這些資訊的詳細資訊,請參閱官方文件。

表10-13 取消應用程式的REST請求

RequestPATCH /jobs/<jobId>
Parameters jobId: list application命令(表10-11)提供的作業ID
Action 取消應用程式

您還可以通過表10-14中所示的REST請求對正在執行的應用程式生成儲存點。

表10-14 生成應用程式儲存點的REST請求

RequestPOST /jobs/<jobId>/savepoints
Parameters list應用程式命令提供的作業的ID。此外,您需要提供一個JSON物件,其中包含儲存點資料夾的路徑和一個標誌,該標誌指示是否使用儲存點終止應用程式。
Action 生成應用程式的儲存點
Response 一個請求ID,用於檢查儲存點觸發操作是否成功完成。

curl命令在不取消應用程式的情況下觸發儲存點:

curl -d '{"target-directory":"file:///savepoints", "canceljob":"false"}' -X POSThttp://localhost:8081/v1/jobs/e99cdb41b422631c8ee2218caa6af1cc/savepoints{"request-id":"ebde90836b8b9dc2da90e9e7655f4179"}
取消具有儲存點的應用程式可能會失敗
只有成功地生成了儲存點,取消應用程式的請求才會成功。如果savepoint命令失敗,應用程式將繼續執行。

檢查ID為ebde90836b8b9dc2da90e9e7655f4179的請求是否成功,並檢索儲存點路徑:

curl -X GEThttp://localhost:8081/v1/jobs/e99cdb41b422631c8ee2218caa6af1cc/savepoints/ebde90836b8b9dc2da90e9e7655f4179{"status":{"id":"COMPLETED"}"operation":{"location":"file:///savepoints/savepoint-e99cdb-34410597dec0"}}

要釋放儲存點,請使用表10-15中所示的REST請求

表10-15釋放一個儲存點的REST請求

RequestPOST /savepoint-disposal
Parameters 需要在JSON物件中以引數的形式提供要釋放儲存點的路徑
Action 釋放儲存點
Response 一個請求ID,用於檢查儲存點是否已成功釋放

要使用curl釋放儲存點,請執行:

curl -d '{"savepoint-path":"file:///savepoints/savepointe99cdb-34410597"}'-X POST http://localhost:8081/v1/savepoint-disposal{"request-id":"217a4ffe935ceac2c281bdded76729d6"}

表10-16顯示了擴充套件應用程式的REST請求

表10-16 擴充套件應用程式的REST請求

RequestPATCH /jobs/<jobID>/rescaling
Parameters jobID: list application命令提供的作業ID。此外,還需要將應用程式的新並行度作為URL引數提供。
Action 生成一個儲存點,取消應用程式,並從儲存點以新的預設並行度重新啟動應用程式。
Response 一個請求ID,用於檢查擴充套件應用程式請求是否成功

擴充套件一個應用程式的curl到一個新的預設並行度為16,執行:

curl -X PATCHhttp://localhost:8081/v1/jobs/129ced9aacf1618ebca0ba81a4b222c6/rescaling?parallelism=16{"request-id":"39584c2f742c3594776653f27833e3eb"}

                應用程式可能不會被擴充套件
如果觸發的儲存點失敗,應用程式將繼續以原來的並行度執行。您可以使用請求ID檢查擴充套件請求的狀態。

在容器中繫結和部署應用程式

到目前為止,我們已經解釋瞭如何在執行中的Flink叢集上啟動應用程式。這就是我們所說的部署應用程式的REST請求風格。在“應用程式部署”一節中,我們簡要介紹了另一種選擇—庫模式,它不需要執行中的Flink叢集來提交作業。

在庫(library)模式下,應用程式被繫結到一個Docker映像中,該映像還包含所需的Flink二進位制檔案。映像可以通過兩種方式啟動——作為JobMaster容器或TaskManager容器。當映像作為JobMaster部署時,容器啟動一個Flink master程序,該程序立即獲取繫結的應用程式以啟動它。TaskManager容器在JobMaster上註冊自己,並提供它的處理槽。一旦有足夠的插槽可用,JobMaster容器就會部署應用程式以供執行。

略。

控制任務排程

Flink應用程式是通過將操作符並行化為任務,並將這些任務分佈到叢集中的worker程序中來並行執行的。與許多其他分散式系統一樣,Flink應用程式的效能在很大程度上取決於如何排程任務。分配任務的worker程序、任務之間的協同以及分配給worker程序的任務數量,可能會對應用程式的效能產生重大影響。

在“任務執行”中,我們描述了Flink如何將任務分配給slots,以及如何利用任務鏈來降低本地資料交換的成本。在本節中,我們將討論如何調整預設行為和控制任務鏈,以及如何將任務分配到slots,以提高應用程式的效能。

控制任務連結

任務鏈將兩個或多個操作符的並行任務合併到一個由單個執行緒執行的任務中。合併的任務通過方法呼叫交換記錄,因此基本上沒有通訊成本。由於任務鏈提高了大多數應用程式的效能,所以在Flink中預設啟用了任務鏈。

然而,某些應用程式可能無法從任務連結中獲益。一個原因是為了在不同的處理slots上執行函式,而打破了昂貴的函式鏈。你可以通過StreamExecutionEnvironment完全禁用應用程式的任務鏈:

StreamExecutionEnvironment.disableOperatorChaining()

除了為整個應用程式禁用任務鏈之外,還可以控制各個操作符的任務鏈行為。要禁用特定操作符的連結,可以呼叫它的disableChaining()方法。這將防止操作符的任務被連結到前面和後面的任務(例如10-1)。

val input: DataStream[X] = ...val result: DataStream[Y] = input.filter(new Filter1()).map(new Map1())// disable chaining for Map2.map(new Map2()).disableChaining().filter(new Filter2())

示例10-1中的程式碼產生三個任務—Filter1和Map1的一個連結任務,Map2的一個單獨任務,以及一個不允許連結到Map2的Filter2任務。

也可以通過呼叫它的startNewChain()方法(例如10-2)來使用操作符啟動一個新的鏈。操作符的任務不會連結到前面的任務,但是如果滿足連結的要求,則會連結到後面的任務。

val input: DataStream[X] = ...val result: DataStream[Y] = input  .filter(new Filter1())  .map(new Map1())   // start a new chain for Map2 and Filter2  .map(new Map2()).startNewChain()  .filter(new Filter2())

在示例10-2中,建立了兩個連結的任務:一個任務用於Filter1和Map1,另一個任務用於Map2和Filter2。注意,新連結的任務以操作符開始,在我們的示例中startNewChain()方法在該操作符上被稱為- map2。

定義Slot-Sharing組

Flink的預設任務排程策略將程式的一個完整分片分配給應用程式的每個操作符的一個任務,並將其分配給單個處理slot。根據應用程式的複雜性和操作符的計算成本,這種預設策略可能會使處理slot超載。Flink手動控制任務分配到slot的機制是slot-sharing groups(slot共享組)。

每個操作符都是一個slot共享組的成員。屬於同一slot共享組的操作符的所有任務都由相同的slots處理。在一個slot共享組中,任務被分配到“任務執行”章節中描述的slot中——每個slot最多處理一個成員操作符的任務。因此,一個slot共享組需要儘可能多的處理slot來滿足其操作符的最大並行度。不同slotsharing組中的操作符任務不會由相同的slot執行。

預設情況下,每個操作符都位於“預設”slot共享組中。對於每個操作符,您可以使用slotSharingGroup(String)方法顯式地指定其slotsharing組。如果輸入操作符的所有成員都屬於同一組,則操作符將繼承其輸入操作符的slot共享組。如果輸入操作符在不同的組中,則操作符在“預設”組中。示例10-3展示瞭如何在Flink DataStream應用程式中指定slot-sharing組。

// slot-sharing group "green"val a: DataStream[A] = env.createInput(...)  .slotSharingGroup("green")  .setParallelism(4)val b: DataStream[B] = a.map(...)   // slot-sharing group "green" is inherited from a  .setParallelism(4)// slot-sharing group "yellow"val c: DataStream[C] = env.createInput(...)  .slotSharingGroup("yellow")  .setParallelism(2)// slot-sharing group "blue"val d: DataStream[D] = b.connect(c.broadcast(...)).process(...)  .slotSharingGroup("blue")  .setParallelism(4)val e = d.addSink()   // slot-sharing group "blue" is inherited from d  .setParallelism(2)

示例10-3中的應用程式由五個操作符、兩個源、兩個中間操作符和一個sink操作符組成。操作符被分配到三個共享位置組:綠色、黃色和藍色。圖10-1顯示了應用程式的JobGraph,以及如何將其任務對映到處理slot。

該應用程式需要10個處理slot。由於分配的操作符的最大並行度,藍色和綠色的slotsharing組需要各4個插槽。黃色slot共享組需要兩個slot。

調優檢查點和恢復

執行時,啟用容錯功能的Flink應用程式定期生成狀態的檢查點。檢查點有可能是一項昂貴的操作,因為需要複製到持久儲存中的資料量可能非常大。增加檢查點間隔可以減少常規處理期間的容錯開銷。然而,它還增加了作業從失敗中恢復時,需要重新處理的資料量。

Flink提供了兩個引數來調優檢查點和狀態後端。配置這些選項對於確保生產中流式應用程式的可靠和平穩執行非常重要。例如,減少每個檢查點的開銷可以提高檢查點頻率,從而加快恢復週期。在本節中,我們將描述用於控制檢查點和應用程式恢復的引數。

配置檢查點

當為應用程式啟用檢查點時,必須指定檢查點間隔—JobManager將在應用程式源(sources)上啟動檢查點的間隔。

在StreamExecutionEnvironment上啟用檢查點:

val env: StreamExecutionEnvironment = ???// enable checkpointing with an interval of 10 seconds.env.enableCheckpointing(10000);CheckpointConfig提供了配置檢查點行為的更多選項,可以從StreamExecutionEnvironment獲得:
// get the CheckpointConfig from the StreamExecutionEnvironmentval cpConfig: CheckpointConfig = env.getCheckpointConfig

預設情況下,Flink建立檢查點來確保精確一次的狀態一致性。不過,它也可以配置為至少一次狀態一致性:

// set mode to at-least-oncecpConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

根據應用程式的特徵、狀態的大小、狀態後端及其配置,檢查點可能需要幾分鐘。此外,隨著時間的推移,狀態的大小可能會增加或減少,這可能是由於長時間執行的視窗造成的。因此,檢查點花費的時間通常比配置的檢查點間隔長。預設情況下,Flink一次只允許一個檢查點處於程序中,以避免檢查點佔用常規處理所需的太多資源。如果——根據配置的檢查點間隔——需要啟動一個檢查點,但是有另一個檢查點正在進行中,第二個檢查點將被擱置,直到第一個檢查點完成。

如果大部分檢查點耗時比檢查點間隔更長,這種行為可能不是最優的,原因有兩個。首先,這意味著應用程式的常規資料處理與並行檢查點總是爭奪資源。因此,其處理減緩,它可能無法取得足夠的速度跟上傳入的資料。第二,一個檢查點可能被推遲,因為我們需要等待另一個檢查點完成,導致較低的檢查點間隔,導致更長的恢復過程。Flink提供引數來解決這些情況。

為了確保應用程式可以取得足夠的處理速度,可以配置檢查點之間的最小暫停。如果將最小暫停配置為30秒,則在完成檢查點後的前30秒內不會啟動新的檢查點。這也意味著有效的檢查點間隔至少為30秒,同時最多有一個檢查點發生。

// make sure we process at least 30s without checkpointingcpConfig.setMinPauseBetweenCheckpoints(30000);

在某些情況下,您可能想要確保檢查點是在配置的檢查點間隔內進行的,即使檢查點花費的時間比間隔長。一個例子是檢查點花費很長時間,但不消耗很多資源;例如,由於對外部系統的呼叫具有高延遲的操作。在這種情況下,您可以配置併發檢查點的最大數量。

// allow three checkpoint to be in progress at the same timecpConfig.setMaxConcurrentCheckpoints(3);

請注意

儲存點與檢查點同時執行。由於檢查點操作,Flink不會延遲顯式觸發儲存點。無論有多少檢查點正在進行中,都會啟動一個儲存點。

為了避免長時間執行的檢查點,您可以配置一個超時間隔,在此間隔之後,檢查點將被取消。預設情況下,檢查點在10分鐘後取消。

// checkpoints have to complete within five minutes, or are discardedcpConfig.setCheckpointTimeout(300000);

最後,您可能還需要配置檢查點失敗時的情況。預設情況下,失敗的檢查點會導致異常,從而導致應用程式重新啟動。您可以禁用此行為,並讓應用程式在檢查點發生錯誤時,繼續執行。

// do not fail the job on a checkpointing errorcpConfig.setFailOnCheckpointingErrors(false);

啟用檢查點壓縮

Flink支援壓縮的檢查點和儲存點。在Flink 1.7之前,唯一支援的壓縮演算法是Snappy。你可以啟用壓縮的檢查點和儲存點如下:

請注意
注意,增量式RocksDB檢查點不支援檢查點壓縮。

在應用程式停止後保留檢查點

檢查點的目的是在應用程式失敗後恢復它。因此,它們在作業停止執行時進行清理,原因可能是應用程式失敗,也可能是顯式取消。但是,您還可以啟用一個稱為外部化檢查點的功能,以便在應用程式停止後保留檢查點。

// Enable externalized checkpointscpConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

外部化檢查點有兩種選擇:

  • RETAIN_ON_CANCELLATION 在應用程式完全失敗和顯式取消後保留檢查點。

  • DELETE_ON_CANCELLATION 只在應用程式完全失敗之後才保留檢查點。如果應用程式被顯式取消,檢查點將被刪除。

外部化的檢查點不替換儲存點。它們使用一種狀態後端特定的儲存格式,並且不支援擴充套件。因此,它們足以在應用程式失敗後重新啟動應用程式,但提供的靈活性不如儲存點。一旦應用程式再次執行,您可以生成一個儲存點。

配置狀態後端

應用程式的狀態後端負責維護本地狀態、執行檢查點和儲存點,以及在發生故障後恢復應用程式狀態。因此,應用程式狀態後端的選擇和配置對檢查點的效能有很大的影響。在“選擇狀態後端”章節中更詳細地描述了各個狀態後端。

應用程式的預設狀態後端是MemoryStateBackend。由於它將所有狀態儲存在記憶體中,並且檢查點完全儲存在volatile和jvm大小有限的JobManager堆儲存中,因此不建議用於生產環境。但是,它對於本地開發的Flink應用程式非常有用。“檢查點和狀態後端”章節描述瞭如何配置Flink叢集的預設狀態後端。

你也可以顯式地選擇一個應用程式的狀態後端:

val env: StreamExecutionEnvironment = ???// create and configure state backend of your choiceval stateBackend: StateBackend = ???// set state backendenv.setStateBackend(stateBackend)

可以使用如下所示的最小設定建立不同的狀態後端。MemoryStateBackend不需要任何引數。但是,有一些建構函式使用引數來啟用或禁用非同步檢查點(預設啟用)並限制狀態大小(預設為5 MB):

// create a MemoryStateBackendval memBackend = new MemoryStateBackend()

FsStateBackend只需要一個路徑來定義檢查點的儲存位置。也有建構函式變數來啟用或禁用非同步檢查點(預設啟用):

// create a FsStateBackend that checkpoints to the /tmp/ckpfolderval fsBackend = new FsStateBackend("file:///tmp/ckp", true)

RocksDBStateBackend只需要一個路徑來定義檢查點的儲存位置,並使用一個可選引數來啟用增量檢查點(預設情況下禁用)。RocksDBStateBackend總是非同步寫入檢查點:

// create a RocksDBStateBackend that writes incremental checkpoints// to the /tmp/ckp folderval rocksBackend = new RocksDBStateBackend("file:///tmp/ckp",true)

在“檢查點和狀態後端”章節中,我們討論了狀態後端的配置選項。當然,您也可以在應用程式中配置狀態後端,覆蓋預設值或叢集範圍的配置。為此,您必須通過將配置物件傳遞到狀態後端來建立新的後端物件。有關可用配置選項的描述,請參閱“檢查點和狀態後端”:

// all of Flink's built-in backends are configurableval backend: ConfigurableStateBackend = ???// create configuration and set optionsval sbConfig = new Configuration()sbConfig.setBoolean("state.backend.async", true)sbConfig.setString("state.savepoints.dir", "file:///tmp/svp")// create a configured copy of the backendval configuredBackend = backend.configure(sbConfig)

由於RocksDB是一個外部元件,它帶來了自己的一組調優引數,也可以針對您的應用程式進行調整。預設情況下,RocksDB針對SSD儲存進行了優化,如果狀態儲存在旋轉磁碟上,則不會提供很好的效能。Flink提供了一些預定義的設定,用於改善常見硬體設定的效能。有關可用設定的更多資訊,請參見文件。你可以應用預定義的選項RocksDBStateBackend如下:

val backend: RocksDBStateBackend = ???// set predefined options for spinning disk storagebackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED)

配置恢復

當檢查點應用程式失敗時,它將通過調出其任務、恢復其狀態(包括源任務的讀取偏移量)並繼續處理來重新啟動。在應用程式重新啟動之後,它就處於一個追趕階段。由於應用程式的源任務被重置為更早的輸入位置,所以它將處理在故障之前處理的資料和在應用程式宕機期間積累的資料。

為了能夠趕上流,到達其尾部—應用程式必須以比新資料到達更高的速度處理累積的資料。當應用程式追趕資料過程中,處理延遲(輸入在實際處理之前可用的時間)會增加。

因此,在重新啟動應用程式後,應用程式需要足夠的備用資源,以便成功地恢復其常規處理。這意味著應用程式在常規處理期間的資源消耗不應該接近100%。可用於恢復的資源越多,追趕階段完成的速度就越快,處理延遲恢復正常的速度也就越快。

除了恢復的資源考慮之外,我們還將討論另外兩個與恢復相關的主題:重新啟動策略和本地恢復。

重新啟動策略

根據導致應用程式崩潰的故障,應用程式可能再次被相同的故障殺死。一個常見的例子是應用程式無法處理無效或損壞的輸入資料。在這種情況下,應用程式最終會陷入無限的恢復週期,消耗大量資源,而沒有機會恢復到常規處理。Flink提供了三種重啟策略來解決這個問題:

  • 固定延遲重新啟動策略(fixed-delay restart strategy):以固定的次數重新啟動應用程式,並在重新啟動嘗試之前等待已配置的時間。

  • 故障率重新啟動策略(failure-rate restart strategy):只要不超過可配置的故障率,故障率重新啟動策略就會重新啟動應用程式。故障率被指定為在一個時間間隔內的最大故障數。例如,您可以配置一個應用程式,只要它在過去十分鐘內失敗不超過三次,就可以重新啟動。

  • 不重啟策略(no-restart strategy):不重啟應用程式,但會立即使其失敗。

應用程式的重啟策略是通過StreamExecutionEnvironment配置的,如示例10-4所示。

val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setRestartStrategy(   RestartStrategies.fixedDelayRestart(       5, // number of restart attempts       Time.of(30, TimeUnit.SECONDS) // delay between attempts  ))

如果沒有顯式定義重啟策略,則使用的預設重啟策略:fixed-delay restart strategy,使用Integer.MAX_VALUE重新啟動嘗試和10秒延遲。

本地恢復

Flink的狀態後端(MemoryStateBackend除外)將檢查點儲存在遠端檔案系統中。這首先確保了狀態被儲存和持久化,其次確保在worker節點丟失或應用程式擴充套件時可以重新分配狀態。但是,在恢復期間從遠端儲存讀取狀態的效率不是很高。此外,在恢復時,可以在發生故障之前,可以在當前worker上重新啟動應用程式。

Flink支援一個稱為本地恢復的特性,如果應用程式可以在相同的機器上重新啟動,它可以顯著加快恢復速度。啟用狀態後端時,除了將資料寫入遠端儲存系統外,還將檢查點資料的副本儲存在worker節點的本地磁碟上。當應用程式重新啟動時,Flink嘗試將相同的任務排程到相同的worker節點。如果成功,任務首先嚐試從本地磁碟載入檢查點資料。如果出現任何問題,它們將退回到遠端儲存。

實現了本地恢復,使遠端系統中的狀態副本成為備用來源。任務只有在遠端寫入成功時才確認生成檢查點。而且,檢查點不會因為本地狀態副本失敗而失敗。由於檢查點資料被寫入兩次,所以本地恢復增加了檢查點的開銷。

可以在flink-conf.yaml檔案中啟用和配置叢集的本地恢復特性,也可以為每個應用程式設定不同的如下狀態後端配置:

  • state.backend.local-recovery:此標誌啟用或禁用本地恢復。預設情況下,本地恢復被禁用。

  • taskmanager.state.local.root-dirs:此引數指定儲存本地狀態副本的一個或多個本地路徑。

                    請注意
本地恢復隻影響鍵控狀態,鍵控狀態總是分割槽的,通常佔狀態大小的大部分。操作符狀態不會儲存在本地,需要從遠端儲存系統檢索。但是,它通常比鍵控狀態小得多。而且,MemoryStateBackend不支援本地恢復,因為它不支援大狀態。

監視Flink叢集和應用程式

監控流作業對於確保其健康執行和及早發現錯誤配置、資源不足或意外行為的潛在症狀至關重要。特別是當流作業是面向使用者的應用程式中的大型資料處理管道或事件驅動服務的一部分時,您可能希望儘可能精確地監視它的效能,並確保它滿足延遲、吞吐量、資源利用率等特定目標。

Flink在執行時收集一組預定義的指標,並提供一個框架,允許定義和跟蹤自己的指標。

Flink Web UI

要獲得Flink叢集的概況以及對作業在內部執行情況的瞭解,最簡單的方法是使用Flink的Web UI。您可以通過以下地址訪問訪問儀表板:

http://<jobmanager-hostname>:8081

在主介面上,您將看到叢集配置的概述,包括工作管理員的數量、已配置和可用的任務slots的數量,以及正在執行和已完成的作業。圖10-2顯示了dashboard主螢幕的一個例項。左邊的菜單鏈接到關於作業和配置引數的更詳細的資訊,還允許通過上傳JAR提交作業。

如果單擊正在執行的作業,可以快速檢視每個任務或子任務的執行統計資訊,如圖10-3所示。您可以檢查持續時間、處理的位元組數量和交換的資料記錄,並根據需要聚合每個TaskManager的記錄。

如果您單擊Task Metrics選項卡,您可以從下拉選單中選擇更多的Metrics,如圖10-4所示。其中包括關於任務的更細粒度的統計資訊,如緩衝區使用情況、水印和輸入/輸出速率。

圖10-5顯示瞭如何將選擇的度量顯示為連續更新的圖表。

Checkpoints選項卡(圖10-3)顯示關於以前和當前檢查點的統計資訊。在概述中,您可以看到有多少檢查點被觸發、正在進行中、已成功完成或失敗。如果單擊History檢視,可以檢索更細粒度的資訊,比如狀態、觸發時間、狀態大小和檢查點對齊階段緩衝了多少位元組。Summary檢視聚合檢查點統計資訊,並提供所有已完成檢查點的最小值、最大值和平均值。最後,在配置下,可以檢查檢查點的配置屬性,比如設定的間隔和超時值。

類似地,Back Pressure選項卡顯示每個操作符和子任務的背壓統計資訊。如果單擊一行,將觸發回壓取樣,將看到正在進行的訊息:Sampling in progress...大約五秒鐘。一旦取樣完成,您將在第二列中看到反壓力狀態。背壓任務會顯示出HIGH訊號;否則,您將看到一個漂亮的綠色OK訊息顯示。

統計系統

在生產環境中執行Flink等資料處理系統時,必須監視其行為,以便能夠發現和診斷效能下降的原因。預設情況下,Flink收集多個系統和應用程式指標。收集每個操作符、TaskManager或JobManager相關指標。在這裡,我們將描述一些最常用的度量標準,並向您推薦Flink的文件以獲得可用度量標準的完整列表。

類別包括CPU利用率、記憶體使用,活動執行緒的數量,垃圾收集統計資料,網路指標,如輸入/輸出緩衝區排隊的數量,整個叢集範圍的指標或執行jobs數量和可用資源等,job指標包括執行時,重試的次數和檢查點資訊,I / O統計資料,包括在本地和遠端互動的記錄數量,水印資訊,聯結器特有指標等。

註冊和使用度量標準

要註冊指標,您必須通過呼叫RuntimeContext上的getMetrics()方法來得到MetricGroup,如示例10-5所示。

class PositiveFilter extends RichFilterFunction[Int] {   @transient private var counter: Counter = _   override def open(parameters: Configuration): Unit = {       counter = getRuntimeContext      .getMetricGroup      .counter("droppedElements")  }   override def filter(value: Int): Boolean = {       if (value > 0) {      true      }       else {           counter.inc()           false  }  }}

METRIC GROUPS

Flink指標是通過MetricGroup介面註冊和訪問的。MetricGroup提供了建立巢狀的、命名的度量層次結構的方法,並提供了註冊以下度量型別的方法:

Counter(計數器):

一個org.apache.flink.metrics.Counter度量計數並提供遞增和遞減的方法。可以在MetricGroup上使用counter(String name, Counter counter)方法註冊計數器度量。

Gauge(度量):

度量標準在某個時間點計算任意型別的值。要使用度量(Gauge),您需要實現org.apache.flink.metrics.Gauge介面,在MetricGroup上使用gauge(String name, Gauge gauge)方法註冊它。例10-6中的程式碼展示了WatermarkGauge度量的實現,它公開了當前的水印。

public class WatermarkGauge implements Gauge<Long> {   private long currentWatermark = Long.MIN_VALUE;   public void setCurrentWatermark(long watermark) {  this.currentWatermark = watermark;  }   @Override   public Long getValue() {  return currentWatermark;  }}

以字串形式報告的指標

度量報告程式將把測量值轉換成字串,因此,如果您使用的型別沒有提供toString()實現,那麼請確保提供了有意義的toString()實現。

Histogram(直方圖):

您可以使用直方圖來表示數值資料的分佈。Flink的直方圖特別適用於報告long型別值的度量。org.apache.flink.metrics.Histogram介面允許收集值,獲取收集值的當前計數,併為到目前為止看到的值建立統計資訊,例如最小值、最大值、標準差和平均值。

除了建立你自己的直方圖實現,Flink還允許你使用DropWizard直方圖,通過新增依賴。如下:

<dependency>   <groupId>org.apache.flink</groupId>   <artifactId>flink-metrics-dropwizard</artifactId>   <version>flink-version</version></dependency>

然後,您可以使用DropwizardHistogramWrapper類在Flink程式中註冊DropWizard直方圖,如示例10-7所示。

// create and register histogramDropwizardHistogramWrapper histogramWrapper =   new DropwizardHistogramWrapper(  new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500))  )metricGroup.histogram("myHistogram", histogramWrapper)// update histogramhistogramWrapper.update(value)

Meter:

您可以使用Meter度量來度量特定事件發生的速率(以事件/秒為單位)。org.apache.flink.metrics.Meter介面提供了一些方法來標記一個或多個事件的發生,獲取每秒事件的當前速率,以及在Meter上標記的當前事件數。

與直方圖一樣,您可以通過在pom.xml中新增flink-metrics-dropwizard依賴項並在DropwizardMeterWrapper類中包裝meter來使用DropWizard meters。

範圍和格式度量

Flink指標屬於某一個範圍,可以是系統範圍(用於系統提供的指標),也可以是使用者範圍(用於自定義的使用者定義指標)。指標由一個包含三部分的唯一識別符號引用:

  1. 使用者在註冊度量時指定的名稱

  2. 一個可選的使用者範圍

  3. 一個系統範圍

例如,指定的度量名稱:“myCounter”、使用者範圍:“MyMetrics”和系統範圍“localhost.taskmanager.512”。此時的識別符號為:" localhost.taskmanager.512.MyMetrics.myCounter‘。可以更改預設分隔符".",通過設定metrics.scope.delimiter配置選項。

系統範圍聲明瞭度量所引用的系統元件以及它應該包含的上下文資訊。度量可以限定為JobManager、TaskManager、作業、操作符或任務。通過在flink-conf.yaml中設定相應的度量選項,可以配置度量應該包含哪些上下文資訊。我們在表10-17中列出了一些配置選項及其預設值。

範圍用於配置的key預設值
JobManager metrics.scope.jm <host>.jobmanager
JobManager和job metrics.scope.jm.job <host>.jobmanager.<job_name>
TaskManager metrics.scope.tm <host>.taskmanager.<tm_id>
TaskManager和job metrics.scope.tm.job <host>.taskmana ger.<tm_id>.<job_name>
Task metrics.scope.task <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
Operator metrics.scope.operator <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>

配置key值包含常量字串,如“taskmanager”和尖括號中顯示的變數。後者將在執行時用實際值替換。例如,TaskManager指標的預設範圍可能建立範圍“localhost.taskmanager.512”。其中“localhost”和“512”是引數值。表10-18顯示了所有可用來配置度量範圍的變數。

範圍可用變數
JobManager: <host>
TaskManager: <host>, <tm_id>
Job: <job_id>, <job_name>
Task: <task_id>, <task_name>, <task_attempt_id>,<task_attempt_num>, <subtask_index>
Operator: <operator_id>, <operator_name>, <subtask_index>

每個作業的範圍識別符號必須是唯一的

如果同時運行同一作業的多個副本,則由於字串衝突,度量可能會變得不準確。為了避免這種風險,您應該確保每個作業的範圍識別符號是惟一的。這可以通過包含<job_id>來輕鬆處理。

您還可以通過呼叫MetricGroup的addGroup()方法來定義度量的使用者範圍,如示例10-8所示。

counter = getRuntimeContext.getMetricGroup.addGroup("MyMetrics").counter("myCounter")

訪問指標

既然您已經瞭解瞭如何註冊、定義和分組度量標準,您可能想知道如何從外部系統訪問它們。畢竟,您可能需要收集度量資料,因為您需要建立一個實時儀表板,或者將度量資料提供給另一個應用程式。您可以通過報告(Reporter)向外部後端公開度量標準,而Flink為其中幾個提供了實現(見表10-19)。

ReporterImplementation
JMX org.apache.flink.metrics.jmx.JMXReporter
Graphite org.apache.flink.metrics.graphite.GraphiteReporter
Prometheus org.apache.flink.metrics.prometheus.PrometheusReporter
PrometheusPushGateway org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
StatsD org.apache.flink.metrics.statsd.StatsDReporter
Datadog org.apache.flink.metrics.datadog.DatadogHttpReporter
Slf4j org.apache.flink.metrics.slf4j.Slf4jReporter

如果您想使用未包含在上述列表中的度量後端,您還可以通過實現org.apache.flink.metrics.reporter.MetricReporter介面來定義您自己的報告程式。

Reporters需要在flink-conf.yaml中配置。向配置中新增以下程式碼行,將定義一個JMX報告器“my_reporter”,它監聽埠9020-9040:

metrics.reporters: my_reporterMetrics.reporter.my_jmx_reporter.class:org.apache.flink.metrics.jmx.JMXReportermetrics.reporter.my_jmx_reporter.port: 9020-9040

有關每個受支援的Reporter程式的配置選項的完整列表,請參閱Flink文件。

監控延遲

延遲可能是您想要監控的第一個指標之一,以評估流作業的效能特徵。同時,它也是在具有豐富語義(如Flink)的分散式流引擎中定義的最棘手的指標之一。在“Latency”中,我們將延遲大致定義為處理事件所需的時間。您可以想象,如果我們試圖在具有複雜資料流的高速率流作業中跟蹤每個事件的延遲,那麼這個定義的精確實現在實踐中可能會出現問題。考慮到視窗操作符使延遲跟蹤更加複雜,如果一個事件貢獻了幾個視窗,我們需要報告第一次呼叫的延遲,還是需要等待,直到計算一個事件可能屬於的所有視窗?如果一個視窗多次觸發該怎麼辦?

Flink遵循一種簡單的、低開銷的方法來提供有用的延遲度量。它不是試圖嚴格測量每個事件的延遲,而是通過定期在源上發出一條特殊記錄,並允許使用者跟蹤該記錄到達sink所需的時間來近似地度量延遲。這個特殊的記錄稱為延遲標記,它帶有一個時間戳,指示它是何時發出的。

要啟用延遲跟蹤,您需要配置延遲標記從源發出的頻率。你可以在ExecutionConfig中設定latencyTrackingInterval,如下所示:

env.getConfig.setLatencyTrackingInterval(500L)

間隔以毫秒為單位指定。當接收到延遲標記時,除了sink操作符外,所有的操作符都將它向下遊傳送。延遲標記使用與正常流記錄相同的資料流通道和佇列,因此其跟蹤的延遲反映了等待處理的時間記錄。但是,它們不度量處理記錄所需的時間,或者記錄在狀態中等待直到處理它們的時間。

操作符將延遲統計資訊儲存在一個延遲指示器(gauge)中,其中包含最小值、最大值和平均值,以及50、95和99個百分點值。接收操作符對每個並行源例項接收的延遲標記進行統計,因此可以使用在接收處檢查延遲標記來估算記錄遍歷資料流所需的時間。如果希望自定義操作符處的處理延遲標記,可以覆蓋processLatencyMarker()方法,並使用LatencyMarker的方法getMarkedTime()、getVertexId()和getSubTaskIndex()檢索相關資訊。

注意時鐘傾斜

如果您沒有使用諸如NTP之類的自動時鐘同步服務,叢集機器的時鐘可能會受到時鐘歪斜的影響。在這種情況下,延遲跟蹤估計是不可靠的,因為其當前實現預設時鐘是同步的。

配置日誌行為

日誌記錄是除錯和理解應用程式行為的另一個重要工具。預設情況下,Flink使用SLF4J日誌抽象和log4j日誌框架。示例10-9顯示了一個MapFunction,它記錄每個輸入記錄的轉換。

import org.apache.flink.api.common.functions.MapFunctionimport org.slf4j.LoggerFactoryimport org.slf4j.Loggerclass MyMapFunction extends MapFunction[Int, String] {   Logger LOG = LoggerFactory.getLogger(MyMapFunction.class)   override def map(value: Int): String = {       LOG.info("Converting value {} to string.", value)       value.toString  }}

要更改log4j記錄器的屬性,請修改conf/資料夾中的log4j.properties檔案。例如,下面的行將根日誌級別設定為“warning”:

log4j.rootLogger=WARN

要設定此檔案的自定義檔名和位置,請傳遞-Dlog4j.configuration=引數到JVM。Flink還提供了log4j-cli.properties檔案,用於命令列客戶機和log4j-yarn-session.properties檔案,用於YARN session環境中的命令列客戶端。

log4j的另一種替代方法是logback,而且Flink還為此後端提供了預設的配置檔案。要使用logback而不是log4j,您需要從lib/資料夾中刪除log4j。有關如何設定和配置後端,請參閱Flink的文件和logback手冊。

結束語

在本章中,我們討論瞭如何在生產環境中執行、管理和監視Flink應用程式。我們解釋了收集和訪問系統和應用程式指標的Flink元件,如何配置日誌系統,以及如何使用命令列客戶機和REST API啟動、停止、恢復和重新部署應用程式。