1. 程式人生 > >Storm入門 第二章準備開始

Storm入門 第二章準備開始

準備開始

在本章,我們要建立一個Storm工程和我們的第一個Storm拓撲結構。

NOTE: 下面假設你的JRE版本在1.6以上。我們推薦Oracle提供的JRE。你可以到http://www.java .com/downloads/下載。

操作模式

開始之前,有必要了解一下Storm的操作模式。有下面兩種方式。

本地模式

在本地模式下,Storm拓撲結構執行在本地計算機的單一JVM程序上。這個模式用於開發、測試以及除錯,因為這是觀察所有元件如何協同工作的最簡單方法。在這種模式下,我們可以調整引數,觀察我們的拓撲結構如何在不同的Storm配置環境下執行。要在本地模式下執行,我們要下載Storm開發依賴,以便用來開發並測試我們的拓撲結構。我們建立了第一個Storm工程以後,很快就會明白如何使用本地模式了。

NOTE: 在本地模式下,跟在叢集環境執行很像。不過很有必要確認一下所有元件都是執行緒安全的,因為當把它們部署到遠端模式時它們可能會執行在不同的JVM程序甚至不同的物理機上,這個時候它們之間沒有直接的通訊或共享記憶體。

我們要在本地模式執行本章的所有例子。

遠端模式

在遠端模式下,我們向Storm叢集提交拓撲,它通常由許多執行在不同機器上的流程組成。遠端模式不會出現除錯資訊, 因此它也稱作生產模式。不過在單一開發機上建立一個Storm叢集是一個好主意,可以在部署到生產環境之前,用來確認拓撲在叢集環境下沒有任何問題。

你將在第六章學到更多關於遠端模式的內容,並在附錄B學到如何安裝一個Storm叢集。

我們在這個工程裡建立一個簡單的拓撲,數單詞數量。我們可以把這個看作Storm的“Hello World”。不過,這是一個非常強大的拓撲,因為它能夠擴充套件到幾乎無限大的規模,而且只需要做一些小修改,就能用它構建一個統計系統。舉個例子,我們可以修改一下工程用來找出Twitter上的熱點話題。

要建立這個拓撲,我們要用一個spout讀取文字,第一個bolt用來標準化單詞,第二個bolt為單詞計數,如圖2-1所示。

NOTE: 如果你使用git(一個分散式版本控制與原始碼管理工具),你可以執行git clone [email protected]:storm-book/examples-ch02-getting_started.git,把原始碼檢出到你指定的目錄。

構建Storm執行環境的第一步是檢查你安裝的Java版本。開啟一個控制檯視窗並執行命令:java -version。控制檯應該會顯示出類似如下的內容:

    java -version

    java version "1.6.0_26"
    Java(TM) SE Runtime Enviroment (build 1.6.0_26-b03)

    Java HotSpot(TM) Server VM (build 20.1-b02, mixed mode)

建立工程

開始之前,先為這個應用建一個目錄(就像你平常為Java應用做的那樣)。這個目錄用來存放工程原始碼。

接下來我們要下載Storm依賴包,這是一些jar包,我們要把它們新增到應用類路徑中。你可以採用如下兩種方式之一完成這一步:

  • 下載所有依賴,解壓縮它們,把它 們新增到類路徑

NOTE: Maven是一個軟體專案管理的綜合工具。它可以用來管理專案的開發週期的許多方面,從包依賴到版本釋出過程。在這本書中,我們將廣泛使用它。如果要檢查是否已經安裝了maven,在命令列執行mvn。如果沒有安裝你可以從http://maven.apache.org/download.html下載。

沒有必要先成為一個Maven專家才能使用Storm,不過了解一下關於Maven工作方式的基礎知識仍然會對你有所幫助。你可以在Apache Maven的網站上找到更多的資訊(http://maven.apache.org/)。

NOTE: Storm的Maven依賴引用了執行Storm本地模式的所有庫。

要執行我們的拓撲,我們可以編寫一個包含基本元件的pom.xml檔案。

    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
             http://maven.apache.org/xsd/maven-4.0.0.xsd">
             <modelVersion>4.0.0</modelVersion>
             <groupId>storm.book</groupId>
             <artifactId>Getting-Started</artifactId>
             <version>0.0.1-SNAPSHOT</version>
             <build>
                 <plugins>
                     <plugin>
                         <groupId>org.apache.maven.plugins</groupId>
                         <artifactId>maven-compiler-plugin</artifactId>
                         <version>2.3.2</version>
                         <configuration>
                             <source>1.6</source>
                             <target>1.6</target>
                             <compilerVersion>1.6</compilerVersion>
                         </configuration>
                     </plugin>
                 </plugins>
             </build>
             <repositories>
                 <!-- Repository where we can found the storm dependencies -->
                 <repository>
                     <id>clojars.org</id>
                     <url>http://clojars.org/repo</url>
                 </repository>
             </repositories>
             <dependencies>
                 <!-- Storm Dependency -->
                 <dependency>
                     <groupId>storm</groupId>
                     <artifactId>storm</artifactId>
                     <version>0.6.0</version>
                 </dependency>
             </dependencies>
    </project>

開頭幾行指定了工程名稱和版本號。然後我們添加了一個編譯器外掛,告知Maven我們的程式碼要用Java1.6編譯。接下來我們定義了Maven倉庫(Maven支援為同一個工程指定多個倉庫)。clojars是存放Storm依賴的倉庫。Maven會為執行本地模式自動下載必要的所有子包依賴。

一個典型的Maven Java工程會擁有如下結構:

    我們的應用目錄/
             ├── pom.xml
             └── src
                   └── main
                      └── java
                   |  ├── spouts
                   |  └── bolts
                   └── resources

java目錄下的子目錄包含我們的程式碼,我們把要統計單詞數的檔案儲存在resource目錄下。

NOTE:命令mkdir -p 會建立所有需要的父目錄。

我們將為執行單詞計數建立所有必要的類。可能這個例子中的某些部分,現在無法講的很清楚,不過我們會在隨後的章節做進一步的講解。

Spout

pout WordReader類實現了IRichSpout介面。我們將在第四章看到更多細節。WordReader負責從檔案按行讀取文字,並把文字行提供給第一個bolt

NOTE: 一個spout釋出一個定義域列表。這個架構允許你使用不同的bolts從同一個spout流讀取資料,它們的輸出也可作為其它bolts的定義域,以此類推。

例2-1包含WordRead類的完整程式碼(我們將會分析下述程式碼的每一部分)。

       /**
         *  例2-1.src/main/java/spouts/WordReader.java
         */
        package spouts;

        import java.io.BufferedReader;
        import java.io.FileNotFoundException;
        import java.io.FileReader;
        import java.util.Map;
        import backtype.storm.spout.SpoutOutputCollector;
        import backtype.storm.task.TopologyContext;
        import backtype.storm.topology.IRichSpout;
        import backtype.storm.topology.OutputFieldsDeclarer;
        import backtype.storm.tuple.Fields;
        import backtype.storm.tuple.Values;

        public class WordReader implements IRichSpout {
            private SpoutOutputCollector collector;
            private FileReader fileReader;
            private boolean completed = false;
            private TopologyContext context;
            public boolean isDistributed() {return false;}
            public void ack(Object msgId) {
                    System.out.println("OK:"+msgId);
            }
            public void close() {}
            public void fail(Object msgId) {
                 System.out.println("FAIL:"+msgId);
            }
            /**
             * 這個方法做的惟一一件事情就是分發檔案中的文字行
             */
            public void nextTuple() {
            /**
             * 這個方法會不斷的被呼叫,直到整個檔案都讀完了,我們將等待並返回。
             */
                 if(completed){
                     try {
                         Thread.sleep(1000);
                     } catch (InterruptedException e) {
                         //什麼也不做
                     }
                    return;
                 }
                 String str;
                 //建立reader
                 BufferedReader reader = new BufferedReader(fileReader);
                 try{
                     //讀所有文字行
                    while((str = reader.readLine()) != null){
                     /**
                      * 按行釋出一個新值
                      */
                         this.collector.emit(new Values(str),str);
                     }
                 }catch(Exception e){
                     throw new RuntimeException("Error reading tuple",e);
                 }finally{
                     completed = true;
                 }
             }
             /**
              * 我們將建立一個檔案並維持一個collector物件
              */
             public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                     try {
                         this.context = context;
                         this.fileReader = new FileReader(conf.get("wordsFile").toString());
                     } catch (FileNotFoundException e) {
                         throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
                     }
                     this.collector = collector;
             }
             /**
              * 宣告輸入域"word"
              */
             public void declareOutputFields(OutputFieldsDeclarer declarer) {
                 declarer.declare(new Fields("line"));
             }
        }

第一個被呼叫的spout方法都是public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)。它接收如下引數:配置物件,在定義topology物件是建立;TopologyContext物件,包含所有拓撲資料;還有SpoutOutputCollector物件,它能讓我們釋出交給bolts處理的資料。下面的程式碼主是這個方法的實現。

    public void open(Map conf, TopologyContext context,
        SpoutOutputCollector collector) {
        try {
            this.context = context;
            this.fileReader = new FileReader(conf.get("wordsFile").toString());
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
        }
        this.collector = collector;
    }

我們在這個方法裡建立了一個FileReader物件,用來讀取檔案。接下來我們要實現public void nextTuple(),我們要通過它向bolts釋出待處理的資料。在這個例子裡,這個方法要讀取檔案並逐行釋出資料。

    public void nextTuple() {
        if(completed){
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                //什麼也不做
            }
            return;
        }
        String str;
        BufferedReader reader = new BufferedReader(fileReader);
        try{
            while((str = reader.readLine()) != null){
                this.collector.emit(new Values(str));
            }
        }catch(Exception e){
            throw new RuntimeException("Error reading tuple",e);
        }finally{
            completed = true;
        }
    }

NOTE: Values是一個ArrarList實現,它的元素就是傳入構造器的引數。

nextTuple()會在同一個迴圈內被ack()fail()週期性的呼叫。沒有任務時它必須釋放對執行緒的控制,其它方法才有機會得以執行。因此nextTuple的第一行就要檢查是否已處理完成。如果完成了,為了降低處理器負載,會在返回前休眠一毫秒。如果任務完成了,檔案中的每一行都已被讀出並分發了。

NOTE:元組(tuple)是一個具名值列表,它可以是任意java物件(只要它是可序列化的)。預設情況,Storm會序列化字串、位元組陣列、ArrayList、HashMap和HashSet等型別。

Bolts

現在我們有了一個spout,用來按行讀取檔案並每行釋出一個元組,還要建立兩個bolts,用來處理它們(看圖2-1)。bolts實現了介面backtype.storm.topology.IRichBolt

bolt最重要的方法是void execute(Tuple input),每次接收到元組時都會被呼叫一次,還會再發布若干個元組。

NOTE: 只要必要,boltspout會發布若干元組。當呼叫nextTupleexecute方法時,它們可能會發布0個、1個或許多個元組。你將在第五章學習更多這方面的內容。

第一個boltWordNormalizer,負責得到並標準化每行文字。它把文字行切分成單詞,大寫轉化成小寫,去掉頭尾空白符。

首先我們要宣告bolt的出參:

    public void declareOutputFields(OutputFieldsDeclarer declarer){
        declarer.declare(new Fields("word"));
    }

這裡我們宣告bolt將釋出一個名為“word”的域。

下一步我們實現public void execute(Tuple input),處理傳入的元組:

    public void execute(Tuple input){
        String sentence=input.getString(0);
        String[] words=sentence.split(" ");
        for(String word : words){
            word=word.trim();
            if(!word.isEmpty()){
                word=word.toLowerCase();
                //釋出這個單詞
                collector.emit(new Values(word));
            }
        }
        //對元組做出應答
        collector.ack(input);
    }

第一行從元組讀取值。值可以按位置或名稱讀取。接下來值被處理並用collector物件釋出。最後,每次都呼叫collector物件的ack()方法確認已成功處理了一個元組。

例2-2是這個類的完整程式碼。

    //例2-2 src/main/java/bolts/WordNormalizer.java
    package bolts;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    public class WordNormalizer implements IRichBolt{
        private OutputCollector collector;
        public void cleanup(){}
        /**
          * *bolt*從單詞檔案接收到文字行,並標準化它。
          * 文字行會全部轉化成小寫,並切分它,從中得到所有單詞。
         */
        public void execute(Tuple input){
            String sentence = input.getString(0);
            String[] words = sentence.split(" ");
            for(String word : words){
                word = word.trim();
                if(!word.isEmpty()){
                    word=word.toLowerCase();
                    //釋出這個單詞
                    List a = new ArrayList();
                    a.add(input);
                    collector.emit(a,new Values(word));
                }
            }
            //對元組做出應答
            collector.ack(input);
        }
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector=collector;
        }

        /**
          * 這個*bolt*只會釋出“word”域
          */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }

NOTE:通過這個例子,我們瞭解了在一次execute呼叫中釋出多個元組。如果這個方法在一次呼叫中接收到句子“This is the Storm book”,它將會發布五個元組。

下一個boltWordCounter,負責為單詞計數。這個拓撲結束時(cleanup()方法被呼叫時),我們將顯示每個單詞的數量。

NOTE: 這個例子的bolt什麼也沒釋出,它把資料儲存在map裡,但是在真實的場景中可以把資料儲存到資料庫。

package bolts;

import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class WordCounter implements IRichBolt{
    Integer id;
    String name;
    Map<String,Integer> counters;
    private OutputCollector collector;

    /**
      * 這個spout結束時(叢集關閉的時候),我們會顯示單詞數量
      */
    @Override
    public void cleanup(){
        System.out.println("-- 單詞數 【"+name+"-"+id+"】 --");
        for(Map.Entry<String,Integer> entry : counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
    }

    /**
     *  為每個單詞計數
     */
    @Override
    public void execute(Tuple input) {
        String str=input.getString(0);
        /**
         * 如果單詞尚不存在於map,我們就建立一個,如果已在,我們就為它加1
         */
        if(!counters.containsKey(str)){
            conters.put(str,1);
        }else{
            Integer c = counters.get(str) + 1;
            counters.put(str,c);
        }
        //對元組作為應答
        collector.ack(input);
    }

    /**
     * 初始化
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector){
        this.counters = new HashMap<String, Integer>();
        this.collector = collector;
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

execute方法使用一個map收集單詞並計數。拓撲結束時,將呼叫clearup()方法列印計數器map。(雖然這只是一個例子,但是通常情況下,當拓撲關閉時,你應當使用cleanup()方法關閉活動的連線和其它資源。)

主類

你可以在主類中建立拓撲和一個本地叢集物件,以便於在本地測試和除錯。LocalCluster可以通過Config物件,讓你嘗試不同的叢集配置。比如,當使用不同數量的工作程序測試你的拓撲時,如果不小心使用了某個全域性變數或類變數,你就能夠發現錯誤。(更多內容請見第三章

NOTE:所有拓撲節點的各個程序必須能夠獨立執行,而不依賴共享資料(也就是沒有全域性變數或類變數),因為當拓撲執行在真實的叢集環境時,這些程序可能會執行在不同的機器上。

接下來,TopologyBuilder將用來建立拓撲,它決定Storm如何安排各節點,以及它們交換資料的方式。

    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("word-reader", new WordReader());
    builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
    builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");

spoutbolts之間通過shuffleGrouping方法連線。這種分組方式決定了Storm會以隨機分配方式從源節點向目標節點發送訊息。

下一步,建立一個包含拓撲配置的Config物件,它會在執行時與叢集配置合併,並通過prepare方法傳送給所有節點。

    Config conf = new Config();
    conf.put("wordsFile", args[0]);
    conf.setDebug(true);

spout讀取的檔案的檔名,賦值給wordFile屬性。由於是在開發階段,設定debug屬性為true,Strom會列印節點間交換的所有訊息,以及其它有助於理解拓撲執行方式的除錯資料。

正如之前講過的,你要用一個LocalCluster物件執行這個拓撲。在生產環境中,拓撲會持續執行,不過對於這個例子而言,你只要執行它幾秒鐘就能看到結果。

    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology());
    Thread.sleep(2000);
    cluster.shutdown();

呼叫createTopologysubmitTopology,執行拓撲,休眠兩秒鐘(拓撲在另外的執行緒執行),然後關閉叢集。

例2-3是完整的程式碼

    //例2-3 src/main/java/TopologyMain.java
    import spouts.WordReader;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    import bolts.WordCounter;
    import bolts.WordNormalizer;

    public class TopologyMain {
        public static void main(String[] args) throws InterruptedException {
        //定義拓撲
            TopologyBuilder builder = new TopologyBuilder());
            builder.setSpout("word-reader", new WordReader());
            builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
            builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word"));

        //配置
            Config conf = new Config();
            conf.put("wordsFile", args[0]);
            conf.setDebug(false);

        //執行拓撲
             conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology();
            Thread.sleep(1000);
            cluster.shutdown();
        }
    }

你已經為執行你的第一個拓撲準備好了。在這個目錄下面建立一個檔案,/src/main/resources/words.txt,一個單詞一行,然後用下面的命令執行這個拓撲:mvn exec:java -Dexec.mainClass=”TopologyMain” -Dexec.args=”src/main/resources/words.txt

舉個例子,如果你的words.txt檔案有如下內容: Storm test are great is an Storm simple application but very powerful really Storm is great 你應該會在日誌中看到類似下面的內容: is: 2 application: 1 but: 1 great: 1 test: 1 simple: 1 Storm: 3 really: 1 are: 1 great: 1 an: 1 powerful: 1 very: 1 在這個例子中,每類節點只有一個例項。但是如果你有一個非常大的日誌檔案呢?你能夠很輕鬆的改變系統中的節點數量實現並行工作。這個時候,你就要建立兩個WordCounter例項。

    builder.setBolt("word-counter", new WordCounter(),2).shuffleGrouping("word-normalizer");

程式返回時,你將看到: — 單詞數 【word-counter-2】 — application: 1 is: 1 great: 1 are: 1 powerful: 1 Storm: 3 — 單詞數 [word-counter-3] — really: 1 is: 1 but: 1 great: 1 test: 1 simple: 1 an: 1 very: 1 棒極了!修改並行度實在是太容易了(當然對於實際情況來說,每個例項都會執行在單獨的機器上)。不過似乎有一個問題:單詞isgreat分別在每個WordCounter各計數一次。怎麼會這樣?當你呼叫shuffleGrouping時,就決定了Storm會以隨機分配的方式向你的bolt例項傳送訊息。在這個例子中,理想的做法是相同的單詞問題傳送給同一個WordCounter例項。你把shuffleGrouping(“word-normalizer”)換成fieldsGrouping(“word-normalizer”, new Fields(“word”))就能達到目的。試一試,重新執行程式,確認結果。 你將在後續章節學習更多分組方式和訊息流型別。

結論 

我們已經討論了Storm的本地和遠端操作模式之間的不同,以及Storm的強大和易於開發的特性。你也學習了一些Storm的基本概念,我們將在後續章節深入講解它們。