理解Storm Topology的併發機制
執行時topology的組成:worker processes、executors(執行者執行緒)以及tasks
Storm區分了以下三個主要的實體並用來在Storm叢集上執行起一個topology:
1.Worker processes
2.Executors(執行緒)
3.Tasks
下面是這3者關係的一個簡單示例:
一個worker process負責執行topology的部分子集。單個worker process歸屬於一個特定的topology並且可以為該topology的一個或多個元件(spouts或者bolts)執行一個或者多個executors。一個執行的topology由Storm叢集中多臺機器上執行著的多個processes(程序)組成。
一個executor其實就是由一個worker process生成的執行緒。executor將會為同一個元件(spout/bolt)執行多個tasks(任務)。
一個task負責進行資料處理---我們的程式碼實現的每個spout或者bolt會在叢集上執行許多的任務。一個元件的tasks的數量在topology的整個生命週期中都是一樣的。但是一個元件的executors的數量卻會改變。這就意味著出出現這樣一種情形:#threads <= #tasks。預設情況下,executor的數量與task的數量會設定成一樣。例如:Storm中會每個executor執行一個task。
topology的併發設定
注意Storm中的術語"parallelism"被用來特指Storm中的parallelism hint,parallelism hint的意思是一個元件的初識executor的數量。這個文件中我們就用"parallelism"(併發)這個更寬泛的術語來描述:不僅僅限於executors的數量,還有worker processes的數量以及tasks的數量。在用"parallelism"來標識正常或者更特指的定義時,我們會特別提醒的。
接下來對多種配置選項的一個概覽。我在Storm配置概覽這篇文章中提到了Storm配置的優先順序:defaults.yaml<storm.yaml<topology-specific configuration<internal component-specific configuration<external component-specific configuration。
worker processes的數量
·描述:為topology在叢集機器上建立多少work processes。
·配置項:TOPOLOGY_WORKERS
·如何在程式碼中設定
executors(執行緒)的數量
·描述:每個元件產生多少executors
·配置項:無(通過setSpout或者setBolt中的parallelism_hint引數設定)
task的數量
·描述:為每個元件建立多少task
·配置項:TOPOLOGY_TASKS
·如何在程式碼中設定(只是個例子):
·ComponentConfigurationDeclarer#setNumTasks()
下面給出了這些配置的一個例子
-
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
-
.setNumTasks(4)
-
.shuffleGrouping("blue-spout");
在程式碼中我們配置了Storm中的bolt(GreenBolt)其中初始的executor數量為2以及相關taks數為4。Storm將會使每個executor執行兩個task。如果沒有顯式的配置task數,Storm會每個executor執行一個task。
一個執行中的topology示例
下面這個示例展示了一個簡單的topology的執行。這個topology有3個元件組成:一個spout(BlueSpout)以及兩個bolt(GreenBolt和YellowBolt)。由BlueSpout傳送線性給GreenBolt,然後,GreenBolt傳送訊息給YellowBolt。
下面給出相應程式碼:
-
Config conf = new Config();
-
conf.setNumWorkers(2); // use two worker processes
-
topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // set parallelism hint to 2
-
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
-
.setNumTasks(4)
-
.shuffleGrouping("blue-spout");
-
topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
-
.shuffleGrouping("green-bolt");
-
StormSubmitter.submitTopology(
-
"mytopology",
-
conf,
-
topologyBuilder.createTopology()
-
);
可以看到,BlueSpout與YellowBolt未設定task數,故預設task數與executor數相同。而GreenBolt設定了task數為4,故每個executor執行兩個task。
Storm有額外的配置設定來控制topology的併發數:
·TOPOLOGY_MAX_TASK_PARALLELISM:這個設定對每個元件能生成的executor數設定了一個上限。這個典型的是在測試的時候,我們在本地模式下限制執行緒的數量。我們可以通過 Config#setMaxTaskParallelism()來進行設定。
如何改變一個執行中的topology的併發量
Storm中的一個靈活的特徵就是我們可以在不重啟叢集或者topology的情況下改變worker proceses以及executors的數量。這個過程叫做rebalancing(再均衡)
我們有兩個選項來進行再均衡:
1.用Storm提供的web介面
2.通過命令列(CLI)工具
下面給出命令列工具的例子:
## Reconfigure the topology "mytopology" to use 5 worker processes,
## the spout "blue-spout" to use 3 executors and
## the bolt "yellow-bolt" to use 10 executors.
$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10