Apache Flink中的廣播狀態實用指南
感謝英文原文作者:https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink
不過,原文最近好像不能訪問了。應該是https://www.da-platform.com/網站移除了blog板塊了。
從版本1.5.0開始,Apache FlinkⓇ具有一種新的狀態,稱為廣播狀態。
在這篇文章中,我們解釋了廣播狀態是什麼,並展示瞭如何將其應用於評估事件流上的動態模式的應用程式的示例。我們將引導您完成處理步驟和原始碼,以實現此應用程式。
什麼是廣播狀態?
廣播狀態可用於以特定方式組合和聯合處理兩個事件流。第一個流的事件被廣播到運營商的所有並行例項,這些例項將它們維持為狀態。
不廣播另一個流的事件,而是將其傳送到同一運營商的各個例項,並與廣播流的事件一起處理。
新的廣播狀態非常適合需要加入低吞吐量和高吞吐量流或需要動態更新其處理邏輯的應用程式。
我們將使用後一個用例的具體示例來解釋廣播狀態,並在本文的其餘部分更詳細地展示其API。
廣播狀態下的動態模式評估
想象一下,一個電子商務網站將所有使用者的互動捕獲為使用者操作流。運營該網站的公司有興趣分析互動以增加收入,改善使用者體驗,以及檢測和防止惡意行為。
該網站實現了一個流應用程式,用於檢測使用者事件流上的模式。
但是,公司希望每次模式更改時都避免修改和重新部署應用程式。相反,應用程式在從模式流接收新模式時攝取第二個模式流並更新其活動模式。
在下文中,我們將逐步討論此應用程式,並展示它如何利用Apache Flink中的廣播狀態功能。
我們的示例應用程式攝取了兩個資料流。
第一個流在網站上提供使用者操作,並在上圖的左上方顯示。使用者互動事件包括操作的型別(使用者登入,使用者登出,新增到購物車或完成付款)和使用者的ID,其由顏色編碼。
圖示中的使用者動作事件流包含使用者1001的登出動作,其後是使用者1003的支付完成事件,以及使用者1002的“新增到購物車”動作。
第二流提供應用將執行的動作模式。評估。模式由兩個連續的動作組成。在上圖中,模式流包含以下兩個:
- 模式#1:使用者登入並立即登出而無需瀏覽電子商務網站上的其他頁面。
- 模式#2:使用者將商品新增到購物車並在不完成購買的情況下注銷。
這些模式有助於企業更好地分析使用者行為,檢測惡意行為並改善網站體驗。
例如,如果專案被新增到購物車而沒有後續購買,網站團隊可以採取適當的措施來更好地瞭解使用者未完成購買的原因並啟動特定程式以改善網站轉換(如提供折扣程式碼,限時免費送貨優惠等)
在右側,該圖顯示了操作員的三個並行任務,即攝取模式和使用者操作流,評估操作流上的模式,並在下游發出模式匹配。
為簡單起見,我們示例中的運算子僅評估具有兩個後續操作的單個模式。當從模式流接收到新模式時,替換當前活動模式。
原則上,還可以實現運算子以同時評估更復雜的模式或多個模式,這些模式可以單獨新增或移除。
我們將描述模式匹配應用程式如何處理使用者操作和模式流。
首先,將模式傳送給操作員。該模式被廣播到運營商的所有三個並行任務。任務將模式儲存在其廣播狀態中。由於廣播狀態只應使用廣播資料進行更新,因此所有任務的狀態始終預期相同。
接下來,第一個使用者操作按使用者ID分割槽併發送到操作員任務。分割槽可確保同一使用者的所有操作都由同一任務處理。上圖顯示了操作員任務消耗第一個模式和前三個操作事件後應用程式的狀態。
當任務收到新的使用者操作時,它會通過檢視使用者的最新和先前操作來評估當前活動的模式。
對於每個使用者,操作員將先前的操作儲存在鍵控狀態。由於上圖中的任務到目前為止僅為每個使用者收到了一個操作(我們剛剛啟動了應用程式),因此不需要評估該模式。
最後,使用者鍵控狀態中的先前操作被更新為最新動作,以便能夠在同一使用者的下一個動作到達時查詢它。
在處理前三個動作之後,下一個事件(使用者1001的登出動作)被運送到處理使用者1001的事件的任務。
當任務接收到動作時,它從廣播狀態中查詢當前模式並且使用者1001的先前操作。由於模式匹配兩個動作,因此任務發出模式匹配事件。
最後,任務通過使用最新操作覆蓋上一個事件來更新其鍵控狀態。
當新模式到達模式流時,它被廣播到所有任務,並且每個任務通過用新模式替換當前模式來更新其廣播狀態。
一旦用新模式更新廣播狀態,匹配邏輯就像之前一樣繼續,即,使用者動作事件由金鑰分割槽並由負責任務評估。
如何使用廣播狀態實現應用程式?
到目前為止,我們在概念上討論了該應用程式並解釋了它如何使用廣播狀態來評估事件流上的動態模式。
接下來,我們將展示如何使用Flink的DataStream API和廣播狀態功能實現示例應用程式。
讓我們從應用程式的輸入資料開始。我們有兩個資料流,操作和模式。
在這一點上,我們並不關心流來自何處。這些流可以從Apache Kafka或Kinesis或任何其他系統中攝取。並與各兩個欄位的POJO:
DataStream<Action> actions = ???
DataStream<Pattern> patterns = ???
Action
Pattern
Action
:Long userId
,String action
Pattern
:,String firstAction
String secondAction
作為第一步,我們在屬性上鍵入操作流。接下來,我們準備廣播狀態。廣播狀態始終表示為 Flink提供的最通用的狀態原語。
由於我們的應用程式一次只評估和儲存一個,我們將廣播狀態配置為具有鍵型別和值型別。該總是儲存在與關鍵。
使用 廣播狀態,我們在流上應用轉換並接收a 。在我們獲得了金鑰 流和廣播流之後,我們都流式傳輸並應用了一個userId
KeyedStream<Action, Long> actionsByUser = actions .keyBy((KeySelector<Action, Long>) action -> action.userId); MapState MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>( "patterns", Types.VOID, Types.POJO(Pattern.class)); PatternMapStateVoidPatternPatternMapStatenull BroadcastStream<Pattern> bcedPatterns = patterns.broadcast(bcStateDescriptor); MapStateDescriptorbroadcast()patternsBroadcastStream bcedPatterns DataStream<Tuple2<Long, Pattern>> matches = actionsByUser .connect(bcedPatterns) .process(new PatternEvaluator());
PatternEvaluator
是一個實現介面的自定義函式。
它應用我們之前討論過的模式匹配邏輯,併發出包含使用者ID和匹配模式的記錄。該 介面提供了三種處理記錄和發出結果的方法。
public static class PatternEvaluator extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long, Pattern>> { // handle for keyed state (per user) ValueState<String> prevActionState; @Override public void open(Configuration conf) { // initialize keyed state prevActionState = getRuntimeContext().getState( new ValueStateDescriptor<>("lastAction", Types.STRING));</code } /** * Called for each user action. * Evaluates the current pattern against the previous and * current action of the user. */ @Override public void processElement( Action action, ReadOnlyContext ctx, Collector<Tuple2<Long, Pattern>> out) throws Exception { // get current pattern from broadcast state Pattern pattern = ctx .getBroadcastState( new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))) // access MapState with null as VOID default value .get(null); // get previous action of current user from keyed state String prevAction = prevActionState.value(); if (pattern != null && prevAction != null) { // user had an action before, check if pattern matches if (pattern.firstAction.equals(prevAction) && pattern.secondAction.equals(action.action)) { // MATCH out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern)); } } // update keyed state and remember action for next pattern evaluation prevActionState.update(action.action); } /** * Called for each new pattern. * Overwrites the current pattern with the new pattern. */ @Override public void processBroadcastElement( Pattern pattern, Context ctx, Collector<Tuple2<Long, Pattern>> out) throws Exception { // store the new pattern by updating the broadcast state BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))); // storing in MapState with null as VOID default value bcState.put(null, pattern); } }
KeyedBroadcastProcessFunction
processBroadcastElement()
為廣播流的每個記錄呼叫。在我們的 函式中,我們只是使用鍵將接收到的記錄放入廣播狀態(記住,我們只儲存一個模式)。PatternEvaluator
Pattern
null
MapState
processElement()
為鍵控流的每個記錄呼叫。它提供對廣播狀態的只讀訪問,以防止修改導致跨函式的並行例項的不同廣播狀態。 從廣播狀態檢索當前模式的方法和從鍵控狀態檢索使用者的先前動作。如果兩者都存在,則檢查先前和當前操作是否與模式匹配,並且如果是這種情況則發出模式匹配記錄。最後,它將鍵控狀態更新為當前使用者操作。processElement()
PatternEvaluator
onTimer()
在先前註冊的計時器觸發時呼叫。定時器可以在任何處理方法中註冊,並用於執行計算或將來清理狀態。我們在示例中沒有實現此方法以保持程式碼簡潔。但是,當用戶在一段時間內未處於活動狀態時,它可用於刪除使用者的最後一個操作,以避免由於非活動使用者而導致狀態增長。
您可能已經注意到了處理方法的上下文物件。上下文物件提供對其他功能的訪問,例如KeyedBroadcastProcessFunction
- 廣播狀態(讀寫或只讀,取決於方法),
- A,可以訪問記錄的時間戳,當前的水印,以及可以註冊計時器,
TimerService
- 當前金鑰(僅適用於 ),和
processElement()
- 一種將函式應用於每個註冊金鑰的鍵控狀態的方法(僅適用於)
processBroadcastElement()
在具有就像任何其他ProcessFunction完全進入狀態弗林克和時間特性,因此可以用來實現複雜的應用程式邏輯。廣播狀態被設計為適應不同場景和用例的多功能特性。雖然我們只討論了一個相當簡單且受限制的應用程式,但您可以通過多種方式使用廣播狀態來實現應用程式的要求。KeyedBroadcastProcessFunction
結論
在這篇博文中,我們向您介紹了一個示例應用程式,以解釋Apache Flink的廣播狀態以及它如何用於評估事件流上的動態模式。我們還討論了API並展示了我們的示例應用程式的原始碼。
我們邀請您檢視Apache Flink網站上的文件,並通過Apache Flink 郵件列表提供反饋或建議以進一步改進。