Storm整合Hbase
阿新 • • 發佈:2018-11-07
將Storm流計算的結果儲存到Hbase上。以 WordCount 為例
準備工作
1. 啟動叢集
- 啟動ZooKeeper叢集
$> zkServer.sh start
- 啟動hadoop叢集(Hbase叢集啟動前應該保證Hadoop叢集啟動,並保證namenode為啟用狀態)
$> start-dfs.sh
- 啟動hbase叢集
start-hbase.sh
- 啟動Storm叢集
//啟動master
$> storm nimbus
//啟動slave
$> storm supervisor
- 建立表名為wordcount,列族為f1的資料表
$hbase shell> create 'ns1:wordcount','f1'
2. maven依賴匯入
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hbase</artifactId>
<version>1.1.3</version>
</dependency>
< dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.6</version>
</dependency>
3. 配置檔案匯入
將 hbase-site.xml檔案 和 hdfs-site.xml檔案匯入 resources 目錄下
程式設計實現
源頭 Spout
public class WordCountSpout implements IRichSpout {
private TopologyContext context;
private SpoutOutputCollector collector;
private List<String> states;
private Random r = new Random();
private int index = 0;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.context = context;
this.collector = collector;
states = new ArrayList<String>();
states.add("hello world tom");
states.add("hello world tomas");
states.add("hello world tomasLee");
states.add("hello world tomson");
}
public void close() {
}
public void activate() {
}
public void deactivate() {
}
public void nextTuple() {
if (index < 3) {
String line = states.get(r.nextInt(4));
collector.emit(new Values(line));
index++;
}
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
分片 Bolt
public class SplitBolt implements IRichBolt {
private TopologyContext context ;
private OutputCollector collector ;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.context = context ;
this.collector = collector ;
}
public void execute(Tuple tuple) {
String line = tuple.getString(0);
String[] arr = line.split(" ");
for(String s : arr){
collector.emit(new Values(s,1));
}
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
HbaseBolt——寫資料到Hbase庫中
public class HbaseBolt implements IRichBolt {
private Table table;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
try {
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
TableName tableName = TableName.valueOf("ns1:wordcount");
table = connection.getTable(tableName);
} catch (IOException e) {
e.printStackTrace();
}
}
public void execute(Tuple tuple) {
//取到單詞
String word = tuple.getString(0);
//取到單詞數
Integer count = tuple.getInteger(1);
//使用hbase的計數器進行單詞統計
byte[] rowKey = Bytes.toBytes(word);
byte[] f = Bytes.toBytes("f1");
byte[] c = Bytes.toBytes("count");
try {
table.incrementColumnValue(rowKey, f, c, count);
} catch (IOException e) {
e.printStackTrace();
}
}
public void cleanup() {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
WCApp
public class WCApp {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("wcspout", new WordCountSpout()).setNumTasks(1);
builder.setBolt("split-bolt", new SplitBolt(), 2).shuffleGrouping("wcspout").setNumTasks(2);
builder.setBolt("hbase-bolt", new HbaseBolt(), 2).fieldsGrouping("split-bolt", new Fields("word")).setNumTasks(2);
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wc", conf, builder.createTopology());
}
}