【Storm初探】wordcount demo
阿新 • • 發佈:2019-01-23
目前工作下需要接觸storm,遂寫個demo練練手。
程式碼主要借鑑的是storm官方的github專案:https://github.com/apache/storm
略做修改。
1.pom.xml引入
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<type>jar</type>
<version>1.0.0</version >
</dependency>
2.Spout
Spout是strom裡產生元資料(tuple)流的類,通常情況下Spout會讀取外部資料,然後轉換為Topology內部的元資料(tuple),主動角色。
nextTuple方法不斷執行,資料在此方法內產生。
此處用在陣列中隨機獲得句子作為元資料流做示例。
public class RandomSentenceSpout extends BaseRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class);
SpoutOutputCollector _collector;
Random _rand;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = new Random();
}
@Override
public void nextTuple() {
Utils.sleep(100);
String[] sentences = new String[]{sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away" ),
sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")};
final String sentence = sentences[_rand.nextInt(sentences.length)];
LOG.info("Emitting tuple: {}", sentence);
_collector.emit(new Values(sentence));
}
protected String sentence(String input) {
return input;
}
@Override
public void ack(Object id) {
}
@Override
public void fail(Object id) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
3.Bolt
在一個Topology中接收資料並處理的元件,被動角色。
execute函式處理接收到的資料,並生成新元組轉發(emit)出去。
此處定義了兩個bolt,第一個blot先把得到Sentence分隔成word,然後下一個bolt對word計數。
public class SplitSentenceBlot extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
String[] words = word.split(" ");
for (String s : words) {
System.out.println("==========" + s);
collector.emit(new Values(s));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public class WordCountBlot extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
System.out.println("==============[" + word + "]:" + count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
4.Topology
拓撲,storm裡的關鍵性概念,本質上就是在storm裡執行的一個實時應用程式,也就是此處要定義的主類。
具體看註釋即可…
public class SimpleTopology {
public static void main(String[] args) throws Exception{
// 例項化TopologyBuilder類。
TopologyBuilder topologyBuilder = new TopologyBuilder();
// 設定噴發節點並分配併發數,該併發數將會控制該物件在叢集中的執行緒數。
topologyBuilder.setSpout("spout", new RandomSentenceSpout(), 1);
// 設定資料處理節點並分配併發數。指定該節點接收噴發節點的策略為隨機方式。
topologyBuilder.setBolt("split", new SplitSentenceBlot(), 3).shuffleGrouping("spout");
topologyBuilder.setBolt("count", new WordCountBlot(), 3).fieldsGrouping("split", new Fields("word"));
String topologyName = "word-count";
Config config = new Config();
// config.setDebug(true);
if (args != null && args.length > 0) {
config.setNumWorkers(1);
StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
} else {
// 這裡是本地模式下執行的啟動程式碼。
config.setMaxTaskParallelism(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, config, topologyBuilder.createTopology());
}
}
}