1. 程式人生 > >Storm文檔詳解

Storm文檔詳解

set child submit nbsp efault zookeep 核心 效果 分布

1、Storm基礎概念

1.1、什麽是storm?

Apache Storm is a free and open source distributed realtime computation system.

Storm是免費開源的分布式實時計算系統

實時和離線的區別:

  1 離線計算:批量獲取數據、批量傳輸數據、周期性批量計算數據、數據展示

代表技術:Sqoop批量導入數據、HDFS批量存儲數據、MapReduce批量計算數據、Hive批量計算數據、***任務調度

  2 流式計算:數據實時產生、數據實時傳輸、數據實時計算、實時展示

代表技術:Flume實時獲取數據、Kafka/metaq

實時數據存儲、Storm/JStorm實時數據計算、Redis實時結果緩存、持久化存儲(mysql)

Storm實時處理數據,特點:低延遲、高可用、分布式、可擴展、數據不丟失。提供簡單容易理解的接口,便於開發。

1.2、Storm的核心組件

技術分享圖片

  • TopologyStorm中運行的一個實時應用程序的名稱。(拓撲)
  • Nimbus:負責資源分配和任務調度。
  • Supervisor:負責接受nimbus分配的任務,啟動和停止屬於自己管理的worker進程。---通過配置文件設置當前supervisor上啟動多少個worker(Slot的個數)
  • Worker:運行具體處理組件邏輯的進程(其實就是一個
    JVM)。Worker運行的任務類型只有兩種,一種是Spout任務,一種是Bolt任務。
  • Taskworker中每一個spout/bolt的線程稱為一個task. storm0.8之後,task不再與物理線程對應,不同spout/bolttask可能會共享一個物理線程,該線程稱為executor(Task=線程=executor)
  • Zookeeper:保存任務分配的信息、心跳信息、元數據信息。

1.3、並發度

用戶指定的一個任務,可以被多個線程執行。

並發度的數量等於線程的數量。一個任務的多個線程,會被運行在多個WorkerJVM)上,有一種類似於平均算法的負載均衡策略。盡可能減少網絡

IO,和Hadoop中的MapReduce中的本地計算的道理一樣。

1.4、Worker與topology

一個worker只屬於一個topology,每個worker中運行的task只能屬於這個topology。反之,一個topology包含多個worker,其實就是這個topology運行在多個worker上。

一個topology要求的worker數量如果不被滿足,集群在任務分配時,根據現有的worker先運行topology

如果當前集群中worker數量為0,那麽最新提交的topology將只會被標識active,不會運行,只有當集群有了空閑資源之後,才會被運行。

1.5、Storm的編程模型

技術分享圖片

  • DataSource:外部數據源
  • Spout:接受外部數據源的組件,將外部數據源轉化成Storm內部的數據,以Tuple為基本的傳輸單元下發給Bolt
  • Bolt:接受Spout發送的數據,或上遊的bolt的發送的數據。根據業務邏輯進行處理。發送給下一個Bolt或者是存儲到某種介質上。介質可以是Redis可以是mysql,或者其他。
  • TupleStorm內部中數據傳輸的基本單元,裏面封裝了一個List對象,用來保存數據。
  • StreamGrouping:數據分組策略
  1. Shuffle Grouping: 隨機分組, 隨機派發stream裏面的tuple,保證每個bolt接收到的tuple數目大致相同。(Random函數)
  2. Fields Grouping:按字段分組,比如按userid來分組,具有同樣useridtuple會被分到相同的Bolts裏的一個task,而不同的userid則會被分配到不同的bolts裏的task(Hash取模)
  3. All Grouping:廣播發送,對於每一個tuple,所有的bolts都會收到。
  4. Global Grouping:全局分組, 這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task
  5. Non Grouping:不分組,這stream grouping個分組的意思是說stream不關心到底誰會收到它的tuple目前這種分組和Shuffle grouping是一樣的效果, 有一點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個線程裏面去執行。(Random函數),
  6. Direct Grouping: 直接分組, 這是一種比較特別的分組方法,用這種分組意味著消息的發送者指定由消息接收者的哪個task處理這個消息。只有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發射。消息處理者可以通過TopologyContext來獲取處理它的消息的taskid OutputCollector.emit方法也會返回taskid)。
  7. Local or shuffle grouping:如果目標bolt有一個或者多個task在同一個工作進程中,tuple將會被隨機發生給這些tasks。否則,和普通的Shuffle Grouping行為一致。

2、Storm程序的並發機制

2.1、概念

  • Workers (JVMs): 在一個物理節點上可以運行一個或多個獨立的JVM 進程。一個Topology可以包含一個或多個worker(並行的跑在不同的物理機上), 所以worker process就是執行一個topology的子集, 並且worker只能對應於一個topology
  • Executors (threads): 在一個worker JVM進程中運行著多個Java線程。一個executor線程可以執行一個或多個tasks。但一般默認每個executor只執行一個task。一個worker可以包含一個或多個executor, 每個component (spoutbolt)至少對應於一個executor, 所以可以說executor執行一個compenent的子集, 同時一個executor只能對應於一個component
  • Tasks(bolt/spout instances)Task就是具體的處理邏輯對象,每一個SpoutBolt會被當作很多task在整個集群裏面執行。每一個task對應到一個線程,而stream grouping則是定義怎麽從一堆task發射tuple到另外一堆task。你可以調用TopologyBuilder.setSpoutTopBuilder.setBolt來設置並行度 — 也就是有多少個task

2.2、配置並行度

l 對於並發度的配置, storm裏面可以在多個地方進行配置, 優先級為:

defaults.yaml < storm.yaml < topology-specific configuration <

<internal component-specific configuration < external component-specific configuration

  • worker processes的數目, 可以通過配置文件和代碼中配置, worker就是執行進程, 所以考慮並發的效果, 數目至少應該大於machines的數目
  • executor的數目, component的並發線程數,只能在代碼中配置(通過setBoltsetSpout的參數), 例如, setBolt("green-bolt", new GreenBolt(), 2)
  • tasks的數目, 可以不配置, 默認和executor1:1, 也可以通過setNumTasks()配置

Topologyworker數通過config設置,即執行該topologyworkerjava)進程數。它可以通過 storm rebalance 命令任意調整。

Config conf = newConfig();

conf.setNumWorkers(2); //用2個worker

topologyBuilder.setSpout("blue-spout", newBlueSpout(), 2); //設置2個並發度

topologyBuilder.setBolt("green-bolt", newGreenBolt(), 2).setNumTasks(4).shuffleGrouping("blue-spout"); //設置2個並發度,4個任務

topologyBuilder.setBolt("yellow-bolt", newYellowBolt(), 6).shuffleGrouping("green-bolt"); //設置6個並發度

StormSubmitter.submitTopology("mytopology", conf, topologyBuilder.createTopology());

技術分享圖片

3個組件的並發度加起來是10,就是說拓撲一共有10executor,一共有2worker,每個worker產生10 / 2 = 5條線程。

綠色的bolt配置成2executor4task。為此每個executor為這個bolt運行2task

l 動態的改變並行度

Storm支持在不 restart topology 的情況下, 動態的改變(增減) worker processes 的數目和 executors 的數目, 稱為rebalancing. 通過Storm web UI,或者通過storm rebalance命令實現:

storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

3、Storm組件本地樹

技術分享圖片

4、Storm zookeeper目錄樹

技術分享圖片

5、Storm 任務提交的過程

技術分享圖片

TopologyMetricsRunnable.TaskStartEvent[oldAssignment=<null>,newAssignment=Assignment[masterCodeDir=C:\Users\MAOXIA~1\AppData\Local\Temp\\e73862a8-f7e7-41f3-883d-af494618bc9f\nimbus\stormdist\double11-1-1458909887,nodeHost={61ce10a7-1e78-4c47-9fb3-c21f43a331ba=192.168.1.106},taskStartTimeSecs={1=1458909910, 2=1458909910, 3=1458909910, 4=1458909910, 5=1458909910, 6=1458909910, 7=1458909910, 8=1458909910},workers=[ResourceWorkerSlot[hostname=192.168.1.106,memSize=0,cpu=0,tasks=[1, 2, 3, 4, 5, 6, 7, 8],jvm=<null>,nodeId=61ce10a7-1e78-4c47-9fb3-c21f43a331ba,port=6900]],timeStamp=1458909910633,type=Assign],task2Component=<null>,clusterName=<null>,topologyId=double11-1-1458909887,timestamp=0]

技術分享圖片

技術分享圖片

6.2、基本實現

Storm 系統中有一組叫做"acker"的特殊的任務,它們負責跟蹤DAG(有向無環圖)中的每個消息。

acker任務保存了spout id到一對值的映射。第一個值就是spout的任務id,通過這個idacker就知道消息處理完成時該通知哪個spout任務。第二個值是一個64bit的數字,我們稱之為"ack val", 它是樹中所有消息的隨機id的異或計算結果。

ack val表示了整棵樹的的狀態,無論這棵樹多大,只需要這個固定大小的數字就可以跟蹤整棵樹。當消息被創建和被應答的時候都會有相同的消息id發送過來做異或。 每當acker發現一棵樹的ack val值為0的時候,它就知道這棵樹已經被完全處理了

技術分享圖片

技術分享圖片

技術分享圖片

技術分享圖片

6.3、可靠性配置

有三種方法可以去掉消息的可靠性:

將參數Config.TOPOLOGY_ACKERS設置為0,通過此方法,當Spout發送一個消息的時候,它的ack方法將立刻被調用;

Spout發送一個消息時,不指定此消息的messageID。當需要關閉特定消息可靠性的時候,可以使用此方法;

最後,如果你不在意某個消息派生出來的子孫消息的可靠性,則此消息派生出來的子消息在發送時不要做錨定,即在emit方法中不指定輸入消息。因為這些子孫消息沒有被錨定在任何tuple tree中,因此他們的失敗不會引起任何spout重新發送消息。

7、Storm的安裝

7.1、Storm安裝

1、上傳解壓安裝包

tar -zxvf apache-storm-1.1.1.tar.gz

mv apache-storm-1.1.1 storm

mv storm.yaml storm.yaml.bak

2、修改配置文件

#指定storm使用的zk集群

storm.zookeeper.servers:

- "zk-datanode-01"

- "zk-datanode-02"

- "zk-datanode-03"

#指定storm本地狀態保存地址

storm.local.dir: "/usr/local/data/storm/workdir"

#指定storm集群中的nimbus節點所在的服務器

nimbus.host: "zk-datanode-01"

#指定nimbus啟動JVM最大可用內存大小

nimbus.childopts: "-Xmx1024m"

#指定supervisor啟動JVM最大可用內存大小

supervisor.childopts: "-Xmx4096m"

#指定supervisor節點上,每個worker啟動JVM最大可用內存大小

worker.childopts: "-Xmx512m"

#指定ui啟動JVM最大可用內存大小,ui服務一般與nimbus同在一個節點上。

ui.childopts: "-Xmx768m"

#指定supervisor節點上,啟動worker時對應的端口號,每個端口對應槽,每個槽位對應一個worker

supervisor.slots.ports:

- 6700

- 6701

- 6702

- 6703

3、分發安裝包

scp -r storm/ zk-datanode-02:/usr/local/

scp -r storm/ zk-datanode-03:/usr/local/

4、啟動集群

cd /usr/local/storm

1、在nimbus.host所屬的機器上啟動 nimbus服務

nohup bin/storm nimbus &

2、在nimbus.host所屬的機器上啟動ui服務

nohup bin/storm ui &

3、在其它個點擊上啟動supervisor服務

nohup bin/storm supervisor &

7.2、Storm任務提交

提交任務到storm集群上運行

bin/storm jar /usr/local/data/package/rcp-streamingengine-cardhz-V0.0.1.jar

com.dinpay.bdp.rcp.CardHzTopology CardHzTopology

Storm文檔詳解