Apache Storm 官方文件 —— Trident API 概述
Trident 的核心資料模型是“流”(Stream),不過與普通的拓撲不同的是,這裡的流是作為一連串 batch 來處理的。流是分佈在叢集中的不同節點上執行的,並且對流的操作也是在流的各個 partition 上並行執行的。
Trident 中有 5 類操作:
- 針對每個小分割槽(partition)的本地操作,這類操作不會產生網路資料傳輸;
- 針對一個數據流的重新分割槽操作,這類操作不會改變資料流中的內容,但是會產生一定的網路傳輸;
- 通過網路資料傳輸進行的聚合操作;
- 針對資料流的分組操作;
- 融合與聯結操作。
本地分割槽操作
本地分割槽操作是在每個分割槽塊上獨立執行的操作,其中不涉及網路資料傳輸。
函式
函式負責接收一個輸入域的集合並選擇輸出或者不輸出 tuple。輸出 tuple 的域會被新增到原始資料流的輸入域中。如果一個函式不輸出 tuple,那麼原始的輸入 tuple 就會被直接過濾掉。否則,每個輸出 tuple 都會複製一份輸入 tuple 。假設你有下面這樣的函式:
public class MyFunction extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { for(int i=0; i < tuple.getInteger(0); i++) { collector.emit(new Values(i)); } } }
再假設你有一個名為 “mystream” 的資料流,這個流中包含下面幾個 tuple,每個 tuple 中包含有 “a”、“b”、“c” 三個域:
[1, 2, 3]
[4, 1, 6]
[3, 0, 8]
如果你執行這段程式碼:
mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))
那麼最終輸出的結果 tuple 就會包含有 “a”、“b”、“c”、“d” 4 個域,就像下面這樣:
[1, 2, 3, 0]
[1, 2, 3, 1]
[4, 1, 6, 0]
過濾器
過濾器負責判斷輸入的 tuple 是否需要保留。以下面的過濾器為例:
public class MyFilter extends BaseFilter { public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2; } }
通過使用這段程式碼:
mystream.each(new Fields("b", "a"), new MyFilter())
就可以將下面這樣帶有 “a”、“b”、“c” 三個域的 tuple
[1, 2, 3]
[2, 1, 1]
[2, 3, 4]
最終轉化成這樣的結果 tuple:
[2, 1, 1]
partitionAggregate
partitionAggregate
會在一批 tuple 的每個分割槽上執行一個指定的功能操作。與上面的函式不同,由 partitionAggregate
傳送出的 tuple 會將輸入 tuple 的域替換。以下面這段程式碼為例:
mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
假如輸入流中包含有 “a”、“b” 兩個域並且有以下幾個 tuple 塊:
Partition 0:
["a", 1]
["b", 2]
Partition 1:
["a", 3]
["c", 8]
Partition 2:
["e", 1]
["d", 9]
["d", 10]
經過上面的程式碼之後,輸出就會變成帶有一個名為 “sum” 的域的資料流,其中的 tuple 就是這樣的:
Partition 0:
[3]
Partition 1:
[11]
Partition 2:
[20]
Storm 有三個用於定義聚合器的介面:CombinerAggregator
,ReducerAggregator
以及 Aggregator
。
這是 CombinerAggregator
介面:
public interface CombinerAggregator<T> extends Serializable { T init(TridentTuple tuple); T combine(T val1, T val2); T zero(); }
CombinerAggregator
會將帶有一個域的一個單獨的 tuple 返回作為輸出。CombinerAggregator
會在每個輸入 tuple 上執行初始化函式,然後使用組合函式來組合所有輸入的值。如果在某個分割槽中沒有 tuple, CombinerAggregator
就會輸出zero
方法的結果。例如,下面是 Count
的實現程式碼:
public class Count implements CombinerAggregator<Long> { public Long init(TridentTuple tuple) { return 1L; } public Long combine(Long val1, Long val2) { return val1 + val2; } public Long zero() { return 0L; } }
如果你使用 aggregate 方法來代替 partitionAggregate 方法,你就會發現 CombinerAggregator
的好處了。在這種情況下,Trident 會在傳送 tuple 之前通過分割槽聚合操作來優化計算過程。
ReducerAggregator
的介面實現是這樣的:
public interface ReducerAggregator<T> extends Serializable { T init(); T reduce(T curr, TridentTuple tuple); }
ReducerAggregator
會使用 init
方法來產生一個初始化的值,然後使用該值對每個輸入 tuple 進行遍歷,並最終生成並輸出一個單獨的 tuple,這個 tuple 中就包含有我們需要的計算結果值。例如,下面是將 Count 定義為 ReducerAggregator
的程式碼:
public class Count implements ReducerAggregator<Long> { public Long init() { return 0L; } public Long reduce(Long curr, TridentTuple tuple) { return curr + 1; } }
ReducerAggregator
同樣可以用於 persistentAggregate,你會在後面看到這一點。
最常用的聚合器介面還是下面的 Aggregator
介面:
public interface Aggregator<T> extends Operation { T init(Object batchId, TridentCollector collector); void aggregate(T state, TridentTuple tuple, TridentCollector collector); void complete(T state, TridentCollector collector); }
Aggregator
聚合器可以生成任意數量的 tuple,這些 tuple 也可以帶有任意數量的域。聚合器可以在執行過程中的任意一點輸出tuple,他們的執行過程是這樣的:
- 在處理一批資料之前先呼叫 init 方法。init 方法的返回值是一個代表著聚合狀態的物件,這個物件接下來會被傳入 aggregate 方法和 complete 方法中。
- 對於一個區塊中的每個 tuple 都會呼叫 aggregate 方法。這個方法能夠更新狀態並且有選擇地輸出 tuple。
- 在區塊中的所有 tuple 都被 aggregate 方法處理之後就會呼叫 complete 方法。
下面是使用 Count 作為聚合器的程式碼:
public class CountAgg extends BaseAggregator<CountState> { static class CountState { long count = 0; } public CountState init(Object batchId, TridentCollector collector) { return new CountState(); } public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) { state.count+=1; } public void complete(CountState state, TridentCollector collector) { collector.emit(new Values(state.count)); } }
有時你可能會需要同時執行多個聚合操作。這個過程叫做鏈式處理,可以使用下面這樣的程式碼來實現:
mystream.chainedAgg() .partitionAggregate(new Count(), new Fields("count")) .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum")) .chainEnd()
這段程式碼會在每個分割槽上分別執行 Count 和 Sum 聚合器,而輸出中只會包含一個帶有 [“count”, “sum”] 域的單獨的 tuple。
stateQuery 與 partitionPersist
stateQuery 與 partitionPersist 會分別查詢、更新 state 資料來源。你可以參考 Trident State 文件 來了解如何使用它們。
projection
projection
方法只會保留操作中指定的域。如果你有一個帶有 [“a”, “b”, “c”, “d”] 域的資料流,通過執行這段程式碼:
mystream.project(new Fields("b", "d"))
就會使得輸出資料流中只包含有 [“b”, “d”] 域。
重分割槽操作
重分割槽操作會執行一個用來改變在不同的任務間分配 tuple 的方式的函式。在重分割槽的過程中分割槽的數量也可能會發生變化(例如,重分割槽之後的並行度就有可能會增大)。重分割槽會產生一定的網路資料傳輸。下面是重分割槽操作的幾個函式:
- shuffle:通過隨機輪詢演算法來重新分配目標區塊的所有 tuple。
- broadcast:每個 tuple 都會被複制到所有的目標區塊中。這個函式在 DRPC 中很有用 —— 比如,你可以使用這個函式來獲取每個區塊資料的查詢結果。
- partitionBy:該函式會接收一組域作為引數,並根據這些域來進行分割槽操作。可以通過對這些域進行雜湊化,並對目標分割槽的數量取模的方法來選取目標區塊。partitionBy 函式能夠保證來自同一組域的結果總會被髮送到相同的目標區間。
- global:這種方式下所有的 tuple 都會被髮送到同一個目標分割槽中,而且資料流中的所有的塊都會由這個分割槽處理。
- batchGlobal:同一個 batch 塊中的所有 tuple 會被髮送到同一個區塊中。當然,在資料流中的不同區塊仍然會分配到不同的區塊中。
- partition:這個函式使用自定義的分割槽方法,該方法會實現
backtype.storm.grouping.CustomStreamGrouping
介面。
聚類操作
Trident 使用 aggregate 方法和 persistentAggregate 方法來對資料流進行聚類操作。其中,aggregate 方法會分別對資料流中的每個 batch 進行處理,而 persistentAggregate 方法則會對資料流中的所有 batch 執行聚類處理,並將結果存入某個 state 中。
在資料流上執行 aggregate 方法會執行一個全域性的聚類操作。在你使用 ReducerAggregator
或者 Aggregator
時,資料流首先會被重新分割槽成一個單獨的分割槽,然後聚類函式就會在該分割槽上執行操作。而在你使用 CombinerAggregator
時,Trident 首先會計算每個分割槽的部分聚類結果,然後將這些結果重分割槽到一個單獨的分割槽中,最後在網路資料傳輸完成之後結束這個聚類過程。CombinerAggregator
比其他的聚合器的執行效率更高,在聚類時應該儘可能使用CombinerAggregator
。
下面是一個使用 aggregate 來獲取一個 batch 的全域性計數值的例子:
mystream.aggregate(new Count(), new Fields("count"))
與 partitionAggregate 一樣,aggregate 的聚合器也可以進行鏈式處理。然而,如果你在一個處理鏈中同時使用了CombinerAggregator
和非 CombinerAggregator
,Trident 就不能對部分聚類操作進行優化了。
想要了解更多使用 persistentAggregate 的方法,可以參考 Trident State 文件 一文。
對分組資料流的操作
通過對指定的域執行 partitionBy 操作,groupBy 操作可以將資料流進行重分割槽,使得相同的域的 tuple 分組可以聚集在一起。例如,下面是一個 groupBy 操作的示例:
如果你在分組資料流上執行聚合操作,聚合器會在每個分組(而不是整個區塊)上執行。persistentAggregate 同樣可以在一個分組資料裡上執行,這種情況下聚合結果會儲存在 MapState 中,其中的 key 就是分組的域名。
和其他操作一樣,對分組資料流的聚合操作也可以以鏈式的方式執行。
融合(Merge)與聯結(join)
Trident API 的最後一部分是聯結不同的資料流的操作。聯結資料流最簡單的方式就是將所有的資料流融合到一個流中。你可以使用 TridentTopology 的 merge 方法實現該操作,比如這樣:
topology.merge(stream1, stream2, stream3);
Trident 會將融合後的新資料流的域命名為為第一個資料流的輸出域。
聯結資料流的另外一種方法是使用 join。像 SQL 那樣的標準 join 操作只能用於有限的輸入資料集,對於無限的資料集就沒有用武之地了。Trident 中的 join 只會應用於每個從 spout 中輸出的小 batch。
下面是兩個流的 join 操作的示例,其中一個流含有 [“key”, “val1”, “val2”] 域,另外一個流含有 [“x”, “val1”] 域:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
上面的例子會使用 “key” 和 “x” 作為 join 的域來聯結 stream1 和 stream2。Trident 要求先定義好新流的輸出域,因為輸入流的域可能會覆蓋新流的域名。從 join 中輸出的 tuple 中會包含:
- join 域的列表。在這個例子裡,輸出的 “key” 域與 stream1 的 “key” 域以及 stream2 的 “x” 域對應。
- 來自所有流的非 join 域的列表。這個列表是按照傳入 join 方法的流的順序排列的。在這個例子裡,“ a” 和 “b” 域與 stream1 的 “val1” 和 “val2” 域對應;而 “c” 域則與 stream2 的 “val1” 域相對應。
在對不同的 spout 傳送出的流進行 join 時,這些 spout 上會按照他們傳送 batch 的方式進行同步處理。也就是說,一個處理中的 batch 中含有每個 spout 傳送出的 tuple。
到這裡你大概仍然會對如何進行視窗 join 操作感到困惑。視窗操作(包括平滑視窗、滾動視窗等 —— 譯者注)主要是指將當前的 tuple 與過去若干小時時間段內的 tuple 聯結起來的過程。
你可以使用 partitionPersist 和 stateQuery 來實現這個過程。過去一段時間內的 tuple 會以 join 域為關鍵字被儲存到一個 state 源中。然後就可以使用 stateQuery 查詢 join 域來實現這個“聯結”(join)的過程。