1. 程式人生 > >Storm Grouping機制詳解(包含兩篇參考資料)

Storm Grouping機制詳解(包含兩篇參考資料)

參考資料1:
  1. shuffleGrouping

    將流分組定義為混排。這種混排分組意味著來自Spout的輸入將混排,或隨機分發給此Bolt中的任務。shuffle grouping對各個task的tuple分配的比較均勻。

  2. fieldsGrouping

    這種grouping機制保證相同field值的tuple會去同一個task,這對於WordCount來說非常關鍵,如果同一個單詞不去同一個task,那麼統計出來的單詞次數就不對了。

  3. All grouping

    廣播發送, 對於每一個tuple將會複製到每一個bolt中處理。

  4. Global grouping

    Stream中的所有的tuple都會發送給同一個bolt任務處理,所有的tuple將會發送給擁有最小task_id的bolt任務處理。

  5. None grouping

    不關注並行處理負載均衡策略時使用該方式,目前等同於shuffle grouping,另外storm將會把bolt任務和他的上游提供資料的任務安排在同一個執行緒下。

  6. Direct grouping

    由tuple的發射單元直接決定tuple將發射給那個bolt,一般情況下是由接收tuple的bolt決定接收哪個bolt發射的Tuple。這是一種比較特別的分組方法,用這種分組意味著訊息的傳送者指定由訊息接收者的哪個task處理這個訊息。 只有被宣告為Direct Stream的訊息流可以宣告這種分組方法。而且這種訊息tuple必須使用emitDirect方法來發射。訊息處理者可以通過TopologyContext來獲取處理它的訊息的taskid (OutputCollector.emit方法也會返回taskid)

fieldsGrouping

上面的資料我摘抄自:http://xumingming.sinaapp.com/127/twitter-storm%E5%A6%82%E4%BD%95%E4%BF%9D%E8%AF%81%E6%B6%88%E6%81%AF%E4%B8%8D%E4%B8%A2%E5%A4%B1/

如果你瞭解Storm,我想你能明白其中的大多數Grouping。這裡的Grouping策略我想著重介紹一下fieldsGrouping,也最難理解的。

fieldsGrouping是按照資料中欄位Field的值分組的。下面是我的測試程式碼:

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 2); 
builder.setBolt("exclaim2", new DefaultStringBolt(), 5)
        .fieldsGrouping("words", new Fields("word"));

測試的例子Spout是Storm自帶的例子,Blot程式碼如下:

public void execute(Tuple tuple) {
    log.info("rev a message: " + tuple.getString(0));
    collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
    collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
}

Storm自帶的例子Spout能隨機的返回new String[] {“nathan”, “mike”, “jackson”, “golda”, “bertels”};列表中的幾個字串。這也是測試FieldGroup的好例子。

按照我最早做Storm開始前的理解,既然是按照Field分組,那麼是所有相同的Field值得資料都會到達一個Blot的。我測試很多次,其結果並不是這樣,一個Blot會收到多個不同的值。我沒有仔細探究Storm這樣分組有什麼特別的地方,以至於自己對Storm的學習停滯了很長時間。

Storm能保證所有相同Field值的資料到達的是相同的Blot,但是不保證一個Blot只處理一個值域。

也就是說,所有值是nathan能到達到一個Blot,但是到達同一個Blot的值可能有多個,如"nathan”, “mike"的資料都到達。

理解到這點上,fieldsGrouping就算是理解了。

下面是測試日誌:

9144 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels
9234 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
9245 [Thread-33-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
9335 [Thread-26-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda
9346 [Thread-26-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda
9437 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9447 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
9537 [Thread-26-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda
9548 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9639 [Thread-33-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
9649 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9740 [Thread-33-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
9749 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
9841 [Thread-35-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels
9850 [Thread-26-exclaim2] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda

由上面的日誌可以看出,golda這個值的資料,的確歸併到一個Blot處理的。執行緒編號:Thread-26-exclaim2。 其它值也都是相同值都是在一個執行緒內被處理的。

參考資料2:

最近研究Storm的Stream Grouping的時候,對Field Grouping和Shuffle Grouping理解不是很透徹。去看WordCountTopology也不怎麼理解,後來腦洞一開,加了一行程式碼再次執行,徹底頓悟。只能說自己對Storm的基本概念還是沒吃透啊。(WordCountTopology這個例子請自行參考Storm-Starter)

  1. public void execute(Tuple tuple, BasicOutputCollector collector) {  
  2.     String word = tuple.getString(0);  
  3.     // 新增這行程式碼的作用是看看值相等的word是不是同一個例項執行的,實時證明確實如此  
  4.     System.out.println(this + "====" + word);  
  5.     Integer count = counts.get(word);  
  6.     if (count == null)  
  7.         count = 0;  
  8.     count++;  
  9.     counts.put(word, count);  
  10.     collector.emit(new Values(word, count));  
  11. }  

經過反覆測試,下面是我個人的一些總結,如果有缺少或者錯誤我會及時改正。

官方文件裡有這麼一句話:“if the stream is grouped by the “user-id” field, tuples with the same “user-id” will always go to the same task”

一個task就是一個處理邏輯的例項,所以fields能根據tuple stream的id,也就是下面定義的xxx

  1. public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  2.         declarer.declare(new Fields("xxx"));  
  3. }  

xxx所代表的具體內容會由某一個task來處理,並且同一個xxx對應的內容,處理這個內容的task例項是同一個。

關聯到Strom裡面Field的概念


比如說:

bolt第一次emit三個流,即xxx有luonq pangyang qinnl三個值,假設分別建立三個task例項來處理:

  1. luonq -> instance1  
  2. pangyang -> instance2  
  3. qinnl -> instance3  


然後第二次emit四個流,即xxx有luonq qinnanluo py pangyang四個值,假設還是由剛才的三個task例項來處理:

  1. luonq -> instance1  
  2. qinnanluo -> instance2  
  3. py -> instance3  
  4. pangyang -> instance2  


然後第三次emit兩個流,即xxx有py qinnl兩個值,假設還是由剛才的三個task例項來處理:

  1. py -> instance3  
  2. qinnl -> instance3  


最後我們看看三個task例項都處理了哪些值,分別處理了多少次:

instance1: luonq(處理2次)
instance2: pangyang(處理2次) qinnanluo(處理1次)
instance3: qinnl(處理2次) py(處理2次)

結論:
1. emit發出的值第一次由哪個task例項處理是隨機的,此後再次出現這個值,就固定由最初處理他的那個task例項再次處理,直到topology結束

2. 一個task例項可以處理多個emit發出的值

3. 和shuffle Grouping的區別就在於,當emit發出同樣的值時,處理他的task是隨機的