Storm Kafka + Storm + HBase例項
阿新 • • 發佈:2019-02-03
需求
- WordCount案例 Kafka + Storm + HBase
- HBase表名:wordcount;
列族:result;
RowKey:word;
Field:count - 打包叢集部署執行
開發過程
1.
配置kafkaSpout,通過KafkaSpout獲取Kafka叢集中的資料
//從zookeeper動態讀取broker BrokerHosts hosts = new ZkHosts("172.17.11.120:2181,172.17.11.117:2181,172.17.11.118:2181"); String topic="TOPIC-STORM-HBASE"; String zkRoot="/storm";//用於儲存當前處理到哪個Offset SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, SPOUTID); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());//如何解碼資料 KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
2.
實現一個Bolt用於切割字串並記錄每個單詞出現的次數,關鍵程式碼
方法一
@Override public void execute(Tuple input) { String line=input.getString(0); String[] words=line.split(" "); for(String word:words){ if (!word.equals("")){ // this.collector.emit(tuple(word,1)); if(map.containsKey(word)){ map.put(word,map.get(word)+1); }else { map.put(word,1); } } } for (Map.Entry<String,Integer> e:map.entrySet()){ this.collector.emit(tuple(e.getKey(),e.getValue().toString())); } this.collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); }
方法二
使用計數器
在Topology中
SimpleHBaseMapper mapper = new SimpleHBaseMapper()
.withRowKeyField("word")
.withCounterFields(new Fields("count"))
//.withColumnFields(new Fields("count"))
.withColumnFamily("result");
在Bolt中可以直接傳送分割後的字串,count為1
this.collector.emit(tuple(word,1));
3.
配置HBase,儲存storm輸出的實時資料
Config config = new Config();
config.setDebug(true);
Map<String, Object> hbConf = new HashMap<String, Object>();
hbConf.put("hbase.rootdir","hdfs://master:9000/hbase");
hbConf.put("hbase.zookeeper.quorum","master,slave1,slave2");//不加入該項配置,會出現連線到HBase失敗的錯誤
config.put("hbase.conf", hbConf);
SimpleHBaseMapper mapper = new SimpleHBaseMapper()
.withRowKeyField("word")
.withColumnFields(new Fields("count"))
.withColumnFamily("result");
HBaseBolt hbase = new HBaseBolt("Wordcount", mapper)
.withConfigKey("hbase.conf");
4.
在Topology中加入spout,bolt
TopologyBuilder builder=new TopologyBuilder();
builder.setSpout(SPOUTID,new KafkaSpout(spoutConfig),1);
builder.setBolt(COUNT_BOLT,bolt,1).shuffleGrouping(SPOUTID);
builder.setBolt(HBASE_BOLT,hbase,1).fieldsGrouping(COUNT_BOLT,new Fields("word"));
打包提交執行
沒有指定拓撲名,在控制檯執行,並不提交
bin/storm jar examples/Storm-Case-02-0.0.1-SNAPSHOT.jar com.horizon.storm.kafkahbase.KHTopology
指定拓撲名,提交執行
bin/storm jar examples/Storm-Case-02-0.0.1-SNAPSHOT.jar com.horizon.storm.kafkahbase.KHTopology KHTopology
Topology成功提交執行
測試一下,在Kafka啟動的Producer中輸入
在HBase中查看錶,可以看到新生產的資料已經加入,及對應的value
之前的測試扔進去一大段話,所以表中顯示這麼多,但是可以發現HBase儲存的value型別為十六進位制
修改Bolt中傳遞到HBaseBolt的資料型別為String,可以使在HBase表中檢視的效果為十進位制
this.collector.emit(tuple(e.getKey(),e.getValue().toString()));
開發過程中遇到的問題
1.
jar包的依賴中含有配置檔案,和叢集環境中的配置檔案衝突
刪掉jar包中的檔案即可
原因:KafkaSpout初始化時,會去取spoutConfig.zkServers 和 spoutConfig.zkPort 變數的值,而該值預設是沒塞的,所以是空,那麼它就會去取當前執行的Storm所配置的zookeeper地址和埠,而本地執行的Storm,是一個臨時的zookeeper例項,並不會真正持久化。所以,每次關閉後,資料就沒了。本地模式,要顯示的去配置
spoutConfig.zkServers = new ArrayList<String>(){{
add("10.1.110.20");
add("10.1.110.21");
add("10.1.110.24");
}};
spoutConfig.zkPort = 2181;
自己實現Scheme
Scheme解碼方式是與Producer端生成時加入資料的編碼方式配套的
public class MyKHScheme implements Scheme{
@Override
public List<Object> deserialize(ByteBuffer byteBuffer) {
String word=decode(byteBuffer);
return new Values(word);
}
@Override
public Fields getOutputFields() {
return new Fields("word");
}
public String decode(ByteBuffer byteBuffer){
Charset charset = null;
CharsetDecoder decoder = null;
CharBuffer charBuffer = null;
charset = Charset.forName("UTF-8");
decoder = charset.newDecoder();
try {
charBuffer = decoder.decode(byteBuffer);
} catch (CharacterCodingException e) {
e.printStackTrace();
}
return charBuffer.toString();
}
}
修改Topology中scheme為自定義的scheme
spoutConfig.scheme = new SchemeAsMultiScheme(new MyKHScheme());
在Bolt中測試一下讀入的資料
正確打印出我想要的資料格式(可以改成錯誤的試試,我試過了)