1. 程式人生 > >跟我學storm教程2-並行機制及資料流分組

跟我學storm教程2-並行機制及資料流分組

topology的四個組成部分

Nodes(伺服器)

  • 即為storm叢集中的supervisor,會執行topology的一部分運算,一個storm叢集一般會有多個node

workers(JVM虛擬機器)

  • node節點上執行的相互獨立的jvm程序,每個節點上可以執行一個或者多個worker。一個複雜的topology會分配到多個worker上執行。

Executor(執行緒)

  • 指某個jvm程序中執行的java執行緒。多個task可以指派給同一個executor執行。storm預設給每個executor分配一個task。

task(spout/bolt例項)

  • task是spout或者bolt的例項,他們的netTuple()和execute()方法會被executor執行緒呼叫執行

示例

builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
        // SentenceSpout --> SplitSentenceBolt
        builder.setBolt(SPLIT_BOLT_ID, splitBolt, 2)
                .setNumTasks(4)
                .shuffleGrouping(SENTENCE_SPOUT_ID);
// SplitSentenceBolt --> WordCountBolt builder.setBolt(COUNT_BOLT_ID, countBolt, 6) .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); // WordCountBolt --> ReportBolt builder.setBolt(REPORT_BOLT_ID, reportBolt) .globalGrouping(COUNT_BOLT_ID);
Config conf = JStormHelper.getConfig(null); conf.setNumWorkers(2);

如上述配置的拓撲,其併發示意圖如下圖所示:
這裡寫圖片描述

  • 其共有2個worker,10個executor,帶圓角的矩形為executor,共12個task(2個spout, 10個bolt)

資料流分組

  • Stream Grouping,告訴topology如何在兩個元件之間傳送tuple 。定義一個topology的其中一步是定義每個bolt接收什麼樣的流作為輸入。stream grouping就是用來定義一個stream應該如果分配資料給bolts上面的多個tasks
  • Storm裡面有7種類型的stream grouping

Shuffle Grouping

  • 隨機分組,隨機派發stream裡面的tuple,保證每個bolt task接收到的tuple數目大致相同。

Fields Grouping

  • 按欄位分組,比如,按”user-id”這個欄位來分組,那麼具有同樣”user-id”的 tuple 會被分到相同的Bolt裡的一個task, 而不同的”user-id”則可能會被分配到不同的task。

All Grouping

  • 廣播發送,對亍每一個tuple,所有的bolts都會收到

Global Grouping

  • 全域性分組,整個stream被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task。

None Grouping

  • 不分組,這個分組的意思是說stream不關心到底怎樣分組。目前這種分組和Shuffle grouping是一樣的效果, 有一點不同的是storm會把使用none grouping的這個bolt放到這個bolt的訂閱者同一個執行緒裡面去執行(如果可能的話)。

Direct Grouping

  • 指向型分組, 這是一種比較特別的分組方法,用這種分組意味著訊息(tuple)的傳送者指定由訊息接收者的哪個task處理這個訊息。只有被宣告為 Direct Stream 的訊息流可以宣告這種分組方法。而且這種訊息tuple必須使用 emitDirect 方法來發射。訊息處理者可以通過 TopologyContext 來獲取處理它的訊息的task的id (OutputCollector.emit方法也會返回task的id)

Local or shuffle grouping

  • 本地或隨機分組。如果目標bolt有一個或者多個task與源bolt的task在同一個工作程序中,tuple將會被隨機發送給這些同進程中的tasks。否則,和普通的Shuffle Grouping行為一致。

其他

messageId

  • 這裡插入講下messageId,messageId可以標識唯一一條訊息,我們通過messageId可以追蹤訊息的處理以及驗證分組是否符合我們的預期等待。可通過tuple.getMessageId()獲取messageId。

taskId

  • storm中的每一個task對會對應唯一一個taskId,其可以通過topologyContext.getThisTaskId()獲取。

演示

  • 我們通過messageId追蹤一條訊息的宣告週期,如下圖所示。
  • 這裡寫圖片描述

  • 可以清晰的看到一個語句被SplitSentenceBolt接收後並切分成單次傳送給WordCountBolt,WordCountBolt接收到各單次後計算然後傳送給ReportBolt進行列印。

  • 由於SplitSentenceBolt split後的欄位是按照fieldgroup後傳遞給WordCountBolt,從下圖中可以看到欄位相同的單次被髮往同一個WordCountBolt。大家也可以換成別的單次grep下看結果。
  • 這裡寫圖片描述

程式碼

SentenceSpout

public class SentenceSpout extends BaseRichSpout {
    private static final Logger logger = LoggerFactory.getLogger(SentenceSpout.class);

    private ConcurrentHashMap<UUID, Values> pending;
    private SpoutOutputCollector collector;
    private String[] sentences = {
            "my dog has fleas",
            "i like cold beverages",
            "the dog ate my homework",
            "don't have a cow man",
            "i don't think i like fleas"
    };
    private AtomicInteger index = new AtomicInteger(0);

    private Integer taskId = null;

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }

    public void open(Map config, TopologyContext context,
                     SpoutOutputCollector collector) {
        this.collector = collector;
        this.pending = new ConcurrentHashMap<UUID, Values>();
        this.taskId = context.getThisTaskId();
    }

    public void nextTuple() {
        Values values = new Values(sentences[index.getAndIncrement()]);
        UUID msgId = UUID.randomUUID();
        this.pending.put(msgId, values);
        this.collector.emit(values, msgId);
        if (index.get() >= sentences.length) {
            index = new AtomicInteger(0);
        }
        logger.warn(String.format("SentenceSpout with taskId: %d emit msgId: %s and tuple is: %s",
                taskId,
                msgId,
                JSONObject.toJSON(values)));
        Utils.waitForMillis(100);
    }

    public void ack(Object msgId) {
        this.pending.remove(msgId);
        logger.warn(String.format("SentenceSpout taskId: %d receive msgId: %s and remove it from the pendingmap",
                taskId,
                JSONObject.toJSONString(msgId)));
    }

    public void fail(Object msgId) {
        logger.error(String.format("SentenceSpout taskid: %d receive msgId: %s and remove it from the pendingmap",
                taskId,
                JSONObject.toJSONString(msgId)));
        this.collector.emit(this.pending.get(msgId), msgId);
    }
}

SplitSentenceBolt

public class SplitSentenceBolt extends BaseRichBolt {

    private static final Logger logger = LoggerFactory.getLogger(SplitSentenceBolt.class);
    private OutputCollector collector;
    private Integer taskId = null;

    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.taskId = context.getThisTaskId();
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for(String word : words){
            this.collector.emit(tuple, new Values(word));
        }
        this.collector.ack(tuple);

        logger.warn(String.format("SplitSentenceBolt taskid: %d acked tuple: %s and messageId is: %s",
                taskId,
                JSONObject.toJSONString(tuple, SerializerFeature.WriteMapNullValue),
                tuple.getMessageId()));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

WordCountBolt

public class WordCountBolt extends BaseRichBolt {

    private static final Logger logger = LoggerFactory.getLogger(WordCountBolt.class);

    private OutputCollector collector;
    private HashMap<String, Long> counts = null;
    private Integer taskId = null;

    public void prepare(Map config, TopologyContext context,
                        OutputCollector collector) {
        this.collector = collector;
        this.counts = new HashMap<String, Long>();
        this.taskId = context.getThisTaskId();
    }

    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = this.counts.get(word);
        if(count == null){
            count = 0L;
        }
        count++;
        this.counts.put(word, count);
        this.collector.ack(tuple);
        logger.warn(String.format("WordCountBolt taskId: %d receive tuple: %s messageId is: %s and going to emit it",
                taskId,
                JSONObject.toJSONString(tuple),
                tuple.getMessageId()));
        this.collector.emit(tuple, new Values(word, count));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

WordCountTopology

public class WordCountTopology {

    private static final String SENTENCE_SPOUT_ID = "sentence-spout";
    private static final String SPLIT_BOLT_ID = "split-bolt";
    private static final String COUNT_BOLT_ID = "count-bolt";
    private static final String REPORT_BOLT_ID = "report-bolt";
    private static final String TOPOLOGY_NAME = "word-count-topology";

    public static void main(String[] args) throws Exception {

        SentenceSpout spout = new SentenceSpout();
        SplitSentenceBolt splitBolt = new SplitSentenceBolt();
        WordCountBolt countBolt = new WordCountBolt();
        ReportBolt reportBolt = new ReportBolt();


        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
        // SentenceSpout --> SplitSentenceBolt
        builder.setBolt(SPLIT_BOLT_ID, splitBolt, 2)
                .setNumTasks(4)
                .shuffleGrouping(SENTENCE_SPOUT_ID);
        // SplitSentenceBolt --> WordCountBolt
        builder.setBolt(COUNT_BOLT_ID, countBolt, 6)
                .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
        // WordCountBolt --> ReportBolt
        builder.setBolt(REPORT_BOLT_ID, reportBolt)
                .globalGrouping(COUNT_BOLT_ID);


        Config conf = JStormHelper.getConfig(null);
        conf.setNumWorkers(2);
        conf.setDebug(true);
        boolean isLocal = true;

        JStormHelper.runTopology(builder.createTopology(), TOPOLOGY_NAME, conf, 10,
                new JStormHelper.CheckAckedFail(conf), isLocal);
    }
}