Storm讀取操作資料的Java Demo
阿新 • • 發佈:2019-02-07
一. Demo的流程圖:
二. 建立Spout
import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.Map; public class MySpout extends BaseRichSpout { SpoutOutputCollector spoutOutputCollector;//spout輸出收集器 int index = 0; String[] data = {"hello world","apache storm","can see you"}; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; } @Override public void nextTuple() { if(index >= data.length){ return; } //傳送資料 this.spoutOutputCollector.emit(new Values(data[index]));//emit發射 發出 index++ ; } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("data"));//宣告輸出欄位描述 } }
三. 建立拆分單詞的Bolt
import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class MyBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { String data = tuple.getStringByField("data"); String[] split = data.split(" "); for (String s : split) { System.out.println("=======first======"+s); basicOutputCollector.emit(new Values(s)); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("world"));//定義傳給下一個bolt的欄位描述 } }
四. 建立計算單詞長度的Bolt
import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; import java.util.HashMap; import java.util.Map; public class MySecondBolt extends BaseBasicBolt { Map<String,Integer> map = new HashMap<>(); // @Override // public void prepare(Map stormConf, TopologyContext context) { // this.map = new HashMap<>(); // } @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { String world = tuple.getStringByField("world"); System.out.println("========world======="+world+"=======length==="+world.length()); map.put(world,world.length()); // System.out.println(); } /** * 擴撲結束時執行 */ @Override public void cleanup() { System.out.println("=====topology end====="); for (String s : map.keySet()) { System.out.println("==key="+s+"==and value="+map.get(s)); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } }
五. 建立擴撲Topology
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class MyTopology {
public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout",new MySpout());
topologyBuilder.setBolt("bolt",new MyBolt()).shuffleGrouping("spout");
topologyBuilder.setBolt("secondBolt",new MySecondBolt()).fieldsGrouping("bolt",new Fields("world"));
Config config = new Config();
config.setDebug(false);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("world-length",config,topologyBuilder.createTopology());
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
localCluster.killTopology("world-length");
localCluster.shutdown();
}
}
注意:1.storm是有本地模式和遠端模式2種模式的,我是本地Demo開發,用的是本地模式,直接執行MyTopology的main方法就可以了,本地模式的核心就是LocalCluster類,它會模擬出storm執行的叢集和依賴的zookeeper叢集。
2.本地模式下Thread.sleep(20000),sleep20秒,之前寫了10秒,一直看不到輸出沒找到原因,最後發現電腦不夠快,在storm執行完成前localCluster.shutdown();關閉了叢集,應該是LocalCluster模擬叢集和依賴的zookeeper叢集比較花費時間。
3.本地模式下有時會報這個錯誤:
org.apache.storm.shade.org.apache.zookeeper.server.ServerCnxn$EndOfStreamException: Unable to read additional data from client sessionid 0x164f9d7461e0008, likely client has closed socket
at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) [storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) [storm-core-1.1.1.jar:1.1.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_161-1-redhat]
不用管,不影響程式的執行,看網上說好像是第一次連線zookeeper的報錯,zookeeper初始化的報錯。
4.正常情況下程式的輸出:
=======first======hello
=======first======world
=======first======apache
=======first======storm
=======first======can
=======first======see
=======first======you
24713 [Thread-20-secondBolt-executor[3 3]] INFO o.a.s.d.executor - Preparing bolt secondBolt:(3)
24714 [Thread-20-secondBolt-executor[3 3]] INFO o.a.s.d.executor - Prepared bolt secondBolt:(3)
========world=======hello=======length===5
========world=======world=======length===5
========world=======apache=======length===6
========world=======storm=======length===5
========world=======can=======length===3
========world=======see=======length===3
========world=======you=======length===3
=====topology end=====
==key=can==and value=3
==key=see==and value=3
==key=world==and value=5
==key=apache==and value=6
==key=storm==and value=5
==key=hello==and value=5
==key=you==and value=3
補充:本地模式下想讓storm執行需要將storm的jar包新增到classpath中,意即在maven的pom.xml中新增storm的依賴jar包就可!
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<storm.version>1.1.1</storm.version>
</dependency>