Storm案例之自增數字求和
阿新 • • 發佈:2018-11-14
1.案例需求
實現自增數字相加的和 1+2+3+4+5+6+........
2.需求分析
Spout來發送數字作為input
使用Bolt來實現求和邏輯
將結果輸出到控制檯
3.匯入Storm的pom依賴
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.1</version> </dependency>
4.具體程式碼實現
package cn.ysjh; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.util.Map; /* 使用Storm實現求和功能 */ public class SumStorm { /* Spout需要繼承Base */ public static class DataSourceSpout extends BaseRichSpout{ private SpoutOutputCollector spoutOutputCollector; /* 初始化方法,只會被呼叫一次 */ @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector=spoutOutputCollector; } int num=0; /* 會產生資料,在實際生產中肯定是從訊息佇列中獲取資料 這個方法是一個死迴圈,會一直執行 */ @Override public void nextTuple() { this.spoutOutputCollector.emit(new Values(++num)); System.out.println("資料:"+num); //防止資料產生太快 Utils.sleep(1000); } /* 宣告輸出欄位 */ @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { //這裡要和上邊nextTuple()方法中的Values中的對應 outputFieldsDeclarer.declare(new Fields("number")); } } /* 資料的累計求和Bolt:接收資料並處理 */ private static class SumBolt extends BaseRichBolt { /* 初始化方法,會被執行一次 */ @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { } int sum=0; /* 獲取Spout傳送過來的資料 Bolt中獲取值可以根據index獲取,也可以根據上一個環節中定義的fields名稱獲取,建議使用後一種方法 */ @Override public void execute(Tuple tuple) { Integer Num = tuple.getIntegerByField("number"); sum+=Num; System.out.println("Sum:"+sum); } /* */ @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } } public static void main(String[] args){ //TopologyBuilder根據Spout和Bolt來構建topology,Storm中任何一個作業都是通過Topology的方式進行提交的,Topology中需要指定Spout和Bolt執行的順序 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("DataSourceSpout",new DataSourceSpout()); builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout"); //建立本地模式,只需使用LocalCluster類 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("SumStorm",new Config(),builder.createTopology()); } }
可以看出先實現Spout和Bolt,最後實現Topology進行本地執行
注意:
這裡是本地執行模式,不需要Storm叢集環境,如果要提交到叢集上執行,最後的LocalCluster需要修改為StormSubmitter將Topology提交到叢集上執行
5.執行截圖