1. 程式人生 > >storm筆記:storm基本概念

storm筆記:storm基本概念

本文主要介紹storm中的基本概念,從基礎上了解strom的體系結構,便於後續程式設計過程中作為基礎指導。主要的概念包括:

  1. topology(拓撲)
  2. stream(資料流)
  3. spout(水龍頭、資料來源)
  4. bolt(螺栓,資料篩選處理)
  5. stream group(資料流分組)
  6. reliability(可靠性)
  7. task(任務)
  8. worker(執行者)

因為上述概念中除了可靠性reliability翻譯起來比較合適,其他幾個詞實在找不到合適的對應詞語,就直接使用原詞。
另外一點需要注意的是,本文使用的storm-core版本是0.10.0,包路徑為backtype.storm。因為阿里巴巴開源了jstorm,據說strom2.0之後使用jstorm作為master主幹,從

github上可以看到包路徑修改為了org.apache.storm,如果發現有包路徑錯誤的地方,請對應修改。

topology

Storm實時執行應用包邏輯上成為一個topology,一個Storm的topology相當於MapReduce的job。關鍵的不同是MapReduce的job有明確的起始和結束,而Storm的topology會一直執行下去(除非程序被殺死或取消部署)。一個topology是有多個spout、bolt通過資料流分組連線起來的圖結構。

storm topology

本地除錯

本地除錯模擬了叢集模式執行方式,對於開發和除錯topology很有用。而且本地模式下執行topology與叢集模式下類似,只是使用backtype.storm.LocalCluster

來模擬叢集狀態。使用backtype.storm.LocalCluster#submitTopology方法提交topology,定義topology唯一名字、topology的配置(使用的是backtype.storm.Config物件)、以及topology物件(通過backtype.storm.topology.TopologyBuilder#createTopology方法建立)。通過backtype.storm.LocalCluster#killTopology殺掉指定topology,通過backtype.storm.LocalCluster#shutdown停止執行的本地叢集模式。比如:

LocalCluster cluster = new LocalCluster();
cluster.submitTopology(DEFAULT_TOPOLOGY_NAME, config, builder.createTopology());
Utils.sleep(100000);
cluster.killTopology(DEFAULT_TOPOLOGY_NAME);
cluster.shutdown();

本地模式常用的配置如下:

  1. Config.TOPOLOGY_MAX_TASK_PARALLELISM:這個配置項主要用來設定每個元件執行緒數的上限。在生產環境中,每個topology中有很多並行執行緒,但是在本地除錯過程中,沒有必要存在這麼多並行執行緒,可以通過這個配置來進行設定。
  2. Config.TOPOLOGY_DEBUG:設定為true,Storm將記錄每個tuple提交後的日誌資訊,對於除錯程式很有用。

叢集模式執行

叢集模式下執行topology與本地模式下類似,具體步驟如下:

  • 定義topology(java下使用backtype.storm.topology.TopologyBuilder#createTopology建立)
  • 通過backtype.storm.StormSubmitter#submitTopology提交topology到叢集。StormSubmitter需要的引數與LocalCluster`的引數一致:topology名、topology配置、topology物件。比如:
Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("mytopology", conf, topology);
  • 將自己的程式碼與依賴的程式碼打成jar包(除了storm自己的程式碼,storm自己的程式碼已經在classpath下了)。
    如果使用的是Mava,可以使用Maven Assembly Plugin打包,在pom.xml中加入如下程式碼:
<plugin>
  <artifactId>maven-assembly-plugin</artifactId>
  <configuration>
    <descriptorRefs>  
      <descriptorRef>jar-with-dependencies</descriptorRef>
    </descriptorRefs>
    <archive>
      <manifest>
        <mainClass>com.path.to.main.Class</mainClass>
      </manifest>
    </archive>
  </configuration>
</plugin>
  • 使用storm客戶端將topology提交到叢集,需要指定jar包路徑、類名、以及提交到main方法的引數列表:
./bin/storm jar path/to/allmycode.jar org.me.MyTopology arg1 arg2 arg3
  • 可以使用storm kill命令停止一個topology:
./bin/storm kill topologyName

資料流

資料流是Storm核心定義的抽象概念,由無限制的tuple組成的序列,tuple包含一個或多個鍵值對列表,可以包含java自帶的型別或者自定義的可序列化的型別。

每個資料流可以在定義時通過backtype.storm.topology.OutputFieldsDeclarer的declareStream方法指定id。預設的id是“default”(直接使用declare將使用預設id)。

在上面的topology圖中,每個藍色、綠色、紅色的條帶是一個數據流,每個資料流內部由tuple組成。

spout

spout是topology中資料流的資料入口,充當資料採集器功能,通常spout從外部資料來源讀取資料,將資料轉化為tuple,然後將它們傳送到topology中。spout可以是可靠的或不可靠的。可靠的spout能夠保證在storm處理tuple出現異常情況下,能夠重新發送該tuple,而不可靠的spout不再處理已傳送的tuple。

spout通過backtype.storm.topology.OutputFieldsDeclarerdeclareStream方法定義資料流,通過backtype.storm.spout.SpoutOutputCollectoremit方法傳送tream。

backtype.storm.spout.ISpout#nextTuple方法是spout的主要方法,可以傳送用於傳送新的tuple,或直接return(不需要傳送新的tuple時,可以直接return)。

當Storm檢測到由某一spout傳送的tuple成功處理後,將呼叫backtype.storm.spout.ISpout#ack方法;當呼叫失敗,將呼叫backtype.storm.spout.ISpout#fail方法。具體可以檢視後面的可靠性

bolt

在topology中所有操作都是在bolt中執行的,它可以進行過濾、計算、連線、聚合、資料庫讀寫,以及其他操作。可以將一個或多個spout作為輸入,對資料進行運算後,選擇性的輸出一個或多個數據流。一個bolt可以做一些簡單的資料變換,複雜的資料處理需要多個步驟或多個bolt。

bolt可以訂閱一個或多個spout或bolt的資料,通過backtype.storm.topology.OutputFieldsDeclarer#declareStream方法定義輸出的資料流,通過backtype.storm.topology.BasicOutputCollector#emit方法提交資料。

bolt通過backtype.storm.topology.InputDeclarer類的shuffleGrouping方法指定需要訂閱的資料流,比如:declarer.shuffleGrouping("1", "stream_id"),同時InputDeclarer也提供了接收所有資料流的語法糖,比如:declarer.shuffleGrouping("1"),相當於declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)。這個地方有點亂,簡單的說,bolt B前面有一個spout A或bolt A,從A中傳送一個id為a_id的資料流,如果B向只訂閱id為a_id的資料流,就使用第一個方法,如果可以接收所有id型別的資料流,就用第二個方法。

該型別中主要執行的方法是cn.howardliu.demo.storm.kafka.wordCount.SentenceBolt#execute,用來獲取新的tuple,並進行處理。同樣使用backtype.storm.topology.BasicOutputCollector#emit方法傳送新的tuple。bolt可以呼叫backtype.storm.task.OutputCollector#ack方法來通知Storm該tuple已經處理完成。

資料流分組

定義topology的很重要的一部分就是定義資料流資料流應該傳送到那些bolt中。資料流分組就是將資料流進行分組,按需要進入不同的bolt中。可以使用Storm提供的分組規則,也可以實現backtype.storm.grouping.CustomStreamGrouping自定義分組規則。Storm定義了8種內建的資料流分組方法:

  1. Shuffle grouping(隨機分組):隨機分發tuple給bolt的各個task,每個bolt例項接收到相同數量的tuple;
  2. Fields grouping(按欄位分組):根據指定欄位的值進行分組。比如,一個數據流按照”user-id”分組,所有具有相同”user-id”的tuple將被路由到同一bolt的task中,不同”user-id”可能路由到不同bolt的task中;
  3. All grouping(全複製分組):將所有tuple複製後分發給所有bolt的task。小心使用。
  4. Global grouping(全域性分組):將所有的tuple路由到唯一一個task上。Storm按照最小的task ID來選取接收資料的task;(注意,當時用全域性分組是,設定bolt的task併發是沒有意義的,因為所有tuple都轉發到一個task上。同時需要注意的是,所有tuple轉發到一個jvm例項上,可能會引起storm叢集某個jvm或伺服器出現效能瓶頸或崩潰)
  5. None grouping(不分組):這種分組方式指明不需要關心分組方式。實際上,不分組功能與隨機分組相同。預留功能。
  6. Direct grouping(指向型分組):資料來源會呼叫emitDirect來判斷一個tuple應該由哪個storm元件接收,只能在聲明瞭指向型的資料流上使用。
  7. Local or shuffle grouping(本地或隨機分組):當同一個worker程序中有目標bolt,將把資料傳送到這些bolt中。否則,功能將與隨機分組相同。該方法取決與topology的併發度,本地或隨機分組可以減少網路傳輸,降低IO,提高topology效能。

可靠行

storm可以保證每一個spout發出的tuple能夠被完整處理,通過跟蹤tuple樹上的每個tuple,檢查是否被成功處理。每個topology有一個超時時間,如果storm檢查到某個tuple已經超時,將重新發送該tuple。為了使用這種特性,需要定義tuple的起點,以及tuple被成功處理。更多內容檢視Guaranteeing message processing

task

task是spout和bolt的例項,他們的nextTuple()和execute()方法會被executors執行緒呼叫執行。根據資料流分組來確定如何從某個task中的tuple傳送到其他的task。

worker

topology執行在一個或多個worker程序上,worker是jvm虛擬機器,執行topology所有task的一部分。比如,topology的併發是300,有50個worker,那每個worker就有6個task。Storm會平衡所有worker的task數量。通過Config.TOPOLOGY_WORKERS來設定topology的worker數量。