1. 程式人生 > >理解Storm Topology的併發機制

理解Storm Topology的併發機制

執行時topology的組成:worker processes、executors(執行者執行緒)以及tasks

Storm區分了以下三個主要的實體並用來在Storm叢集上執行起一個topology:

1.Worker processes

2.Executors(執行緒)

3.Tasks

下面是這3者關係的一個簡單示例:

一個worker process負責執行topology的部分子集。單個worker process歸屬於一個特定的topology並且可以為該topology的一個或多個元件(spouts或者bolts)執行一個或者多個executors。一個執行的topology由Storm叢集中多臺機器上執行著的多個processes(程序)組成。

一個executor其實就是由一個worker process生成的執行緒。executor將會為同一個元件(spout/bolt)執行多個tasks(任務)。

一個task負責進行資料處理---我們的程式碼實現的每個spout或者bolt會在叢集上執行許多的任務。一個元件的tasks的數量在topology的整個生命週期中都是一樣的。但是一個元件的executors的數量卻會改變。這就意味著出出現這樣一種情形:#threads <= #tasks。預設情況下,executor的數量與task的數量會設定成一樣。例如:Storm中會每個executor執行一個task。

topology的併發設定

注意Storm中的術語"parallelism"被用來特指Storm中的parallelism hint,parallelism hint的意思是一個元件的初識executor的數量。這個文件中我們就用"parallelism"(併發)這個更寬泛的術語來描述:不僅僅限於executors的數量,還有worker processes的數量以及tasks的數量。在用"parallelism"來標識正常或者更特指的定義時,我們會特別提醒的。

 

接下來對多種配置選項的一個概覽。我在Storm配置概覽這篇文章中提到了Storm配置的優先順序:defaults.yaml<storm.yaml<topology-specific configuration<internal component-specific configuration<external component-specific configuration。

 

worker processes的數量

·描述:為topology在叢集機器上建立多少work processes。

·配置項:TOPOLOGY_WORKERS

·如何在程式碼中設定

 

 

executors(執行緒)的數量

·描述:每個元件產生多少executors

·配置項:無(通過setSpout或者setBolt中的parallelism_hint引數設定)

 

task的數量

·描述:為每個元件建立多少task

·配置項:TOPOLOGY_TASKS

·如何在程式碼中設定(只是個例子):

 

·ComponentConfigurationDeclarer#setNumTasks()

下面給出了這些配置的一個例子

 

 
  1. topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)

  2. .setNumTasks(4)

  3. .shuffleGrouping("blue-spout");

在程式碼中我們配置了Storm中的bolt(GreenBolt)其中初始的executor數量為2以及相關taks數為4。Storm將會使每個executor執行兩個task。如果沒有顯式的配置task數,Storm會每個executor執行一個task。

 

一個執行中的topology示例
下面這個示例展示了一個簡單的topology的執行。這個topology有3個元件組成:一個spout(BlueSpout)以及兩個bolt(GreenBolt和YellowBolt)。由BlueSpout傳送線性給GreenBolt,然後,GreenBolt傳送訊息給YellowBolt。

下面給出相應程式碼:

 
  1. Config conf = new Config();

  2. conf.setNumWorkers(2); // use two worker processes

  3.  
  4. topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // set parallelism hint to 2

  5.  
  6. topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)

  7. .setNumTasks(4)

  8. .shuffleGrouping("blue-spout");

  9.  
  10. topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)

  11. .shuffleGrouping("green-bolt");

  12.  
  13. StormSubmitter.submitTopology(

  14. "mytopology",

  15. conf,

  16. topologyBuilder.createTopology()

  17. );

可以看到,BlueSpout與YellowBolt未設定task數,故預設task數與executor數相同。而GreenBolt設定了task數為4,故每個executor執行兩個task。

 

Storm有額外的配置設定來控制topology的併發數:

·TOPOLOGY_MAX_TASK_PARALLELISM:這個設定對每個元件能生成的executor數設定了一個上限。這個典型的是在測試的時候,我們在本地模式下限制執行緒的數量。我們可以通過 Config#setMaxTaskParallelism()來進行設定。

 

如何改變一個執行中的topology的併發量 

Storm中的一個靈活的特徵就是我們可以在不重啟叢集或者topology的情況下改變worker proceses以及executors的數量。這個過程叫做rebalancing(再均衡)

我們有兩個選項來進行再均衡:

1.用Storm提供的web介面

2.通過命令列(CLI)工具

下面給出命令列工具的例子:

## Reconfigure the topology "mytopology" to use 5 worker processes,
## the spout "blue-spout" to use 3 executors and
## the bolt "yellow-bolt" to use 10 executors.

$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10