1. 程式人生 > >Storm-wordcount實時統計單詞次數

Storm-wordcount實時統計單詞次數

一、本地模式

1、WordCountSpout類

package com.demo.wc;

import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values; /** * 需求:單詞計數 hello world hello Beijing China * * 實現介面: IRichSpout IRichBolt * 繼承抽象類:BaseRichSpout BaseRichBolt 常用*/ public class WordCountSpout extends BaseRichSpout { //定義收集器 private SpoutOutputCollector collector; //傳送資料 @Override
public void nextTuple() { //1.傳送資料 到bolt collector.emit(new Values("I like China very much")); //2.設定延遲 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } //建立收集器 @Override public
void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) { this.collector = collector; } //宣告描述 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { //起別名 declarer.declare(new Fields("wordcount")); } }

2、WordCountSplitBolt類

package com.demo.wc;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class WordCountSplitBolt extends BaseRichBolt {

    //資料繼續傳送到下一個bolt
    private OutputCollector collector;
    
    //業務邏輯
    @Override
    public void execute(Tuple in) {
        //1.獲取資料
        String line = in.getStringByField("wordcount");
        
        //2.切分資料
        String[] fields = line.split(" ");
        
        //3.<單詞,1> 傳送出去 下一個bolt(累加求和)
        for (String w : fields) {
            collector.emit(new Values(w, 1));
        }
    }

    //初始化
    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
        this.collector = collector;
    }

    //宣告描述
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "sum"));
    }
}

3、WordCountBolt類

package com.demo.wc;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

public class WordCountBolt extends BaseRichBolt{

    private Map<String, Integer> map = new HashMap<>();
    
    //累加求和
    @Override
    public void execute(Tuple in) {
        //1.獲取資料
        String word = in.getStringByField("word");
        Integer sum = in.getIntegerByField("sum");
        
        //2.業務處理
        if (map.containsKey(word)) {
            //之前出現幾次
            Integer count = map.get(word);
            //已有的
            map.put(word, count + sum);
        } else {
            map.put(word, sum);
        }
        
        //3.列印控制檯
        System.out.println(Thread.currentThread().getName() + "\t 單詞為:" + word + "\t 當前已出現次數為:" + map.get(word));
    }

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {        
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {        
    }
}

4、WordCountDriver類

package com.demo.wc;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class WordCountDriver {
    public static void main(String[] args) {
        //1.hadoop->Job storm->topology 建立拓撲
        TopologyBuilder builder = new TopologyBuilder();
        //2.指定設定
        builder.setSpout("WordCountSpout", new WordCountSpout(), 1);
        builder.setBolt("WordCountSplitBolt", new WordCountSplitBolt(), 4).fieldsGrouping("WordCountSpout", new Fields("wordcount"));
        builder.setBolt("WordCountBolt", new WordCountBolt(), 2).fieldsGrouping("WordCountSplitBolt", new Fields("word"));
        
        //3.建立配置資訊
        Config conf = new Config();
        
        //4.提交任務
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("wordcounttopology", conf, builder.createTopology());
    }
}

5、直接執行(4)裡面的main方法即可啟動本地模式。

二、叢集模式

前三個類和上面本地模式一樣,第4個類WordCountDriver和本地模式有點區別

package com.demo.wc;

import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class WordCountDriver {
    public static void main(String[] args) {
        //1.hadoop->Job storm->topology 建立拓撲
        TopologyBuilder builder = new TopologyBuilder();
        //2.指定設定
        builder.setSpout("WordCountSpout", new WordCountSpout(), 1);
        builder.setBolt("WordCountSplitBolt", new WordCountSplitBolt(), 4).fieldsGrouping("WordCountSpout", new Fields("wordcount"));
        builder.setBolt("WordCountBolt", new WordCountBolt(), 2).fieldsGrouping("WordCountSplitBolt", new Fields("word"));
        
        //3.建立配置資訊
        Config conf = new Config();
        //conf.setNumWorkers(10);
        
        //叢集模式
        try {
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        //4.提交任務
        //LocalCluster localCluster = new LocalCluster();
        //localCluster.submitTopology("wordcounttopology", conf, builder.createTopology());
    }
}

把程式打成jar包放在啟動了Storm叢集的機器裡,在stormwordcount.jar所在目錄下執行

storm jar stormwordcount.jar com.demo.wc.WordCountDriver wordcount01

即可啟動程式。

三、併發度和分組策略

1、WordCountDriver_Shuffle類

package com.demo.wc;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

public class WordCountDriver_Shuffle {
    public static void main(String[] args) {
        //1.hadoop->Job storm->topology 建立拓撲
        TopologyBuilder builder = new TopologyBuilder();
        //2.指定設定
        builder.setSpout("WordCountSpout", new WordCountSpout(), 2);
        builder.setBolt("WordCountSplitBolt", new WordCountSplitBolt(), 2).setNumTasks(4).shuffleGrouping("WordCountSpout");
        builder.setBolt("WordCountBolt", new WordCountBolt(), 6).shuffleGrouping("WordCountSplitBolt");
        
        //3.建立配置資訊
        Config conf = new Config();
        //conf.setNumWorkers(2);
        
        //叢集模式
//        try {
//            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
        
        //4.提交任務
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("wordcounttopology", conf, builder.createTopology());
    }
}

2、併發度與分組策略