Storm入門 第二章準備開始
準備開始
在本章,我們要建立一個Storm工程和我們的第一個Storm拓撲結構。
NOTE: 下面假設你的JRE版本在1.6以上。我們推薦Oracle提供的JRE。你可以到http://www.java .com/downloads/下載。
操作模式
開始之前,有必要了解一下Storm的操作模式。有下面兩種方式。
本地模式
在本地模式下,Storm拓撲結構執行在本地計算機的單一JVM程序上。這個模式用於開發、測試以及除錯,因為這是觀察所有元件如何協同工作的最簡單方法。在這種模式下,我們可以調整引數,觀察我們的拓撲結構如何在不同的Storm配置環境下執行。要在本地模式下執行,我們要下載Storm開發依賴,以便用來開發並測試我們的拓撲結構。我們建立了第一個Storm工程以後,很快就會明白如何使用本地模式了。
NOTE: 在本地模式下,跟在叢集環境執行很像。不過很有必要確認一下所有元件都是執行緒安全的,因為當把它們部署到遠端模式時它們可能會執行在不同的JVM程序甚至不同的物理機上,這個時候它們之間沒有直接的通訊或共享記憶體。
我們要在本地模式執行本章的所有例子。
遠端模式
在遠端模式下,我們向Storm叢集提交拓撲,它通常由許多執行在不同機器上的流程組成。遠端模式不會出現除錯資訊, 因此它也稱作生產模式。不過在單一開發機上建立一個Storm叢集是一個好主意,可以在部署到生產環境之前,用來確認拓撲在叢集環境下沒有任何問題。
你將在第六章學到更多關於遠端模式的內容,並在附錄B學到如何安裝一個Storm叢集。
我們在這個工程裡建立一個簡單的拓撲,數單詞數量。我們可以把這個看作Storm的“Hello World”。不過,這是一個非常強大的拓撲,因為它能夠擴充套件到幾乎無限大的規模,而且只需要做一些小修改,就能用它構建一個統計系統。舉個例子,我們可以修改一下工程用來找出Twitter上的熱點話題。
要建立這個拓撲,我們要用一個spout讀取文字,第一個bolt用來標準化單詞,第二個bolt為單詞計數,如圖2-1所示。
NOTE: 如果你使用git(一個分散式版本控制與原始碼管理工具),你可以執行git clone [email protected]:storm-book/examples-ch02-getting_started.git,把原始碼檢出到你指定的目錄。
構建Storm執行環境的第一步是檢查你安裝的Java版本。開啟一個控制檯視窗並執行命令:java -version。控制檯應該會顯示出類似如下的內容:
java -version java version "1.6.0_26" Java(TM) SE Runtime Enviroment (build 1.6.0_26-b03) Java HotSpot(TM) Server VM (build 20.1-b02, mixed mode)
建立工程
開始之前,先為這個應用建一個目錄(就像你平常為Java應用做的那樣)。這個目錄用來存放工程原始碼。
接下來我們要下載Storm依賴包,這是一些jar包,我們要把它們新增到應用類路徑中。你可以採用如下兩種方式之一完成這一步:
- 下載所有依賴,解壓縮它們,把它 們新增到類路徑
NOTE: Maven是一個軟體專案管理的綜合工具。它可以用來管理專案的開發週期的許多方面,從包依賴到版本釋出過程。在這本書中,我們將廣泛使用它。如果要檢查是否已經安裝了maven,在命令列執行mvn。如果沒有安裝你可以從http://maven.apache.org/download.html下載。
沒有必要先成為一個Maven專家才能使用Storm,不過了解一下關於Maven工作方式的基礎知識仍然會對你有所幫助。你可以在Apache Maven的網站上找到更多的資訊(http://maven.apache.org/)。
NOTE: Storm的Maven依賴引用了執行Storm本地模式的所有庫。
要執行我們的拓撲,我們可以編寫一個包含基本元件的pom.xml檔案。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>storm.book</groupId> <artifactId>Getting-Started</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.6</source> <target>1.6</target> <compilerVersion>1.6</compilerVersion> </configuration> </plugin> </plugins> </build> <repositories> <!-- Repository where we can found the storm dependencies --> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> </repositories> <dependencies> <!-- Storm Dependency --> <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.6.0</version> </dependency> </dependencies> </project>
開頭幾行指定了工程名稱和版本號。然後我們添加了一個編譯器外掛,告知Maven我們的程式碼要用Java1.6編譯。接下來我們定義了Maven倉庫(Maven支援為同一個工程指定多個倉庫)。clojars是存放Storm依賴的倉庫。Maven會為執行本地模式自動下載必要的所有子包依賴。
一個典型的Maven Java工程會擁有如下結構:
我們的應用目錄/
├── pom.xml
└── src
└── main
└── java
| ├── spouts
| └── bolts
└── resources
java目錄下的子目錄包含我們的程式碼,我們把要統計單詞數的檔案儲存在resource目錄下。
NOTE:命令mkdir -p 會建立所有需要的父目錄。
我們將為執行單詞計數建立所有必要的類。可能這個例子中的某些部分,現在無法講的很清楚,不過我們會在隨後的章節做進一步的講解。
Spout
pout WordReader類實現了IRichSpout介面。我們將在第四章看到更多細節。WordReader負責從檔案按行讀取文字,並把文字行提供給第一個bolt。
NOTE: 一個spout釋出一個定義域列表。這個架構允許你使用不同的bolts從同一個spout流讀取資料,它們的輸出也可作為其它bolts的定義域,以此類推。
例2-1包含WordRead類的完整程式碼(我們將會分析下述程式碼的每一部分)。
/** * 例2-1.src/main/java/spouts/WordReader.java */ package spouts; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class WordReader implements IRichSpout { private SpoutOutputCollector collector; private FileReader fileReader; private boolean completed = false; private TopologyContext context; public boolean isDistributed() {return false;} public void ack(Object msgId) { System.out.println("OK:"+msgId); } public void close() {} public void fail(Object msgId) { System.out.println("FAIL:"+msgId); } /** * 這個方法做的惟一一件事情就是分發檔案中的文字行 */ public void nextTuple() { /** * 這個方法會不斷的被呼叫,直到整個檔案都讀完了,我們將等待並返回。 */ if(completed){ try { Thread.sleep(1000); } catch (InterruptedException e) { //什麼也不做 } return; } String str; //建立reader BufferedReader reader = new BufferedReader(fileReader); try{ //讀所有文字行 while((str = reader.readLine()) != null){ /** * 按行釋出一個新值 */ this.collector.emit(new Values(str),str); } }catch(Exception e){ throw new RuntimeException("Error reading tuple",e); }finally{ completed = true; } } /** * 我們將建立一個檔案並維持一個collector物件 */ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.context = context; this.fileReader = new FileReader(conf.get("wordsFile").toString()); } catch (FileNotFoundException e) { throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]"); } this.collector = collector; } /** * 宣告輸入域"word" */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("line")); } }
第一個被呼叫的spout方法都是public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)。它接收如下引數:配置物件,在定義topology物件是建立;TopologyContext物件,包含所有拓撲資料;還有SpoutOutputCollector物件,它能讓我們釋出交給bolts處理的資料。下面的程式碼主是這個方法的實現。
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { try { this.context = context; this.fileReader = new FileReader(conf.get("wordsFile").toString()); } catch (FileNotFoundException e) { throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]"); } this.collector = collector; }
我們在這個方法裡建立了一個FileReader物件,用來讀取檔案。接下來我們要實現public void nextTuple(),我們要通過它向bolts釋出待處理的資料。在這個例子裡,這個方法要讀取檔案並逐行釋出資料。
public void nextTuple() { if(completed){ try { Thread.sleep(1); } catch (InterruptedException e) { //什麼也不做 } return; } String str; BufferedReader reader = new BufferedReader(fileReader); try{ while((str = reader.readLine()) != null){ this.collector.emit(new Values(str)); } }catch(Exception e){ throw new RuntimeException("Error reading tuple",e); }finally{ completed = true; } }
NOTE: Values是一個ArrarList實現,它的元素就是傳入構造器的引數。
nextTuple()會在同一個迴圈內被ack()和fail()週期性的呼叫。沒有任務時它必須釋放對執行緒的控制,其它方法才有機會得以執行。因此nextTuple的第一行就要檢查是否已處理完成。如果完成了,為了降低處理器負載,會在返回前休眠一毫秒。如果任務完成了,檔案中的每一行都已被讀出並分發了。
NOTE:元組(tuple)是一個具名值列表,它可以是任意java物件(只要它是可序列化的)。預設情況,Storm會序列化字串、位元組陣列、ArrayList、HashMap和HashSet等型別。
Bolts
現在我們有了一個spout,用來按行讀取檔案並每行釋出一個元組,還要建立兩個bolts,用來處理它們(看圖2-1)。bolts實現了介面backtype.storm.topology.IRichBolt。
bolt最重要的方法是void execute(Tuple input),每次接收到元組時都會被呼叫一次,還會再發布若干個元組。
NOTE: 只要必要,bolt或spout會發布若干元組。當呼叫nextTuple或execute方法時,它們可能會發布0個、1個或許多個元組。你將在第五章學習更多這方面的內容。
第一個bolt,WordNormalizer,負責得到並標準化每行文字。它把文字行切分成單詞,大寫轉化成小寫,去掉頭尾空白符。
首先我們要宣告bolt的出參:
public void declareOutputFields(OutputFieldsDeclarer declarer){ declarer.declare(new Fields("word")); }
這裡我們宣告bolt將釋出一個名為“word”的域。
下一步我們實現public void execute(Tuple input),處理傳入的元組:
public void execute(Tuple input){ String sentence=input.getString(0); String[] words=sentence.split(" "); for(String word : words){ word=word.trim(); if(!word.isEmpty()){ word=word.toLowerCase(); //釋出這個單詞 collector.emit(new Values(word)); } } //對元組做出應答 collector.ack(input); }
第一行從元組讀取值。值可以按位置或名稱讀取。接下來值被處理並用collector物件釋出。最後,每次都呼叫collector物件的ack()方法確認已成功處理了一個元組。
例2-2是這個類的完整程式碼。
//例2-2 src/main/java/bolts/WordNormalizer.java package bolts; import java.util.ArrayList; import java.util.List; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class WordNormalizer implements IRichBolt{ private OutputCollector collector; public void cleanup(){} /** * *bolt*從單詞檔案接收到文字行,並標準化它。 * 文字行會全部轉化成小寫,並切分它,從中得到所有單詞。 */ public void execute(Tuple input){ String sentence = input.getString(0); String[] words = sentence.split(" "); for(String word : words){ word = word.trim(); if(!word.isEmpty()){ word=word.toLowerCase(); //釋出這個單詞 List a = new ArrayList(); a.add(input); collector.emit(a,new Values(word)); } } //對元組做出應答 collector.ack(input); } public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector=collector; } /** * 這個*bolt*只會釋出“word”域 */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
NOTE:通過這個例子,我們瞭解了在一次execute呼叫中釋出多個元組。如果這個方法在一次呼叫中接收到句子“This is the Storm book”,它將會發布五個元組。
下一個bolt,WordCounter,負責為單詞計數。這個拓撲結束時(cleanup()方法被呼叫時),我們將顯示每個單詞的數量。
NOTE: 這個例子的bolt什麼也沒釋出,它把資料儲存在map裡,但是在真實的場景中可以把資料儲存到資料庫。
package bolts; import java.util.HashMap; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class WordCounter implements IRichBolt{ Integer id; String name; Map<String,Integer> counters; private OutputCollector collector; /** * 這個spout結束時(叢集關閉的時候),我們會顯示單詞數量 */ @Override public void cleanup(){ System.out.println("-- 單詞數 【"+name+"-"+id+"】 --"); for(Map.Entry<String,Integer> entry : counters.entrySet()){ System.out.println(entry.getKey()+": "+entry.getValue()); } } /** * 為每個單詞計數 */ @Override public void execute(Tuple input) { String str=input.getString(0); /** * 如果單詞尚不存在於map,我們就建立一個,如果已在,我們就為它加1 */ if(!counters.containsKey(str)){ conters.put(str,1); }else{ Integer c = counters.get(str) + 1; counters.put(str,c); } //對元組作為應答 collector.ack(input); } /** * 初始化 */ @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector){ this.counters = new HashMap<String, Integer>(); this.collector = collector; this.name = context.getThisComponentId(); this.id = context.getThisTaskId(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) {} }
execute方法使用一個map收集單詞並計數。拓撲結束時,將呼叫clearup()方法列印計數器map。(雖然這只是一個例子,但是通常情況下,當拓撲關閉時,你應當使用cleanup()方法關閉活動的連線和其它資源。)
主類
你可以在主類中建立拓撲和一個本地叢集物件,以便於在本地測試和除錯。LocalCluster可以通過Config物件,讓你嘗試不同的叢集配置。比如,當使用不同數量的工作程序測試你的拓撲時,如果不小心使用了某個全域性變數或類變數,你就能夠發現錯誤。(更多內容請見第三章)
NOTE:所有拓撲節點的各個程序必須能夠獨立執行,而不依賴共享資料(也就是沒有全域性變數或類變數),因為當拓撲執行在真實的叢集環境時,這些程序可能會執行在不同的機器上。
接下來,TopologyBuilder將用來建立拓撲,它決定Storm如何安排各節點,以及它們交換資料的方式。
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word-reader", new WordReader()); builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader"); builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");
在spout和bolts之間通過shuffleGrouping方法連線。這種分組方式決定了Storm會以隨機分配方式從源節點向目標節點發送訊息。
下一步,建立一個包含拓撲配置的Config物件,它會在執行時與叢集配置合併,並通過prepare方法傳送給所有節點。
Config conf = new Config(); conf.put("wordsFile", args[0]); conf.setDebug(true);
由spout讀取的檔案的檔名,賦值給wordFile屬性。由於是在開發階段,設定debug屬性為true,Strom會列印節點間交換的所有訊息,以及其它有助於理解拓撲執行方式的除錯資料。
正如之前講過的,你要用一個LocalCluster物件執行這個拓撲。在生產環境中,拓撲會持續執行,不過對於這個例子而言,你只要執行它幾秒鐘就能看到結果。
LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology()); Thread.sleep(2000); cluster.shutdown();
呼叫createTopology和submitTopology,執行拓撲,休眠兩秒鐘(拓撲在另外的執行緒執行),然後關閉叢集。
例2-3是完整的程式碼
//例2-3 src/main/java/TopologyMain.java import spouts.WordReader; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import bolts.WordCounter; import bolts.WordNormalizer; public class TopologyMain { public static void main(String[] args) throws InterruptedException { //定義拓撲 TopologyBuilder builder = new TopologyBuilder()); builder.setSpout("word-reader", new WordReader()); builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader"); builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word")); //配置 Config conf = new Config(); conf.put("wordsFile", args[0]); conf.setDebug(false); //執行拓撲 conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology(); Thread.sleep(1000); cluster.shutdown(); } }
你已經為執行你的第一個拓撲準備好了。在這個目錄下面建立一個檔案,/src/main/resources/words.txt,一個單詞一行,然後用下面的命令執行這個拓撲:mvn exec:java -Dexec.mainClass=”TopologyMain” -Dexec.args=”src/main/resources/words.txt。
舉個例子,如果你的words.txt檔案有如下內容: Storm test are great is an Storm simple application but very powerful really Storm is great 你應該會在日誌中看到類似下面的內容: is: 2 application: 1 but: 1 great: 1 test: 1 simple: 1 Storm: 3 really: 1 are: 1 great: 1 an: 1 powerful: 1 very: 1 在這個例子中,每類節點只有一個例項。但是如果你有一個非常大的日誌檔案呢?你能夠很輕鬆的改變系統中的節點數量實現並行工作。這個時候,你就要建立兩個WordCounter例項。
builder.setBolt("word-counter", new WordCounter(),2).shuffleGrouping("word-normalizer");
程式返回時,你將看到: — 單詞數 【word-counter-2】 — application: 1 is: 1 great: 1 are: 1 powerful: 1 Storm: 3 — 單詞數 [word-counter-3] — really: 1 is: 1 but: 1 great: 1 test: 1 simple: 1 an: 1 very: 1 棒極了!修改並行度實在是太容易了(當然對於實際情況來說,每個例項都會執行在單獨的機器上)。不過似乎有一個問題:單詞is和great分別在每個WordCounter各計數一次。怎麼會這樣?當你呼叫shuffleGrouping時,就決定了Storm會以隨機分配的方式向你的bolt例項傳送訊息。在這個例子中,理想的做法是相同的單詞問題傳送給同一個WordCounter例項。你把shuffleGrouping(“word-normalizer”)換成fieldsGrouping(“word-normalizer”, new Fields(“word”))就能達到目的。試一試,重新執行程式,確認結果。 你將在後續章節學習更多分組方式和訊息流型別。
我們已經討論了Storm的本地和遠端操作模式之間的不同,以及Storm的強大和易於開發的特性。你也學習了一些Storm的基本概念,我們將在後續章節深入講解它們。