Storm Trident示例shuffle¶llelismHint
阿新 • • 發佈:2018-03-23
大並發 extends bool obj 輸出 bsp shuf shu private
-Thread-119-b-0-executor[37 37]]> DEBUG(print:): [john7]
<Fri Mar 23 14:17:13 CST 2018[partition2-Thread-53-b-0-executor[35 35]]> DEBUG(print:): [john4]
<Fri Mar 23 14:17:13 CST 2018[partition3-Thread-116-b-0-executor[36 36]]> DEBUG(print:): [john4]
<Fri Mar 23 14:17:13 CST 2018[partition1-Thread-146-b-0-executor[34 34]]> DEBUG(print:): [john2]
<Fri Mar 23 14:17:13 CST 2018[partition1 -Thread-146-b-0-executor[34 34]]> DEBUG(print:): [john3]
<Fri Mar 23 14:17:13 CST 2018[partition1-Thread-146-b-0-executor[34 34]]> DEBUG(print:): [john7]
<Fri Mar 23 14:17:13 CST 2018[partition0-Thread-78-b-0-executor[33 33]]> DEBUG(print:): [john1]
<Fri Mar 23 14:17:13 CST 2018[partition0-Thread-78-b-0-executor[33 33]]> DEBUG(print:): [john3]
<Fri Mar 23 14:17:13 CST 2018[partition2-Thread-53-b-0-executor[35 35]]> DEBUG(print:): [john1]
<Fri Mar 23 14:17:13 CST 2018[partition2-Thread-53-b-0-executor[35 35]]> DEBUG(print:): [john5]
<Fri Mar 23 14:17:13 CST 2018[partition2-Thread-53-b-0-executor[35 35]]> DEBUG(print:): [john6]
本例包括Storm Trident中shuffle與parallelismHint的使用。
代碼當中包括註釋
import java.util.Date; import java.util.List; import java.util.Map; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.trident.TridentTopology; import org.apache.storm.trident.operation.BaseFilter; import org.apache.storm.trident.operation.TridentOperationContext; import org.apache.storm.trident.testing.FixedBatchSpout; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; public class TridentTest { public static class Debug extends BaseFilter { private static final long serialVersionUID = -3136720361960744881L; private final String name; private int partitionIndex; public Debug() { this(false); } public Debug(boolean useLogger) { this.name = "DEBUG: "; } public Debug(String name) { this.name = "DEBUG(" + name + "): "; } @Override public void prepare(Map conf, TridentOperationContext context) { this.partitionIndex = context.getPartitionIndex(); super.prepare(conf, context); } @Override public boolean isKeep(TridentTuple tuple) { System.out.println("<"+new Date()+"[partition"+partitionIndex+"-"+Thread.currentThread().getName()+"]"+"> "+name + tuple.toString()); return true; } } public static class MyFixedBatchSpout extends FixedBatchSpout { public MyFixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) { super(fields, maxBatchSize, outputs); } @Override public Map<String, Object> getComponentConfiguration() { Config conf = new Config(); // 設置此組件的最大並發度 //conf.setMaxTaskParallelism(1); return conf; } } public static StormTopology buildTopology() { //FixedBatchSpout, 發射出兩個字段,user與score, 一個batch中包括3個tuples FixedBatchSpout spout = new MyFixedBatchSpout(new Fields("user", "score"), 3, new Values("john1", 4), new Values("john2", 7), new Values("john3", 8), new Values("john4", 9), new Values("john5", 7), new Values("john6", 11), new Values("john7", 5) ); spout.setCycle(false); TridentTopology topology = new TridentTopology(); topology.newStream("spout1", spout) .parallelismHint(2)//設置spout的並行度為2,因為上面數據jonh1到john7一共有7條數據,則1共會發射2*7=14條數據 .shuffle() .each(new Fields("user"),new Debug("print:")) .parallelismHint(5);//設置Debug並行度為5,由於使用了shuffle,14個tuple會隨機分步到5個partion當中 return topology.build(); } public static void main(String[] args) throws Exception { Config conf = new Config(); conf.setMaxSpoutPending(200); conf.setNumWorkers(30); conf.setMessageTimeoutSecs(100000); LocalCluster local = new LocalCluster(); local.submitTopology("test-topology", conf, buildTopology()); } }
輸出結果如下:一共14條 tuples,分布上0-4的partition裏
<Fri Mar 23 14:17:13 CST 2018[partition1-Thread-146-b-0-executor[34 34]]> DEBUG(print:): [john2]
<Fri Mar 23 14:17:13 CST 2018[partition4-Thread-119-b-0-executor[37 37]]> DEBUG(print:): [john5]
<Fri Mar 23 14:17:13 CST 2018[partition4-Thread-119-b-0-executor[37 37]]> DEBUG(print:): [john6]
<Fri Mar 23 14:17:13 CST 2018[partition4
<Fri Mar 23 14:17:13 CST 2018[partition2-Thread-53-b-0-executor[35 35]]> DEBUG(print:): [john4]
<Fri Mar 23 14:17:13 CST 2018[partition3-Thread-116-b-0-executor[36 36]]> DEBUG(print:): [john4]
<Fri Mar 23 14:17:13 CST 2018[partition1-Thread-146-b-0-executor[34 34]]> DEBUG(print:): [john2]
<Fri Mar 23 14:17:13 CST 2018[partition1
<Fri Mar 23 14:17:13 CST 2018[partition1-Thread-146-b-0-executor[34 34]]> DEBUG(print:): [john7]
<Fri Mar 23 14:17:13 CST 2018[partition0-Thread-78-b-0-executor[33 33]]> DEBUG(print:): [john1]
<Fri Mar 23 14:17:13 CST 2018[partition0-Thread-78-b-0-executor[33 33]]> DEBUG(print:): [john3]
<Fri Mar 23 14:17:13 CST 2018[partition2-Thread-53-b-0-executor[35 35]]> DEBUG(print:): [john1]
<Fri Mar 23 14:17:13 CST 2018[partition2-Thread-53-b-0-executor[35 35]]> DEBUG(print:): [john5]
<Fri Mar 23 14:17:13 CST 2018[partition2-Thread-53-b-0-executor[35 35]]> DEBUG(print:): [john6]
Storm Trident示例shuffle¶llelismHint