1. 程式人生 > >Storm讀取操作資料的Java Demo

Storm讀取操作資料的Java Demo

一. Demo的流程圖:

二. 建立Spout



import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;


public class MySpout extends BaseRichSpout {

    SpoutOutputCollector spoutOutputCollector;//spout輸出收集器

    int index = 0;

    String[] data = {"hello world","apache storm","can see you"};

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        if(index >= data.length){
            return;
        }
        //傳送資料
        this.spoutOutputCollector.emit(new Values(data[index]));//emit發射 發出
        index++ ;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("data"));//宣告輸出欄位描述
    }
}

三. 建立拆分單詞的Bolt



import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;


public class MyBolt extends BaseBasicBolt {


    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String data = tuple.getStringByField("data");
        String[] split = data.split(" ");
        for (String s : split) {
            System.out.println("=======first======"+s);
            basicOutputCollector.emit(new Values(s));
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("world"));//定義傳給下一個bolt的欄位描述
    }
}

四. 建立計算單詞長度的Bolt

​


import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;


public class MySecondBolt extends BaseBasicBolt {

    Map<String,Integer> map = new HashMap<>();

//    @Override
//    public void prepare(Map stormConf, TopologyContext context) {
//        this.map = new HashMap<>();
//    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String world = tuple.getStringByField("world");
        System.out.println("========world======="+world+"=======length==="+world.length());
        map.put(world,world.length());
//        System.out.println();
    }

    /**
     * 擴撲結束時執行
     */
    @Override
    public void cleanup() {
        System.out.println("=====topology end=====");
        for (String s : map.keySet()) {
            System.out.println("==key="+s+"==and value="+map.get(s));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

​

五. 建立擴撲Topology



import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;


public class MyTopology {

    public static void main(String[] args) {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("spout",new MySpout());
        topologyBuilder.setBolt("bolt",new MyBolt()).shuffleGrouping("spout");
        topologyBuilder.setBolt("secondBolt",new MySecondBolt()).fieldsGrouping("bolt",new Fields("world"));

        Config config = new Config();
        config.setDebug(false);
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("world-length",config,topologyBuilder.createTopology());

        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        localCluster.killTopology("world-length");
        localCluster.shutdown();
    }


}

注意:1.storm是有本地模式和遠端模式2種模式的,我是本地Demo開發,用的是本地模式,直接執行MyTopology的main方法就可以了,本地模式的核心就是LocalCluster類,它會模擬出storm執行的叢集和依賴的zookeeper叢集。

2.本地模式下Thread.sleep(20000),sleep20秒,之前寫了10秒,一直看不到輸出沒找到原因,最後發現電腦不夠快,在storm執行完成前localCluster.shutdown();關閉了叢集,應該是LocalCluster模擬叢集和依賴的zookeeper叢集比較花費時間。

3.本地模式下有時會報這個錯誤:

org.apache.storm.shade.org.apache.zookeeper.server.ServerCnxn$EndOfStreamException: Unable to read additional data from client sessionid 0x164f9d7461e0008, likely client has closed socket
	at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) [storm-core-1.1.1.jar:1.1.1]
	at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) [storm-core-1.1.1.jar:1.1.1]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161-1-redhat]

不用管,不影響程式的執行,看網上說好像是第一次連線zookeeper的報錯,zookeeper初始化的報錯。

4.正常情況下程式的輸出:

=======first======hello
=======first======world
=======first======apache
=======first======storm
=======first======can
=======first======see
=======first======you
24713 [Thread-20-secondBolt-executor[3 3]] INFO  o.a.s.d.executor - Preparing bolt secondBolt:(3)
24714 [Thread-20-secondBolt-executor[3 3]] INFO  o.a.s.d.executor - Prepared bolt secondBolt:(3)
========world=======hello=======length===5
========world=======world=======length===5
========world=======apache=======length===6
========world=======storm=======length===5
========world=======can=======length===3
========world=======see=======length===3
========world=======you=======length===3
=====topology end=====
==key=can==and value=3
==key=see==and value=3
==key=world==and value=5
==key=apache==and value=6
==key=storm==and value=5
==key=hello==and value=5
==key=you==and value=3

補充:本地模式下想讓storm執行需要將storm的jar包新增到classpath中,意即在maven的pom.xml中新增storm的依賴jar包就可!

<dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <storm.version>1.1.1</storm.version>
        </dependency>