Flink-CEP之帶版本的共享緩衝區
帶版本的共享緩衝區
當股票模式以一個事件流作為輸入時,狀態轉換將會作用於事件流從而引起事件的狀態變化。結合視窗對參與匹配的事件的限制以及模式中結合事件上下文(狀態)的過濾條件,同一事件流隨著時間的流動或者多次執行都會產生多種不同的匹配結果。在此我們為示例模式構建了一個事件流以及其可能產生的三種匹配結果,如下圖:
在事件e6到達後,會產生兩個結果:R1和R2,而結果R3將會在e8到來之後匹配成功。圖中可見R1、R2和R3這三個匹配結果在一些事件上產生了重疊。
為了保留已匹配的結果,需要將匹配結果中包含的事件儲存起來,這種資料結構在論文中稱之為緩衝區。首先,初步的解決方案是為獨立匹配而設計緩衝區,在緩衝區中為了讓不同的狀態儲存不同的事件,每個狀態對應一個棧空間(除了最終態),針對上面三個匹配的獨立緩衝區如下圖a-c所示:
上圖中的a-c描述了儲存R1-R3三個匹配結果的獨立緩衝區。每個棧包含事件和指向事件的指標,它們通常是因為“begin”或者“take”狀態轉換而被加入到緩衝區中。每個事件有一個前置指標指向之前被選擇的事件,之前的事件要麼在相同的棧中要麼在之前的棧中。當一個事件被加入到緩衝區中,它的指標也一同被設定,在緩衝區中從該事件開始沿著前置指標的一次遍歷將能檢索到完整的匹配。
為每個匹配單獨構建緩衝區,從技術實現上來看是沒有問題的,但隨著事件的流入,模式的匹配結果也將會變得更多,從而導致緩衝區的數量也極具上升。為了避免緩衝區、棧的數目過多以及在棧中頻繁地複製事件,一種優化措施是將這些獨立的緩衝區合併為單一共享的緩衝區。這個過程最終是基於合併這些獨立緩衝區中相應的棧來實現的。為了在遍歷時找到匹配的事件流,合併棧中相同的事件時必須保留他們的前置指標,這一步是整個優化措施的關鍵,如果草率地合併這些棧中的事件,在共享緩衝區中沿著這些已存在的指標所進行的遍歷將會導致錯誤的結果。舉個例子,假設我們將R2的a[i]棧中的e4元素以及b棧中的e6元素與R3緩衝區裡的a[i]棧以及b棧合併(來達到合併R1和R2緩衝區的目的),從e6開始的一次遍歷會產生包含:e1,e2,e3,e4和e6元素的結果,而這是一個錯誤的結果。產生這一問題的原因是因為在合併的過程中,沒有區分來自不同緩衝區中的不同指標。
為了解決這個問題,
在程式實現上,Flink定義了一個DeweyNumber類來表示這種點分十進位制形式的版本號,其內部使用陣列來儲存“點分”的每一個數值並提供了幾個對版本號操作的方法。比如,增加版本號:
public DeweyNumber increase() {
//原樣拷貝出一個新陣列
int[] newDeweyNumber = Arrays.copyOf(deweyNumber, deweyNumber.length);
//最後一個數值加一
newDeweyNumber[deweyNumber.length - 1]++;
//構建新陣列
return new DeweyNumber(newDeweyNumber);
}
進入到一個新狀態,將新增一位版本號:
public DeweyNumber addStage() {
//拷貝原先陣列的資料,並將陣列的容量加1
int[] newDeweyNumber = Arrays.copyOf(deweyNumber, deweyNumber.length + 1);
return new DeweyNumber(newDeweyNumber);
}
以及檢測當前DeweyNumber與另一個DeweyNumber是否相容:
public boolean isCompatibleWith(DeweyNumber other) {
//當前的數值數目多於另一個,則從頭開始比對,字首必須完全相等
if (length() > other.length()) {
for (int i = 0; i < other.length(); i++) {
if (other.deweyNumber[i] != deweyNumber[i]) {
return false;
}
}
return true;
}
// 數值數目相等,前n-1個必須相等,最後一個數值,當前的必須比另一個大
else if (length() == other.length()) {
int lastIndex = length() - 1;
for (int i = 0; i < lastIndex; i++) {
if (other.deweyNumber[i] != deweyNumber[i]) {
return false;
}
}
return deweyNumber[lastIndex] >= other.deweyNumber[lastIndex];
}
//如果當前數值數目比另一個的數值數目少,則明顯不相容
else {
return false;
}
}
�一個帶版本的共享緩衝區合併那三個獨立緩衝區後的結果如上圖中的圖(d)所示,所有來自單個緩衝區的指標現在都被標記了相容的版本號。而之前提到的那個因為不具備版本號而導致遍歷產生錯誤結果的問題在這裡也將不再出現,因為從e6指向e4版本號2.0.0跟e4指向e3(處於a[i]棧中)的版本號1.0不相容,而只有版本號相容,遍歷才會繼續。帶有版本號的緩衝區對所有的匹配提供簡潔的編碼,並且被標記了相容版本號的指標和事件構建了一個滿足恰好匹配一次的帶版本的檢視。為了返回一次匹配成功的結果,檢索演算法會沿著相容指標從棧中最近的事件開始遍歷。
版本號的實現以及理論分析完成之後,我們來看程式碼中如何實現這個共享緩衝區。SharedBuffer就是這一資料結構的實現,它是一個巢狀多層略顯複雜的資料結構。一個SharedBuffer包含一個鍵與SharedBufferPage的對映(Map):
SharedBufferPage表示一組擁有相同鍵的元素的儲存。但是元素也是由對映構成的,該對映的鍵是ValueTimeWrapper型別,而值為SharedBufferEntry型別:
其中ValueTimeWarpper類似於一個封裝了值和時間戳的二元組。對於SharedBufferEntry,它儲存了一組關聯著的SharedBufferEdge。SharedBufferEdge包含指向目標SharedBufferEntry的指標(多個SharedBufferEntry之間的邊)以及邊上的版本號DeweyNumber。因此,SharedBuffer的整體檢視如下所示:
從類圖上來展示它們之間的關聯關係如下圖:
從上面的關係圖可見,往SharedBuffer中新增元素如果有前置元素將會涉及到跟前置元素的SharedBufferEntry構建關聯關係。因此對於設定元素的put方法被分成了兩種情況:
- 無前置元素:不需要處理跟之前元素的關係,也不需要初始化對應的SharedBufferEntry;
- 有前置元素:需要提供前置元素的資訊,並在內部查詢到前置元素所對應的SharedBufferEntry,然後再構建ValueTimeWrapper與SharedBufferEntry的對映關係。
通常SharedBuffer如果使用者的模式配置了時間視窗,那麼它會基於視窗長度來對過期元素進行清理。提供該服務的方法是:prune。該方法會在每個page上進行prune。而在page上的prune則會對其內部Map的每一項的ValueTimeWrapper的時間戳進行比對,凡是小於等於清理時間戳的元素,都予以清理。
另外,由<key, value, timestamp>結合所對映到的SharedBufferEntry,可能會被多次引用(如之前三次匹配中的e4),SharedBuffer採用的是引用計數機制(它是一種資源回收時常用的機制)來標記引用次數。具體而言是由lock、release以及remove這三個方法共同組合來完成這一功能的,而引用計數器實現在SharedBufferEntry上。當然,在刪除該SharedBufferEntry時需要一併清除它被其他SharedBufferEdge的引用關係。
為了基於版本號提取某個匹配的的所有元素,Flink定義了一個ExtractionState來儲存提取狀態的資訊,該資料結構內部以棧結構來儲存向前遍歷的整個路徑。下面我們來分析一下,SharedBuffer是如何提取模式的匹配元素,該邏輯被封裝在方法extractPatterns中:
public Collection<LinkedHashMultimap<K, V>> extractPatterns(
final K key,
final V value,
final long timestamp,
final DeweyNumber version) {
Collection<LinkedHashMultimap<K, V>> result = new ArrayList<>();
//構建一個棧來記住當前提取的狀態
Stack<ExtractionState<K, V>> extractionStates = new Stack<>();
//為了構建前置關係,根據鍵、值以及時間戳獲得首個共享緩衝區項
SharedBufferEntry<K, V> entry = get(key, value, timestamp);
//如果記錄項存在
if (entry != null) {
//根據記錄項,首先構建一個提取狀態加入棧
extractionStates.add(new ExtractionState<K, V>(entry, version, new Stack<SharedBufferEntry<K, V>>()));
//當提取狀態的棧不為空時,使用深度優先的搜尋來重構之前的關係
while (!extractionStates.isEmpty()) {
//出棧一個物件
ExtractionState<K, V> extractionState = extractionStates.pop();
//獲得其版本號
DeweyNumber currentVersion = extractionState.getVersion();
//獲得其棧來儲存當前路徑,深度優先搜尋
Stack<SharedBufferEntry<K, V>> currentPath = extractionState.getPath();
//終止條件:某個提取狀態的版本號為單一數值,說明深度搜索已到達頭狀態
if (currentVersion.length() == 1) {
LinkedHashMultimap<K, V> completePath = LinkedHashMultimap.create();
//出棧構建正向的完整路徑儲存到LinkedHashMultimap中,並加入到結果集
while(!currentPath.isEmpty()) {
SharedBufferEntry<K, V> currentEntry = currentPath.pop();
completePath.put(currentEntry.getKey(), currentEntry.getValueTime().getValue());
}
result.add(completePath);
} else {
SharedBufferEntry<K, V> currentEntry = extractionState.getEntry();
//追加到路徑中
currentPath.push(currentEntry);
boolean firstMatch = true;
//從當前記錄項開始探索與其關聯的邊,檢測版本是否相容
for (SharedBufferEdge<K, V> edge : currentEntry.getEdges()) {
// 如果版本號相容
if (currentVersion.isCompatibleWith(edge.getVersion())) {
//首次匹配,構建提取狀態並直接加入棧中,後續匹配需要為提取狀態構建新的路徑棧,通過深度拷貝路徑
//因為除了首次匹配路徑唯一之外,後續的匹配路徑都可能不一致,因此不能共享狀態
if (firstMatch) {
extractionStates.push(
new ExtractionState<K, V>(edge.getTarget(), edge.getVersion(), currentPath));
firstMatch = false;
} else {
Stack<SharedBufferEntry<K, V>> copy = new Stack<>();
copy.addAll(currentPath);
extractionStates.push(
new ExtractionState<K, V>(edge.getTarget(), edge.getVersion(), copy));
}
}
}
}
}
}
return result;
}
注意,上面程式碼段中有兩個棧:
- extractionStates:型別為Stack<ExtractionState<K, V>>,對當前處理狀態壓棧,輔助深度遍歷;
- currentPath:型別為Stack<SharedBufferEntry<K, V>>,儲存匹配模式中各狀態的“路徑”因為是從後往前遍歷,所以恰好適合用棧來儲存,出棧時正好是順序的。
微信掃碼關注公眾號:Apache_Flink
QQ掃碼關注QQ群:Apache Flink學習交流群(123414680)