1. 程式人生 > >Storm案例之自增數字求和

Storm案例之自增數字求和

1.案例需求

實現自增數字相加的和 1+2+3+4+5+6+........

2.需求分析

Spout來發送數字作為input

使用Bolt來實現求和邏輯

將結果輸出到控制檯

3.匯入Storm的pom依賴

<dependency>
      <groupId>org.apache.storm</groupId>
      <artifactId>storm-core</artifactId>
      <version>1.1.1</version>
</dependency>

4.具體程式碼實現

package cn.ysjh;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;

/*
使用Storm實現求和功能
 */
public class SumStorm {

    /*
    Spout需要繼承Base
     */

    public static class DataSourceSpout extends BaseRichSpout{

       private SpoutOutputCollector spoutOutputCollector;
        /*
        初始化方法,只會被呼叫一次
         */
        @Override
        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
           this.spoutOutputCollector=spoutOutputCollector;
        }


        int num=0;
        /*
         會產生資料,在實際生產中肯定是從訊息佇列中獲取資料

         這個方法是一個死迴圈,會一直執行
         */
        @Override
        public void nextTuple() {
            this.spoutOutputCollector.emit(new Values(++num));

            System.out.println("資料:"+num);

            //防止資料產生太快
            Utils.sleep(1000);
        }


        /*
          宣告輸出欄位
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            //這裡要和上邊nextTuple()方法中的Values中的對應
          outputFieldsDeclarer.declare(new Fields("number"));
        }
    }



    /*
     資料的累計求和Bolt:接收資料並處理
     */
  private static class SumBolt extends BaseRichBolt {

      /*
      初始化方法,會被執行一次
       */
        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {

        }

        int sum=0;
        /*
        獲取Spout傳送過來的資料
        Bolt中獲取值可以根據index獲取,也可以根據上一個環節中定義的fields名稱獲取,建議使用後一種方法
         */
        @Override
        public void execute(Tuple tuple) {

            Integer Num = tuple.getIntegerByField("number");

            sum+=Num;

            System.out.println("Sum:"+sum);
        }

        /*

         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        }
    }

   public static void main(String[] args){

      //TopologyBuilder根據Spout和Bolt來構建topology,Storm中任何一個作業都是通過Topology的方式進行提交的,Topology中需要指定Spout和Bolt執行的順序
       TopologyBuilder builder = new TopologyBuilder();
       builder.setSpout("DataSourceSpout",new DataSourceSpout());
       builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");


       //建立本地模式,只需使用LocalCluster類
       LocalCluster cluster = new LocalCluster();
       cluster.submitTopology("SumStorm",new Config(),builder.createTopology());

   }

}

可以看出先實現Spout和Bolt,最後實現Topology進行本地執行

注意:

這裡是本地執行模式,不需要Storm叢集環境,如果要提交到叢集上執行,最後的LocalCluster需要修改為StormSubmitter將Topology提交到叢集上執行

5.執行截圖