1. 程式人生 > >Storm+HBASE+MySQL 實時讀取Kafka資訊計算儲存

Storm+HBASE+MySQL 實時讀取Kafka資訊計算儲存

本文是通過Storm將生產出來的資料進行實時的計算統計,整理出來之後將資料寫到hbase和mysql資料中,並將結果展示在前端頁面上,頁面展示部分在下一篇說明

題目要求

一、機組執行資料清洗規則
1、執行資料日期不是當日資料
2、執行資料風速 為空||=-902||風速在 3~12之外
3、執行資料功率 為空||=-902||功率在 -0.5*1500~2*1500之外
二、清洗資料後儲存HBase
1、正常資料 & 不合理資料 全部存入HBase中
2、劃分兩個表(Normal/Abnormal);Rowkey設計:年月日時分秒_機組編號;列:Value(把資料寫入一個列中)
三、實時監控報警
對於正常資料監控異常指標,並輸出到MySQL中記錄,Web顯示報警資訊。
規則:每5S監控30S內發電機溫度高於80度以上5次,報警(機組編號、報警時間、報警描述:過去30S內發電機溫度高於80
度以上出現:6/10(次))

Storm實時計算部分

  • 通過在終端啟動了一個消費者檢視使用kafka生產出來的資料被消費者消費的格式如下,是以逗號分割的一條資料流。
    這裡寫圖片描述
  • 首先定義一個SplitdataBolt()用來將資料欄位進行分割,並且將下面的步驟需要的欄位傳送到下一個bolt中
    這裡寫圖片描述
    這裡寫圖片描述
  • 機組執行資料清洗規則
    定義FliterBolt()用來進行資料的篩選。定義一個flag標識,當符合條件時,將flag標誌為false,然後在發射資料的時候根據flag的值進行發射,同時標記了streamid,用於後面資料的分流。其中主要的不同還是streamid的不同,其他的欄位都相同。
    篩選條件
    根據是否為當天日期,風速的值和功率值來進行篩選。當天日期只選取了2016/1/1的資料為當天資料。
    這裡寫圖片描述

    為了驗證資料的分流是否準確,使用了兩個printbolt用來列印兩個分流的資料。分流資料的使用在shuffleGrouping中加上分流資料的id,就可以將正常和不正常的進行分類分別使用兩個bolt進行輸出。
    這裡寫圖片描述
    輸出的資料如下
    這裡寫圖片描述
  • 清洗資料後儲存HBase
    之前已經將資料的分流標記好id,只需要定義兩個hbaseBolt分別進行配置和資料的接收,並將資料寫入到hbase中
     兩個mapper和hbasebolt的設定
    這裡寫圖片描述
     輸入到hbase接收分流資料
    這裡寫圖片描述
    輸入到hbase結果
    可以看到rowkey和最後的結果和要求相同。
    正常資料
    這裡寫圖片描述
    異常資料
    這裡寫圖片描述
  • 實時監控報警(使用滑動視窗進行計算)
    使用滑動視窗進行溫度超過80度的資料的總和統計
    定義WindowMonitor繼承BaseWindowedBolt類,通過execute方法將一個視窗內的資料進行統計。通過後面的if語句進行條件的判斷,將溫度大於80的風機編號進行計數,使用map來進行資料的儲存
    這裡寫圖片描述

     資料的傳送:當一個視窗統計結束後,將map中key的value>5的map進行資料的傳送
    這裡寫圖片描述
     對接資料流
    要定義視窗的長度和間隔,並且指定streamId
 builder.setBolt("Filter", new FliterBolt(), 1)
 .shuffleGrouping("data-spilter");
builder.setBolt("window",new WindowMonitor().withWindow(new BaseWindowedBolt.Duration(30, TimeUnit.SECONDS),new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS)),1)
.shuffleGrouping("Filter","Normal");
  • 將統計資料輸入到mysql資料庫中
    定義一個PersistentBolt類來進行jdbc的連線
    Mysql資料庫的配置
 private static Map<String,Object> hikariConfigMap = new HashMap<String, Object>(){{
        put("dataSourceClassName", "com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
        put("dataSource.url", "jdbc:mysql://172.17.11.183:3306/ExceptionMonitor");
        put("dataSource.user","root");
        put("dataSource.password", "root");
    }};

建立getJdbcInsertBolt方法,用來向資料庫中插入資料
首先定義
List<> schemaColumns用來存放資料庫中的表的欄位名
建立如下兩個物件使用withInsertQuery方法來進行資料庫的插入

public static JdbcInsertBolt getJdbcInsertBolt(){

        List<Column> schemaColumns = Lists.newArrayList(new Column("fan_no", Types.VARCHAR),
                new Column("call_time", Types.VARCHAR), new Column("call_count", Types.INTEGER));
        JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(schemaColumns);
        JdbcInsertBolt jdbcInsertBolt = new JdbcInsertBolt(connectionProvider,simpleJdbcMapper)
                .withInsertQuery("insert into exception_output(fan_no,call_time,call_count) values(?,?,?)")
                .withQueryTimeoutSecs(50);
        return jdbcInsertBolt;
        //使用tablename進行插入資料,需要指定表中的所有欄位
        /*String tableName="ExceptionOutput";
        JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
        JdbcInsertBolt jdbcInsertBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
                                        .withTableName("ExceptionOutput")
                                        .withQueryTimeoutSecs(50);*/
        //使用schemaColumns,可以指定欄位要插入的欄位
    }

MySQL資料庫展示
這裡寫圖片描述
Storm部分計算程式碼
寫入到mysql中
主要還是要把Storm的上下游機制以及各種元件的原理搞清楚,程式碼就好寫了