1. 程式人生 > >apache-storm例子:統計句子中的單詞數量

apache-storm例子:統計句子中的單詞數量

模型圖

程式碼

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion
>
<groupId>com.zhyoulun</groupId> <artifactId>storm_study</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId
>
<version>0.9.7</version> </dependency> </dependencies> </project>

MainTopology.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

public class MainTopology {
    public void
runLocal(int waitSeconds) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentenceSpout", new SentenceSpout(), 1); builder.setBolt("splitBolt", new SplitBolt(), 1).shuffleGrouping("sentenceSpout"); builder.setBolt("countBolt", new CountBolt(), 1).shuffleGrouping("splitBolt"); Config config = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word_count", config, builder.createTopology()); try { Thread.sleep(waitSeconds * 1000); } catch (InterruptedException e) { e.printStackTrace(); } cluster.killTopology("word_count"); cluster.shutdown(); } public static void main(String[] args) { MainTopology topology = new MainTopology(); topology.runLocal(60); } }

SentenceSpout.java

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.util.Map;

public class SentenceSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private String[] sentences = {"hello world", "study storm"};
    private int index = 0;

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
    }

    public void nextTuple() {
        //將一句話拆分成單詞,傳送每一個詞
        this.collector.emit(new Values(this.sentences[index]));
        index++;
        if (index >= sentences.length) {
            index = 0;
        }

        //等待500ms
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("sentence"));
    }
}

SplitBolt.java

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;

public class SplitBolt extends BaseRichBolt {
    private OutputCollector collector;

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");//將一句話拆分成單詞,傳送每一個詞
        for (String word : words) {
            this.collector.emit(new Values(word));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}

CountBolt.java

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;


public class CountBolt extends BaseRichBolt {
    private HashMap<String, Integer> wordMap = new HashMap<String, Integer>();

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {

    }

    public void execute(Tuple tuple) {
        //從tuple中讀取單詞
        String word = tuple.getStringByField("word");

        //計數
        int num;
        if (wordMap.containsKey(word)) {
            num = wordMap.get(word);
        } else {
            num = 0;
        }
        wordMap.put(word, 1 + num);

        //輸出展示
        Set<String> keys = wordMap.keySet();
        for (String key : keys) {
            System.out.print(key + ":" + wordMap.get(key) + ",");
        }
        System.out.println();
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

執行結果

...
5978 [Thread-9-countBolt] INFO  backtype.storm.daemon.executor - Prepared bolt countBolt:(2)
hello:1,
world:1,hello:1,
study:1,world:1,hello:1,
study:1,world:1,storm:1,hello:1,
study:1,world:1,storm:1,hello:2,
study:1,world:2,storm:1,hello:2,
study:2,world:2,storm:1,hello:2,
study:2,world:2,storm:2,hello:2,
study:2,world:2,storm:2,hello:3,
...
study:57,world:58,storm:57,hello:58,
study:58,world:58,storm:57,hello:58,
study:58,world:58,storm:58,hello:58,
study:58,world:58,storm:58,hello:59,
study:58,world:59,storm:58,hello:59,
64444 [main] INFO  backtype.storm.daemon.nimbus - Delaying event :remove for 30 secs for word_count-1-1511510371
study:59,world:59,storm:58,hello:59,
study:59,world:59,storm:59,hello:59,
64490 [main] INFO  backtype.storm.daemon.nimbus - Updated word_count-1-1511510371 with status {:type :killed, :kill-time-secs 30}
...

提交到storm

修改MainTopology.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;

public class MainTopology {
    private TopologyBuilder builder;
    private Config config;

    public MainTopology() {
        this.builder = new TopologyBuilder();
        this.builder.setSpout("sentenceSpout", new SentenceSpout(), 1);
        this.builder.setBolt("splitBolt", new SplitBolt(), 1).shuffleGrouping("sentenceSpout");
        this.builder.setBolt("countBolt", new CountBolt(), 1).shuffleGrouping("splitBolt");

        this.config = new Config();
    }

    public void runCluster() {
        try {
            StormSubmitter.submitTopology("word_count",this.config,this.builder.createTopology());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void runLocal(int waitSeconds) {
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("word_count", this.config, this.builder.createTopology());

        try {
            Thread.sleep(waitSeconds * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        cluster.killTopology("word_count");
        cluster.shutdown();
    }

    public static void main(String[] args) {
        MainTopology topology = new MainTopology();
//        topology.runLocal(60);
        topology.runCluster();
    }
}

修改pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.zhyoulun</groupId>
    <artifactId>storm_study</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.7</version>
            <!-- 不需要將這個依賴打入jar包中 -->
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>MainTopology</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

打包

mvn package assembly:single

target資料夾中會生成檔案storm_study-1.0-SNAPSHOT-jar-with-dependencies.jar

上傳到storm

storm jar storm_study-1.0-SNAPSHOT-jar-with-dependencies.jar MainTopology

檢視執行狀態

檢視日誌

tail -f apache-storm/logs/worker-6700.png
2017-11-24T08:27:37.260+0000 STDIO [INFO] storm:113,
2017-11-24T08:27:37.260+0000 STDIO [INFO] hello:113,
2017-11-24T08:27:37.760+0000 STDIO [INFO] study:113,
2017-11-24T08:27:37.760+0000 STDIO [INFO] world:113,
2017-11-24T08:27:37.760+0000 STDIO [INFO] storm:113,
2017-11-24T08:27:37.760+0000 STDIO [INFO] hello:114,
2017-11-24T08:27:37.760+0000 STDIO [INFO] study:113,
2017-11-24T08:27:37.760+0000 STDIO [INFO] world:114,
2017-11-24T08:27:37.760+0000 STDIO [INFO] storm:113,
2017-11-24T08:27:37.760+0000 STDIO [INFO] hello:114,
...

參考

  • Storm分散式實時計算模式
  • Storm實時資料處理