apache-storm例子:統計句子中的單詞數量
阿新 • • 發佈:2019-02-03
模型圖
程式碼
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion >
<groupId>com.zhyoulun</groupId>
<artifactId>storm_study</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId >
<version>0.9.7</version>
</dependency>
</dependencies>
</project>
MainTopology.java
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
public class MainTopology {
public void runLocal(int waitSeconds) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentenceSpout", new SentenceSpout(), 1);
builder.setBolt("splitBolt", new SplitBolt(), 1).shuffleGrouping("sentenceSpout");
builder.setBolt("countBolt", new CountBolt(), 1).shuffleGrouping("splitBolt");
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word_count", config, builder.createTopology());
try {
Thread.sleep(waitSeconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cluster.killTopology("word_count");
cluster.shutdown();
}
public static void main(String[] args) {
MainTopology topology = new MainTopology();
topology.runLocal(60);
}
}
SentenceSpout.java
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
public class SentenceSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] sentences = {"hello world", "study storm"};
private int index = 0;
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
public void nextTuple() {
//將一句話拆分成單詞,傳送每一個詞
this.collector.emit(new Values(this.sentences[index]));
index++;
if (index >= sentences.length) {
index = 0;
}
//等待500ms
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}
}
SplitBolt.java
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
public class SplitBolt extends BaseRichBolt {
private OutputCollector collector;
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");//將一句話拆分成單詞,傳送每一個詞
for (String word : words) {
this.collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
}
CountBolt.java
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class CountBolt extends BaseRichBolt {
private HashMap<String, Integer> wordMap = new HashMap<String, Integer>();
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
}
public void execute(Tuple tuple) {
//從tuple中讀取單詞
String word = tuple.getStringByField("word");
//計數
int num;
if (wordMap.containsKey(word)) {
num = wordMap.get(word);
} else {
num = 0;
}
wordMap.put(word, 1 + num);
//輸出展示
Set<String> keys = wordMap.keySet();
for (String key : keys) {
System.out.print(key + ":" + wordMap.get(key) + ",");
}
System.out.println();
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
執行結果
...
5978 [Thread-9-countBolt] INFO backtype.storm.daemon.executor - Prepared bolt countBolt:(2)
hello:1,
world:1,hello:1,
study:1,world:1,hello:1,
study:1,world:1,storm:1,hello:1,
study:1,world:1,storm:1,hello:2,
study:1,world:2,storm:1,hello:2,
study:2,world:2,storm:1,hello:2,
study:2,world:2,storm:2,hello:2,
study:2,world:2,storm:2,hello:3,
...
study:57,world:58,storm:57,hello:58,
study:58,world:58,storm:57,hello:58,
study:58,world:58,storm:58,hello:58,
study:58,world:58,storm:58,hello:59,
study:58,world:59,storm:58,hello:59,
64444 [main] INFO backtype.storm.daemon.nimbus - Delaying event :remove for 30 secs for word_count-1-1511510371
study:59,world:59,storm:58,hello:59,
study:59,world:59,storm:59,hello:59,
64490 [main] INFO backtype.storm.daemon.nimbus - Updated word_count-1-1511510371 with status {:type :killed, :kill-time-secs 30}
...
提交到storm
修改MainTopology.java
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
public class MainTopology {
private TopologyBuilder builder;
private Config config;
public MainTopology() {
this.builder = new TopologyBuilder();
this.builder.setSpout("sentenceSpout", new SentenceSpout(), 1);
this.builder.setBolt("splitBolt", new SplitBolt(), 1).shuffleGrouping("sentenceSpout");
this.builder.setBolt("countBolt", new CountBolt(), 1).shuffleGrouping("splitBolt");
this.config = new Config();
}
public void runCluster() {
try {
StormSubmitter.submitTopology("word_count",this.config,this.builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}
public void runLocal(int waitSeconds) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word_count", this.config, this.builder.createTopology());
try {
Thread.sleep(waitSeconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cluster.killTopology("word_count");
cluster.shutdown();
}
public static void main(String[] args) {
MainTopology topology = new MainTopology();
// topology.runLocal(60);
topology.runCluster();
}
}
修改pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zhyoulun</groupId>
<artifactId>storm_study</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.7</version>
<!-- 不需要將這個依賴打入jar包中 -->
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>MainTopology</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
打包
mvn package assembly:single
target
資料夾中會生成檔案storm_study-1.0-SNAPSHOT-jar-with-dependencies.jar
上傳到storm
storm jar storm_study-1.0-SNAPSHOT-jar-with-dependencies.jar MainTopology
檢視執行狀態
檢視日誌
tail -f apache-storm/logs/worker-6700.png
2017-11-24T08:27:37.260+0000 STDIO [INFO] storm:113,
2017-11-24T08:27:37.260+0000 STDIO [INFO] hello:113,
2017-11-24T08:27:37.760+0000 STDIO [INFO] study:113,
2017-11-24T08:27:37.760+0000 STDIO [INFO] world:113,
2017-11-24T08:27:37.760+0000 STDIO [INFO] storm:113,
2017-11-24T08:27:37.760+0000 STDIO [INFO] hello:114,
2017-11-24T08:27:37.760+0000 STDIO [INFO] study:113,
2017-11-24T08:27:37.760+0000 STDIO [INFO] world:114,
2017-11-24T08:27:37.760+0000 STDIO [INFO] storm:113,
2017-11-24T08:27:37.760+0000 STDIO [INFO] hello:114,
...
參考
- Storm分散式實時計算模式
- Storm實時資料處理