storm1.2.1-wordcount可靠的單詞計數
阿新 • • 發佈:2019-01-23
專案原始碼下載:
https://download.csdn.net/download/adam_zs/10294019
測試程式運行了5次,每次失敗的訊息都會再次傳送。
SentenceSpout->SplitSentenceBolt->WordCountBolt->ReportBolt
這個TopologyBuilder順序如上,
在SentenceSpout中定義ConcurrentHashMap<UUID, Values> pending用來儲存傳送訊息的msgId,values;
在SplitSentenceBolt如果訊息接收處理成功this.outputCollector.ack(tuple);失敗this.outputCollector.fail(tuple);
根據程式列印結果看到在WordCountBolt,ReportBolt中msgId為空,經測試在WordCountBolt,ReportBolt中設定this.outputCollector.fail(tuple),顯示的結果都是#####[ack]######;
所以在WordCountBolt,ReportBolt中設定的ack,fail無效;
總結:我寫的這個程式只能保證SentenceSpout->SplitSentenceBolt的訊息傳遞的可靠性;
package com.wangzs.chapter1.wordcountreliable; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; /** * @title: 資料來源 <br/> * @author: wangzs <br/> * @date: 2018年3月18日 */ public class SentenceSpout extends BaseRichSpout { private ConcurrentHashMap<UUID, Values> pending; private SpoutOutputCollector spoutOutputCollector; private String[] sentences = { "a b c d", "a b c ", "a b", "a" }; @Override public void open(Map map, TopologyContext topologycontext, SpoutOutputCollector spoutoutputcollector) { this.spoutOutputCollector = spoutoutputcollector; this.pending = new ConcurrentHashMap<UUID, Values>(); } @Override public void nextTuple() { for (String sentence : sentences) { Values values = new Values(sentence); UUID msgId = UUID.randomUUID(); this.spoutOutputCollector.emit(values, msgId); this.pending.put(msgId, values); System.out.println("SentenceSpout==> " + values + " msgId=" + msgId); } Utils.sleep(1000); } @Override public void declareOutputFields(OutputFieldsDeclarer outputfieldsdeclarer) { outputfieldsdeclarer.declare(new Fields("sentence")); } @Override public void ack(Object msgId) { System.out.println("#####[ack]###### msgId=" + msgId + " values=" + this.pending.get(msgId)); this.pending.remove(msgId); } @Override public void fail(Object msgId) { System.out.println("#####[fail]###### msgId=" + msgId + " values=" + this.pending.get(msgId)); this.spoutOutputCollector.emit(this.pending.get(msgId), msgId); } }
package com.wangzs.chapter1.wordcountreliable; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; /** * @title: 分隔單詞 <br/> * @author: wangzs <br/> * @date: 2018年3月18日 */ public class SplitSentenceBolt extends BaseRichBolt { private OutputCollector outputCollector; @Override public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for (String word : words) { this.outputCollector.emit(new Values(word)); } System.out.println("SplitSentenceBolt==> " + sentence + " msgId=" + tuple.getMessageId()); if (sentence.equals("a b c d")) { this.outputCollector.fail(tuple); } else { this.outputCollector.ack(tuple); } } @Override public void prepare(Map map, TopologyContext topologycontext, OutputCollector outputcollector) { this.outputCollector = outputcollector; } @Override public void declareOutputFields(OutputFieldsDeclarer outputfieldsdeclarer) { outputfieldsdeclarer.declare(new Fields("word")); } }
package com.wangzs.chapter1.wordcountreliable;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
/**
* @title: 實現單詞計數 <br/>
* @author: wangzs <br/>
* @date: 2018年3月18日
*/
public class WordCountBolt extends BaseRichBolt {
private OutputCollector outputCollector;
private HashMap<String, Integer> counts = null;
@Override
public void prepare(Map map, TopologyContext topologycontext, OutputCollector outputcollector) {
this.outputCollector = outputcollector;
this.counts = new HashMap<String, Integer>();
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
this.counts.put(word, count);
this.outputCollector.emit(new Values(word, count));
this.outputCollector.ack(tuple);
System.out.println("WordCountBolt==> " + word + " msgId=" + tuple.getMessageId());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputfieldsdeclarer) {
outputfieldsdeclarer.declare(new Fields("word", "count"));
}
}
package com.wangzs.chapter1.wordcountreliable;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
/**
* @title: 可靠的單詞計數 <br/>
* @author: wangzs <br/>
* @date: 2018年3月18日
*/
public class WordCountTopology {
public static void main(String[] args) {
SentenceSpout sentenceSpout = new SentenceSpout();
SplitSentenceBolt splitSentenceBolt = new SplitSentenceBolt();
WordCountBolt wordCountBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentenceSpout-1", sentenceSpout);
builder.setBolt("splitSentenceBolt-1", splitSentenceBolt).shuffleGrouping("sentenceSpout-1");
builder.setBolt("wordCountBolt-1", wordCountBolt).fieldsGrouping("splitSentenceBolt-1", new Fields("word"));
builder.setBolt("reportBolt-1", reportBolt).globalGrouping("wordCountBolt-1");
Config config = new Config();
LocalCluster cluster = new LocalCluster();
// 本地提交
cluster.submitTopology("wordCountTopology-1", config, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("wordCountTopology-1");
cluster.shutdown();
}
}
執行結果: