1. 程式人生 > >Storm Trident示例shuffle&parallelismHint

Storm Trident示例shuffle&parallelismHint

大並發 extends bool obj 輸出 bsp shuf shu private

本例包括Storm Trident中shuffle與parallelismHint的使用。

代碼當中包括註釋

import java.util.Date;  
import java.util.List;  
import java.util.Map;  
  
import org.apache.storm.Config;  
import org.apache.storm.LocalCluster;  
import org.apache.storm.generated.StormTopology;  
import org.apache.storm.trident.TridentTopology;  
import org.apache.storm.trident.operation.BaseFilter;  
import org.apache.storm.trident.operation.TridentOperationContext;  
import org.apache.storm.trident.testing.FixedBatchSpout;  
import org.apache.storm.trident.tuple.TridentTuple;  
import org.apache.storm.tuple.Fields;  
import org.apache.storm.tuple.Values;  
  
  
public class TridentTest {  
      
    public static class Debug extends BaseFilter {  
  
        private static final long serialVersionUID = -3136720361960744881L;  
        private final String name;  
        private int partitionIndex;    
  
        public Debug() {  
            this(false);  
        }  
  
        public Debug(boolean useLogger) {  
            this.name = "DEBUG: ";  
        }  
  
        public Debug(String name) {  
            this.name = "DEBUG(" + name + "): ";  
        }  
          
        @Override  
        public void prepare(Map conf, TridentOperationContext context) {  
            this.partitionIndex = context.getPartitionIndex();    
            super.prepare(conf, context);  
        }  
  
        @Override  
        public boolean isKeep(TridentTuple tuple) {  
               System.out.println("<"+new Date()+"[partition"+partitionIndex+"-"+Thread.currentThread().getName()+"]"+"> "+name + tuple.toString());  
            return true;  
        }  
    }  
  
    public static class MyFixedBatchSpout extends FixedBatchSpout {  
  
        public MyFixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) {  
            super(fields, maxBatchSize, outputs);  
        }  
          
        @Override  
        public Map<String, Object> getComponentConfiguration() {  
            Config conf = new Config();  
            // 設置此組件的最大並發度  
            //conf.setMaxTaskParallelism(1);  
            return conf;  
        }  
    }  
  
    public static StormTopology buildTopology() {  
        //FixedBatchSpout, 發射出兩個字段,user與score, 一個batch中包括3個tuples  
        FixedBatchSpout spout = new MyFixedBatchSpout(new Fields("user", "score"), 3,   
                new Values("john1", 4),  
                new Values("john2", 7),   
                new Values("john3", 8),  
                new Values("john4", 9),   
                new Values("john5", 7),  
                new Values("john6", 11),  
                new Values("john7", 5)  
                );  
        spout.setCycle(false);  
        TridentTopology topology = new TridentTopology();  
        topology.newStream("spout1", spout)  
                .parallelismHint(2)//設置spout的並行度為2,因為上面數據jonh1到john7一共有7條數據,則1共會發射2*7=14條數據  
                .shuffle()  
                .each(new Fields("user"),new Debug("print:"))  
                .parallelismHint(5);//設置Debug並行度為5,由於使用了shuffle,14個tuple會隨機分步到5個partion當中  
                  
        return topology.build();  
    }  
      
    public static void main(String[] args) throws Exception {  
        Config conf = new Config();  
        conf.setMaxSpoutPending(200);  
        conf.setNumWorkers(30);  
        conf.setMessageTimeoutSecs(100000);  
        LocalCluster local = new LocalCluster();    
        local.submitTopology("test-topology", conf, buildTopology());   
    }  
}  

  

輸出結果如下:一共14條 tuples,分布上0-4的partition裏

<Fri Mar 23 14:17:13 CST 2018[partition1-Thread-146-b-0-executor[34 34]]> DEBUG(print:): [john2]
<Fri Mar 23 14:17:13 CST 2018[partition4-Thread-119-b-0-executor[37 37]]> DEBUG(print:): [john5]
<Fri Mar 23 14:17:13 CST 2018[partition4-Thread-119-b-0-executor[37 37]]> DEBUG(print:): [john6]
<Fri Mar 23 14:17:13 CST 2018[partition4

-Thread-119-b-0-executor[37 37]]> DEBUG(print:): [john7]
<Fri Mar 23 14:17:13 CST 2018[partition2-Thread-53-b-0-executor[35 35]]> DEBUG(print:): [john4]
<Fri Mar 23 14:17:13 CST 2018[partition3-Thread-116-b-0-executor[36 36]]> DEBUG(print:): [john4]
<Fri Mar 23 14:17:13 CST 2018[partition1-Thread-146-b-0-executor[34 34]]> DEBUG(print:): [john2]
<Fri Mar 23 14:17:13 CST 2018[partition1
-Thread-146-b-0-executor[34 34]]> DEBUG(print:): [john3]
<Fri Mar 23 14:17:13 CST 2018[partition1-Thread-146-b-0-executor[34 34]]> DEBUG(print:): [john7]
<Fri Mar 23 14:17:13 CST 2018[partition0-Thread-78-b-0-executor[33 33]]> DEBUG(print:): [john1]
<Fri Mar 23 14:17:13 CST 2018[partition0-Thread-78-b-0-executor[33 33]]> DEBUG(print:): [john3]
<Fri Mar 23 14:17:13 CST 2018[partition2-Thread-53-b-0-executor[35 35]]> DEBUG(print:): [john1]
<Fri Mar 23 14:17:13 CST 2018[partition2-Thread-53-b-0-executor[35 35]]> DEBUG(print:): [john5]
<Fri Mar 23 14:17:13 CST 2018[partition2-Thread-53-b-0-executor[35 35]]> DEBUG(print:): [john6]

Storm Trident示例shuffle&parallelismHint