1. 程式人生 > >一個可靠的storm wordcount實現

一個可靠的storm wordcount實現

可靠的wordcount

1.實現storm的可靠性api

要實現可靠的api大致需要實現以下步驟:

  • 實現spout的ack和fail方法
  • 在spout發射的時候將發射的tuple與一個唯一的messageId進行繫結
  • 在bolt發射新tuple的時候將當期tuple與發射的新tuple進行錨定
  • bolt處理失敗呼叫collector.fail,成功呼叫collector.ack

2.實現一個可靠的wordcount

2.1 自定義的單詞發射器

一個可記錄目前發射的所有單詞個數,與之前的普通wordcount實現類似

public class SentenceEmitter {
    private AtomicLong atomicLong = new AtomicLong(0);

    private final AtomicLongMap<String> CONUTS = AtomicLongMap.create();

    private final String[] SENTENCES = {"The logic for a realtime application is packaged into a Storm topology",
            " A Storm topology is analogous to a MapReduce job ",
            "One key difference is that a MapReduce job eventually finishes ",
            "whereas a topology runs forever or until you kill it of course ",
            "A topology is a graph of spouts and bolts that are connected with stream groupings"};


    /**
     * 隨機發射sentence,並記錄單詞數量,該統計結果用於驗證與storm的統計結果是否相同。
     * 當發射總數<1000時,停止發射,以便程式在停止時,其它bolt能將發射的資料統計完畢
     *
     * @return
     */
    public String emit() {
        int randomIndex = (int) (Math.random() * SENTENCES.length);
        String sentence = SENTENCES[randomIndex];
        for (String s : sentence.split(" ")) {
            CONUTS.incrementAndGet(s);
        }
        return sentence;
    }

    public void printCount() {
        System.out.println("--- Emitter COUNTS ---");
        List<String> keys = new ArrayList<String>();
        keys.addAll(CONUTS.asMap().keySet());
        Collections.sort(keys);
        for (String key : keys) {
            System.out.println(key + " : " + this.CONUTS.get(key));
        }
        System.out.println("--------------");
    }

    public AtomicLongMap<String> getCount() {
        return CONUTS;
    }

    public static void main(String[] args) {
        SentenceEmitter sentenceEmitter = new SentenceEmitter();
        for (int i = 0; i < 20; i++) {
            System.out.println(sentenceEmitter.emit());
        }
        sentenceEmitter.printCount();
    }
}

可以隨機的發射單詞,並計數,列印方法,用於打印出計數。

2.2 可靠的spout實現SentenceSpout

這裡重寫了BaseRichSpout 的ack和fail方法。ConcurrentHashMap<UUID, Values> emitted用於快取目前傳送的所有tuple。nextTuple() 方法當傳送的單詞數量達到1000時停止向後繼續傳送便於統計,同時生成一個uuid與當前的tuple對應在發射的時候放入emitted,同時在發射的時候將uuid一併發射出去。ack方法呼叫時說明該tuple的所有下游tuple均處理成功,此時從快取emitted移除該tuple。fail方法與ack方法對應,說明該tuple處理失敗需要重發再處理,此時從快取中取出失敗的uuid對應tuple從新發送。當關閉topology的時候呼叫colse方法列印spout傳送出的所有資料。

public class SentenceSpout extends BaseRichSpout {

    private static final long serialVersionUID = -5335326175089829338L;
    private static final Logger LOGGER = Logger.getLogger(WordSplitBolt.class);

    private AtomicLong atomicLong = new AtomicLong(0);
    private SpoutOutputCollector collector;
    private SentenceEmitter sentenceEmitter;
    private ConcurrentHashMap<UUID, Values> emitted;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.sentenceEmitter = new SentenceEmitter();
        this.emitted = new ConcurrentHashMap<UUID, Values>();
    }

    @Override
    public void nextTuple() {
        //這裡做了一點更改,不再在sentenceEmitter裡面睡眠,這樣會影響到spout執行緒程式傳送失敗的資料。
        if (atomicLong.incrementAndGet() >= 1000) {
            return;
        }
        String sentence = sentenceEmitter.emit();
        Values values = new Values(sentence);
        UUID msgId = UUID.randomUUID();

        //在spout發射的時候為每一個tuple指定一個id,這個 id 會在後續過程中用於識別 tuple
        collector.emit(values, msgId);
        //將所有發射出去的sentence記錄下來,以便在失敗時重新發射
        this.emitted.put(msgId, values);
    }

    /**
     * 要保證可靠性,必須實現ack和fail方法
     * 呼叫ack表示下游全部成功處理,此時需要從emitted移除已經ack的tuple
     *
     * @param msgId
     */
    @Override
    public void ack(Object msgId) {
        this.emitted.remove(msgId);
    }

    /**
     * 要保證可靠性,必須實現ack和fail方法
     * 呼叫fail表示下游某個環節處理失敗,可能是程式異常,也可能是網路原因,此時需要從emitted獲取失敗的tuple,然後重新發送
     *
     * @param msgId
     */
    @Override
    public void fail(Object msgId) {
        Values values = this.emitted.get(msgId);
        this.collector.emit(values, msgId);
        LOGGER.info(String.format("失敗重發:messageId:%s,values:%s", msgId, values));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }

    @Override
    public void close() {
        super.close();
        sentenceEmitter.printCount();
    }
}

2.3 單詞分隔bolt

在該bolt裡,execute方法進行了一點小處理,用於模擬異常情況,同時在emit的時候使用collector.emit(input, new Values(word))將當前tuple與發射的tuple進行錨定,同時發射成功後ack該tuple,通知spout處理成功將該tuple從快取移除;失敗的時候呼叫fail方法,通知spout處理失敗,重新發送該tuple。

public class WordSplitBolt extends BaseRichBolt {
    private static final long serialVersionUID = 2932049413480818649L;
    private static final Logger LOGGER = Logger.getLogger(WordSplitBolt.class);
    private OutputCollector collector;

    private AtomicInteger atomicInteger = new AtomicInteger(1);

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        try {

            atomicInteger.getAndIncrement();

            String sentence = input.getStringByField("sentence");

            if (atomicInteger.get() == 20 || atomicInteger.get() == 200) {
               throw new RuntimeException(String.format("模擬異常情況,sourceStreamId:%s,messageId:%s,sentence:%s", input.getSourceStreamId(), input.getMessageId(), sentence));
            }

            String[] words = sentence.split(" ");
            for (String word : words) {
                //發射的時候錨定該tuple
                collector.emit(input, new Values(word));
            }
            //當處理成功時ack該Tuple
            this.collector.ack(input);
            LOGGER.info("--sentence--" + sentence);
        } catch (Exception e) {
            //處理失敗呼叫fail方法
            collector.fail(input);
            LOGGER.error(e.getMessage());
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

2.4 單詞計數bolt

與分隔bolt類似

public class WordCountBolt extends BaseRichBolt {
    private static final long serialVersionUID = -7753338296650445257L;
    private static final Logger LOGGER = Logger.getLogger(WordCountBolt.class);
    private OutputCollector collector;
    private HashMap<String, Long> counts = null;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.counts = new HashMap<String, Long>();
    }

    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Long count = this.counts.get(word);
        if (count == null) {
            count = 0L;
        }
        count++;
        counts.put(word, count);
        //同樣發射的時候錨定該tuple
        collector.emit(input, new Values(word, count));
        //當處理成功時ack該Tuple
        collector.ack(input);
        LOGGER.info("--word--" + word);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

2.5 列印結果

沒什麼好說的成功呼叫ack,killTopology列印storm單詞計數

public class ReportBolt extends BaseRichBolt {
    private static final Logger LOGGER = Logger.getLogger(ReportBolt.class);

    private static final long serialVersionUID = -3973016696731250995L;
    private HashMap<String, Long> counts = null;
    private OutputCollector collector;


    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        counts = new HashMap<>();
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Long count = input.getLongByField("count");
        this.counts.put(word, count);
        //當處理成功時ack該Tuple
        collector.ack(input);
        LOGGER.info("--globalreport--" + word);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    @Override
    public void cleanup() {
        System.out.println("--- FINAL COUNTS ---");
        List<String> keys = new ArrayList<String>();
        keys.addAll(this.counts.keySet());
        Collections.sort(keys);
        for (String key : keys) {
            System.out.println(key + " : " + this.counts.get(key));
        }
        System.out.println("--------------");
    }
}

3. 執行並分析結果

3.1 執行topology

splitBolt併發度為3,隨機分組
countBolt併發度為4,欄位分組
reportBolt全域性分組
注意config.setNumAckers(1)這裡設定了一個acker,Storm 的拓撲有一些特殊的稱為“acker”的任務,這些任務負責跟蹤每個 Spout 發出的 tuple 的 DAG。當一個 acker 發現一個 DAG 結束了,它就會給建立 spout tuple 的 Spout 任務傳送一條訊息,讓這個任務來應答這個訊息。Storm 預設會將 acker 的數量設定為一,不過如果你有大量訊息的處理需求,你可能需要增加這個數量。

public class GuaranteedWordCountTopology {

    private static final String SENTENCE_SPOUT_ID = "sentence-spout";
    private static final String SPLIT_BOLT_ID = "split-bolt";
    private static final String COUNT_BOLT_ID = "count-bolt";
    private static final String REPORT_BOLT_ID = "report-bolt";
    private static final String TOPOLOGY_NAME = "word-count-topology";

    public static void main(String[] args) throws Exception {

        SentenceSpout spout = new SentenceSpout();
        WordSplitBolt splitBolt = new WordSplitBolt();
        WordCountBolt countBolt = new WordCountBolt();
        ReportBolt reportBolt = new ReportBolt();


        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(SENTENCE_SPOUT_ID, spout);
        //將生成的sentence隨機分組,然後發射出去
        builder.setBolt(SPLIT_BOLT_ID, splitBolt,3) .shuffleGrouping(SENTENCE_SPOUT_ID);

        //splitBolt按照空格後分隔sentence為word,然後發射給countBolt
        builder.setBolt(COUNT_BOLT_ID, countBolt, 4).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));

        // WordCountBolt --> ReportBolt
        builder.setBolt(REPORT_BOLT_ID, reportBolt,1).globalGrouping(COUNT_BOLT_ID);

        Config config = new Config();
        config.setNumWorkers(1);
        config.setNumAckers(1);
        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
        Thread.sleep(30*1000);
        cluster.killTopology(TOPOLOGY_NAME);
        cluster.shutdown();
    }
}

3.2 結果分析

單詞計數器列印的計數資訊如下:

--- Emitter COUNTS ---
 : 203
A : 404
MapReduce : 391
One : 188
Storm : 391
The : 188
a : 1187
analogous : 203
and : 201
application : 188
are : 201
bolts : 201
connected : 201
course : 219
difference : 188
eventually : 188
finishes : 188
for : 188
forever : 219
graph : 201
groupings : 201
into : 188
is : 780
it : 219
job : 391
key : 188
kill : 219
logic : 188
of : 420
or : 219
packaged : 188
realtime : 188
runs : 219
spouts : 201
stream : 201
that : 389
to : 203
topology : 811
until : 219
whereas : 219
with : 201
you : 219
--------------

storm統計結果ReportBolt列印的結果如下:

--- FINAL COUNTS ---
 : 203
A : 404
MapReduce : 391
One : 188
Storm : 391
The : 188
a : 1187
analogous : 203
and : 201
application : 188
are : 201
bolts : 201
connected : 201
course : 219
difference : 188
eventually : 188
finishes : 188
for : 188
forever : 219
graph : 201
groupings : 201
into : 188
is : 780
it : 219
job : 391
key : 188
kill : 219
logic : 188
of : 420
or : 219
packaged : 188
realtime : 188
runs : 219
spouts : 201
stream : 201
that : 389
to : 203
topology : 811
until : 219
whereas : 219
with : 201
you : 219
--------------

對比結果發現即使在分隔單詞bolt裡面丟擲了異常,通過可靠的機制最後兩者統計的結果完全一致,說明可靠機制是可行的。但是可靠機制不能保證tuple被恰好一次處理,程式異常可能在任何時候發生,當儲存資料成功後返回結果超時,這時再重新發送失敗的tuple會導致重複處理。但可靠機制保證的該tuple至少能被處理一次。要想唯一一次處理需要使用storm的事務性spout,現在已經有trident實現。

3.3 異常重發日誌分析

在spout和splitbolt對異常進行了日誌記錄,下面來看看該日誌,有助於加深對失敗重發的理解,結果如下:

[Thread-30-split-bolt-executor[9 9]] com.foo.bolt.WordSplitBolt [54] - 模擬異常情況,sourceStreamId:default,messageId:{-897755601482291185=1436976737464555631},sentence:whereas a topology runs forever or until you kill it of course 
[Thread-36-split-bolt-executor[10 10]] com.foo.bolt.WordSplitBolt [54] - 模擬異常情況,sourceStreamId:default,messageId:{1355559750003096469=8699297777170365670},sentence:One key difference is that a MapReduce job eventually finishes 
[Thread-36-split-bolt-executor[10 10]] com.foo.bolt.WordSplitBolt [54] - 模擬異常情況,sourceStreamId:default,messageId:{-6711225109969092935=5915661248356017371},sentence:A topology is a graph of spouts and bolts that are connected with stream groupings
[Thread-18-split-bolt-executor[8 8]] com.foo.bolt.WordSplitBolt [54] - 模擬異常情況,sourceStreamId:default,messageId:{-692291341444179168=-4371551265892401171},sentence: A Storm topology is analogous to a MapReduce job 
[Thread-30-split-bolt-executor[9 9]] com.foo.bolt.WordSplitBolt [54] - 模擬異常情況,sourceStreamId:default,messageId:{-7981624255431253628=6508027834482650707},sentence:A topology is a graph of spouts and bolts that are connected with stream groupings
[Thread-18-split-bolt-executor[8 8]] com.foo.bolt.WordSplitBolt [54] - 模擬異常情況,sourceStreamId:default,messageId:{-9071636350309294887=-1076991837303452874},sentence:A topology is a graph of spouts and bolts that are connected with stream groupings

[Thread-22-sentence-spout-executor[7 7]] com.foo.bolt.WordSplitBolt [76] - 失敗重發:messageId:239b1155-0eec-4a11-a8aa-3368ab947cee,values:[whereas a topology runs forever or until you kill it of course ]
[Thread-22-sentence-spout-executor[7 7]] com.foo.bolt.WordSplitBolt [76] - 失敗重發:messageId:bae350f5-8548-4e6b-852d-7280d03e0ea2,values:[One key difference is that a MapReduce job eventually finishes ]
[Thread-22-sentence-spout-executor[7 7]] com.foo.bolt.WordSplitBolt [76] - 失敗重發:messageId:22821cab-9fd0-4532-b731-2b995b98a381,values:[ A Storm topology is analogous to a MapReduce job ]
[Thread-22-sentence-spout-executor[7 7]] com.foo.bolt.WordSplitBolt [76] - 失敗重發:messageId:1195ba50-d2f3-4cea-9eab-e426ffbb5bf9,values:[A topology is a graph of spouts and bolts that are connected with stream groupings]
[Thread-22-sentence-spout-executor[7 7]] com.foo.bolt.WordSplitBolt [76] - 失敗重發:messageId:42d14bdc-8e42-4eb6-81ec-3496a9dc855d,values:[A topology is a graph of spouts and bolts that are connected with stream groupings]
[Thread-22-sentence-spout-executor[7 7]] com.foo.bolt.WordSplitBolt [76] - 失敗重發:messageId:20f77240-21bb-4e65-aee7-6e2918857a63,values:[A topology is a graph of spouts and bolts that are connected with stream groupings]

當然日誌當中出現的順序並不是這樣。通過日誌發現失敗的tuple確實都進行了重發,而在splitbolt中兩種情況下會出現異常,由於splitbolt設定的併發度為3所以總共出現了6次異常。

ps:完整程式碼在:https://github.com/Json-Lin/storm-practice/tree/master/guaranteed-word-count