1. 程式人生 > >storm 學習教程

storm 學習教程

class 模擬 保存 oms 上進 知識 cluster eclipse wordcount

轉自:http://blog.csdn.net/hrn1216/article/details/51538962

翻譯太累了,再也不想去翻譯了,真的太累了:

在這個教程中, 你將學到如何創建一個Storm topologies以及怎樣把它部署到storm集群上。本教程中,Java將作為主要使用的語言,但在一小部分示例中將會使用Python來闡述storm處理多語言的能力。

預備工作

本教程使用的例子來自於 storm-starter 項目. 我們建議你拷貝該項目並跟隨這個例子來進行學習。 請閱讀 Setting up a development environment 和 Creating a new Storm project 創建好相應的基礎環境。

Storm集群的組件

Storm集群在表面上與Hadoop集群相似。在hadoop上運行"MapReduce jobs",而在Storm上運行的是"topologies"。 "Jobs" and "topologies" 它們本身非常的不同 -- 一個關鍵的不同的是MapReduce job最終會完成並結束,而topology的消息處理將無限期進行下去(除非你kill它)。

在storm集群中,有兩類節點。Master節點運行守護進程稱為"Nimbus",它有點像 Hadoop 的 "JobTracker"。Nimbus負責集群的代碼分發,任務分配,故障監控。

每個工作節點運行的守護進程稱為"Supervisor"。Supervisor 負責監聽分配到它自己機器的作業,根據需要啟動和停止相應的工作進程,當然這些工作進程也是Nimbus分派給它的。每個工作進程執行topology的一個子集;一個運行的topology是由分布在多個機器的多個工作進程組成的。

技術分享

Nimbus 與 Supervisors 所有的協調工作是由 Zookeeper 集群完成的. 此外,Nimbus 守護進程 和 Supervisor 守護進程 是無狀態的,快速失敗的機制。 所有的狀態保存在Zookeeper上或者本地磁盤中。這就是說,你用kill -9殺掉Nimbus 或者Supervisors,它們重新啟動後就像什麽都沒有發生一樣,這樣的設計讓storm集群擁有令人難以置信的穩定性。

Topologies

在Storm上進行實時計算,你需要創建名為 "topologies" 的這麽個東西。一個topology是一個計算的圖,每個在topology中的節點(以下部分也稱作“組件”)包含了處理邏輯,以及多個節點間數據傳送的聯系和方式。運行一個topology很直接簡單的。第一,你把你的java code和它所有的依賴打成一個單獨的jar包。然後,你用如下的命令去運行就可以了。

storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2

這個例子中運行的類是 org.apache.storm.MyTopology 且帶著兩個參數 arg1arg2. 這個類的主要功能是定義topology,並被提交到Nimbus中。命令 storm jar 就是用來加載這個topology jar的。

因為topology的定義方式就是Thrift的結構,Nimbus也是一個Thrift服務,所以你可以用任何語言去創建topologies並提交。以上的例子是最簡單的方式去使用基於JVM的語言(比如java)創建的topology。請閱讀 Running topologies on a production cluster 來獲得更多的信息關於topology的啟動和停止。

Storm裏的核心抽象就是 "流"。流 是一個無界的 元組序列。Storm提供原始地、分布式地、可靠地方式 把一個流轉變成一個新的流。舉例來說,

你可以把一個 tweet 流 轉換成一個 趨勢主題 的流。

Storm中提供 流轉換 的最主要的原生方式是 "spouts" and "bolts"。Spouts 和 bolts 有相應的接口,你需要用你的應用的特定邏輯實現接口即可。

Spout是 流 的源頭。舉例來說,一個spout也許會從 Kestrel 隊列中讀取數據並以流的方式發射出來,亦或者一個spout也許會連接Twitter的API,

並發出一個關於tweets的流。

一個bolt可以消費任意數量的輸入流,並做一些處理,也可以由此發出一個新的流。復雜流的轉換,像從一個tweet流中計算出一個關於趨勢話題的流,它要求多個步驟,因此也需要多個bolt。Bolts 能做任何事情,運行方法,過濾元組,做流的聚合,流的連接,寫入數據庫等。

Spouts 和 bolts 組成的網絡 打包到 "topology" 中,它是頂級的抽象,是你需要提交到storm集群執行的東西。一個topology是一個由spout和bolt組成的 做流轉換 的圖,其中圖中的每個節點都可以是一個spout或者一個bolt。圖中的邊表明了bolt訂閱了哪些流,亦或是當一個spout或者bolt發射元組到流中,它發出的元組數據到訂閱這個流的所有bolt中去。

技術分享

topology中節點之間的聯系表明了元組數據是怎樣去傳送的。舉例來說,Spout A 和 Bolt B 相連(A 到 B),Spout A 和 Bolt C 相連(A 到 C),Bolt B 和 Bolt C相連(B 到 C)。每當Spout A發出元組數據時,它會同時發給Bolt B 和 Bolt C。再者,所有Bolt B的輸出元組,也會發給Bolt C。

在topology中的每個節點都是並行運行的。因此在你的topology中,你可以為每個節點指定並行運行的數量,然後storm集群將會產生相應數量的線程來執行。

一個topology無限運行,直到你殺掉它才會停止。Storm將自動地重新分配失敗過的任務。此外,Storm保證不會有數據丟失,即便是機器掛掉,消息被丟棄。

數據模型

Storm用元組作為它的數據模型。一個元組是一個命名的,有值的,一般由過個字段組成的序列,序列中的每個字段可以是任何類型的對象。在沙箱之外,Storm提供所有的原始類型,字符串,byte數組作為元組的字段值。如果想用一個其他類型的對象,你需要實現a serializer 接口。

每個topology節點必須聲明輸出元組的字段。舉例來說,這個bolt聲明它將輸出帶有"double" and "triple"兩個字段的元組:

public class DoubleAndTripleBolt extends BaseRichBolt {
    private OutputCollectorBase _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        int val = input.getInteger(0);        
        _collector.emit(input, new Values(val*2, val*3));
        _collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple"));
    }    
}

declareOutputFields 方法聲明了該bolt的輸出的字段 ["double", "triple"] .這個bolt的剩余部分將在接下來進行說明。

一個簡單topology

讓我們去看一個簡單的 topology,去探索更多的概念,去看一下它的代碼到底長什麽樣。讓我們看一下 ExclamationTopology 的定義,來自storm-starter的項目:

TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout("words", new TestWordSpout(), 10);        
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
        .shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
        .shuffleGrouping("exclaim1");

這個topology包含了一個spout和兩個bolt,這個spout發出words,然後每個bolt都在自己的讀入字符串上加上"!!!"。

這些節點被安排成一條線:spout發給第一個bolt,第一個bolt發給第二個bolt。如果spout發出的元組數據是["bob"] 和 ["john"],通過第二個

bolt後,將發出["bob!!!!!!"] 和 ["john!!!!!!"]。

代碼中用 setSpoutsetBolt 方法定義了節點。這些方法的輸入是 一個用戶指定的id,包含處理邏輯的對象,你希望該節點並行計算的數量。在這個例子中,這個 spout 的id是 "words" ,兩個bolt的id分別為 "exclaim1" 和 "exclaim2"。

包含處理邏輯的spout對象實現了 IRichSpout 接口,bolt對象實現了 IRichBolt 接口。

最後一個參數,是你希望該節點並行計算的數量是多少,這是可選的。它表明了會有多少線程會通過storm集群來執行這個組件(spout或bolt)。

如果你忽略它,Storm集群會分配單線程給該節點。

setBolt 返回一個 InputDeclarer 對象,它用來定義bolt的輸入。在這裏,bolt "exclaim1" 聲明了它希望通過shuffle分組的方式讀取 spout "words"中的所有元組。同理,Bolt "exclaim2" 聲明了它希望通過shuffle分組的方式讀取 bolt "exclaim1" 所發出的元組數據。

"shuffle 分組" 指元組數據 將會 隨機分布地 從輸入任務 到bolt任務中。在多個組件(spout或bolt)之間,這裏有很多數據分組的方式。

這在接下來的章節中會說明。

如果你希望bolt "exclaim2" 從 spout "words" 和 bolt "exclaim1" 讀取所有的元組數據,你需要像下面這樣定義:

builder.setBolt("exclaim2", new ExclamationBolt(), 5)
            .shuffleGrouping("words")
            .shuffleGrouping("exclaim1");

如你所見,輸入的定義可以是鏈式的,bolt可以指定多個源。

讓我們深入了解一下spout和bolt在topology中的實現。Spout負責發送新的數據到topology中。 TestWordSpout 在topology中每隔 100ms 發送了一個隨機的單詞,單詞來自列表["nathan", "mike", "jackson", "golda", "bertels"]。在 TestWordSpout 中的 nextTuple() 的實現細節如下:

public void nextTuple() {
    Utils.sleep(100);
    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
    final Random rand = new Random();
    final String word = words[rand.nextInt(words.length)];
    _collector.emit(new Values(word));
}

如你所見,這個實現非常簡單明了。

ExclamationBolt 給輸入的字符串追加上 "!!!" 。 讓我們看一下 ExclamationBolt 的完整實現吧:

public static class ExclamationBolt implements IRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }

    @Override
    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

prepare 方法提供了一個 OutputCollector 對象,它用來發出元組數據給下遊節點。元組數據可以在任意時間從bolt發出 -- 可以在 prepare,execute, 或者 cleanup 方法,或者 在另一個線程,異步地發送。 這裏的 prepare 實現很簡單,初始化並保存了 OutputCollector 的引用,該引用將在接下來的execute 方法中使用。

execute 方法從該bolt的輸入中接收一個元組數據, ExclamationBolt 對象提取元組中的第一個字段,並追加字符串 "!!!" 。如果你實現的bolt訂閱了多個輸入源,你可以用 Tuple 中的 Tuple#getSourceComponent 方法來獲取你當前讀取的這個元組數據來自於哪個源。

execute 方法中,還有一點東西需要說明。 即輸入的元組作為第一個參數 發出 ,然後在最後一行中發出確認消息。這是Storm保證可靠性的API的一部分,它保證數據不會丟失,這在之後的教程會說明。

當一個Bolt將要停止、關閉時,它需要關閉當前打開的資源,此時 cleanup 方法可以被調用。 需要註意的是,這並不保證這個方法在storm集群中一定會被調用:舉例來說,如果機器上的任務爆發,這就不會調用這個方法。 cleanup 方法打算用於,當你在 local mode 上運行你的topology(模擬storm集群的仿真模式), 你能夠啟動和停止很多topology且不會遭受任何資源泄露的問題。

declareOutputFields 方法聲明 ExclamationBolt 發出一個名稱為 "word" 的帶一個字段的元組。

getComponentConfiguration 方法允許你從很多方面配置這個組件怎樣去運行。更多高級的話題,深入的解釋,請參見 Configuration.

通常像 cleanupgetComponentConfiguration 方法,在bolt中並不是必須去實現的。你可以用一個更為簡潔的方式,通過使用一個提供默認實現的基本類去定義bolt,這也許更為合適一些。 ExclamationBolt 可以通過繼承 BaseRichBolt,這會更簡單一點,就像這樣:

public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }    
}

在local mode上運行ExclamationTopology

讓我們來看一下如何在本地模式上運行 ExclamationTopology ,看到它工作起來。

Storm有兩種操作模式:本地模式和分布式模式。在本地模式中,Storm通過線程模擬工作節點並在一個進程中完成執行。本地模式在用於開發和測試topology時是很有用處的。當你在本地模式中運行 storm-starter 項目中的 topology 時,你就能看到每個組件發送了什麽信息。你可以獲取更多關於在本地模式上運行topology的信息,請參見 Local mode。

在分布式模式中,Storm操作的是機器集群。當你提交一個topology給master,你也需要提交運行這個topology所必須的代碼。Master將會關註於分發你的代碼並分配worker去運行你的topology。如果worker掛掉,master將會重新分配這些代碼、topology到其他地方。你可以獲取更多關於在分布式模式上運行topology的信息,請參見 Running topologies on a production cluster。

這是在本地模式上運行 ExclamationTopology 的代碼:

Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();

首先,代碼中定義了單進程的偽集群,通過創建 LocalCluster 對象實現。提交topology到虛擬集群,和提交topology到真正的分布式集群是相同的。提交topology使用LocalClustersubmitTopology 方法。它需要的參數為 topology的名字,topology的配置,topology本身。

topology的名稱是為了標識topology,以便你之後可以停掉它。一個topology將無限期運行,直到你停掉它。

topology的配置可以從多個方面調整topology運行時的形態。這裏給出了兩個最為常見的配置:

  1. TOPOLOGY_WORKERS (用 setNumWorkers方法來設置) 表明你希望在storm集群中分配多少進程來執行你的topology。每個在topology中的組件(spout 或 bolt)將會被分配多個線程去執行。線程數的設置是通過組件的 setBoltsetSpout 方法。這些線程存在於worker進程中。 每個worker進程包含了處理一些組件的一些線程,例如,你橫跨集群指定了300個線程處理所有的組件,且指定了50個worker進程。也就是說,每個工作進程將執行6個線程, 其中的每一個可能又屬於不同的組件。調整topology的性能需要通過調整每個組件的並行線程數 和 工作進程中運行的線程數量。
  2. TOPOLOGY_DEBUG (用 setDebug方法來設置), 當設為true的時候,它將告訴Storm打印組件發出的每條信息。這在本地模式測試topology的時候很有用處。但是當你的topology在集群中運行的時候,或許你應該關掉它。

這裏有很多其他的topology的配置,更多細節請參見 the Javadoc for Config.

學習如何建立自己的開發環境,以便你能用本地模式運行你的topology(比如在eclipse裏),請參見 Creating a new Storm project.

流的分組方式

流的分組方式告訴一個topology,兩個組件是通過怎樣的方式傳遞元組數據。記住,spout 和 bolt 是並行執行在集群上的多個任務中的。如果你想知道一個topology是如何在任務層執行的,它也許就是這樣的:

技術分享

從上圖可知,bolt A 有4個task,bolt B有3個task。當bolt A的1個task 發出一個元組給 bolt B,那麽bolt A的這個task應該發給bolt B的哪個task呢?

流的分組方式通過告訴storm在兩個任務集(上圖的 bolt A 的任務集 合 bolt B的任務集)發送元組信息的方式 回答了這個問題。

在我們深入研究不同的流的分組策略前,我們先看一下另一個來自 storm-starter 項目的例子。 WordCountTopology 從spout讀取一句話,並分流給 WordCountBolt ,並統計句子中每個詞兒出現的次數:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("sentences", new RandomSentenceSpout(), 5);        
builder.setBolt("split", new SplitSentence(), 8)
        .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
        .fieldsGrouping("split", new Fields("word"));

SplitSentence 發出的元組來自於它接收到的每個句子的每個單詞,WordCount 在內存中保留一個map統計單詞的次數。每當WordCount

收到一個單詞,它將更新map的狀態並發出最新的單詞統計。

這裏有幾種不同的流的分組方式。

最簡單的分組方式稱作 "shuffle grouping" ,這種方式是將元組數據發送給隨機的任務。shuffle grouping 用於 WordCountTopology 中,RandomSentenceSpout 發送元組數據給 SplitSentence bolt 的部分。它很有效地把元組數組 均勻分給所有的 SplitSentence bolt 的任務。

一個更有趣的分組方式叫做 "fields grouping"。在本例中,字段分組用於 SplitSentence bolt 和 WordCount bolt 之間。它對 WordCount 的功能至關重要,它保證了相同的單詞只會去相同的任務中。否則,會有多個任務接收到相同的單詞,它們各自發出的單詞統計也是不正確的,因為它們獲得的都是不全的信息。 fields grouping 讓你可以通過字段來進行數據流的分組,這樣就導致了相同的字段值會進入到相同的任務中去。 WordCount 訂閱了 SplitSentence 的輸出流,並且是通過fields grouping的方式,本例針對 "word" 字段進行分組, 使相同的單詞進入了相同的任務,bolt 就可以給出正確的結果了。

Fields groupings 是 流的連接 和 聚合 的基礎實現,它也有其他很多的例子。

在底層, fields groupings 使用 mod hashing 算法來實現的。

這裏還有一些其他的分組方式,更多信息請參見 Concepts.

用其他語言定義Bolts

Bolts 可以用任何語言定義。用其他語言寫的 Bolts ,是以子進程的方式運行,storm與子進程通信的方式是 通過輸入輸出 JSON 數據來實現的。通信協議只需要1個100行的適配器代碼的庫, Storm附帶了 Ruby, python, and Fancy 的適配器的庫。

這是 SplitSentence bolt 的定義,來自於 WordCountTopology:

public static class SplitSentence extends ShellBolt implements IRichBolt {
    public SplitSentence() {
        super("python", "splitsentence.py");
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

SplitSentence 覆蓋 ShellBolt 並聲明用 python ,運行文件為 splitsentence.py. 這是 splitsentence.py的實現:

import storm

class SplitSentenceBolt(storm.BasicBolt):
    def process(self, tup):
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])

SplitSentenceBolt().run()

更多關於用其他語言編寫 spout 和 bolt 的信息,創建topology的信息 (以及完全避免 JVM 的方式), 請參見 Using non-JVM languages with Storm.

保證消息的處理

本教程前面部分,我們跳過了一些方面如元組是怎麽發出的,這些方面涉及了Storm的可靠性的API:Storm是如何保證每條來自於spout的消息會被完全完整地被處理。 參見 Guaranteeing message processing 信息看它是如何工作的,以及作為用戶的你,如何使用storm的可靠性能的這一優點。

事務型的topologies

Storm 保證每條信息至少被處理一次. 一個大眾化的問題被提出: "你怎麽在storm頂層上進行計數操作?計數值會不會超量?" Storm 有一個特性稱作事務型topologies,在絕大部分的計算場景中,它的實現可以讓消息只傳遞一次。獲取更多事務型的topologies的信息: here.

分布式 RPC

本教程介紹了storm頂層的核心流的處理過程。storm原生部分 是有很多的事情可以去做的,其中最有意思的應用就是storm的分布式RPC,它具有很強的並行計算的性能。獲取更多的關於分布式RPC的信息: here.

結論

這個教程給出了一個關於storm開發,測試,部署的概覽,剩余的文檔將深入storm的每個方面。

storm 學習教程