Storm自定義分組
阿新 • • 發佈:2018-12-14
以 WordCount 為例
自定義策略
/**
* 自定義分組
*/
public class MyGrouping implements CustomStreamGrouping {
//接受目標任務的id集合
private List<Integer> targetTasks ;
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
this.targetTasks = targetTasks ;
}
public List<Integer> chooseTasks(int taskId, List<Object> values) {
List<Integer> subTaskIds = new ArrayList<Integer>();
for(int i = 0 ; i <= targetTasks.size() / 2 ; i ++){
subTaskIds.add(targetTasks.get(i));
}
return subTaskIds;
}
}
源頭 Spout
public class WordCountSpout implements IRichSpout{
private TopologyContext context ;
private SpoutOutputCollector collector ;
private List<String> states ;
private Random r = new Random();
private int index = 0;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context ;
this.collector = collector ;
states = new ArrayList<String>();
states.add("hello world tom");
states.add("hello world tomas");
states.add("hello world tomasLee");
states.add("hello world tomson");
}
public void close() {
}
public void activate() {
}
public void deactivate() {
}
public void nextTuple() {
if(index < 3){
String line = states.get(r.nextInt(4));
collector.emit(new Values(line),index);
index ++ ;
}
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
分片 Bolt
public class SplitBolt implements IRichBolt {
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
System.out.println(this + " : prepare()");
this.context = context;
this.collector = collector;
}
public void execute(Tuple tuple) {
String line = tuple.getString(0);
System.out.println(this + " : excute() : " + line);
String[] arr = line.split(" ");
for (String s : arr) {
collector.emit(new Values(s, 1));
}
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
WCApp
public class WCApp {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
//設定Spout
builder.setSpout("wcspout", new WordCountSpout()).setNumTasks(2);
//設定creator-Bolt
builder.setBolt("split-bolt", new SplitBolt(), 4).customGrouping("wcspout", new MyGrouping()).setNumTasks(4);
Config conf = new Config();
conf.setNumWorkers(2);
conf.setDebug(true);
/**
* 本地模式storm
*/
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wc", conf, builder.createTopology());
System.out.println("hello world");
}
}