1. 程式人生 > 實用技巧 >[DB] Flink 讀 MySQL

[DB] Flink 讀 MySQL

思路

在 Flink 中建立一張表有兩種方法:

  • 從一個檔案中匯入表結構(Structure)(常用於批計算)(靜態)

  • 從 DataStream 或者 DataSet 轉換成 Table (動態)

package com.kaikeba.mysql.demo

import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.types.Row

object Flink2Mysql {
  def main(args: Array[String]): Unit = {
    //設定執行環境
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = BatchTableEnvironment.create(env)

    //通過建立JDBCInputFormat讀取JDBC資料來源
    val jdbcDataSet: DataSet[Row] =
      env.createInput(JDBCInputFormat.buildJDBCInputFormat()
        .setDrivername("com.mysql.cj.jdbc.Driver")
        .setDBUrl("jdbc:mysql://127.0.0.1:3306/flink-mysql?serverTimezone=GMT%2B8&characterEncoding=UTF-8&useSSL=false")
        .setUsername("root")
        .setPassword("Chen1227+")
        .setQuery("select * from filter")
        .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
        .finish()
      )

    //將DataSet註冊為表
    tEnv.registerDataSet("tb", jdbcDataSet)
    //執行查詢操作
    val table = tEnv.sqlQuery("select * from tb")
    //把table轉為DataSet
    tEnv.toDataSet[Row](table).print()
  }
}  

參考

Flink 讀寫 Mysql

https://blog.csdn.net/Android_xue/article/details/102705711

https://blog.csdn.net/ranyizhang/article/details/103759251

https://www.cnblogs.com/Gxiaobai/p/12645497.html

Flink流處理訪問MySQL

https://blog.csdn.net/u012447842/article/details/89175772

Flink例項

https://blog.csdn.net/xianpanjia4616/article/details/98318750