Storm+HBASE+MySQL 實時讀取Kafka資訊計算儲存
阿新 • • 發佈:2019-02-04
本文是通過Storm將生產出來的資料進行實時的計算統計,整理出來之後將資料寫到hbase和mysql資料中,並將結果展示在前端頁面上,頁面展示部分在下一篇說明
題目要求
一、機組執行資料清洗規則
1、執行資料日期不是當日資料
2、執行資料風速 為空||=-902||風速在 3~12之外
3、執行資料功率 為空||=-902||功率在 -0.5*1500~2*1500之外
二、清洗資料後儲存HBase
1、正常資料 & 不合理資料 全部存入HBase中
2、劃分兩個表(Normal/Abnormal);Rowkey設計:年月日時分秒_機組編號;列:Value(把資料寫入一個列中)
三、實時監控報警
對於正常資料監控異常指標,並輸出到MySQL中記錄,Web顯示報警資訊。
規則:每5S監控30S內發電機溫度高於80度以上5次,報警(機組編號、報警時間、報警描述:過去30S內發電機溫度高於80
度以上出現:6/10(次))
Storm實時計算部分
- 通過在終端啟動了一個消費者檢視使用kafka生產出來的資料被消費者消費的格式如下,是以逗號分割的一條資料流。
- 首先定義一個SplitdataBolt()用來將資料欄位進行分割,並且將下面的步驟需要的欄位傳送到下一個bolt中
- 機組執行資料清洗規則
定義FliterBolt()用來進行資料的篩選。定義一個flag標識,當符合條件時,將flag標誌為false,然後在發射資料的時候根據flag的值進行發射,同時標記了streamid,用於後面資料的分流。其中主要的不同還是streamid的不同,其他的欄位都相同。
篩選條件
根據是否為當天日期,風速的值和功率值來進行篩選。當天日期只選取了2016/1/1的資料為當天資料。
為了驗證資料的分流是否準確,使用了兩個printbolt用來列印兩個分流的資料。分流資料的使用在shuffleGrouping中加上分流資料的id,就可以將正常和不正常的進行分類分別使用兩個bolt進行輸出。
輸出的資料如下
- 清洗資料後儲存HBase
之前已經將資料的分流標記好id,只需要定義兩個hbaseBolt分別進行配置和資料的接收,並將資料寫入到hbase中
兩個mapper和hbasebolt的設定
輸入到hbase接收分流資料
輸入到hbase結果
可以看到rowkey和最後的結果和要求相同。
正常資料
異常資料
- 實時監控報警(使用滑動視窗進行計算)
使用滑動視窗進行溫度超過80度的資料的總和統計
定義WindowMonitor繼承BaseWindowedBolt類,通過execute方法將一個視窗內的資料進行統計。通過後面的if語句進行條件的判斷,將溫度大於80的風機編號進行計數,使用map來進行資料的儲存
資料的傳送:當一個視窗統計結束後,將map中key的value>5的map進行資料的傳送
對接資料流
要定義視窗的長度和間隔,並且指定streamId
builder.setBolt("Filter", new FliterBolt(), 1)
.shuffleGrouping("data-spilter");
builder.setBolt("window",new WindowMonitor().withWindow(new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS),new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS)),1)
.shuffleGrouping("Filter","Normal");
- 將統計資料輸入到mysql資料庫中
定義一個PersistentBolt類來進行jdbc的連線
Mysql資料庫的配置
private static Map<String,Object> hikariConfigMap = new HashMap<String, Object>(){{
put("dataSourceClassName", "com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
put("dataSource.url", "jdbc:mysql://172.17.11.183:3306/ExceptionMonitor");
put("dataSource.user","root");
put("dataSource.password", "root");
}};
建立getJdbcInsertBolt方法,用來向資料庫中插入資料
首先定義
List<> schemaColumns用來存放資料庫中的表的欄位名
建立如下兩個物件使用withInsertQuery方法來進行資料庫的插入
public static JdbcInsertBolt getJdbcInsertBolt(){
List<Column> schemaColumns = Lists.newArrayList(new Column("fan_no", Types.VARCHAR),
new Column("call_time", Types.VARCHAR), new Column("call_count", Types.INTEGER));
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(schemaColumns);
JdbcInsertBolt jdbcInsertBolt = new JdbcInsertBolt(connectionProvider,simpleJdbcMapper)
.withInsertQuery("insert into exception_output(fan_no,call_time,call_count) values(?,?,?)")
.withQueryTimeoutSecs(50);
return jdbcInsertBolt;
//使用tablename進行插入資料,需要指定表中的所有欄位
/*String tableName="ExceptionOutput";
JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
JdbcInsertBolt jdbcInsertBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
.withTableName("ExceptionOutput")
.withQueryTimeoutSecs(50);*/
//使用schemaColumns,可以指定欄位要插入的欄位
}
MySQL資料庫展示
Storm部分計算程式碼
寫入到mysql中
主要還是要把Storm的上下游機制以及各種元件的原理搞清楚,程式碼就好寫了