Storm部分:程式碼模板【Java版純程式碼】
阿新 • • 發佈:2019-02-07
總結:構成部分:
Spout部分:繼承BaseRichSpout類,實現裡邊的三個方法:nextTuple,open,declareOutPutFields.主要的方法在nexttuple中寫,打包成集合的形式,在這個方法中用emit傳送,同時在declareOutPutFields也有傳送
Bolt方法:繼承BaseRichBolt類,實現內部的三個方法:execute,open,declareOutputDeclare。主要是在execute中寫,包括切分集合等。如果還有一個步驟的話,那還需要再發送給下一步。
Main方法:建立:TopologyBuilder tb=new TopologyBuilder();
用tb分別去調動其他的執行緒和程序,裡邊設定Grouping的方法和形式
LocalCluster lc=new LocalCluster();
呼叫。
1.Test測試程式碼
package com.bjsxt.storm.test; import com.bjsxt.storm.bolt.WsBolt; import com.bjsxt.storm.spout.WsSpout; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; /** * * 構建拓撲結構 Topology ----- 》job * * @author Administrator * */ public class Test { public static void main(String[] args) { TopologyBuilder tb = new TopologyBuilder(); tb.setSpout("wsspout", new WsSpout()); tb.setBolt("wsbolt", new WsBolt()).shuffleGrouping("wsspout"); //建立本地服務叢集 LocalCluster lc = new LocalCluster(); lc.submitTopology("ws", new Config(), tb.createTopology()); } }
2.Spout:傳送端
package com.bjsxt.storm.spout; import java.util.List; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; /** * * 資料累加: 1+2+3+4...... * * @author Administrator * */ public class WsSpout extends BaseRichSpout { Map map; TopologyContext context; SpoutOutputCollector collector;// 傳送器 int i = 0; /** * 傳送資料,不斷被執行緒呼叫 */ @Override public void nextTuple() { i++; List tuple = new Values(i); this.collector.emit(tuple); System.err.println("spout -----------------------" + i); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void open(Map map, TopologyContext context, SpoutOutputCollector collector) { this.map = map; this.context = context; this.collector = collector; } /** * 聲明發送資料的資訊 */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("num")); } }
3.Bolt:計算與分析
package com.bjsxt.storm.bolt;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
public class WsBolt extends BaseRichBolt {
Map stormConf;
TopologyContext context;
OutputCollector collector;
int sum = 0;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
}
@Override
public void execute(Tuple input) {
// 1.接收tuple內的資料
int i = input.getIntegerByField("num");
// 2.累加
sum += i;
//3.輸出效果
System.out.println("sum-------------------------------"+sum);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}