1. 程式人生 > >Storm Trident示例partitionBy

Storm Trident示例partitionBy

fields number val orm 不同 col tails top b-

如下代碼使用partitionBy做repartition, partitionBy即根據相應字段的值按一定算法,把tuple分配到目標partition當中(Target Partition = hash(fields) % (number of target partition)),

相同值會被分配到同一個partition當中,由於不同值有可能出現相同的hash, 根據上面的算法,不同的值,也可能分配到同一個partition中。

省略部分代碼,省略部分可參考:https://blog.csdn.net/nickta/article/details/79666918

FixedBatchSpout spout = new FixedBatchSpout(new Fields("user", "score"), 3,   
                new Values("nickt1", 4),  
                new Values("nickt2", 7),   
                new Values("nickt1", 8),  
                new Values("nickt4", 9),   
                new Values("nickt5", 7),  
                new Values("nickt1", 11),  
                new Values("nickt4", 5)  
                );  
        spout.setCycle(false);  
        TridentTopology topology = new TridentTopology();  
        topology.newStream("spout1", spout)  
                .partitionBy(new Fields("user"))  
                .each(new Fields("user"),new Debug("print:"))  
                .parallelismHint(5);  

  

輸出結果:由輸出,可以看出,nickt1都是在partition1中輸出,nickt4都是在partition4中輸出

<Fri Mar 23 15:21:46 CST 2018[partition2-Thread-53-b-0-executor[35 35]]> DEBUG(print:): [nickt2]
<Fri Mar 23 15:21:46 CST 2018[partition1-Thread-128-b-0-executor[34 34]]> DEBUG(print:): [nickt1]
<Fri Mar 23 15:21:46 CST 2018[partition1-Thread-128-b-0-executor[34 34]]> DEBUG(print:): [nickt1]


<Fri Mar 23 15:21:46 CST 2018[partition1-Thread-128-b-0-executor[34 34]]> DEBUG(print:): [nickt1]
<Fri Mar 23 15:21:46 CST 2018[partition4-Thread-126-b-0-executor[37 37]]> DEBUG(print:): [nickt4]
<Fri Mar 23 15:21:46 CST 2018[partition0-Thread-72-b-0-executor[33 33]]> DEBUG(print:): [nickt5]
<Fri Mar 23 15:21:46 CST 2018[partition4-Thread-126-b-0-executor[37 37]]> DEBUG(print:): [nickt4]

Storm Trident示例partitionBy