storm叢集配置以及java編寫拓撲例子
阿新 • • 發佈:2019-01-10
storm叢集配置
storm配置相當簡單
安裝
tar -zxvf apache-storm-1.2.2.tar.gz rm apache-storm-1.2.2.tar.gz mv apache-storm-1.2.2 storm sudo vim /etc/profile export STORM_HOME=/usr/local/storm export PATH=$PATH:$STORM_HOME/bin source /etc/profile apt install python 準備 master worker1 worker2 worker3 這四臺機器 首先確保你的zookeeper叢集能夠正常執行worker1 worker2 worker3為zk叢集 具體配置參照我的部落格https://www.cnblogs.com/ye-hcj/p/9889585.html
修改配置檔案
storm.yaml
sudo vim storm.yaml 在四臺機器中都加入如下配置 storm.zookeeper.servers: - "worker1" - "worker2" - "worker3" storm.local.dir: "/usr/local/tmpdata/storm" supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 nimbus.seeds: ["master"] storm.zookeeper.port: 2181 // 不加下面這幾個你的拓撲直接跑不起來 nimbus.childopts: "-Xmx1024m" supervisor.childopts: "-Xmx1024m" worker.childopts: "-Xmx768m"
啟動
在master中執行 storm nimbus >> /dev/null & storm ui >/dev/null 2>&1 & 在worker1,worker2,worker3中執行 storm supervisor >/dev/null 2>&1 & storm logviewer >/dev/null 2>&1 & 直接訪問http://master:8080即可
使用java編寫拓撲
四個檔案如圖
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>test</groupId> <artifactId>test</artifactId> <version>1.0.0</version> <name>test</name> <description>Test project for spring boot mybatis</description> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.encoding>UTF-8</maven.compiler.encoding> <java.version>1.8</java.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.2.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
App.java
package test; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; public class App { public static void main( String[] args ) throws Exception { TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("word",new WordSpout(),1); topologyBuilder.setBolt("receive",new RecieveBolt(),1).shuffleGrouping("word"); topologyBuilder.setBolt("print",new ConsumeBolt(),1).shuffleGrouping("receive"); // 叢集執行 Config config = new Config(); config.setNumWorkers(3); config.setDebug(true); StormSubmitter.submitTopology("teststorm", config, topologyBuilder.createTopology()); // 本地測試 // Config config = new Config(); // config.setNumWorkers(3); // config.setDebug(true); // config.setMaxTaskParallelism(20); // LocalCluster cluster = new LocalCluster(); // cluster.submitTopology("wordCountTopology", config, topologyBuilder.createTopology()); // Utils.sleep(60000); // 執行完畢,關閉cluster // cluster.shutdown(); } }
WordSpout.java
package test; import java.util.Map; import java.util.Random; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; public class WordSpout extends BaseRichSpout { private static final long serialVersionUID = 6102239192526611945L; private SpoutOutputCollector collector; Random random = new Random(); // 初始化tuple的collector public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector collector) { this.collector = collector; } public void nextTuple() { // 模擬產生訊息佇列 String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"}; final String word = words[random.nextInt(words.length)]; // 提交一個tuple給預設的輸出流 this.collector.emit(new Values(word)); Utils.sleep(5000); } // 聲明發送訊息的欄位名 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); } }
RecieveBolt.java
package test; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class RecieveBolt extends BaseRichBolt { private static final long serialVersionUID = -4758047349803579486L; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { // 將spout傳遞過來的tuple值進行轉換 this.collector.emit(new Values(tuple.getStringByField("word") + "!!!")); } // 聲明發送訊息的欄位名 public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); } }
ConsumeBolt.java
package test; import java.io.FileWriter; import java.io.IOException; import java.util.Map; import java.util.UUID; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; public class ConsumeBolt extends BaseRichBolt { private static final long serialVersionUID = -7114915627898482737L; private FileWriter fileWriter = null; private OutputCollector collector; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; try { fileWriter = new FileWriter("/usr/local/tmpdata/" + UUID.randomUUID()); // fileWriter = new FileWriter("C:\\Users\\26401\\Desktop\\test\\" + UUID.randomUUID()); } catch (IOException e) { throw new RuntimeException(e); } } public void execute(Tuple tuple) { try { String word = tuple.getStringByField("word") + "......." + "\n"; fileWriter.write(word); fileWriter.flush(); System.out.println(word); } catch (IOException e) { throw new RuntimeException(e); } } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
在叢集中執行
storm jar test-1.0.0-jar-with-dependencies.jar test.App // 啟動叢集 storm kill teststorm // 結束叢集