Flink批處理中的增量迭代
對某些迭代而言並不是單次迭代產生的下一次工作集中的每個元素都需要重新參與下一輪迭代,有時只需要重新計算部分資料同時選擇性地更新解集,這種形式的迭代就是增量迭代。增量迭代能夠使得一些演算法執行得更高效,它可以讓演算法專注於工作集中的“熱點”資料部分,這導致工作集中的絕大部分資料冷卻得非常快,因此隨後的迭代面對的資料規模將會大幅縮小。增量迭代的示意圖如下:
我們來梳理一下上圖中的流程:
- Iteration Input:從source或之前的運算子中讀取的初始工作集與解集作為首次迭代的輸入;
- Step Function:也即步函式,將會在每次迭代時被執行。它可以是由map、reduce等運算子組成的任意資料流形成的邏輯體;
- Next Workset/Update Solution Set:下一個工作集驅動著迭代計算並且將會被反饋給迭代頭。除此之外,解集將會被更新並間接地向前推進。這兩個資料集都可以通過步函式的不同的運算子進行更新;
- Iteration Result:在最後一次迭代之後產生最終的解集被寫入到sink或者被用於後續運算子的輸入;
增量迭代可以指定三種終止條件:
- 空工作集收斂標準;
- 最大迭代次數;
- 自定義的聚合器收斂;
在1.1版本中,沒有發現可定義收斂標準的地方;
跟批量迭代一樣,對於增量迭代我們同樣先結合案例來分析解決方案並給出程式碼實現。
現在給定一系列的事件以及它們兩兩之間的關聯關係(連線起來是多個樹結構),找到每個事件對應的根節點。事件之間的關聯關係如下圖:
給定頂點陣列和邊陣列作為輸入,頂點和邊都以二元組來表示。上圖對應的頂點和邊如下所示:
Vertex | Edge |
---|---|
<1,1> | <1,2> |
<2,2> | <2,3> |
<3,3> | <2,4> |
<4,4> | <4,5> |
<5,5> | <6,7> |
<6,6> | <5,8> |
<7,7> | <9,10> |
<8,8> | <9,11> |
<9,9> | <8,12> |
<10,10> | <10,13> |
<11,11> | <1,14> |
<12,12> | <11,15> |
<13,13> | |
<14,14> | |
<15,15> |
最終我們期望得到的結果資料集如下,其中第一個元素表示事件編號,第二個元素表示對應的根節點編號:
Final DataSet |
---|
<1,1> |
<2,1> |
<3,1> |
<4,1> |
<5,1> |
<6,6> |
<7,6> |
<8,1> |
<9,9> |
<10,9> |
<11,9> |
<12,1> |
<13,9> |
<14,1> |
<15,9> |
由於增量迭代比批量迭代更抽象、複雜,這裡我們會將每個迭代步驟圖形化,以方便理解。
首先第一步是初始化頂點資料集以及邊資料集,這兩個資料集中的元素就是上文我們第一個表格裡的二元組集合,這裡頂點的二元組集合同時也是增量迭代的工作集:
DataSet<Tuple2<Long, Long>> verticesAsWorkset = generateWorksetWithVertices(env);
DataSet<Tuple2<Long, Long>> edges = generateDefaultEdgeDataSet(env);
接下來會構建一個增量迭代物件DeltaIteration的例項變數iteration,這裡會以verticesAsWorkset作為初始化工作集,並指定最大迭代次數以及用於分割槽的鍵。
int vertexIdIndex = 0;
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = verticesAsWorkset
.iterateDelta(verticesAsWorkset, MAX_ITERATION_NUM, vertexIdIndex);
別看這裡區區兩行程式碼,但其內部經歷了一系列的初始化過程,示意如下圖:
下文進行原始碼解讀時,會看到iterateDelta方法會觸發對初始解集的構建。
在上文我們闡述增量迭代原理時,我們知道在每次迭代過程中會執行步函式,增量迭代在步函式執行之後會產生增量解集(delta solution set),該增量解集會更新或者合併到解集中來。
接下來是一個完整的步函式,我們會將其進行拆分,第一步將工作集(頂點集合)與邊集合進行連線:
DataSet<Tuple2<Long, Long>> delta = iteration.getWorkset()
.join(edges).where(0).equalTo(0)
.with(new NeighborWithParentIDJoin())
對於連線所匹配的結果,將會應用一個特定的函式:NeighborWithParentIDJoin。該函式會對連線匹配上的頂點和邊產生一個新的頂點元組,第一個欄位是邊的目的頂點,而第二個欄位是匹配頂點的父頂點:
public static final class NeighborWithParentIDJoin implements
JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexAndParent,
Tuple2<Long, Long> edgeSourceAndTarget) throws Exception {
return new Tuple2<Long, Long>(edgeSourceAndTarget.f1, vertexAndParent.f1);
}
}
我們以事件編號1和事件編號2為例,展示連線的過程,圖示如下:
接下來,建立在上面連線產生的資料集的基礎上跟解集進行連線,然後在連線產生的資料上應用FlatJoinFunction函式的實現:RootIdFilter。
DataSet<Tuple2<Long, Long>> delta = ......
.join(iteration.getSolutionSet()).where(0).equalTo(0)
.with(new RootIdFilter());
DataSet<Tuple2<Long, Long>> finalDataSet = iteration.closeWith(delta, delta);
RootIdFilter是個過濾器的實現,它會對Join後的結果集進行過濾,它會選擇性地輸出源節點相同但父節點更小的節點元組。因為從以上樹中元素的規律來看,父節點越小,越靠近真正的根節點。注意,它實現的是FlatJoinFunction函式,而不是JoinFunction函式。因為FlatJoinFunction支援輸出零個或若干個元素(在這個案例裡,該過濾器有可能不輸出記錄):
public static final class RootIdFilter implements FlatJoinFunction<Tuple2<Long, Long>,
Tuple2<Long, Long>, Tuple2<Long, Long>> {
public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> old,
Collector<Tuple2<Long, Long>> collector) throws Exception {
if (candidate.f1 < old.f1) {
collector.collect(candidate);
}
}
}
我們仍然以事件編號1及事件編號2作為示例,分析兩個連線的過程。首先拿上一步新生成的頂點<2,1>跟解集進行連線(初始解集為原始的頂點資料集,隨著迭代越接近最終的解集),以元組的第一個欄位作為連線條件,在進行連線之後對於事件編號2產生兩個元組,分別是<2,1>,<2,2>。再應用RootIdFilter過濾器,<2,2>被過濾而<2,1>被輸出。該輸出就對應著生產的delta。隨後處於增量解集中的<2,1>會初始解集中的<2,2>進行替換。這段過程,圖示如下:
回過頭來看這個完整的步函式,所產生的是一個增量解集delta(也就是RootIdFilter過濾後的輸出,比如上面的<2,1>元組),它將會被更新到最終的解集。
增量迭代最終會呼叫closeWith方法來關閉一個迭代邏輯並得到最終的結果集finalDataSet。先解釋一下closeWith的兩個引數的含義:
- solutionSetDelta:也即增量解集,在每次迭代之後,它將會被更新到解集中去;
- newWorkset:新的工作集,它將會被反饋給下一次迭代作為輸入;
這裡得到的delta變數不僅僅是增量解集,同時也是新的工作集。所以上圖中的<2,1>將會被用來更新<2,2>。
接下來,我們來分步展示迭代的執行過程以及各個資料集產生的變化。首先,第一次迭代之後:
對應的資料集變化:
第二次迭代之後:
對應的資料集變化:
因篇幅受限,我們略去第三次、第四次迭代產生的變化圖示,進入到第五次迭代後:
與此同時,資料集的變化:
可以看到這裡新的工作集已經逐漸減少到只剩下一個元組元素<12,1>。當執行完第六次迭代,工作集變為空:
對應的新工作集為空,意味著增量迭代將會終止執行:
完整的實現程式碼如下:
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<Long, Long>> verticesAsWorkset = generateWorksetWithVertices(env);
DataSet<Tuple2<Long, Long>> edges = generateDefaultEdgeDataSet(env);
int vertexIdIndex = 0;
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = verticesAsWorkset
.iterateDelta(verticesAsWorkset, MAX_ITERATION_NUM, vertexIdIndex);
DataSet<Tuple2<Long, Long>> delta = iteration.getWorkset()
.join(edges).where(0).equalTo(0)
.with(new NeighborWithParentIDJoin())
.join(iteration.getSolutionSet()).where(0).equalTo(0)
.with(new RootIdFilter());
DataSet<Tuple2<Long, Long>> finalDataSet = iteration.closeWith(delta, delta);
finalDataSet.print();
}
接下來我們分析一下增量迭代的API,由於增量迭代與批量迭代設計上的差異,它們的實現也迥然不同。增量迭代用DeltaIteration來表示迭代的資料集物件,而批量迭代用IterativeDataSet來表示。DeltaIteration是一個獨立的類,而IterativeDataSet本質上是DataSet的特例。這兩者都是通過DataSet的例項方法來進行初始化,IterativeDataSet通過iterate方法,而DeltaIteration則通過iterateDelta方法。
我們來看一段示例程式碼:
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
initialState.iterateDelta(initialFeedbackSet, 100, 0);
DataSet<Tuple2<Long, Long>> delta = iteration.groupBy(0).aggregate(Aggregations.AVG, 1)
.join(iteration.getSolutionSet()).where(0).equalTo(0)
.flatMap(new ProjectAndFilter());
DataSet<Tuple2<Long, Long>> feedBack = delta.join(someOtherSet).where(...).equalTo(...).with(...);
// close the delta iteration (delta and new workset are identical)
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(delta, feedBack);
上述程式碼中initialState是DataSet的例項,iterateDelta以初始化一個DeltaIteration物件iteration。它接收三個引數:
- initialFeedbackSet:它是DataSet的例項,表示參與迭代的初始資料集。在Flink中稱之為工作集(workset);
- 100:整型值,表示最大迭代次數為100次;
- 0:元組中欄位的下標,該下標所表示的欄位將會作為解集的鍵;
解集是迭代所處的當前狀態,通過iteration的getSolutionSet例項方法來進行訪問的。解集從何而來?回到DeltaIteration類中,我們看到它內部封裝了初始的工作集和初始的解集兩個欄位:
private final DataSet<ST> initialSolutionSet;
private final DataSet<WT> initialWorkset;
它們都是通過DeltaIteration的構造器進行設定的。在DataSet的iterateDelta方法中,我們來看一下這兩個引數所傳遞的值,程式碼如下:
public <R> DeltaIteration<T, R> iterateDelta(DataSet<R> workset,
int maxIterations, int... keyPositions) {
//...
return new DeltaIteration<>(getExecutionEnvironment(), getType(), this, workset, keys, maxIterations);
}
從程式碼段可見,工作集物件是通過引數從外部傳入,而初始的解集則為當前DataSet的例項(this引用)。因此對於上面的示例而言,初始的解集就是initialState物件,隨著迭代的進行,步函式一輪輪被執行,解集也會被增量地更新從而向前演進,同時作為下一輪迭代的輸入。
iteration通過對一系列轉換函式的呼叫形成了任意資料流組成的步函式最終產生delta這一資料集。增量迭代跟批量迭代類似,都是通過迭代物件的closeWith方法來關閉迭代邏輯。跟批量迭代類似,這裡的closeWith方法也返回表示增量迭代結果資料集DeltaIterationResultSet的例項,它也充當迭代尾的角色。
雖然我們沒有應用到聚合器以及收斂標準,而是以空的工作集作為迭代的執行的終止條件。但是,在增量迭代中聚合器、收斂標準同樣適用。
微信掃碼關注公眾號:Apache_Flink
QQ掃碼關注QQ群:Apache Flink學習交流群(123414680)