Storm安裝以及單詞統計案例二
阿新 • • 發佈:2018-12-17
安裝
1 :解壓安裝包 tar -zxvf apache-storm-0.9.5.tar.gz
2 :修改配置檔案 vi storm/conf/storm.yaml
#指定storm使用的zk叢集 storm.zookeeper.servers: - "hadoop01" - "hadoop02" - "hadoop03" #指定storm叢集中的nimbus節點所在的伺服器 nimbus.host: "hadoop01" #指定nimbus啟動JVM最大可用記憶體大小 nimbus.childopts: "-Xmx1024m" #指定supervisor啟動JVM最大可用記憶體大小 supervisor.childopts: "-Xmx1024m" #指定supervisor節點上,每個worker啟動JVM最大可用記憶體大小 worker.childopts: "-Xmx768m" #指定ui啟動JVM最大可用記憶體大小,ui服務一般與nimbus同在一個節點上。 ui.childopts: "-Xmx768m" #指定supervisor節點上,啟動worker時對應的埠號,每個埠對應槽,每個槽位對應一個worker supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703
配置檔案修改左側需要保留空格,否則啟動報錯
3 :分發安裝包
scp -r /usr/apps/apache-storm-0.9.5 [email protected]:/usr/apps
scp -r /usr/apps/apache-storm-0.9.5 [email protected]:/usr/apps
4 :啟動叢集
在nimbus.host所屬的機器上啟動 nimbus 服務
一般啟動 storm nimbus ; 後臺啟動 nohup storm nimbus &
在nimbus.host所屬的機器上啟動 ui 服務
一般啟動 storm ui ; 後臺啟動 nohup storm ui &
在supervisor.host所屬的機器上啟動 supervisor 服務
一般啟動 storm supervisor ; 後臺啟動 nohup storm supervisor &
4 :檢視叢集
訪問nimbus.host:/8080,即可看到storm的ui介面
單詞統計Demo
1:Spout 檔案讀取
package cn.itcast.storm; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import org.apache.commons.lang.StringUtils; import java.io.*; import java.util.ArrayList; import java.util.List; import java.util.Map; public class MyLocalFileSpout extends BaseRichSpout { private SpoutOutputCollector spoutOutputCollector; private BufferedReader bufferedReader ; /** * 初始化方法(被迴圈呼叫的方法) * @param map * @param topologyContext * @param spoutOutputCollector */ @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; try { this.bufferedReader = new BufferedReader( new FileReader( new File("D:/bigDataJob/wordcount/input/1.txt") )); } catch (FileNotFoundException e) { e.printStackTrace(); } } /** * Storm實時計算的特性就是對資料一條一條的處理 * while(true){ this.nextTuple(); } */ @Override public void nextTuple() { try { String line = bufferedReader.readLine(); if(StringUtils.isNotBlank( line )){ //每呼叫一次就會發送一次資料 List<Object> list = new ArrayList<Object>(); list.add( line ); spoutOutputCollector.emit( list ); } } catch (IOException e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { //定義傳送的資料是什麼 outputFieldsDeclarer.declare( new Fields( "juzi" )); } }
2:Bolt 檔案切割
package cn.itcast.storm;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class MySplitBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
// 資料如何獲取
String juzi = (String) tuple.getValueByField("juzi");
// 進行切割
String[] strings = juzi.split( " " );
// 傳送資料
for( String word : strings ){
// Values物件幫我們自動生成list
basicOutputCollector.emit( new Values(word,1));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare( new Fields( "word","num" ));
}
}
3:Bolt 單詞統計
package cn.itcast.storm;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
public class MyWordCountBolt extends BaseBasicBolt {
Map<String,Integer> map = new HashMap<String,Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
String word = (String) tuple.getValueByField( "word" );
Integer num = (Integer) tuple.getValueByField( "num" );
Integer integer = map.get( word );
if( integer==null || integer.intValue()==0 ){
map.put( word,num );
}else{
map.put( word ,integer.intValue()+num );
}
System.out.println( map );
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
// 不需要定義輸出欄位
}
}
4:任務提交工具類
package cn.itcast.storm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
public class StormTopologyDriver {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
//1、準備任務資訊
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("mySpout", new MyLocalFileSpout());
topologyBuilder.setBolt("bolt1", new MySplitBolt()).shuffleGrouping("mySpout");
topologyBuilder.setBolt("bolt2", new MyWordCountBolt()).shuffleGrouping("bolt1");
//2、任務提交
//提交給誰?提交什麼內容?
Config config = new Config();
//config.setNumWorkers(2);
StormTopology stormTopology = topologyBuilder.createTopology();
// 本地模式
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("wordcount", config, stormTopology);
//叢集模式
//StormSubmitter.submitTopology("wordcount1", config, stormTopology);
}
}