1. 程式人生 > >Storm安裝以及單詞統計案例二

Storm安裝以及單詞統計案例二

安裝

 1 :解壓安裝包 tar -zxvf apache-storm-0.9.5.tar.gz

 2 :修改配置檔案 vi storm/conf/storm.yaml

#指定storm使用的zk叢集
storm.zookeeper.servers:
     - "hadoop01"
     - "hadoop02"
     - "hadoop03"
#指定storm叢集中的nimbus節點所在的伺服器
nimbus.host: "hadoop01"
#指定nimbus啟動JVM最大可用記憶體大小
nimbus.childopts: "-Xmx1024m"
#指定supervisor啟動JVM最大可用記憶體大小
supervisor.childopts: "-Xmx1024m"
#指定supervisor節點上,每個worker啟動JVM最大可用記憶體大小
worker.childopts: "-Xmx768m"
#指定ui啟動JVM最大可用記憶體大小,ui服務一般與nimbus同在一個節點上。
ui.childopts: "-Xmx768m"
#指定supervisor節點上,啟動worker時對應的埠號,每個埠對應槽,每個槽位對應一個worker
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

配置檔案修改左側需要保留空格,否則啟動報錯

 3 :分發安裝包

 scp -r /usr/apps/apache-storm-0.9.5 [email protected]:/usr/apps

 scp -r /usr/apps/apache-storm-0.9.5 [email protected]:/usr/apps

 4 :啟動叢集

在nimbus.host所屬的機器上啟動 nimbus 服務 

         一般啟動 storm nimbus ; 後臺啟動 nohup storm nimbus &

在nimbus.host所屬的機器上啟動 ui 服務 

         一般啟動 storm ui ; 後臺啟動 nohup storm ui &

在supervisor.host所屬的機器上啟動 supervisor 服務 

         一般啟動 storm supervisor ; 後臺啟動 nohup storm supervisor &

 4 :檢視叢集

訪問nimbus.host:/8080,即可看到storm的ui介面

單詞統計Demo

1:Spout 檔案讀取

package cn.itcast.storm;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import org.apache.commons.lang.StringUtils;

import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class MyLocalFileSpout extends BaseRichSpout {

    private SpoutOutputCollector spoutOutputCollector;
    private BufferedReader bufferedReader ;

    /**
     * 初始化方法(被迴圈呼叫的方法)
     * @param map
     * @param topologyContext
     * @param spoutOutputCollector
     */
    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
        try {
            this.bufferedReader = new BufferedReader( new FileReader( new File("D:/bigDataJob/wordcount/input/1.txt") ));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }

    }

    /**
     * Storm實時計算的特性就是對資料一條一條的處理
     * while(true){ this.nextTuple(); }
     */
    @Override
    public void nextTuple() {
        try {
            String line = bufferedReader.readLine();
            if(StringUtils.isNotBlank( line )){
                //每呼叫一次就會發送一次資料
                List<Object> list = new ArrayList<Object>();
                list.add( line );
                spoutOutputCollector.emit( list );
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        //定義傳送的資料是什麼
        outputFieldsDeclarer.declare( new Fields( "juzi" ));
    }
}

2:Bolt 檔案切割

package cn.itcast.storm;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class MySplitBolt extends BaseBasicBolt {


    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        // 資料如何獲取
        String juzi = (String) tuple.getValueByField("juzi");
        // 進行切割
        String[] strings = juzi.split( " " );
        // 傳送資料
        for( String word : strings ){
            // Values物件幫我們自動生成list
            basicOutputCollector.emit( new Values(word,1));
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare( new Fields( "word","num" ));
    }
}

3:Bolt 單詞統計

package cn.itcast.storm;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

public class MyWordCountBolt extends BaseBasicBolt {

    Map<String,Integer> map = new HashMap<String,Integer>();

    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {

        String word = (String) tuple.getValueByField( "word" );
        Integer num = (Integer) tuple.getValueByField( "num" );

        Integer integer = map.get( word );
        if(  integer==null || integer.intValue()==0 ){
            map.put( word,num );
        }else{
            map.put( word ,integer.intValue()+num  );
        }
        System.out.println( map );

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        // 不需要定義輸出欄位
    }
}

4:任務提交工具類

package cn.itcast.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;

public class StormTopologyDriver {
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
        //1、準備任務資訊
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("mySpout", new MyLocalFileSpout());
        topologyBuilder.setBolt("bolt1", new MySplitBolt()).shuffleGrouping("mySpout");
        topologyBuilder.setBolt("bolt2", new MyWordCountBolt()).shuffleGrouping("bolt1");

        //2、任務提交
        //提交給誰?提交什麼內容?
        Config config = new Config();
        //config.setNumWorkers(2);
        StormTopology stormTopology = topologyBuilder.createTopology();
//        本地模式
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("wordcount", config, stormTopology);
        //叢集模式
        //StormSubmitter.submitTopology("wordcount1", config, stormTopology);
    }
}