1. 程式人生 > >從wordcount瞭解storm的所有基礎用法

從wordcount瞭解storm的所有基礎用法

不一樣的wordcount

workcount例子如同初學Java時的HelloWord一樣,下面通過一個workcount瞭解storm的所有基礎用法。

整個工程的結構圖如下:
工程結構圖
由5部分組成,其中topology,spout,bolt是wordcount的主要計算程式碼,而source是一個自定義的sentence發射器,util裡面包含了日誌解析的工具類。

1. 自定義可計數的sentence發射器

wordcount的主要作用是統計單詞的個數,我們通過一SentenceEmitter記錄發射出的單詞總個數,然後對比storm統計的單詞個數,對比二者的計數是否一致。
SentenceEmitter的程式碼如下:


/**
 * @author JasonLin
 * @version V1.0
 */
public class SentenceEmitter {
    private AtomicLong atomicLong = new AtomicLong(0);

    private final AtomicLongMap<String> CONUTS = AtomicLongMap.create();

    private final String[] SENTENCES = {"The logic for a realtime application is packaged into a Storm topology",
            " A Storm topology is analogous to a MapReduce job ",
            "One key difference is that a MapReduce job eventually finishes ",
            "whereas a topology runs forever or until you kill it of course ",
            "A topology is a graph of spouts and bolts that are connected with stream groupings"};


    /**
     * 隨機發射sentence,並記錄單詞數量,該統計結果用於驗證與storm的統計結果是否相同。
     * 當發射總數<1000時,停止發射,以便程式在停止時,其它bolt能將發射的資料統計完畢
     *
     * @return
     */
    public String emit() {
        if (atomicLong.incrementAndGet() >= 1000) {
            try {
                Thread.sleep(10000 * 1000);
            } catch (InterruptedException e) {
                return null;
            }
        }
        int randomIndex = (int) (Math.random() * SENTENCES.length);
        String sentence = SENTENCES[randomIndex];
        for (String s : sentence.split(" ")) {
            CONUTS.incrementAndGet(s);
        }
        return sentence;
    }

    public void printCount() {
        System.out.println("--- Emitter COUNTS ---");
        List<String> keys = new ArrayList<String>();
        keys.addAll(CONUTS.asMap().keySet());
        Collections.sort(keys);
        for (String key : keys) {
            System.out.println(key + " : " + this.CONUTS.get(key));
        }
        System.out.println("--------------");
    }

    public AtomicLongMap<String> getCount() {
        return CONUTS;
    }

    public static void main(String[] args) {
        SentenceEmitter sentenceEmitter = new SentenceEmitter();
        for (int i = 0; i < 20; i++) {
            System.out.println(sentenceEmitter.emit());
        }
        sentenceEmitter.printCount();
    }
}

該類定義了一個emit()方法用於隨機發射一個sentence,printCount() 用於列印emitter記錄的發射的單詞個數。

2. Spout元件

Spout元件是topology的資料來源,SentenceSpout通過SentenceEmitter的emit()方法來獲取一個隨機的sentence,然後發射出去。

/**
 * @author JasonLin
 * @version V1.0
 */
public class SentenceSpout extends BaseRichSpout {

    private static final long serialVersionUID = -5335326175089829338L;
    private SpoutOutputCollector collector;
    private SentenceEmitter sentenceEmitter;

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.sentenceEmitter = new SentenceEmitter();
    }

    @Override
    public void nextTuple() {
        String word = sentenceEmitter.emit();
        collector.emit(new Values(word));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }

    @Override
    public void close() {
        super.close();
        sentenceEmitter.printCount();
    }
}

open()方法用與初始化,nextTuple() 是spout的核心方法,通過該方法向整個 tuple 樹發射資料(s實則是通過SpoutOutputCollector的emit(List tuple)方法發射資料)。declareOutputFields()方法宣告此拓撲的所有流的輸出模式,這裡將發射出的單詞宣告為"sentence"。close()方法在spout關閉的時候做一些清理工作,這裡將列印sentenceEmitter最終發射的單詞計數。

3. WordSplitBolt單詞分隔bolt

WordSplitBolt接收SentenceSpout發射出的sentence,然後通過空格符進行分隔單詞,然後繼續向後發射分隔的單詞。

/**
 * @author JasonLin
 * @version V1.0
 */
public class WordSplitBolt extends BaseRichBolt {
    private static final long serialVersionUID = 2932049413480818649L;
    private static final Logger LOGGER = Logger.getLogger(WordSplitBolt.class);
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String sentence = input.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for (String word : words) {
            collector.emit(new Values(word));
        }
        LOGGER.info("--sentence--" + sentence);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

prepare()方法類似spout的open()方式,初始化bolt。execute()方法是bolt的核心方法,資料將在這裡進行處理,然後決定是否繼續將處理的資料向後發射。程式碼邏輯也很簡單spout發射的資料與“sentence”對應,在此bolt裡面通過input.getStringByField(“sentence”)獲取到對應的spout發射的Tuple,有點類似於key-value資料庫一樣,spout將資料儲存在某個key裡面,bolt獲取指定key的值,只不過storm裡面使用Tuple來定義key-value。

4. WordCountBolt單詞計數bolt

終於到了WordCount最核心的程式碼了,WordCountBolt將WordSplitBolt 發射的tuple進行統計其中word出現的次數。

/**
 * @author JasonLin
 * @version V1.0
 */
public class WordCountBolt extends BaseRichBolt {
    private static final long serialVersionUID = -7753338296650445257L;
    private static final Logger LOGGER = Logger.getLogger(WordCountBolt.class);
    private OutputCollector collector;
    private HashMap<String, Long> counts = null;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.counts = new HashMap<String, Long>();
    }

    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Long count = this.counts.get(word);
        if (count == null) {
            count = 0L;
        }
        count++;
        counts.put(word, count);
        collector.emit(new Values(word, count));
        LOGGER.info("--word--" + word);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

程式碼也非常簡單,通過counts快取該bolt接收到的word,然後累加出現的次數。declarer.declare(new Fields(“word”, “count”))最後將word和計數發射到下游bolt。

5. ReportBolt 輸出最終的計算結果

ReportBolt通過map將WordCountBolt計算的最後的結果進行統一。在WordCountBolt中會多次傳送一個word的計數結果,每次發射的結果是上次的結果+1,所以ReportBolt 通過map保留最後的計數結果。

public class ReportBolt extends BaseRichBolt {
    private static final Logger LOGGER = Logger.getLogger(ReportBolt.class);

    private static final long serialVersionUID = -3973016696731250995L;
    private HashMap<String, Long> counts = null;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        counts = new HashMap<>();
    }

    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Long count = input.getLongByField("count");
        this.counts.put(word, count);
        LOGGER.info("--globalreport--" + word);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    @Override
    public void cleanup() {
        System.out.println("--- FINAL COUNTS ---");
        List<String> keys = new ArrayList<String>();
        keys.addAll(this.counts.keySet());
        Collections.sort(keys);
        for (String key : keys) {
            System.out.println(key + " : " + this.counts.get(key));
        }
        System.out.println("--------------");
    }
}

由於沒有後續的bolt了,所以declareOutputFields方法不在做任何操作。同樣cleanup()方法用於在關閉該bolt時做一些後續操作,此處是列印該最終的統計結果。

6.WordCountTopology提交本地Topology

終於到了最後的操作,提交一個Topology

public class WordCountTopology {

    private static final String SENTENCE_SPOUT_ID = "sentence-spout";
    private static final String SPLIT_BOLT_ID = "split-bolt";
    private static final String COUNT_BOLT_ID = "count-bolt";
    private static final String REPORT_BOLT_ID = "report-bolt";
    private static final String TOPOLOGY_NAME = "word-count-topology";

    public static void main(String[] args) throws Exception {

        SentenceSpout spout = new SentenceSpout();
        WordSplitBolt splitBolt = new WordSplitBolt();
        WordCountBolt countBolt = new WordCountBolt();
        ReportBolt reportBolt = new ReportBolt();


        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(SENTENCE_SPOUT_ID, spout);
        //將生成的sentence隨機分組,然後發射出去
        builder.setBolt(SPLIT_BOLT_ID, splitBolt,3) .shuffleGrouping(SENTENCE_SPOUT_ID);

        //splitBolt按照空格後分隔sentence為word,然後發射給countBolt
        builder.setBolt(COUNT_BOLT_ID, countBolt, 3).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")).setNumTasks(6);

        // WordCountBolt --> ReportBolt
        builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);

        Config config = new Config();
        config.setNumWorkers(1);
        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
        Thread.sleep(15*1000);
        cluster.killTopology(TOPOLOGY_NAME);
        cluster.shutdown();
    }
}

在Topology中通過TopologyBuilder來建立一個Topology,然後通過LocalCluster來提交一個本地Topology。在上面的程式碼中我們設定了spout並行度為1,splitBolt的並行度為3,即3個executor;countBolt的並行度為3,reportBolt的並行度為1;work為1。因此,這個拓撲的總並行度就是 1+3 + 3 + 1 = 8。此topology的並行圖如下:

在這裡插入圖片描述

7. 本地執行

本地執行後,檢視日誌,topology輸出的統計結果為:

--- FINAL COUNTS ---
 : 203
A : 412
MapReduce : 393
One : 190
Storm : 400
The : 197
a : 1196
analogous : 203
and : 209
application : 197
are : 209
bolts : 209
connected : 209
course : 200
difference : 190
eventually : 190
finishes : 190
for : 197
forever : 200
graph : 209
groupings : 209
into : 197
is : 799
it : 200
job : 393
key : 190
kill : 200
logic : 197
of : 409
or : 200
packaged : 197
realtime : 197
runs : 200
spouts : 209
stream : 209
that : 399
to : 203
topology : 809
until : 200
whereas : 200
with : 209
you : 200
--------------

emitter統計的結果為:

--- Emitter COUNTS ---
 : 203
A : 412
MapReduce : 393
One : 190
Storm : 400
The : 197
a : 1196
analogous : 203
and : 209
application : 197
are : 209
bolts : 209
connected : 209
course : 200
difference : 190
eventually : 190
finishes : 190
for : 197
forever : 200
graph : 209
groupings : 209
into : 197
is : 799
it : 200
job : 393
key : 190
kill : 200
logic : 197
of : 409
or : 200
packaged : 197
realtime : 197
runs : 200
spouts : 209
stream : 209
that : 399
to : 203
topology : 809
until : 200
whereas : 200
with : 209
you : 200
--------------

兩者統計結果一致,說明我們的wordcount計算結果完全正確。

8.storm分組再理解

storm共有八種分組,具體的可以看這裡資料流分組,我們繼續分析wordcount的資料時怎麼分組的,wordcount將storm的常用的分組都包含在內了。

builder.setBolt(SPLIT_BOLT_ID, splitBolt,3) .shuffleGrouping(SENTENCE_SPOUT_ID);

在這裡我們設定splitBolt的分組為shuffleGrouping,即隨機分組,表示spout發射的tuple將隨機的發射給所有的splitBolt。

builder.setBolt(COUNT_BOLT_ID, countBolt, 3).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")).setNumTasks(6);

然後splitBolt將發射的資料按照fieldsGrouping分組的方式發射給countBolt,簡單的說就是相同的單詞會被髮射到同一個countBolt,這樣一個單詞的最終計數才是正確的。

builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);

最後globalGrouping表示會將上游countBolt的資料全部彙總到reportBolt。

在上面的程式碼中我們在每個bolt元件裡面都輸出了當前執行緒所處理的資料,接下來我們通過uilt裡面的工具類分析執行日誌,幫助加深理解這幾個分鐘方式。

log解析的程式碼如下

**
 * @author JasonLin
 * @version V1.0
 */
public class LogUtils {

    public static void sysoWordByWhatThread(String fileName, String prefix, String splitPointStr) throws IOException {
        List<String> strings = FileUtils.readFromFileToList(fileName);
        List<String> need = Lists.newArrayList();
        for (String s : strings) {
            if (!s.contains(prefix)) {
                continue;
            }
            need.add(s);
        }
        HashMultimap<String, String> result = HashMultimap.create();
        for (String n : need) {
            String[] strs = n.split(splitPointStr);
            if (strs.length > 1) {
                result.put(strs[0], strs[1]);
            } else {
                result.put(strs[0], "");
            }
        }
        for (String key : result.keySet()) {
            System.out.println(key + " : " + result.get(key));
        }
    }

    public static void main(String[] args) throws IOException {
        sysoWordByWhatThread("C:\\Users\\LinYu\\Desktop\\new 1.txt", "--word--", "--word--");
        sysoWordByWhatThread("C:\\Users\\LinYu\\Desktop\\new 1.txt", "--sentence--", "--sentence--");
    }
}

在WordSplitBolt中列印了當前執行緒名稱,處理的sentence和"–sentence–“關鍵字;WordCountBolt中列印了當前執行緒名稱,處理的word和”–word–"關鍵字,日誌工具將統計每個執行緒處理了哪些word和sentence。

日誌的解析結果如下:

[Thread-24-count-bolt-executor[6 7]]  : [realtime, or, topology, for, is, eventually, MapReduce, into, of, spouts, until, logic, whereas, job]
[Thread-34-count-bolt-executor[4 5]]  : [a, bolts, One, kill, graph, The, that, application, groupings, and, difference, finishes, forever, you, analogous]
[Thread-22-count-bolt-executor[2 3]]  : [, A, Storm, it, connected, with, are, stream, course, to, runs, key, packaged]
[Thread-30-split-bolt-executor[11 11]]  : [A topology is a graph of spouts and bolts that are connected with stream groupings, whereas a topology runs forever or until you kill it of course , The logic for a realtime application is packaged into a Storm topology,  A Storm topology is analogous to a MapReduce job , One key difference is that a MapReduce job eventually finishes ]
[Thread-20-split-bolt-executor[12 12]]  : [A topology is a graph of spouts and bolts that are connected with stream groupings, whereas a topology runs forever or until you kill it of course , The logic for a realtime application is packaged into a Storm topology,  A Storm topology is analogous to a MapReduce job , One key difference is that a MapReduce job eventually finishes ]
[Thread-36-split-bolt-executor[10 10]]  : [whereas a topology runs forever or until you kill it of course , A topology is a graph of spouts and bolts that are connected with stream groupings, The logic for a realtime application is packaged into a Storm topology,  A Storm topology is analogous to a MapReduce job , One key difference is that a MapReduce job eventually finishes ]
[Thread-18-report-bolt-executor[8 8]]  : [, A, Storm, One, for, eventually, MapReduce, The, that, into, are, stream, and, of, spouts, finishes, course, whereas, you, key, analogous, packaged, a, realtime, or, topology, bolts, is, it, kill, graph, connected, with, application, groupings, difference, until, logic, to, job, forever, runs]


我們看到c總共有6個執行緒,即6個executor,其中3個count-bolt-executor,3個split-bolt-executor,也驗證了前面的並行度設定,和task的數量並不參與的並行度計算!

接著看我們給splitBolt設定的分組為shuffleGrouping,隨機分組是將所有的tuple隨機發送,該日誌統計結果也表明這幾個sentence處於不同的split-bolt-executor執行緒,同時多次執行會發現每個執行緒的sentence會有所不同。

再看WordCountBolt的分組為fieldsGrouping,fieldsGrouping表示相同的Fields會被分配到同一個任務中,從日誌中發現3個count-bolt-executor裡面的單詞全部都沒有重複也驗證了這一點。

report-bolt-executor由於是globalGrouping,所以只有一個執行緒,並且包含之前所有發射出來的word,即使將builder.setBolt(REPORT_BOLT_ID, reportBolt,2).globalGrouping(COUNT_BOLT_ID);的併發度設定為2,其最終還是隻有一個執行緒在執行。所以對於globalGrouping調高併發度沒有任何意義。

**finally:**完整的程式碼在:https://github.com/Json-Lin/storm-practice.git
ps:

  1. 在執行的時候如果檢視FINAL COUNTS和Emitter COUNTS日誌,建議先把其它的元件日誌註釋,因為FINAL COUNTS和Emitter COUNTS日誌是控制檯輸出,如果日誌過多,控制檯會展示不完全。
  2. LOGGER日誌的目錄在工程同級目錄log資料夾下,可以在logback.xml裡面設定日誌路徑
<!-- 日誌檔案路徑 -->
<fileNamePattern>../logs/${log.context.name}-%d{yyyy-MM-dd}.%i.log</fileNamePattern>