Flink 原始碼解析 —— 深度解析 Flink 是如何管理好記憶體的?
前言
如今,許多用於分析大型資料集的開源系統都是用 Java 或者是基於 JVM 的程式語言實現的。最著名的例子是 Apache Hadoop,還有較新的框架,如 Apache Spark、Apache Drill、Apache Flink。基於 JVM 的資料分析引擎面臨的一個常見挑戰就是如何在記憶體中儲存大量的資料(包括快取和高效處理)。合理的管理好 JVM 記憶體可以將 難以配置且不可預測的系統 與 少量配置且穩定執行的系統區分開來。
在這篇文章中,我們將討論 Apache Flink 如何管理記憶體,討論其自定義序列化與反序列化機制,以及它是如何操作二進位制資料的。
資料物件直接放在堆記憶體中
在 JVM 中處理大量資料最直接的方式就是將這些資料做為物件儲存在堆記憶體中,然後直接在記憶體中操作這些資料,如果想進行排序則就是對物件列表進行排序。然而這種方法有一些明顯的缺點,首先,在頻繁的建立和銷燬大量物件的時候,監視和控制堆記憶體的使用並不是一件很簡單的事情。如果物件分配過多的話,那麼會導致記憶體過度使用,從而觸發 OutOfMemoryError,導致 JVM 程序直接被殺死。另一個方面就是因為這些物件大都是生存在新生代,當 JVM 進行垃圾回收時,垃圾收集的開銷很容易達到 50% 甚至更多。最後就是 Java 物件具有一定的空間開銷(具體取決於 JVM 和平臺)。對於具有許多小物件的資料集,這可以顯著減少有效可用的記憶體量。如果你精通系統設計和系統調優,你可以根據系統進行特定的引數調整,可以或多或少的控制出現 OutOfMemoryError 的次數和避免堆記憶體的過多使用,但是這種設定和調優的作用有限,尤其是在資料量較大和執行環境發生變化的情況下。
Flink 是怎麼做的?
Apache Flink 起源於一個研究專案,該專案旨在結合基於 MapReduce 的系統和並行資料庫系統的最佳技術。在此背景下,Flink 一直有自己的記憶體資料處理方法。Flink 將物件序列化為固定數量的預先分配的記憶體段,而不是直接把物件放在堆記憶體上。它的 DBMS 風格的排序和連線演算法儘可能多地對這個二進位制資料進行操作,以此將序列化和反序列化開銷降到最低。如果需要處理的資料多於可以儲存在記憶體中的資料,Flink 的運算子會將部分資料溢位到磁碟。事實上,很多Flink 的內部實現看起來更像是 C / C ++,而不是普通的 Java。下圖概述了 Flink 如何在記憶體段中儲存序列化資料並在必要時溢位到磁碟:
Flink 的主動記憶體管理和操作二進位制資料有幾個好處:
1、記憶體安全執行和高效的核外演算法 由於分配的記憶體段的數量是固定的,因此監控剩餘的記憶體資源是非常簡單的。在記憶體不足的情況下,處理操作符可以有效地將更大批的記憶體段寫入磁碟,後面再將它們讀回到記憶體。因此,OutOfMemoryError 就有效的防止了。
2、減少垃圾收集壓力 因為所有長生命週期的資料都是在 Flink 的管理記憶體中以二進位制表示的,所以所有資料物件都是短暫的,甚至是可變的,並且可以重用。短生命週期的物件可以更有效地進行垃圾收集,這大大降低了垃圾收集的壓力。現在,預先分配的記憶體段是 JVM 堆上的長期存在的物件,為了降低垃圾收集的壓力,Flink 社群正在積極地將其分配到堆外記憶體。這種努力將使得 JVM 堆變得更小,垃圾收集所消耗的時間將更少。
3、節省空間的資料儲存 Java 物件具有儲存開銷,如果資料以二進位制的形式儲存,則可以避免這種開銷。
4、高效的二進位制操作和快取敏感性 在給定合適的二進位制表示的情況下,可以有效地比較和操作二進位制資料。此外,二進位制表示可以將相關值、雜湊碼、鍵和指標等相鄰地儲存在記憶體中。這使得資料結構通常具有更高效的快取訪問模式。
主動記憶體管理的這些特性在用於大規模資料分析的資料處理系統中是非常可取的,但是要實現這些功能的代價也是高昂的。要實現對二進位制資料的自動記憶體管理和操作並非易事,使用 java.util.HashMap
比實現一個可溢位的 hash-table
(由位元組陣列和自定義序列化支援)。當然,Apache Flink 並不是唯一一個基於 JVM 且對二進位制資料進行操作的資料處理系統。例如 Apache Drill、Apache Ignite、Apache Geode 也有應用類似技術,最近 Apache Spark 也宣佈將向這個方向演進。
下面我們將詳細討論 Flink 如何分配記憶體、如果對物件進行序列化和反序列化以及如果對二進位制資料進行操作。我們還將通過一些效能表現資料來比較處理堆記憶體上的物件和對二進位制資料的操作。
Flink 如何分配記憶體?
Flink TaskManager 是由幾個內部元件組成的:actor 系統(負責與 Flink master 協調)、IOManager(負責將資料溢位到磁碟並將其讀取回來)、MemoryManager(負責協調記憶體使用)。在本篇文章中,我們主要講解 MemoryManager。
MemoryManager 負責將 MemorySegments 分配、計算和分發給資料處理操作符,例如 sort 和 join 等操作符。MemorySegment 是 Flink 的記憶體分配單元,由常規 Java 位元組陣列支援(預設大小為 32 KB)。MemorySegment 通過使用 Java 的 unsafe 方法對其支援的位元組陣列提供非常有效的讀寫訪問。你可以將 MemorySegment 看作是 Java 的 NIO ByteBuffer 的定製版本。為了在更大的連續記憶體塊上操作多個 MemorySegment,Flink 使用了實現 Java 的 java.io.DataOutput 和 java.io.DataInput 介面的邏輯檢視。
MemorySegments 在 TaskManager 啟動時分配一次,並在 TaskManager 關閉時銷燬。因此,在 TaskManager 的整個生命週期中,MemorySegment 是重用的,而不會被垃圾收集的。在初始化 TaskManager 的所有內部資料結構並且已啟動所有核心服務之後,MemoryManager 開始建立 MemorySegments。預設情況下,服務初始化後,70% 可用的 JVM 堆記憶體由 MemoryManager 分配(也可以配置全部)。剩餘的 JVM 堆記憶體用於在任務處理期間例項化的物件,包括由使用者定義的函式建立的物件。下圖顯示了啟動後 TaskManager JVM 中的記憶體分佈:
Flink 如何序列化物件?
Java 生態系統提供了幾個庫,可以將物件轉換為二進位制表示形式並返回。常見的替代方案是標準 Java 序列化,Kryo,Apache Avro,Apache Thrift 或 Google 的 Protobuf。Flink 包含自己的自定義序列化框架,以便控制資料的二進位制表示。這一點很重要,因為對二進位制資料進行操作需要對序列化佈局有準確的瞭解。此外,根據在二進位制資料上執行的操作配置序列化佈局可以顯著提升效能。Flink 的序列化機制利用了這一特性,即在執行程式之前,要序列化和反序列化的物件的型別是完全已知的。
Flink 程式可以處理表示為任意 Java 或 Scala 物件的資料。在優化程式之前,需要識別程式資料流的每個處理步驟中的資料型別。對於 Java 程式,Flink 提供了一個基於反射的型別提取元件,用於分析使用者定義函式的返回型別。Scala 程式可以在 Scala 編譯器的幫助下進行分析。Flink 使用 TypeInformation 表示每種資料型別。
注:該圖選自董偉柯的文章《Apache Flink 型別和序列化機制簡介》,侵刪
Flink 有如下幾種資料型別的 TypeInformations:
-
BasicTypeInfo:所有 Java 的基礎型別或 java.lang.String
-
BasicArrayTypeInfo:Java 基本型別構成的陣列或 java.lang.String
-
WritableTypeInfo:Hadoop 的 Writable 介面的任何實現
-
TupleTypeInfo:任何 Flink tuple(Tuple1 到 Tuple25)。Flink tuples 是具有型別化欄位的固定長度元組的 Java 表示
-
CaseClassTypeInfo:任何 Scala CaseClass(包括 Scala tuples)
-
PojoTypeInfo:任何 POJO(Java 或 Scala),即所有欄位都是 public 的或通過 getter 和 setter 訪問的物件,遵循通用命名約定
-
GenericTypeInfo:不能標識為其他型別的任何資料型別
注:該圖選自董偉柯的文章《Apache Flink 型別和序列化機制簡介》,侵刪
每個 TypeInformation 都為它所代表的資料型別提供了一個序列化器。例如,BasicTypeInfo 返回一個序列化器,該序列化器寫入相應的基本型別;WritableTypeInfo 的序列化器將序列化和反序列化委託給實現 Hadoop 的 Writable 介面的物件的 write() 和 readFields() 方法;GenericTypeInfo 返回一個序列化器,該序列化器將序列化委託給 Kryo。物件將自動通過 Java 中高效的 Unsafe 方法來序列化到 Flink MemorySegments 支援的 DataOutput。對於可用作鍵的資料型別,例如雜湊值,TypeInformation 提供了 TypeComparators,TypeComparators 比較和雜湊物件,並且可以根據具體的資料型別有效的比較二進位制並提取固定長度的二進位制 key 字首。
Tuple,Pojo 和 CaseClass 型別是複合型別,它們可能巢狀一個或者多個數據型別。因此,它們的序列化和比較也都比較複雜,一般將其成員資料型別的序列化和比較都交給各自的 Serializers(序列化器) 和 Comparators(比較器)。下圖說明了 Tuple3<Integer, Double, Person>
物件的序列化,其中Person
是 POJO 並定義如下:
public class Person {
public int id;
public String name;
}
通過提供定製的 TypeInformations、Serializers(序列化器) 和 Comparators(比較器),可以方便地擴充套件 Flink 的型別系統,從而提高序列化和比較自定義資料型別的效能。
Flink 如何對二進位制資料進行操作?
與其他的資料處理框架的 API(包括 SQL)類似,Flink 的 API 也提供了對資料集進行分組、排序和連線等轉換操作。這些轉換操作的資料集可能非常大。關係資料庫系統具有非常高效的演算法,比如 merge-sort、merge-join 和 hash-join。Flink 建立在這種技術的基礎上,但是主要分為使用自定義序列化和自定義比較器來處理任意物件。在下面文章中我們將通過 Flink 的記憶體排序演算法示例演示 Flink 如何使用二進位制資料進行操作。
Flink 為其資料處理操作符預先分配記憶體,初始化時,排序演算法從 MemoryManager 請求記憶體預算,並接收一組相應的 MemorySegments。這些 MemorySegments 變成了緩衝區的記憶體池,緩衝區中收集要排序的資料。下圖說明了如何將資料物件序列化到排序緩衝區中:
排序緩衝區在內部分為兩個記憶體區域:第一個區域儲存所有物件的完整二進位制資料,第二個區域包含指向完整二進位制物件資料的指標(取決於 key 的資料型別)。將物件新增到排序緩衝區時,它的二進位制資料會追加到第一個區域,指標(可能還有一個 key)被追加到第二個區域。分離實際資料和指標以及固定長度的 key 有兩個目的:它可以有效的交換固定長度的 entries(key 和指標),還可以減少排序時需要移動的資料。如果排序的 key 是可變長度的資料型別(比如 String),則固定長度的排序 key 必須是字首 key,比如字串的前 n 個字元。請注意:並非所有資料型別都提供固定長度的字首排序 key。將物件序列化到排序緩衝區時,兩個記憶體區域都使用記憶體池中的 MemorySegments 進行擴充套件。一旦記憶體池為空且不能再新增物件時,則排序緩衝區將會被完全填充並可以進行排序。Flink 的排序緩衝區提供了比較和交換元素的方法,這使得實際的排序演算法是可插拔的。預設情況下, Flink 使用了 Quicksort(快速排序)實現,可以使用 HeapSort(堆排序)。下圖顯示瞭如何比較兩個物件:
排序緩衝區通過比較它們的二進位制固定長度排序 key 來比較兩個元素。如果元素的完整 key(不是字首 key) 或者二進位制字首 key 不相等,則代表比較成功。如果字首 key 相等(或者排序 key 的資料型別不提供二進位制字首 key),則排序緩衝區遵循指向實際物件資料的指標,對兩個物件進行反序列化並比較物件。根據比較結果,排序演算法決定是否交換比較的元素。排序緩衝區通過移動其固定長度 key 和指標來交換兩個元素,實際資料不會移動,排序演算法完成後,排序緩衝區中的指標被正確排序。下圖演示瞭如何從排序緩衝區返回已排序的資料:
通過順序讀取排序緩衝區的指標區域,跳過排序 key 並按照實際資料的排序指標返回排序資料。此資料要麼反序列化並作為物件返回,要麼在外部合併排序的情況下複製二進位制資料並將其寫入磁碟。
基準測試資料
那麼,對二進位制資料進行操作對效能意味著什麼?我們將執行一個基準測試,對 1000 萬個Tuple2<Integer, String>
物件進行排序以找出答案。整數字段的值從均勻分佈中取樣。String 欄位值的長度為 12 個字元,並從長尾分佈中進行取樣。輸入資料由返回可變物件的迭代器提供,即返回具有不同欄位值的相同 Tuple 物件例項。Flink 在從記憶體,網路或磁碟讀取資料時使用此技術,以避免不必要的物件例項化。基準測試在具有 900 MB 堆大小的 JVM 中執行,在堆上儲存和排序 1000 萬個 Tuple 物件並且不會導致觸發 OutOfMemoryError 大約需要這麼大的記憶體。我們使用三種排序方法在Integer 欄位和 String 欄位上對 Tuple 物件進行排序:
1、物件存在堆中:Tuple 物件儲存在常用的 java.util.ArrayList
中,初始容量設定為 1000 萬,並使用 Java 中常用的集合排序進行排序。
- Flink 序列化:使用 Flink 的自定義序列化程式將 Tuple 欄位序列化為 600 MB 大小的排序緩衝區,如上所述排序,最後再次反序列化。在 Integer 欄位上進行排序時,完整的 Integer 用作排序 key,以便排序完全發生在二進位制資料上(不需要物件的反序列化)。對於 String 欄位的排序,使用 8 位元組字首 key,如果字首 key 相等,則對 Tuple 物件進行反序列化。
3、Kryo 序列化:使用 Kryo 序列化將 Tuple 欄位序列化為 600 MB 大小的排序緩衝區,並在沒有二進位制排序 key 的情況下進行排序。這意味著每次比較需要對兩個物件進行反序列化。
所有排序方法都使用單執行緒實現。結果的時間是十次執行結果的平均值。在每次執行之後,我們呼叫System.gc()
請求垃圾收集執行,該執行不會進入測量的執行時間。下圖顯示了將輸入資料儲存在記憶體中,對其進行排序並將其作為物件讀回的時間。
我們看到 Flink 使用自己的序列化器對二進位制資料進行排序明顯優於其他兩種方法。與儲存在堆記憶體上相比,我們看到將資料載入到記憶體中要快得多。因為我們實際上是在收集物件,沒有機會重用物件例項,但必須重新建立每個 Tuple。這比 Flink 的序列化器(或Kryo序列化)效率低。另一方面,與反序列化相比,從堆中讀取物件是無效能消耗的。在我們的基準測試中,物件克隆比序列化和反序列化組合更耗效能。檢視排序時間,我們看到對二進位制資料的排序也比 Java 的集合排序更快。使用沒有二進位制排序 key 的 Kryo 序列化的資料排序比其他方法慢得多。這是因為反序列化帶來很大的開銷。在String 欄位上對 Tuple 進行排序比在 Integer 欄位上排序更快,因為長尾值分佈顯著減少了成對比較的數量。為了更好地瞭解排序過程中發生的狀況,我們使用 VisualVM 監控執行的 JVM。以下截圖顯示了執行 10次 執行時的堆記憶體使用情況、垃圾收集情況和 CPU 使用情況。
測試是在 8 核機器上執行單執行緒,因此一個核心的完全利用僅對應 12.5% 的總體利用率。截圖顯示,對二進位制資料進行操作可顯著減少垃圾回收活動。對於物件存在堆中,垃圾收集器在排序緩衝區被填滿時以非常短的時間間隔執行,並且即使對於單個處理執行緒也會導致大量 CPU 使用(排序本身不會觸發垃圾收集器)。JVM 垃圾收集多個並行執行緒,解釋了高CPU 總體利用率。另一方面,對序列化資料進行操作的方法很少觸發垃圾收集器並且 CPU 利用率低得多。實際上,如果使用 Flink 序列化的方式在 Integer 欄位上對 Tuple 進行排序,則垃圾收集器根本不執行,因為對於成對比較,不需要反序列化任何物件。Kryo 序列化需要比較多的垃圾收集,因為它不使用二進位制排序 key 並且每次排序都要反序列化兩個物件。
記憶體使用情況上圖顯示 Flink 序列化和 Kryo 序列化不斷的佔用大量記憶體
存使用情況圖表顯示flink-serialized和kryo-serialized不斷佔用大量記憶體。這是由於 MemorySegments 的預分配。實際記憶體使用率要低得多,因為排序緩衝區並未完全填充。下表顯示了每種方法的記憶體消耗。1000 萬條資料產生大約 280 MB 的二進位制資料(物件資料、指標和排序 key),具體取決於使用的序列化程式以及二進位制排序 key 的存在和大小。將其與資料儲存在堆上的方法進行比較,我們發現對二進位制資料進行操作可以顯著提高記憶體效率。在我們的基準測試中,如果序列化為排序緩衝區而不是將其作為堆上的物件儲存,則可以在記憶體中對兩倍以上的資料進行排序。
佔用記憶體 | 物件存在堆中 | Flink 序列化 | Kryo 序列化 |
---|---|---|---|
對 Integer 排序 | 約 700 MB(堆記憶體) | 277 MB(排序緩衝區) | 266 MB(排序緩衝區) |
對 String 排序 | 約 700 MB(堆記憶體) | 315 MB(排序緩衝區) | 266 MB(排序緩衝區) |
總而言之,測試驗證了文章前面說的對二進位制資料進行操作的好處。
展望未來
Apache Flink 具有相當多的高階技術,可以通過有限的記憶體資源安全有效地處理大量資料。但是有幾點可以使 Flink 更有效率。Flink 社群正在努力將管理記憶體移動到堆外記憶體。這將允許更小的 JVM,更低的垃圾收集開銷,以及更容易的系統配置。使用 Flink 的 Table API,所有操作(如 aggregation 和 projection)的語義都是已知的(與黑盒使用者定義的函式相反)。因此,我們可以為直接對二進位制資料進行操作的 Table API 操作生成程式碼。進一步的改進包括序列化設計,這些設計針對應用於二進位制資料的操作和針對序列化器和比較器的程式碼生成而定製。
總結
-
Flink 的主動記憶體管理減少了因觸發 OutOfMemoryErrors 而殺死 JVM 程序和垃圾收集開銷的問題。
-
Flink 具有高效的資料序列化和反序列化機制,有助於對二進位制資料進行操作,並使更多資料適合記憶體。
-
Flink 的 DBMS 風格的運算子本身在二進位制資料上執行,在必要時可以在記憶體中高效能地傳輸到磁碟。
本文地址: http://www.54tianzhisheng.cn/2019/03/24/Flink-code-memory-management/
本文翻譯自:https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html 翻譯:zhisheng,二次轉載請註明地址,否則保留追究法律責任。
關注我
微信公眾號:zhisheng
另外我自己整理了些 Flink 的學習資料,目前已經全部放到微信公眾號了。你可以加我的微信:zhisheng_tian,然後回覆關鍵字:Flink 即可無條件獲取到。
更多私密資料請加入知識星球!
Github 程式碼倉庫
https://github.com/zhisheng17/flink-learning/
以後這個專案的所有程式碼都將放在這個倉庫裡,包含了自己學習 flink 的一些 demo 和部落格。
部落格
1、Flink 從0到1學習 —— Apache Flink 介紹
2、Flink 從0到1學習 —— Mac 上搭建 Flink 1.6.0 環境並構建執行簡單程式入門
3、Flink 從0到1學習 —— Flink 配置檔案詳解
4、Flink 從0到1學習 —— Data Source 介紹
5、Flink 從0到1學習 —— 如何自定義 Data Source ?
6、Flink 從0到1學習 —— Data Sink 介紹
7、Flink 從0到1學習 —— 如何自定義 Data Sink ?
8、Flink 從0到1學習 —— Flink Data transformation(轉換)
9、Flink 從0到1學習 —— 介紹 Flink 中的 Stream Windows
10、Flink 從0到1學習 —— Flink 中的幾種 Time 詳解
11、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 ElasticSearch
12、Flink 從0到1學習 —— Flink 專案如何執行?
13、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 Kafka
14、Flink 從0到1學習 —— Flink JobManager 高可用性配置
15、Flink 從0到1學習 —— Flink parallelism 和 Slot 介紹
16、Flink 從0到1學習 —— Flink 讀取 Kafka 資料批量寫入到 MySQL
17、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 RabbitMQ
18、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 HBase
19、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 HDFS
20、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 Redis
21、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 Cassandra
22、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 Flume
23、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 InfluxDB
24、Flink 從0到1學習 —— Flink 讀取 Kafka 資料寫入到 RocketMQ
25、Flink 從0到1學習 —— 你上傳的 jar 包藏到哪裡去了
26、Flink 從0到1學習 —— 你的 Flink job 日誌跑到哪裡去了
28、Flink 從0到1學習 —— Flink 中如何管理配置?
29、Flink 從0到1學習—— Flink 不可以連續 Split(分流)?
30、Flink 從0到1學習—— 分享四本 Flink 國外的書和二十多篇 Paper 論文
32、為什麼說流處理即未來?
33、OPPO 資料中臺之基石:基於 Flink SQL 構建實時資料倉庫
36、Apache Flink 結合 Kafka 構建端到端的 Exactly-Once 處理
38、如何基於Flink+TensorFlow打造實時智慧異常檢測平臺?只看這一篇就夠了
40、Flink 全網最全資源(視訊、部落格、PPT、入門、實戰、原始碼解析、問答等持續更新)
原始碼解析
4、Flink 原始碼解析 —— standalone session 模式啟動流程
5、Flink 原始碼解析 —— Standalone Session Cluster 啟動流程深度分析之 Job Manager 啟動
6、Flink 原始碼解析 —— Standalone Session Cluster 啟動流程深度分析之 Task Manager 啟動
7、Flink 原始碼解析 —— 分析 Batch WordCount 程式的執行過程
8、Flink 原始碼解析 —— 分析 Streaming WordCount 程式的執行過程
9、Flink 原始碼解析 —— 如何獲取 JobGraph?
10、Flink 原始碼解析 —— 如何獲取 StreamGraph?
11、Flink 原始碼解析 —— Flink JobManager 有什麼作用?
12、Flink 原始碼解析 —— Flink TaskManager 有什麼作用?
13、Flink 原始碼解析 —— JobManager 處理 SubmitJob 的過程
14、Flink 原始碼解析 —— TaskManager 處理 SubmitJob 的過程
15、Flink 原始碼解析 —— 深度解析 Flink Checkpoint 機制
16、Flink 原始碼解析 —— 深度解析 Flink 序列化機制
17、Flink 原始碼解析 —— 深度解析 Flink 是如何管理好記憶體的?
18、Flink Metrics 原始碼解析 —— Flink-metrics-core
19、Flink Metrics 原始碼解析 —— Flink-metrics-datadog
20、Flink Metrics 原始碼解析 —— Flink-metrics-dropwizard
21、Flink Metrics 原始碼解析 —— Flink-metrics-graphite
22、Flink Metrics 原始碼解析 —— Flink-metrics-influxdb
23、Flink Metrics 原始碼解析 —— Flink-metrics-jmx
24、Flink Metrics 原始碼解析 —— Flink-metrics-slf4j
25、Flink Metrics 原始碼解析 —— Flink-metrics-statsd
26、Flink Metrics 原始碼解析 —— Flink-metrics-prometheus
27、Flink 原始碼解析 —— 如何獲取 ExecutionGraph ?
30、Flink Clients 原始碼解析原文出處:zhisheng的部落格,歡迎關注我