Flink流處理之迭代案例
當前Flink將迭代的重心集中在批處理上,之前我們談及了批量迭代和增量迭代主要是針對批處理(DataSet)API而言的,並且Flink為批處理中的迭代提供了針對性的優化。但是對於流處理(DataStream),Flink同樣提供了對迭代的支援,這一節我們主要來分析流處理中的迭代,我們將會看到流處理中的迭代相較於批處理有相似之處,但差異也是十分之明顯。
可迭代的流處理程式允許定義“步函式”(step function)並將其內嵌到一個可迭代的流(IterativeStream)中。因為一個流處理程式可能永不終止,因此不同於批處理中的迭代機制,流處理中無法設定迭代的最大次數。取而代之的是,你可以指定等待反饋輸入的最大時間間隔(如果超過該時間間隔沒有反饋元素到來,那麼該迭代將會終止)。通過應用split或filter轉換,你可以指定流的哪一部分用於反饋給迭代頭,哪一部分分發給下游。這裡我們以filter作為示例來展示可迭代的流處理程式的API使用模式。
首先,基於輸入流構建IterativeStream,這是一個迭代的起始,通常稱之為迭代頭:
IterativeStream<Integer> iteration = inputStream.iterate();
接著,我們指定一系列的轉換操作用於表述在迭代過程中執行的邏輯(這裡簡單以map轉換作為示例),map API所接受的UDF就是我們上文所說的步函式:
DataStream<Integer> iteratedStream = iteration.map(/* this is executed many times */);
然後,作為迭代我們肯定需要有資料反饋給迭代頭進行重複計算,所以我們從迭代過的流中過濾出符合條件的元素組成的部分流,我們稱之為反饋流:
DataStream<Integer> feedbackStream = iteratedStream.filter(/* one part of the stream */);
將反饋流反饋給迭代頭就意味著一個迭代的完整邏輯的完成,那麼它就可以“關閉”這個閉合的“環”了。通過呼叫IterativeStream的closeWith這一例項方法可以關閉一個迭代(也可表述為定義了迭代尾)。傳遞給closeWith的資料流將會反饋給迭代頭:
iteration.closeWith(feedbackStream);
另外,一個慣用的模式是過濾出需要繼續向前分發的部分流,這個過濾轉換其實定義的是“終止迭代”的邏輯條件,符合條件的元素將被分發給下游而不用於進行下一次迭代:
DataStream<Integer> output = iteratedStream.filter(/* some other part of the stream */);
跟分析批處理中的迭代一樣,我們仍然以解決實際問題的案例作為切入點來看看流處理中的迭代跟批處理中的迭代有何不同。
首先描述一下需要解決的問題:產生一個由一系列二元組(兩個欄位都是在一個區間內產生的正整數來作為斐波那契數列的兩個初始值)構成的資料流,然後對該資料流中的二元組不斷地迭代使其產生斐波那契數列,直到某次產生的值大於給定的閾值,則停止迭代並輸出迭代次數。
該案例參考自Flink隨原始碼釋出的迭代示例,此案例問題規模較小並且能夠說明問題。但它示例程式碼中的一系列變數稍顯混亂,為了增強程式的表述性,筆者會對其稍作調整。
這個案例如果拆分到對單個元素(二元組)的角度來看,其執行過程如下圖所示:
n表示迭代次數,在最初的map轉換中初始化為0;m是判定迭代停止的閾值;
另外,T後面跟的是欄位索引,比如T2表示取元組中位置為3的欄位。且注意隨著迭代T在不斷變化。
上面我們已經對問題的核心過程進行了分析,接下來我們會分步解決這個問題的構建迭代的流處理程式。
首先,我們先通過source函式建立初始的流物件inputStream:
DataStream<Tuple2<Integer, Integer>> inputStream = env.addSource(new RandomFibonacciSource());
該source函式會生成二元組序列,二元組的兩個欄位值是隨機生成的作為斐波那契數列的初始值:
private static class RandomFibonacciSource
implements SourceFunction<Tuple2<Integer, Integer>> {
private Random random = new Random();
private volatile boolean isRunning = true;
private int counter = 0;
public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
while (isRunning && counter < MAX_RANDOM_VALUE) {
int first = random.nextInt(MAX_RANDOM_VALUE / 2 - 1) + 1;
int second = random.nextInt(MAX_RANDOM_VALUE / 2 -1) + 1;
if (first > second) continue;
ctx.collect(new Tuple2<Integer, Integer>(first, second));
counter++;
Thread.sleep(50);
}
}
public void cancel() {
isRunning = false;
}
}
為了對新計算的斐波那契數列中的值以及累加的迭代次數進行儲存,我們需要將二元組資料流轉換為五元組資料流,並據此建立迭代物件:
IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> iterativeStream =
inputStream.map(new TupleTransformMapFunction()).iterate(5000);
注意上面程式碼段中iterate API的引數5000,不是指迭代5000次,而是等待反饋輸入的最大時間間隔為5秒。流被認為是無界的,所以無法像批處理迭代那樣指定最大迭代次數。但它允許指定一個最大等待間隔,如果在給定的時間間隔裡沒有元素到來,那麼將會終止迭代。
元組轉換的map函式實現:
private static class TupleTransformMapFunction extends RichMapFunction<Tuple2<Integer,
Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(
Tuple2<Integer, Integer> inputTuples) throws Exception {
return new Tuple5<Integer, Integer, Integer, Integer, Integer>(
inputTuples.f0,
inputTuples.f1,
inputTuples.f0,
inputTuples.f1,
0);
}
}
上面五元組中,其中索引為0,1這兩個位置的元素,始終都是最初生成的兩個元素不會變化,而後三個欄位都會隨著迭代而變化。
在迭代流iterativeStream建立完成之後,我們將基於它執行斐波那契數列的步函式併產生斐波那契數列流fibonacciStream:
DataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> fibonacciStream =
iterativeStream.map(new FibonacciCalcStepFunction());
這裡的fibonacciStream只是一個代稱,其中的資料並不是真正的斐波那契數列,其實就是上面那個五元組。
其中用於計算斐波那契數列的步函式實現如下:
private static class FibonacciCalcStepFunction extends
RichMapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>,
Tuple5<Integer, Integer, Integer, Integer, Integer>> {
public Tuple5<Integer, Integer, Integer, Integer, Integer> map(
Tuple5<Integer, Integer, Integer, Integer, Integer> inputTuple) throws Exception {
return new Tuple5<Integer, Integer, Integer, Integer, Integer>(
inputTuple.f0,
inputTuple.f1,
inputTuple.f3,
inputTuple.f2 + inputTuple.f3,
++inputTuple.f4);
}
}
正如上文所述,後三個欄位會產生變化,在計算之前,數列最後一個元素會被保留,也就是f3對應的元素,然後通過f2元素加上f3元素會產生最新值並更新f3元素,而f4則會累加。
隨著迭代次數增加,不是整個數列都會被保留,只有最初的兩個元素和最新的兩個元素會被保留,這裡也沒必要保留整個數列,因為我們不需要完整的數列,我們只需要對最新的兩個元素進行判斷即可。
上文我們對每個元素計算斐波那契數列的新值併產生了fibonacciStream,但是我們需要對最新的兩個值進行判斷,看它們是否超過了指定的閾值。超過了閾值的元組將會被輸出,而沒有超過的則會再次參與迭代。因此這將產生兩個不同的分支,我們也為此構建了分支流:
SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> branchedStream =
fibonacciStream.split(new FibonacciOverflowSelector());
而對是否超過閾值的元組進行判斷並分離的實現如下:
private static class FibonacciOverflowSelector implements OutputSelector<
Tuple5<Integer, Integer, Integer, Integer, Integer>> {
public Iterable<String> select(
Tuple5<Integer, Integer, Integer, Integer, Integer> inputTuple) {
if (inputTuple.f2 < OVERFLOW_THRESHOLD && inputTuple.f3 < OVERFLOW_THRESHOLD) {
return Collections.singleton(ITERATE_FLAG);
}
return Collections.singleton(OUTPUT_FLAG);
}
}
在篩選方法select中,我們對不同的分支以不同的常量識別符號進行標識:ITERATE_FLAG(還要繼續迭代)和OUTPUT_FLAG(直接輸出)。
產生了分支流之後,我們就可以從中檢出不同的流分支做迭代或者輸出處理。對需要再次迭代的,就通過迭代流的closeWith方法反饋給迭代頭:
iterativeStream.closeWith(branchedStream.select(ITERATE_FLAG));
而對於不需要的迭代就直接讓其流向下游處理,這裡我們只是簡單得將流“重構”了一下然後直接輸出:
DataStream<Tuple3<Integer, Integer, Integer>> outputStream = branchedStream
.select(OUTPUT_FLAG).map(new BuildOutputTupleMapFunction());
outputStream.print();
所謂的重構就是將之前的五元組重新縮減為三元組,實現如下:
private static class BuildOutputTupleMapFunction extends RichMapFunction<
Tuple5<Integer, Integer, Integer, Integer, Integer>,
Tuple3<Integer, Integer, Integer>> {
public Tuple3<Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer,
Integer> inputTuple) throws Exception {
return new Tuple3<Integer, Integer, Integer>(
inputTuple.f0,
inputTuple.f1,
inputTuple.f4);
}
}
最終我們將會得到類似如下的輸出:
(7,14,5)
(18,37,3)
(3,46,3)
(23,32,3)
(31,43,2)
(13,45,2)
(37,42,2)
……
前兩個整數是斐波那契數列的兩個初始值,第三個整數表示其需要經歷多少次迭代其斐波那契數列最新的兩個值才會超過閾值。
最終完整的主幹程式程式碼如下:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment().setBufferTimeout(1);
DataStream<Tuple2<Integer, Integer>> inputStream = env.addSource(new RandomFibonacciSource());
IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> iterativeStream =
inputStream.map(new TupleTransformMapFunction()).iterate(5000);
DataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> fibonacciStream =
iterativeStream.map(new FibonacciCalcStepFunction());
SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> branchedStream =
fibonacciStream.split(new FibonacciOverflowSelector());
iterativeStream.closeWith(branchedStream.select(ITERATE_FLAG));
DataStream<Tuple3<Integer, Integer, Integer>> outputStream = branchedStream
.select(OUTPUT_FLAG).map(new BuildOutputTupleMapFunction());
outputStream.print();
env.execute("Streaming Iteration Example");
}
微信掃碼關注公眾號:Apache_Flink
QQ掃碼關注QQ群:Apache Flink學習交流群(123414680)