Storm 使用經驗與效能優化(一)
阿新 • • 發佈:2019-02-01
提交任務:storm jar storm-starter-topologies-1.0.1.jar org.apache.storm.starter.WordCountTopology word-count
查詢任務:storm list
Kill任務:storm kill word-count
1) 使用rebalance命令動態調整併發度
Storm提供了rebalance命令,可以動態調整Topology的Worker數量和Component的併發度,而不用修改Topology的程式碼。如果想動態增加某個Component的併發度,需要設定Component的NumTask數量或者MaxTaskParallelism的引數值,並且大於併發度引數值(parallelism_hint)的值。如果不設定NumTask數量,預設值等同於併發度,重新平衡的時候就只能減少併發度,而不能再增加了。
SentenceSpout spout=new SentenceSpout(); SplitSentenceBolt splitbolt=new SplitSentenceBolt(); WordCountBolt countbolt=new WordCountBolt(); ReportBolt reportbolt=new ReportBolt(); TopologyBuilder builder=new TopologyBuilder(); // 設定併發為2個executor,每個Task指派各自的executor執行緒 builder.setSpout(SENTENCE_SPOUT_ID,spout,5).setMaxTaskParallelism(10); // 設定併發為2個executor,每個executor執行2個task builder.setBolt(SPLIT_BOLT_ID,splitbolt,8).shuffleGrouping(SENTENCE_SPOUT_ID).setNumTasks(16); // 有時候我們需要將特定資料的tuple路由到特殊的bolt例項中,在此我們使用fieldsGrouping // 來保證所有"word"欄位值相同的tuple會被路由到同一個WordCountBolt例項中 builder.setBolt(COUNT_BOLT_ID,countbolt,12).fieldsGrouping(SPLIT_BOLT_ID,new Fields("words")); builder.setBolt(REPORT_BOLT_ID,reportbolt).globalGrouping(COUNT_BOLT_ID); /*Map conf=new HashMap(); conf.put(Config.TOPOLOGY_WORKERS,4); conf.put(Config.TOPOLOGY_DEBUG,true);*/ Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(3); LocalCluster cluster=new LocalCluster(); cluster.submitTopology(TOPOLOGY_NAME,conf,builder.createTopology()); // Thread.sleep(1000); // cluster.shutdown();
在構建Topology的過程中,指定了Spout的初始併發度為5,也就是會啟動5個Executor執行緒來執行,然後設定最大併發度為10,上限為10個Executor執行緒;同時也指定了Split初始併發度為8,最大的Task數量為16個。在執行過程中,我們發現Spout或者Split的效能是瓶頸,可以通過調整併發度來提供效能。Spout併發度上限為10個,Split的最大併發度為16個。
storm rebalance word-count -w 10 -n 4 -e spout=8 -e split=12
Storm通過自己內部的重新部署和分配,來完成併發度的調整。等待10s後,Topology進入重新分配狀態,等重新分配完成後,就可以在Storm UI上看見新的結果了。