Flink流計算程式設計--Flink sink to Oracle
1、Flink connectors
關於Flink connectors,Flink 1.1提供了許多內建的第三方聯結器,這些connectors包括:
Apache Kafka (sink/source)
Elasticsearch (sink)
Elasticsearch 2x (sink)
Hadoop FileSystem (sink)
RabbitMQ (sink/source)
Amazon Kinesis Streams (sink/source)
Twitter Streaming API (source)
Apache NiFi (sink/source)
Apache Cassandra (sink)
Redis (sink)
可以看到,第三方軟體中,可以作為source的軟體有:
Apache Kafka、RabbitMQ、Twitter Streaming API和Apache NiFi。
可以作為sink的軟體包括Apache Kafka、Apache Cassandra、Redis等。
除了Flink內建支援的這些第三方軟體之外,Flink也提供了自定義的source以及自定義的Sink。
2、關於Sink to JDBC
Flink的DataStream在計算完成後,就要將結果輸出,目前除了上述提到的Kafka、Redis等之外,Flink也提供了其他幾種方式:
writeAsText() / TextOutputFormat: 將元素按照行輸出,每行當做一個字串
writeAsCsv(... ) / CsvOutputFormat: 將每行的元組按照特定的格式劃分,然後輸出到csv
print() / printToErr() :標準輸出,錯誤輸出。也是把每行按照字串方式輸出到taskmanager的out檔案
writeUsingOutputFormat() / FileOutputFormat:自定義的檔案輸出
writeToSocket:根據序列化的sckame將元素寫入socket
addSink:通過invoke方法自定義sink
其中,addSink就是我們這裡要說的“自定義Flink sink”。既然是自定義,我們就可以將DataStream輸出到JDBC,例如Mysql、Oracle。這在很多時候都很用。
一般情況下,我們通常會將DataStream sink到類似於Redis這種記憶體資料庫,同時也會將結果入庫,作為以後分析使用,例如sink到Mysql或oracle等JDBC。
Flink內建並沒有支援JDBC,除了要覆寫addSink方法外,我們還需要匯入JDBC相應的依賴。
在大多數Maven管理的專案中,我們通常需要手動匯入JDBC的依賴包。
3、Maven匯入Oracle的依賴包
(1)獲取oracle的lib包
這裡要根據目標資料庫的版本下載,也可以去資料庫伺服器中的“{ORACLE_HOME}\jdbc\lib\ojdbc.jar”,例如oracle12.1.0.2.0,此時對應的jar包是ojdbc6.jar。
(2)手動安裝ojdbc6.jar到本地maven的repository
mvn install:install-file -DgroupId=com.oracle -DartifactId=ojdbc6 -Dversion=12.1.0.2.0 -Dpackaging=jar -Dfile=ojdbc6.jar
通常,cmd後先cd到剛才我們獲得的ojdbc6.jar所在的路徑,然後執行上邊的命令即可。
(3)在pom.xml中新增引用
<!-- 新增oracle jdbc driver -->
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>12.1.0.2.0</version>
</dependency>
4、Flink自定義Sink to Oracle
這裡Flink對DataStrem提供了一個addSink方法,我們自定義一個類實現SinkFunction或者RichSinkFunction即可,例如:
txSumNew.addSink(new TX1MinuteSinkToOracle)
這個新的物件TX1MinuteSinkToOracle要繼承RichSinkFunction,因為對資料庫的insert操作,我們只需要一次性建立1個session即可,不需要每次insert都建立一個連線,因此在RichSinkFunction的open方法中建立連線。這樣,就可以open一次,insert多次了。
這裡採用java.sql類庫建立連線,但是在連線之前,我們要告訴Flink我們準備使用的JdbcDriver是oracle Driver,如下:
Class.forName ("oracle.jdbc.OracleDriver")
之後,就可以建立連線(這裡的IP要自己連線自己的伺服器,s1是服務名,使用者名稱和密碼自己設定):
conn = DriverManager.getConnection("jdbc:oracle:thin:@<IP>:1521:s1", "s1", "s1")
val sql = "insert into flink_tx(tx_date,minute,window_start_time, window_end_time,code,minute_volume,minute_turnover,minute_size,minute_avg_volume,sum_size,minute_volume_super,minute_volume_big,minute_volume_middle,minute_volume_small," +
"sum_volume_super,sum_volume_big,sum_volume_middle,sum_volume_small,minute_vwap,sum_vwap,vwap_sd) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
ps = conn.prepareStatement(sql)
完整的程式碼如下:
object TX1MinuteSinkToOracle {
// *****************************************************************************
// open()中只執行一次,開始時執行;invoke根據input進行sql執行;close()最後時關閉
// *****************************************************************************
class TX1MinuteSinkToOracle extends RichSinkFunction[TX1MinSliding]{
var conn : Connection = null
var ps: PreparedStatement = null
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
override def open(parameters: Configuration): Unit = {
Class.forName ("oracle.jdbc.OracleDriver")
conn = DriverManager.getConnection("jdbc:oracle:thin:@<IP>:1521:s1", "s1", "s1")
val sql = "insert into flink_tx(tx_date,minute,window_start_time, window_end_time,code,minute_volume,minute_turnover,minute_size,minute_avg_volume,sum_size,minute_volume_super,minute_volume_big,minute_volume_middle,minute_volume_small," +
"sum_volume_super,sum_volume_big,sum_volume_middle,sum_volume_small,minute_vwap,sum_vwap,vwap_sd) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
ps = conn.prepareStatement(sql)
}
override def invoke(in: TX1MinSliding): Unit = {
try {
val avg_volume_1Min = BigDecimal.valueOf(in.volume_1Min)./(BigDecimal.valueOf(in.size_1Min.toDouble)).setScale(4,BigDecimal.RoundingMode.HALF_UP)
ps.setString(1,in.date)//時間
ps.setString(2,in.minute)//分鐘
ps.setString(3,in.window_start_time)//視窗開始範圍
ps.setString(4,in.window_end_time)//視窗結束範圍
ps.setString(5,in.code)//股票程式碼
ps.setLong(6,in.volume_1Min)//分鐘成交量
ps.setDouble(7,BigDecimal.valueOf(in.turnover_1Min).setScale(1,BigDecimal.RoundingMode.HALF_UP).toDouble)//分鐘成交額
ps.setInt(8,in.size_1Min)//分鐘交易筆數
ps.setDouble(9,avg_volume_1Min.toDouble)//分鐘平均交易量
ps.setInt(10,in.sum_size)//累計交易筆數
ps.setDouble(11,in.volume_super_1Min.toDouble)//分鐘-成交量(特大戶)
ps.setDouble(12,in.volume_big_1Min)//分鐘-成交量(大戶)
ps.setDouble(13,in.volume_middle_1Min.toDouble)//分鐘-成交量(中戶)
ps.setDouble(14,in.volume_small_1Min.toDouble)//分鐘-成交量(散戶)
ps.setDouble(15,in.volume_super_sum.toDouble)//累計成交量(特大戶)
ps.setDouble(16,in.volume_big_sum.toDouble)//累計成交量(大戶)
ps.setDouble(17,in.volume_middle_sum.toDouble)//累計成交量(中戶)
ps.setDouble(18,in.volume_small_sum.toDouble)//累計成交量(散戶)
ps.setDouble(19,in.vwap_1Min.setScale(4,BigDecimal.RoundingMode.HALF_UP).toDouble)//分鐘VWAP
ps.setDouble(20,in.vwap_sum.setScale(4,BigDecimal.RoundingMode.HALF_UP).toDouble)//累計VWAP
ps.setDouble(21,in.vwap_sd.setScale(4,BigDecimal.RoundingMode.HALF_UP).toDouble)//VWAP標準差
ps.executeUpdate()
}catch{
case e : Exception => println(e.getMessage)
}
}
override def close(): Unit = {
if (ps != null) {
ps.close()
}
if(conn != null){
conn.close()
}
}
}
}
關於Flink的addSink,我們可以看到原始碼如下:
\flink-streaming-java\src\main\java\org\apache\flink\streaming\api\functions\sink\下,有RichSinkFunction.java與SinkFunction.java。
@Public
public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
private static final long serialVersionUID = 1L;
public abstract void invoke(IN value) throws Exception;
}
5、引用