一個可靠的storm wordcount實現
可靠的wordcount
1.實現storm的可靠性api
要實現可靠的api大致需要實現以下步驟:
- 實現spout的ack和fail方法
- 在spout發射的時候將發射的tuple與一個唯一的messageId進行繫結
- 在bolt發射新tuple的時候將當期tuple與發射的新tuple進行錨定
- bolt處理失敗呼叫collector.fail,成功呼叫collector.ack
2.實現一個可靠的wordcount
2.1 自定義的單詞發射器
一個可記錄目前發射的所有單詞個數,與之前的普通wordcount實現類似
public class SentenceEmitter { private AtomicLong atomicLong = new AtomicLong(0); private final AtomicLongMap<String> CONUTS = AtomicLongMap.create(); private final String[] SENTENCES = {"The logic for a realtime application is packaged into a Storm topology", " A Storm topology is analogous to a MapReduce job ", "One key difference is that a MapReduce job eventually finishes ", "whereas a topology runs forever or until you kill it of course ", "A topology is a graph of spouts and bolts that are connected with stream groupings"}; /** * 隨機發射sentence,並記錄單詞數量,該統計結果用於驗證與storm的統計結果是否相同。 * 當發射總數<1000時,停止發射,以便程式在停止時,其它bolt能將發射的資料統計完畢 * * @return */ public String emit() { int randomIndex = (int) (Math.random() * SENTENCES.length); String sentence = SENTENCES[randomIndex]; for (String s : sentence.split(" ")) { CONUTS.incrementAndGet(s); } return sentence; } public void printCount() { System.out.println("--- Emitter COUNTS ---"); List<String> keys = new ArrayList<String>(); keys.addAll(CONUTS.asMap().keySet()); Collections.sort(keys); for (String key : keys) { System.out.println(key + " : " + this.CONUTS.get(key)); } System.out.println("--------------"); } public AtomicLongMap<String> getCount() { return CONUTS; } public static void main(String[] args) { SentenceEmitter sentenceEmitter = new SentenceEmitter(); for (int i = 0; i < 20; i++) { System.out.println(sentenceEmitter.emit()); } sentenceEmitter.printCount(); } }
可以隨機的發射單詞,並計數,列印方法,用於打印出計數。
2.2 可靠的spout實現SentenceSpout
這裡重寫了BaseRichSpout 的ack和fail方法。ConcurrentHashMap<UUID, Values> emitted用於快取目前傳送的所有tuple。nextTuple() 方法當傳送的單詞數量達到1000時停止向後繼續傳送便於統計,同時生成一個uuid與當前的tuple對應在發射的時候放入emitted,同時在發射的時候將uuid一併發射出去。ack方法呼叫時說明該tuple的所有下游tuple均處理成功,此時從快取emitted移除該tuple。fail方法與ack方法對應,說明該tuple處理失敗需要重發再處理,此時從快取中取出失敗的uuid對應tuple從新發送。當關閉topology的時候呼叫colse方法列印spout傳送出的所有資料。
public class SentenceSpout extends BaseRichSpout { private static final long serialVersionUID = -5335326175089829338L; private static final Logger LOGGER = Logger.getLogger(WordSplitBolt.class); private AtomicLong atomicLong = new AtomicLong(0); private SpoutOutputCollector collector; private SentenceEmitter sentenceEmitter; private ConcurrentHashMap<UUID, Values> emitted; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.sentenceEmitter = new SentenceEmitter(); this.emitted = new ConcurrentHashMap<UUID, Values>(); } @Override public void nextTuple() { //這裡做了一點更改,不再在sentenceEmitter裡面睡眠,這樣會影響到spout執行緒程式傳送失敗的資料。 if (atomicLong.incrementAndGet() >= 1000) { return; } String sentence = sentenceEmitter.emit(); Values values = new Values(sentence); UUID msgId = UUID.randomUUID(); //在spout發射的時候為每一個tuple指定一個id,這個 id 會在後續過程中用於識別 tuple collector.emit(values, msgId); //將所有發射出去的sentence記錄下來,以便在失敗時重新發射 this.emitted.put(msgId, values); } /** * 要保證可靠性,必須實現ack和fail方法 * 呼叫ack表示下游全部成功處理,此時需要從emitted移除已經ack的tuple * * @param msgId */ @Override public void ack(Object msgId) { this.emitted.remove(msgId); } /** * 要保證可靠性,必須實現ack和fail方法 * 呼叫fail表示下游某個環節處理失敗,可能是程式異常,也可能是網路原因,此時需要從emitted獲取失敗的tuple,然後重新發送 * * @param msgId */ @Override public void fail(Object msgId) { Values values = this.emitted.get(msgId); this.collector.emit(values, msgId); LOGGER.info(String.format("失敗重發:messageId:%s,values:%s", msgId, values)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } @Override public void close() { super.close(); sentenceEmitter.printCount(); } }
2.3 單詞分隔bolt
在該bolt裡,execute方法進行了一點小處理,用於模擬異常情況,同時在emit的時候使用collector.emit(input, new Values(word))將當前tuple與發射的tuple進行錨定,同時發射成功後ack該tuple,通知spout處理成功將該tuple從快取移除;失敗的時候呼叫fail方法,通知spout處理失敗,重新發送該tuple。
public class WordSplitBolt extends BaseRichBolt {
private static final long serialVersionUID = 2932049413480818649L;
private static final Logger LOGGER = Logger.getLogger(WordSplitBolt.class);
private OutputCollector collector;
private AtomicInteger atomicInteger = new AtomicInteger(1);
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
try {
atomicInteger.getAndIncrement();
String sentence = input.getStringByField("sentence");
if (atomicInteger.get() == 20 || atomicInteger.get() == 200) {
throw new RuntimeException(String.format("模擬異常情況,sourceStreamId:%s,messageId:%s,sentence:%s", input.getSourceStreamId(), input.getMessageId(), sentence));
}
String[] words = sentence.split(" ");
for (String word : words) {
//發射的時候錨定該tuple
collector.emit(input, new Values(word));
}
//當處理成功時ack該Tuple
this.collector.ack(input);
LOGGER.info("--sentence--" + sentence);
} catch (Exception e) {
//處理失敗呼叫fail方法
collector.fail(input);
LOGGER.error(e.getMessage());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
2.4 單詞計數bolt
與分隔bolt類似
public class WordCountBolt extends BaseRichBolt {
private static final long serialVersionUID = -7753338296650445257L;
private static final Logger LOGGER = Logger.getLogger(WordCountBolt.class);
private OutputCollector collector;
private HashMap<String, Long> counts = null;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.counts = new HashMap<String, Long>();
}
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Long count = this.counts.get(word);
if (count == null) {
count = 0L;
}
count++;
counts.put(word, count);
//同樣發射的時候錨定該tuple
collector.emit(input, new Values(word, count));
//當處理成功時ack該Tuple
collector.ack(input);
LOGGER.info("--word--" + word);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
2.5 列印結果
沒什麼好說的成功呼叫ack,killTopology列印storm單詞計數
public class ReportBolt extends BaseRichBolt {
private static final Logger LOGGER = Logger.getLogger(ReportBolt.class);
private static final long serialVersionUID = -3973016696731250995L;
private HashMap<String, Long> counts = null;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
counts = new HashMap<>();
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Long count = input.getLongByField("count");
this.counts.put(word, count);
//當處理成功時ack該Tuple
collector.ack(input);
LOGGER.info("--globalreport--" + word);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public void cleanup() {
System.out.println("--- FINAL COUNTS ---");
List<String> keys = new ArrayList<String>();
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for (String key : keys) {
System.out.println(key + " : " + this.counts.get(key));
}
System.out.println("--------------");
}
}
3. 執行並分析結果
3.1 執行topology
splitBolt併發度為3,隨機分組
countBolt併發度為4,欄位分組
reportBolt全域性分組
注意config.setNumAckers(1)這裡設定了一個acker,Storm 的拓撲有一些特殊的稱為“acker”的任務,這些任務負責跟蹤每個 Spout 發出的 tuple 的 DAG。當一個 acker 發現一個 DAG 結束了,它就會給建立 spout tuple 的 Spout 任務傳送一條訊息,讓這個任務來應答這個訊息。Storm 預設會將 acker 的數量設定為一,不過如果你有大量訊息的處理需求,你可能需要增加這個數量。
public class GuaranteedWordCountTopology {
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String REPORT_BOLT_ID = "report-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
public static void main(String[] args) throws Exception {
SentenceSpout spout = new SentenceSpout();
WordSplitBolt splitBolt = new WordSplitBolt();
WordCountBolt countBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, spout);
//將生成的sentence隨機分組,然後發射出去
builder.setBolt(SPLIT_BOLT_ID, splitBolt,3) .shuffleGrouping(SENTENCE_SPOUT_ID);
//splitBolt按照空格後分隔sentence為word,然後發射給countBolt
builder.setBolt(COUNT_BOLT_ID, countBolt, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
// WordCountBolt --> ReportBolt
builder.setBolt(REPORT_BOLT_ID, reportBolt,1).globalGrouping(COUNT_BOLT_ID);
Config config = new Config();
config.setNumWorkers(1);
config.setNumAckers(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
Thread.sleep(30*1000);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
}
}
3.2 結果分析
單詞計數器列印的計數資訊如下:
--- Emitter COUNTS ---
: 203
A : 404
MapReduce : 391
One : 188
Storm : 391
The : 188
a : 1187
analogous : 203
and : 201
application : 188
are : 201
bolts : 201
connected : 201
course : 219
difference : 188
eventually : 188
finishes : 188
for : 188
forever : 219
graph : 201
groupings : 201
into : 188
is : 780
it : 219
job : 391
key : 188
kill : 219
logic : 188
of : 420
or : 219
packaged : 188
realtime : 188
runs : 219
spouts : 201
stream : 201
that : 389
to : 203
topology : 811
until : 219
whereas : 219
with : 201
you : 219
--------------
storm統計結果ReportBolt列印的結果如下:
--- FINAL COUNTS ---
: 203
A : 404
MapReduce : 391
One : 188
Storm : 391
The : 188
a : 1187
analogous : 203
and : 201
application : 188
are : 201
bolts : 201
connected : 201
course : 219
difference : 188
eventually : 188
finishes : 188
for : 188
forever : 219
graph : 201
groupings : 201
into : 188
is : 780
it : 219
job : 391
key : 188
kill : 219
logic : 188
of : 420
or : 219
packaged : 188
realtime : 188
runs : 219
spouts : 201
stream : 201
that : 389
to : 203
topology : 811
until : 219
whereas : 219
with : 201
you : 219
--------------
對比結果發現即使在分隔單詞bolt裡面丟擲了異常,通過可靠的機制最後兩者統計的結果完全一致,說明可靠機制是可行的。但是可靠機制不能保證tuple被恰好一次處理,程式異常可能在任何時候發生,當儲存資料成功後返回結果超時,這時再重新發送失敗的tuple會導致重複處理。但可靠機制保證的該tuple至少能被處理一次。要想唯一一次處理需要使用storm的事務性spout,現在已經有trident實現。
3.3 異常重發日誌分析
在spout和splitbolt對異常進行了日誌記錄,下面來看看該日誌,有助於加深對失敗重發的理解,結果如下:
[Thread-30-split-bolt-executor[9 9]] com.foo.bolt.WordSplitBolt [54] - 模擬異常情況,sourceStreamId:default,messageId:{-897755601482291185=1436976737464555631},sentence:whereas a topology runs forever or until you kill it of course
[Thread-36-split-bolt-executor[10 10]] com.foo.bolt.WordSplitBolt [54] - 模擬異常情況,sourceStreamId:default,messageId:{1355559750003096469=8699297777170365670},sentence:One key difference is that a MapReduce job eventually finishes
[Thread-36-split-bolt-executor[10 10]] com.foo.bolt.WordSplitBolt [54] - 模擬異常情況,sourceStreamId:default,messageId:{-6711225109969092935=5915661248356017371},sentence:A topology is a graph of spouts and bolts that are connected with stream groupings
[Thread-18-split-bolt-executor[8 8]] com.foo.bolt.WordSplitBolt [54] - 模擬異常情況,sourceStreamId:default,messageId:{-692291341444179168=-4371551265892401171},sentence: A Storm topology is analogous to a MapReduce job
[Thread-30-split-bolt-executor[9 9]] com.foo.bolt.WordSplitBolt [54] - 模擬異常情況,sourceStreamId:default,messageId:{-7981624255431253628=6508027834482650707},sentence:A topology is a graph of spouts and bolts that are connected with stream groupings
[Thread-18-split-bolt-executor[8 8]] com.foo.bolt.WordSplitBolt [54] - 模擬異常情況,sourceStreamId:default,messageId:{-9071636350309294887=-1076991837303452874},sentence:A topology is a graph of spouts and bolts that are connected with stream groupings
[Thread-22-sentence-spout-executor[7 7]] com.foo.bolt.WordSplitBolt [76] - 失敗重發:messageId:239b1155-0eec-4a11-a8aa-3368ab947cee,values:[whereas a topology runs forever or until you kill it of course ]
[Thread-22-sentence-spout-executor[7 7]] com.foo.bolt.WordSplitBolt [76] - 失敗重發:messageId:bae350f5-8548-4e6b-852d-7280d03e0ea2,values:[One key difference is that a MapReduce job eventually finishes ]
[Thread-22-sentence-spout-executor[7 7]] com.foo.bolt.WordSplitBolt [76] - 失敗重發:messageId:22821cab-9fd0-4532-b731-2b995b98a381,values:[ A Storm topology is analogous to a MapReduce job ]
[Thread-22-sentence-spout-executor[7 7]] com.foo.bolt.WordSplitBolt [76] - 失敗重發:messageId:1195ba50-d2f3-4cea-9eab-e426ffbb5bf9,values:[A topology is a graph of spouts and bolts that are connected with stream groupings]
[Thread-22-sentence-spout-executor[7 7]] com.foo.bolt.WordSplitBolt [76] - 失敗重發:messageId:42d14bdc-8e42-4eb6-81ec-3496a9dc855d,values:[A topology is a graph of spouts and bolts that are connected with stream groupings]
[Thread-22-sentence-spout-executor[7 7]] com.foo.bolt.WordSplitBolt [76] - 失敗重發:messageId:20f77240-21bb-4e65-aee7-6e2918857a63,values:[A topology is a graph of spouts and bolts that are connected with stream groupings]
當然日誌當中出現的順序並不是這樣。通過日誌發現失敗的tuple確實都進行了重發,而在splitbolt中兩種情況下會出現異常,由於splitbolt設定的併發度為3所以總共出現了6次異常。
ps:完整程式碼在:https://github.com/Json-Lin/storm-practice/tree/master/guaranteed-word-count