1. 程式人生 > >Storm自定義分組

Storm自定義分組

以 WordCount 為例

自定義策略
/**
 * 自定義分組
 */
public class MyGrouping implements CustomStreamGrouping {

    //接受目標任務的id集合
    private List<Integer> targetTasks ;

    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
        this.targetTasks = targetTasks ;
} public List<Integer> chooseTasks(int taskId, List<Object> values) { List<Integer> subTaskIds = new ArrayList<Integer>(); for(int i = 0 ; i <= targetTasks.size() / 2 ; i ++){ subTaskIds.add(targetTasks.get(i)); } return subTaskIds;
} }
源頭 Spout
public class WordCountSpout implements IRichSpout{
    private TopologyContext context ;
    private SpoutOutputCollector collector ;

    private List<String> states ;

    private Random r = new Random();

    private int index = 0;

    public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) { this.context = context ; this.collector = collector ; states = new ArrayList<String>(); states.add("hello world tom"); states.add("hello world tomas"); states.add("hello world tomasLee"); states.add("hello world tomson"); } public void close() { } public void activate() { } public void deactivate() { } public void nextTuple() { if(index < 3){ String line = states.get(r.nextInt(4)); collector.emit(new Values(line),index); index ++ ; } } public void ack(Object msgId) { } public void fail(Object msgId) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line")); } public Map<String, Object> getComponentConfiguration() { return null; } }
分片 Bolt
public class SplitBolt implements IRichBolt {

    private TopologyContext context;
    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        System.out.println(this + " : prepare()");
        this.context = context;
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String line = tuple.getString(0);
        System.out.println(this + " : excute() : " + line);
        String[] arr = line.split(" ");
        for (String s : arr) {
            collector.emit(new Values(s, 1));
        }
    }

    public void cleanup() {

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));

    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
WCApp
public class WCApp {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        //設定Spout
        builder.setSpout("wcspout", new WordCountSpout()).setNumTasks(2);
        //設定creator-Bolt
        builder.setBolt("split-bolt", new SplitBolt(), 4).customGrouping("wcspout", new MyGrouping()).setNumTasks(4);

        Config conf = new Config();
        conf.setNumWorkers(2);
        conf.setDebug(true);

        /**
         * 本地模式storm
         */
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("wc", conf, builder.createTopology());
        System.out.println("hello world");
    }
}