1. 程式人生 > >Flink流計算程式設計--Flink sink to Oracle

Flink流計算程式設計--Flink sink to Oracle

關於Flink connectors,Flink 1.1提供了許多內建的第三方聯結器,這些connectors包括:

Apache Kafka (sink/source)
Elasticsearch (sink)
Elasticsearch 2x (sink)
Hadoop FileSystem (sink)
RabbitMQ (sink/source)
Amazon Kinesis Streams (sink/source)
Twitter Streaming API (source)
Apache NiFi (sink/source)
Apache Cassandra (sink)
Redis (sink)

可以看到,第三方軟體中,可以作為source的軟體有:
Apache Kafka、RabbitMQ、Twitter Streaming API和Apache NiFi。
可以作為sink的軟體包括Apache Kafka、Apache Cassandra、Redis等。

除了Flink內建支援的這些第三方軟體之外,Flink也提供了自定義的source以及自定義的Sink。

2、關於Sink to JDBC

Flink的DataStream在計算完成後,就要將結果輸出,目前除了上述提到的Kafka、Redis等之外,Flink也提供了其他幾種方式:

writeAsText() / TextOutputFormat: 將元素按照行輸出,每行當做一個字串

writeAsCsv(...
) / CsvOutputFormat: 將每行的元組按照特定的格式劃分,然後輸出到csv print() / printToErr() :標準輸出,錯誤輸出。也是把每行按照字串方式輸出到taskmanager的out檔案 writeUsingOutputFormat() / FileOutputFormat:自定義的檔案輸出 writeToSocket:根據序列化的sckame將元素寫入socket addSink:通過invoke方法自定義sink

其中,addSink就是我們這裡要說的“自定義Flink sink”。既然是自定義,我們就可以將DataStream輸出到JDBC,例如Mysql、Oracle。這在很多時候都很用。

一般情況下,我們通常會將DataStream sink到類似於Redis這種記憶體資料庫,同時也會將結果入庫,作為以後分析使用,例如sink到Mysql或oracle等JDBC。

Flink內建並沒有支援JDBC,除了要覆寫addSink方法外,我們還需要匯入JDBC相應的依賴。
在大多數Maven管理的專案中,我們通常需要手動匯入JDBC的依賴包。

3、Maven匯入Oracle的依賴包

(1)獲取oracle的lib包
這裡要根據目標資料庫的版本下載,也可以去資料庫伺服器中的“{ORACLE_HOME}\jdbc\lib\ojdbc.jar”,例如oracle12.1.0.2.0,此時對應的jar包是ojdbc6.jar。
(2)手動安裝ojdbc6.jar到本地maven的repository

mvn install:install-file -DgroupId=com.oracle -DartifactId=ojdbc6 -Dversion=12.1.0.2.0 -Dpackaging=jar -Dfile=ojdbc6.jar

通常,cmd後先cd到剛才我們獲得的ojdbc6.jar所在的路徑,然後執行上邊的命令即可。
(3)在pom.xml中新增引用

<!-- 新增oracle jdbc driver -->
        <dependency>
            <groupId>com.oracle</groupId>
            <artifactId>ojdbc6</artifactId>
            <version>12.1.0.2.0</version>
        </dependency>

4、Flink自定義Sink to Oracle

這裡Flink對DataStrem提供了一個addSink方法,我們自定義一個類實現SinkFunction或者RichSinkFunction即可,例如:

txSumNew.addSink(new TX1MinuteSinkToOracle)

這個新的物件TX1MinuteSinkToOracle要繼承RichSinkFunction,因為對資料庫的insert操作,我們只需要一次性建立1個session即可,不需要每次insert都建立一個連線,因此在RichSinkFunction的open方法中建立連線。這樣,就可以open一次,insert多次了。

這裡採用java.sql類庫建立連線,但是在連線之前,我們要告訴Flink我們準備使用的JdbcDriver是oracle Driver,如下:

Class.forName ("oracle.jdbc.OracleDriver")

之後,就可以建立連線(這裡的IP要自己連線自己的伺服器,s1是服務名,使用者名稱和密碼自己設定):

conn = DriverManager.getConnection("jdbc:oracle:thin:@<IP>:1521:s1", "s1", "s1")
      val sql = "insert into flink_tx(tx_date,minute,window_start_time, window_end_time,code,minute_volume,minute_turnover,minute_size,minute_avg_volume,sum_size,minute_volume_super,minute_volume_big,minute_volume_middle,minute_volume_small," +
        "sum_volume_super,sum_volume_big,sum_volume_middle,sum_volume_small,minute_vwap,sum_vwap,vwap_sd) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
      ps = conn.prepareStatement(sql)

完整的程式碼如下:

object TX1MinuteSinkToOracle {

  // *****************************************************************************
  // open()中只執行一次,開始時執行;invoke根據input進行sql執行;close()最後時關閉
  // *****************************************************************************
  class TX1MinuteSinkToOracle extends RichSinkFunction[TX1MinSliding]{

    var conn : Connection = null
    var ps: PreparedStatement = null

    val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")


    override def open(parameters: Configuration): Unit = {


      Class.forName ("oracle.jdbc.OracleDriver")

      conn = DriverManager.getConnection("jdbc:oracle:thin:@<IP>:1521:s1", "s1", "s1")
      val sql = "insert into flink_tx(tx_date,minute,window_start_time, window_end_time,code,minute_volume,minute_turnover,minute_size,minute_avg_volume,sum_size,minute_volume_super,minute_volume_big,minute_volume_middle,minute_volume_small," +
        "sum_volume_super,sum_volume_big,sum_volume_middle,sum_volume_small,minute_vwap,sum_vwap,vwap_sd) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
      ps = conn.prepareStatement(sql)
    }


    override def invoke(in: TX1MinSliding): Unit = {

      try {

        val avg_volume_1Min = BigDecimal.valueOf(in.volume_1Min)./(BigDecimal.valueOf(in.size_1Min.toDouble)).setScale(4,BigDecimal.RoundingMode.HALF_UP)

        ps.setString(1,in.date)//時間
        ps.setString(2,in.minute)//分鐘
        ps.setString(3,in.window_start_time)//視窗開始範圍
        ps.setString(4,in.window_end_time)//視窗結束範圍
        ps.setString(5,in.code)//股票程式碼
        ps.setLong(6,in.volume_1Min)//分鐘成交量
        ps.setDouble(7,BigDecimal.valueOf(in.turnover_1Min).setScale(1,BigDecimal.RoundingMode.HALF_UP).toDouble)//分鐘成交額
        ps.setInt(8,in.size_1Min)//分鐘交易筆數
        ps.setDouble(9,avg_volume_1Min.toDouble)//分鐘平均交易量
        ps.setInt(10,in.sum_size)//累計交易筆數
        ps.setDouble(11,in.volume_super_1Min.toDouble)//分鐘-成交量(特大戶)
        ps.setDouble(12,in.volume_big_1Min)//分鐘-成交量(大戶)
        ps.setDouble(13,in.volume_middle_1Min.toDouble)//分鐘-成交量(中戶)
        ps.setDouble(14,in.volume_small_1Min.toDouble)//分鐘-成交量(散戶)
        ps.setDouble(15,in.volume_super_sum.toDouble)//累計成交量(特大戶)
        ps.setDouble(16,in.volume_big_sum.toDouble)//累計成交量(大戶)
        ps.setDouble(17,in.volume_middle_sum.toDouble)//累計成交量(中戶)
        ps.setDouble(18,in.volume_small_sum.toDouble)//累計成交量(散戶)
        ps.setDouble(19,in.vwap_1Min.setScale(4,BigDecimal.RoundingMode.HALF_UP).toDouble)//分鐘VWAP
        ps.setDouble(20,in.vwap_sum.setScale(4,BigDecimal.RoundingMode.HALF_UP).toDouble)//累計VWAP
        ps.setDouble(21,in.vwap_sd.setScale(4,BigDecimal.RoundingMode.HALF_UP).toDouble)//VWAP標準差

        ps.executeUpdate()

      }catch{
        case e : Exception => println(e.getMessage)
      }
    }

    override def close(): Unit = {
      if (ps != null) {
        ps.close()
      }
      if(conn != null){
        conn.close()
      }
    }

  }
}

關於Flink的addSink,我們可以看到原始碼如下:
\flink-streaming-java\src\main\java\org\apache\flink\streaming\api\functions\sink\下,有RichSinkFunction.java與SinkFunction.java。

@Public
public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {

    private static final long serialVersionUID = 1L;

    public abstract void invoke(IN value) throws Exception;

}

5、引用