Storm 系列(八)—— Storm 整合 HDFS 和 HBase
阿新 • • 發佈:2019-12-31
一、Storm整合HDFS
1.1 專案結構
本用例原始碼下載地址:storm-hdfs-integration
1.2 專案主要依賴
專案主要依賴如下,有兩個地方需要注意:
- 這裡由於我伺服器上安裝的是 CDH 版本的 Hadoop,在匯入依賴時引入的也是 CDH 版本的依賴,需要使用
<repository>
標籤指定 CDH 的倉庫地址; -
hadoop-common
、hadoop-client
、hadoop-hdfs
均需要排除slf4j-log4j12
依賴,原因是storm-core
中已經有該依賴,不排除的話有 JAR 包衝突的風險;
<properties >
<storm.version>1.2.2</storm.version>
</properties>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies >
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<!--Storm 整合 HDFS 依賴-->
<dependency>
<groupId>org.apache.storm</groupId >
<artifactId>storm-hdfs</artifactId>
<version>${storm.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
複製程式碼
1.3 DataSourceSpout
/**
* 產生詞頻樣本的資料來源
*/
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark","Hadoop","HBase","Storm","Flink","Hive");
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map,TopologyContext topologyContext,SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
// 模擬產生資料
String lineData = productData();
spoutOutputCollector.emit(new Values(lineData));
Utils.sleep(1000);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("line"));
}
/**
* 模擬資料
*/
private String productData() {
Collections.shuffle(list);
Random random = new Random();
int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
return StringUtils.join(list.toArray(),"\t",0,endIndex);
}
}
複製程式碼
產生的模擬資料格式如下:
Spark HBase
Hive Flink Storm Hadoop HBase Spark
Flink
HBase Storm
HBase Hadoop Hive Flink
HBase Flink Hive Storm
Hive Flink Hadoop
HBase Hive
Hadoop Spark HBase Storm
複製程式碼
1.4 將資料儲存到HDFS
這裡 HDFS 的地址和資料儲存路徑均使用了硬編碼,在實際開發中可以通過外部傳參指定,這樣程式更為靈活。
public class DataToHdfsApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
private static final String HDFS_BOLT = "hdfsBolt";
public static void main(String[] args) {
// 指定 Hadoop 的使用者名稱 如果不指定,則在 HDFS 建立目錄時候有可能丟擲無許可權的異常 (RemoteException: Permission denied)
System.setProperty("HADOOP_USER_NAME","root");
// 定義輸出欄位 (Field) 之間的分隔符
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");
// 同步策略: 每 100 個 tuples 之後就會把資料從快取重新整理到 HDFS 中
SyncPolicy syncPolicy = new CountSyncPolicy(100);
// 檔案策略: 每個檔案大小上限 1M,超過限定時,建立新檔案並繼續寫入
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f,Units.MB);
// 定義儲存路徑
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/storm-hdfs/");
// 定義 HdfsBolt
HdfsBolt hdfsBolt = new HdfsBolt()
.withFsUrl("hdfs://hadoop001:8020")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
// 構建 Topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(DATA_SOURCE_SPOUT,new DataSourceSpout());
// save to HDFS
builder.setBolt(HDFS_BOLT,hdfsBolt,1).shuffleGrouping(DATA_SOURCE_SPOUT);
// 如果外部傳參 cluster 則代表線上環境啟動,否則代表本地啟動
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterDataToHdfsApp",new Config(),builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalDataToHdfsApp",builder.createTopology());
}
}
}
複製程式碼
1.5 啟動測試
可以用直接使用本地模式執行,也可以打包後提交到伺服器叢集執行。本倉庫提供的原始碼預設採用 maven-shade-plugin
進行打包,打包命令如下:
# mvn clean package -D maven.test.skip=true
複製程式碼
執行後,資料會儲存到 HDFS 的 /storm-hdfs
目錄下。使用以下命令可以檢視目錄內容:
# 檢視目錄內容
hadoop fs -ls /storm-hdfs
# 監聽文內容變化
hadoop fs -tail -f /strom-hdfs/檔名
複製程式碼
二、Storm整合HBase
2.1 專案結構
整合用例: 進行詞頻統計並將最後的結果儲存到 HBase,專案主要結構如下:
本用例原始碼下載地址:storm-hbase-integration
2.2 專案主要依賴
<properties>
<storm.version>1.2.2</storm.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<!--Storm 整合 HBase 依賴-->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hbase</artifactId>
<version>${storm.version}</version>
</dependency>
</dependencies>
複製程式碼
2.3 DataSourceSpout
/**
* 產生詞頻樣本的資料來源
*/
public class DataSourceSpout extends BaseRichSpout {
private List<String> list = Arrays.asList("Spark",endIndex);
}
}
複製程式碼
產生的模擬資料格式如下:
Spark HBase
Hive Flink Storm Hadoop HBase Spark
Flink
HBase Storm
HBase Hadoop Hive Flink
HBase Flink Hive Storm
Hive Flink Hadoop
HBase Hive
Hadoop Spark HBase Storm
複製程式碼
2.4 SplitBolt
/**
* 將每行資料按照指定分隔符進行拆分
*/
public class SplitBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String line = input.getStringByField("line");
String[] words = line.split("\t");
for (String word : words) {
collector.emit(tuple(word,1));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
}
複製程式碼
2.5 CountBolt
/**
* 進行詞頻統計
*/
public class CountBolt extends BaseRichBolt {
private Map<String,Integer> counts = new HashMap<>();
private OutputCollector collector;
@Override
public void prepare(Map stormConf,OutputCollector collector) {
this.collector=collector;
}
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word,count);
// 輸出
collector.emit(new Values(word,String.valueOf(count)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
}
複製程式碼
2.6 WordCountToHBaseApp
/**
* 進行詞頻統計 並將統計結果儲存到 HBase 中
*/
public class WordCountToHBaseApp {
private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
private static final String SPLIT_BOLT = "splitBolt";
private static final String COUNT_BOLT = "countBolt";
private static final String HBASE_BOLT = "hbaseBolt";
public static void main(String[] args) {
// storm 的配置
Config config = new Config();
// HBase 的配置
Map<String,Object> hbConf = new HashMap<>();
hbConf.put("hbase.rootdir","hdfs://hadoop001:8020/hbase");
hbConf.put("hbase.zookeeper.quorum","hadoop001:2181");
// 將 HBase 的配置傳入 Storm 的配置中
config.put("hbase.conf",hbConf);
// 定義流資料與 HBase 中資料的對映
SimpleHBaseMapper mapper = new SimpleHBaseMapper()
.withRowKeyField("word")
.withColumnFields(new Fields("word","count"))
.withColumnFamily("info");
/*
* 給 HBaseBolt 傳入表名、資料對映關係、和 HBase 的配置資訊
* 表需要預先建立: create 'WordCount','info'
*/
HBaseBolt hbase = new HBaseBolt("WordCount",mapper)
.withConfigKey("hbase.conf");
// 構建 Topology
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(DATA_SOURCE_SPOUT,new DataSourceSpout(),1);
// split
builder.setBolt(SPLIT_BOLT,new SplitBolt(),1).shuffleGrouping(DATA_SOURCE_SPOUT);
// count
builder.setBolt(COUNT_BOLT,new CountBolt(),1).shuffleGrouping(SPLIT_BOLT);
// save to HBase
builder.setBolt(HBASE_BOLT,hbase,1).shuffleGrouping(COUNT_BOLT);
// 如果外部傳參 cluster 則代表線上環境啟動,否則代表本地啟動
if (args.length > 0 && args[0].equals("cluster")) {
try {
StormSubmitter.submitTopology("ClusterWordCountToRedisApp",config,builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalWordCountToRedisApp",builder.createTopology());
}
}
}
複製程式碼
2.7 啟動測試
可以用直接使用本地模式執行,也可以打包後提交到伺服器叢集執行。本倉庫提供的原始碼預設採用 maven-shade-plugin
進行打包,打包命令如下:
# mvn clean package -D maven.test.skip=true
複製程式碼
執行後,資料會儲存到 HBase 的 WordCount
表中。使用以下命令檢視錶的內容:
hbase > scan 'WordCount'
複製程式碼
2.8 withCounterFields
在上面的用例中我們是手動編碼來實現詞頻統計,並將最後的結果儲存到 HBase 中。其實也可以在構建 SimpleHBaseMapper
的時候通過 withCounterFields
指定 count 欄位,被指定的欄位會自動進行累加操作,這樣也可以實現詞頻統計。需要注意的是 withCounterFields 指定的欄位必須是 Long 型別,不能是 String 型別。
SimpleHBaseMapper mapper = new SimpleHBaseMapper()
.withRowKeyField("word")
.withColumnFields(new Fields("word"))
.withCounterFields(new Fields("count"))
.withColumnFamily("cf");
複製程式碼
參考資料
更多大資料系列文章可以參見 GitHub 開源專案: 大資料入門指南