1. 程式人生 > >storm叢集配置以及java編寫拓撲例子

storm叢集配置以及java編寫拓撲例子

storm叢集配置

storm配置相當簡單

安裝

tar -zxvf apache-storm-1.2.2.tar.gz
rm apache-storm-1.2.2.tar.gz
mv apache-storm-1.2.2 storm

sudo vim /etc/profile
    export STORM_HOME=/usr/local/storm
    export PATH=$PATH:$STORM_HOME/bin

source /etc/profile

apt install python

準備 master worker1 worker2 worker3 這四臺機器

首先確保你的zookeeper叢集能夠正常執行worker1 worker2 worker3為zk叢集
具體配置參照我的部落格https://www.cnblogs.com/ye-hcj/p/9889585.html

修改配置檔案

  1. storm.yaml

    sudo vim storm.yaml
    在四臺機器中都加入如下配置
        storm.zookeeper.servers:
        - "worker1"
        - "worker2"
        - "worker3"
    
        storm.local.dir: "/usr/local/tmpdata/storm"
    
        supervisor.slots.ports:
        - 6700
        - 6701
        - 6702
        - 6703
    
        nimbus.seeds: ["master"]
    
        storm.zookeeper.port: 2181
    
        // 不加下面這幾個你的拓撲直接跑不起來
        nimbus.childopts: "-Xmx1024m" 
    
        supervisor.childopts: "-Xmx1024m"
    
        worker.childopts: "-Xmx768m"
  2. 啟動

    在master中執行
        storm nimbus >> /dev/null &
        storm ui >/dev/null 2>&1 &
    在worker1,worker2,worker3中執行
        storm supervisor >/dev/null 2>&1 &
        storm logviewer >/dev/null 2>&1 &
    
    直接訪問http://master:8080即可

使用java編寫拓撲

  1. 四個檔案如圖

  2. pom.xml

        <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>
            <groupId>test</groupId>
            <artifactId>test</artifactId>
            <version>1.0.0</version>
            <name>test</name>
            <description>Test project for spring boot mybatis</description>
            <packaging>jar</packaging>
    
            <properties>
                <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
                <maven.compiler.encoding>UTF-8</maven.compiler.encoding>
                <java.version>1.8</java.version>
                <maven.compiler.source>1.8</maven.compiler.source>
                <maven.compiler.target>1.8</maven.compiler.target>
            </properties> 
    
            <dependencies>
                <dependency>
                    <groupId>org.apache.storm</groupId>
                    <artifactId>storm-core</artifactId>
                    <version>1.2.2</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>junit</groupId>
                    <artifactId>junit</artifactId>
                    <version>3.8.1</version>
                </dependency>
            </dependencies>
    
            <build>
                <plugins>
                    <plugin>  
                        <artifactId>maven-assembly-plugin</artifactId>  
                        <configuration>  
                            <descriptorRefs>  
                                <descriptorRef>jar-with-dependencies</descriptorRef>  
                            </descriptorRefs>  
                        </configuration>  
                        <executions>  
                            <execution>  
                                <id>make-assembly</id>  
                                <phase>package</phase>  
                                <goals>  
                                    <goal>single</goal>  
                                </goals>  
                            </execution>  
                        </executions>  
                    </plugin>  
                </plugins>
            </build>
        </project>
  3. App.java

        package test;
        import org.apache.storm.Config;
        import org.apache.storm.LocalCluster;
        import org.apache.storm.StormSubmitter;
        import org.apache.storm.topology.TopologyBuilder;
        import org.apache.storm.utils.Utils;
    
        public class App 
        {
            public static void main( String[] args ) throws Exception {
    
                TopologyBuilder topologyBuilder = new TopologyBuilder();
                topologyBuilder.setSpout("word",new WordSpout(),1);
                topologyBuilder.setBolt("receive",new RecieveBolt(),1).shuffleGrouping("word");
                topologyBuilder.setBolt("print",new ConsumeBolt(),1).shuffleGrouping("receive");
    
                // 叢集執行
                Config config = new Config();
                config.setNumWorkers(3);
                config.setDebug(true);
                StormSubmitter.submitTopology("teststorm", config, topologyBuilder.createTopology());
    
                // 本地測試
                // Config config = new Config();
                // config.setNumWorkers(3);
                // config.setDebug(true);
                // config.setMaxTaskParallelism(20);
                // LocalCluster cluster = new LocalCluster();
                // cluster.submitTopology("wordCountTopology", config, topologyBuilder.createTopology());
                // Utils.sleep(60000);
                // 執行完畢,關閉cluster
                // cluster.shutdown();
            }
        }
  4. WordSpout.java

        package test;
    
        import java.util.Map;
        import java.util.Random;
    
        import org.apache.storm.spout.SpoutOutputCollector;
        import org.apache.storm.task.TopologyContext;
        import org.apache.storm.topology.OutputFieldsDeclarer;
        import org.apache.storm.topology.base.BaseRichSpout;
        import org.apache.storm.tuple.Fields;
        import org.apache.storm.tuple.Values;
        import org.apache.storm.utils.Utils;
    
        public class WordSpout extends BaseRichSpout {
    
            private static final long serialVersionUID = 6102239192526611945L;
    
            private SpoutOutputCollector collector;
    
            Random random = new Random();
    
    
            // 初始化tuple的collector
            public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector collector) {
                this.collector = collector;
            }
    
            public void nextTuple() {
                // 模擬產生訊息佇列
                String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};
    
                final String word = words[random.nextInt(words.length)];
    
                // 提交一個tuple給預設的輸出流
                this.collector.emit(new Values(word));
    
                Utils.sleep(5000);
            }
    
            // 聲明發送訊息的欄位名
            public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
                outputFieldsDeclarer.declare(new Fields("word"));
            }
        }
  5. RecieveBolt.java

        package test;
    
        import java.util.Map;
    
        import org.apache.storm.task.OutputCollector;
        import org.apache.storm.task.TopologyContext;
        import org.apache.storm.topology.OutputFieldsDeclarer;
        import org.apache.storm.topology.base.BaseRichBolt;
        import org.apache.storm.tuple.Fields;
        import org.apache.storm.tuple.Tuple;
        import org.apache.storm.tuple.Values;
    
        public class RecieveBolt extends BaseRichBolt {
    
            private static final long serialVersionUID = -4758047349803579486L;
    
            private OutputCollector collector;
    
            public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
                this.collector = collector;
            }
    
            public void execute(Tuple tuple) {
                // 將spout傳遞過來的tuple值進行轉換
                this.collector.emit(new Values(tuple.getStringByField("word") + "!!!")); 
            }
    
            // 聲明發送訊息的欄位名
            public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
                outputFieldsDeclarer.declare(new Fields("word"));
            }
        }
  6. ConsumeBolt.java

        package test;
    
        import java.io.FileWriter;
        import java.io.IOException;
        import java.util.Map;
        import java.util.UUID;
    
        import org.apache.storm.task.OutputCollector;
        import org.apache.storm.task.TopologyContext;
        import org.apache.storm.topology.OutputFieldsDeclarer;
        import org.apache.storm.topology.base.BaseRichBolt;
        import org.apache.storm.tuple.Tuple;
    
        public class ConsumeBolt extends BaseRichBolt {
    
            private static final long serialVersionUID = -7114915627898482737L;
    
            private FileWriter fileWriter = null;
    
            private OutputCollector collector;
    
            public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    
                this.collector = collector;
    
                try {
                    fileWriter = new FileWriter("/usr/local/tmpdata/" + UUID.randomUUID());
                    // fileWriter = new FileWriter("C:\\Users\\26401\\Desktop\\test\\" + UUID.randomUUID());
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
    
            public void execute(Tuple tuple) {
    
                try {
                    String word = tuple.getStringByField("word") + "......." + "\n";
                    fileWriter.write(word);
                    fileWriter.flush();
                    System.out.println(word);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
    
            public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    
            }
        }
  7. 在叢集中執行

        storm jar test-1.0.0-jar-with-dependencies.jar test.App // 啟動叢集
        storm kill teststorm // 結束叢集