Storm文檔詳解
1、Storm基礎概念
1.1、什麽是storm?
Apache Storm is a free and open source distributed realtime computation system.
Storm是免費開源的分布式實時計算系統
實時和離線的區別:
1 離線計算:批量獲取數據、批量傳輸數據、周期性批量計算數據、數據展示
代表技術:Sqoop批量導入數據、HDFS批量存儲數據、MapReduce批量計算數據、Hive批量計算數據、***任務調度
2 流式計算:數據實時產生、數據實時傳輸、數據實時計算、實時展示
代表技術:Flume實時獲取數據、Kafka/metaq
Storm用來實時處理數據,特點:低延遲、高可用、分布式、可擴展、數據不丟失。提供簡單容易理解的接口,便於開發。
1.2、Storm的核心組件
- Topology:Storm中運行的一個實時應用程序的名稱。(拓撲)
- Nimbus:負責資源分配和任務調度。
- Supervisor:負責接受nimbus分配的任務,啟動和停止屬於自己管理的worker進程。---通過配置文件設置當前supervisor上啟動多少個worker。(Slot的個數)
- Worker:運行具體處理組件邏輯的進程(其實就是一個
- Task:worker中每一個spout/bolt的線程稱為一個task. 在storm0.8之後,task不再與物理線程對應,不同spout/bolt的task可能會共享一個物理線程,該線程稱為executor。(Task=線程=executor)
- Zookeeper:保存任務分配的信息、心跳信息、元數據信息。
1.3、並發度
用戶指定的一個任務,可以被多個線程執行。
並發度的數量等於線程的數量。一個任務的多個線程,會被運行在多個Worker(JVM)上,有一種類似於平均算法的負載均衡策略。盡可能減少網絡
1.4、Worker與topology
一個worker只屬於一個topology,每個worker中運行的task只能屬於這個topology。反之,一個topology包含多個worker,其實就是這個topology運行在多個worker上。
一個topology要求的worker數量如果不被滿足,集群在任務分配時,根據現有的worker先運行topology。
如果當前集群中worker數量為0,那麽最新提交的topology將只會被標識active,不會運行,只有當集群有了空閑資源之後,才會被運行。
1.5、Storm的編程模型
- DataSource:外部數據源
- Spout:接受外部數據源的組件,將外部數據源轉化成Storm內部的數據,以Tuple為基本的傳輸單元下發給Bolt
- Bolt:接受Spout發送的數據,或上遊的bolt的發送的數據。根據業務邏輯進行處理。發送給下一個Bolt或者是存儲到某種介質上。介質可以是Redis可以是mysql,或者其他。
- Tuple:Storm內部中數據傳輸的基本單元,裏面封裝了一個List對象,用來保存數據。
- StreamGrouping:數據分組策略
- Shuffle Grouping: 隨機分組, 隨機派發stream裏面的tuple,保證每個bolt接收到的tuple數目大致相同。(Random函數)
- Fields Grouping:按字段分組,比如按userid來分組,具有同樣userid的tuple會被分到相同的Bolts裏的一個task,而不同的userid則會被分配到不同的bolts裏的task。(Hash取模)
- All Grouping:廣播發送,對於每一個tuple,所有的bolts都會收到。
- Global Grouping:全局分組, 這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task。
- Non Grouping:不分組,這stream grouping個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是一樣的效果, 有一點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個線程裏面去執行。(Random函數),
- Direct Grouping: 直接分組, 這是一種比較特別的分組方法,用這種分組意味著消息的發送者指定由消息接收者的哪個task處理這個消息。只有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發射。消息處理者可以通過TopologyContext來獲取處理它的消息的task的id (OutputCollector.emit方法也會返回task的id)。
- Local or shuffle grouping:如果目標bolt有一個或者多個task在同一個工作進程中,tuple將會被隨機發生給這些tasks。否則,和普通的Shuffle Grouping行為一致。
2、Storm程序的並發機制
2.1、概念
- Workers (JVMs): 在一個物理節點上可以運行一個或多個獨立的JVM 進程。一個Topology可以包含一個或多個worker(並行的跑在不同的物理機上), 所以worker process就是執行一個topology的子集, 並且worker只能對應於一個topology
- Executors (threads): 在一個worker JVM進程中運行著多個Java線程。一個executor線程可以執行一個或多個tasks。但一般默認每個executor只執行一個task。一個worker可以包含一個或多個executor, 每個component (spout或bolt)至少對應於一個executor, 所以可以說executor執行一個compenent的子集, 同時一個executor只能對應於一個component。
- Tasks(bolt/spout instances):Task就是具體的處理邏輯對象,每一個Spout和Bolt會被當作很多task在整個集群裏面執行。每一個task對應到一個線程,而stream grouping則是定義怎麽從一堆task發射tuple到另外一堆task。你可以調用TopologyBuilder.setSpout和TopBuilder.setBolt來設置並行度 — 也就是有多少個task。
2.2、配置並行度
l 對於並發度的配置, 在storm裏面可以在多個地方進行配置, 優先級為:
defaults.yaml < storm.yaml < topology-specific configuration <
<internal component-specific configuration < external component-specific configuration
- worker processes的數目, 可以通過配置文件和代碼中配置, worker就是執行進程, 所以考慮並發的效果, 數目至少應該大於machines的數目
- executor的數目, component的並發線程數,只能在代碼中配置(通過setBolt和setSpout的參數), 例如, setBolt("green-bolt", new GreenBolt(), 2)
- tasks的數目, 可以不配置, 默認和executor1:1, 也可以通過setNumTasks()配置
Topology的worker數通過config設置,即執行該topology的worker(java)進程數。它可以通過 storm rebalance 命令任意調整。
Config conf = newConfig(); conf.setNumWorkers(2); //用2個worker topologyBuilder.setSpout("blue-spout", newBlueSpout(), 2); //設置2個並發度 topologyBuilder.setBolt("green-bolt", newGreenBolt(), 2).setNumTasks(4).shuffleGrouping("blue-spout"); //設置2個並發度,4個任務 topologyBuilder.setBolt("yellow-bolt", newYellowBolt(), 6).shuffleGrouping("green-bolt"); //設置6個並發度 StormSubmitter.submitTopology("mytopology", conf, topologyBuilder.createTopology()); |
3個組件的並發度加起來是10,就是說拓撲一共有10個executor,一共有2個worker,每個worker產生10 / 2 = 5條線程。
綠色的bolt配置成2個executor和4個task。為此每個executor為這個bolt運行2個task。
l 動態的改變並行度
Storm支持在不 restart topology 的情況下, 動態的改變(增減) worker processes 的數目和 executors 的數目, 稱為rebalancing. 通過Storm web UI,或者通過storm rebalance命令實現:
storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10 |
3、Storm組件本地樹
4、Storm zookeeper目錄樹
5、Storm 任務提交的過程
TopologyMetricsRunnable.TaskStartEvent[oldAssignment=<null>,newAssignment=Assignment[masterCodeDir=C:\Users\MAOXIA~1\AppData\Local\Temp\\e73862a8-f7e7-41f3-883d-af494618bc9f\nimbus\stormdist\double11-1-1458909887,nodeHost={61ce10a7-1e78-4c47-9fb3-c21f43a331ba=192.168.1.106},taskStartTimeSecs={1=1458909910, 2=1458909910, 3=1458909910, 4=1458909910, 5=1458909910, 6=1458909910, 7=1458909910, 8=1458909910},workers=[ResourceWorkerSlot[hostname=192.168.1.106,memSize=0,cpu=0,tasks=[1, 2, 3, 4, 5, 6, 7, 8],jvm=<null>,nodeId=61ce10a7-1e78-4c47-9fb3-c21f43a331ba,port=6900]],timeStamp=1458909910633,type=Assign],task2Component=<null>,clusterName=<null>,topologyId=double11-1-1458909887,timestamp=0] |
6.2、基本實現
Storm 系統中有一組叫做"acker"的特殊的任務,它們負責跟蹤DAG(有向無環圖)中的每個消息。
acker任務保存了spout id到一對值的映射。第一個值就是spout的任務id,通過這個id,acker就知道消息處理完成時該通知哪個spout任務。第二個值是一個64bit的數字,我們稱之為"ack val", 它是樹中所有消息的隨機id的異或計算結果。
ack val表示了整棵樹的的狀態,無論這棵樹多大,只需要這個固定大小的數字就可以跟蹤整棵樹。當消息被創建和被應答的時候都會有相同的消息id發送過來做異或。 每當acker發現一棵樹的ack val值為0的時候,它就知道這棵樹已經被完全處理了
6.3、可靠性配置
有三種方法可以去掉消息的可靠性:
將參數Config.TOPOLOGY_ACKERS設置為0,通過此方法,當Spout發送一個消息的時候,它的ack方法將立刻被調用;
Spout發送一個消息時,不指定此消息的messageID。當需要關閉特定消息可靠性的時候,可以使用此方法;
最後,如果你不在意某個消息派生出來的子孫消息的可靠性,則此消息派生出來的子消息在發送時不要做錨定,即在emit方法中不指定輸入消息。因為這些子孫消息沒有被錨定在任何tuple tree中,因此他們的失敗不會引起任何spout重新發送消息。
7、Storm的安裝
7.1、Storm安裝
1、上傳解壓安裝包
tar -zxvf apache-storm-1.1.1.tar.gz
mv apache-storm-1.1.1 storm
mv storm.yaml storm.yaml.bak
2、修改配置文件
#指定storm使用的zk集群
storm.zookeeper.servers:
- "zk-datanode-01"
- "zk-datanode-02"
- "zk-datanode-03"
#指定storm本地狀態保存地址
storm.local.dir: "/usr/local/data/storm/workdir"
#指定storm集群中的nimbus節點所在的服務器
nimbus.host: "zk-datanode-01"
#指定nimbus啟動JVM最大可用內存大小
nimbus.childopts: "-Xmx1024m"
#指定supervisor啟動JVM最大可用內存大小
supervisor.childopts: "-Xmx4096m"
#指定supervisor節點上,每個worker啟動JVM最大可用內存大小
worker.childopts: "-Xmx512m"
#指定ui啟動JVM最大可用內存大小,ui服務一般與nimbus同在一個節點上。
ui.childopts: "-Xmx768m"
#指定supervisor節點上,啟動worker時對應的端口號,每個端口對應槽,每個槽位對應一個worker
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
3、分發安裝包
scp -r storm/ zk-datanode-02:/usr/local/
scp -r storm/ zk-datanode-03:/usr/local/
4、啟動集群
cd /usr/local/storm
1、在nimbus.host所屬的機器上啟動 nimbus服務
nohup bin/storm nimbus &
2、在nimbus.host所屬的機器上啟動ui服務
nohup bin/storm ui &
3、在其它個點擊上啟動supervisor服務
nohup bin/storm supervisor &
7.2、Storm任務提交
提交任務到storm集群上運行
bin/storm jar /usr/local/data/package/rcp-streamingengine-cardhz-V0.0.1.jar
com.dinpay.bdp.rcp.CardHzTopology CardHzTopology
Storm文檔詳解