1. 程式人生 > 實用技巧 >Flink例項(三): connectors(一)MySQL讀寫

Flink例項(三): connectors(一)MySQL讀寫

1 工程目錄結構

pom.xml

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</
groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.9.2</version> </dependency>

2 flink 讀取MySQL

1)通過自定義source提交

MySQLSource

package com.atguigu.flink.source

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

import com.atguigu.flink.bean.SensorReading import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} class MySQLSource extends RichSourceFunction[SensorReading] { var conn:Connection = null var ps:PreparedStatement
= null // 流開啟時操作 override def open(parameters: Configuration): Unit = { // 載入驅動 Class.forName("com.mysql.jdbc.Driver") // 資料庫連線 conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/gmall1122?useSSL=false","root","123456") ps = conn.prepareStatement("select * from sensor limit 5") } // 流執行時操作 override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = { try { var resultSet:ResultSet = ps.executeQuery() while (resultSet.next()){ var id:String = resultSet.getString("id") var curTime:Long = resultSet.getLong("timestamp") var timepreture:Double = resultSet.getDouble("timepreture") sourceContext.collect(SensorReading(id,curTime,timepreture)) } } catch { case _:Exception => 0 } finally { conn.close() } } // 流關閉時操作 override def cancel(): Unit = { try{ if(conn!=null){ conn.close() } if(ps!=null){ ps.close() } } catch { case _:Exception => print("error") } } }

主程式入口 MySQLSourceSinkApp

package com.atguigu.flink.app

import com.atguigu.flink.bean.SensorReading
import com.atguigu.flink.sink.MySQLSink
import com.atguigu.flink.source.MySQLSource
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.scala
import org.apache.flink.streaming.api.scala._


object MySQLSourceSinkApp {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)


    //呼叫addSource以此來作為資料輸入端
    val stream: scala.DataStream[SensorReading] = env.addSource(new MySQLSource)


    //呼叫addSink以此來作為資料輸出端
    stream.addSink(new MySQLSink())

    // 列印流
    stream.print()

    // 執行主程式
    env.execute()
  }

}

2) 通過 JDBCInputFormat方式

    
  val sql_read = "select * from sensor limit 5"

def readMysql(env:ExecutionEnvironment,url: String, driver: String, user: String, pwd: String, sql: String): DataSet[SensorReading] ={ // 獲取資料流 val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat() .setDrivername(driver) .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .setRowTypeInfo(new RowTypeInfo( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO)) .finish()) // 轉化為自定義格式 val dStream = dataResult.map(x=> { val id = x.getField(0).asInstanceOf[String] val timestamp = x.getField(1).asInstanceOf[Long] val timepreture = x.getField(2).asInstanceOf[Double] SensorReading(id, timestamp, timepreture) }) return dStream }

主程式入口MySQLSourceSinkApp2

package com.atguigu.flink.app

import com.atguigu.flink.bean.SensorReading
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import org.apache.flink.types.Row

object MySQLSourceSinkApp2 {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost:3306/gmall1122?useSSL=false"
    val username = "root"
    val password = "123456"
    val sql_read = "select * from sensor limit 5"
    val sql_write = "insert into sensor (id, timestamp,timepreture) values(?,?,?)"


    def readMysql(env:ExecutionEnvironment,url: String, driver: String, user: String, pwd: String, sql: String): DataSet[SensorReading] ={
      // 獲取資料流
      val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
        .setDrivername(driver)
        .setDBUrl(url)
        .setUsername(user)
        .setPassword(pwd)
        .setQuery(sql)
        .setRowTypeInfo(new RowTypeInfo(
          BasicTypeInfo.STRING_TYPE_INFO,
          BasicTypeInfo.LONG_TYPE_INFO,
          BasicTypeInfo.DOUBLE_TYPE_INFO))
        .finish())

      // 轉化為自定義格式
      val dStream = dataResult.map(x=> {
        val id = x.getField(0).asInstanceOf[String]
        val timestamp = x.getField(1).asInstanceOf[Long]
        val timepreture = x.getField(2).asInstanceOf[Double]
        SensorReading(id, timestamp, timepreture)
      })
      return dStream
    }

    // 讀取mysql資料
    val readStream = readMysql(env, url, driver ,username ,password ,sql_read)

  }


}

3 flink 寫入 MySQL

1)通過自定義Sink提交

MySQLSink

package com.atguigu.flink.sink

import java.sql.{Connection, DriverManager, PreparedStatement}

import com.atguigu.flink.bean.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction

class MySQLSink extends RichSinkFunction[SensorReading]{
  var conn:Connection = null
  var ps:PreparedStatement = null
  val INSERT_CASE:String = "INSERT INTO sensor (id, timestamp,timepreture) " + "VALUES (?, ?, ?) "

  override def open(parameters: Configuration): Unit = {
    // 載入驅動
    Class.forName("com.mysql.jdbc.Driver")
    // 資料庫連線
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/gmall1122?useSSL=false","root","123456")
    ps = conn.prepareStatement(INSERT_CASE)
  }

  override def invoke(value:SensorReading): Unit = {
    try{
      ps.setString(1,value.id)
      ps.setLong(2,value.timestamp)
      ps.setDouble(3,value.timepreture)
      ps.addBatch()
      ps.executeBatch()
    } catch {
      case _:Exception => 0
    }
  }
}

主程式入口MySQLSourceSinkApp

package com.atguigu.flink.app

import com.atguigu.flink.bean.SensorReading
import com.atguigu.flink.sink.MySQLSink
import com.atguigu.flink.source.MySQLSource
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.scala
import org.apache.flink.streaming.api.scala._


object MySQLSourceSinkApp {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)


    //呼叫addSource以此來作為資料輸入端
    val stream: scala.DataStream[SensorReading] = env.addSource(new MySQLSource)


    //呼叫addSink以此來作為資料輸出端
    stream.addSink(new MySQLSink())

    // 列印流
    stream.print()

    // 執行主程式
    env.execute()
  }

}

2)通過JDBCOutputFormat

在flink中沒有現成的用來寫入MySQL的sink,但是flink提供了一個類,JDBCOutputFormat,通過這個類,如果你提供了jdbc的driver,則可以當做sink使用。

JDBCOutputFormat其實是flink的batch api,但也可以用來作為stream的api使用,社群也推薦通過這種方式來進行。

    val sql_write = "insert into sensor (id, timestamp,timepreture) values(?,?,?)"


    def writeMysql(env: ExecutionEnvironment, outputData: DataSet[Row], url: String, user: String, pwd: String, sql: String) = {
      outputData.output(JDBCOutputFormat.buildJDBCOutputFormat()
        .setDrivername("com.mysql.jdbc.Driver")
        .setDBUrl(url)
        .setUsername(user)
        .setPassword(pwd)
        .setQuery(sql)
        .finish())
      env.execute("insert data to mysql")
      print("data write successfully")
    }

主程式入口MySQLSourceSinkApp2

package com.atguigu.flink.app

import com.atguigu.flink.bean.SensorReading
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import org.apache.flink.types.Row

object MySQLSourceSinkApp2 {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost:3306/gmall1122?useSSL=false"
    val username = "root"
    val password = "123456"
    val sql_read = "select * from sensor limit 5"
    val sql_write = "insert into sensor (id, timestamp,timepreture) values(?,?,?)"


    def readMysql(env:ExecutionEnvironment,url: String, driver: String, user: String, pwd: String, sql: String): DataSet[SensorReading] ={
      // 獲取資料流
      val dataResult: DataSet[Row] = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
        .setDrivername(driver)
        .setDBUrl(url)
        .setUsername(user)
        .setPassword(pwd)
        .setQuery(sql)
        .setRowTypeInfo(new RowTypeInfo(
          BasicTypeInfo.STRING_TYPE_INFO,
          BasicTypeInfo.LONG_TYPE_INFO,
          BasicTypeInfo.DOUBLE_TYPE_INFO))
        .finish())

      // 轉化為自定義格式
      val dStream = dataResult.map(x=> {
        val id = x.getField(0).asInstanceOf[String]
        val timestamp = x.getField(1).asInstanceOf[Long]
        val timepreture = x.getField(2).asInstanceOf[Double]
        SensorReading(id, timestamp, timepreture)
      })
      return dStream
    }

    // 讀取mysql資料
    val readStream = readMysql(env, url, driver ,username ,password ,sql_read)

// 將流中的資料格式轉化為JDBCOutputFormat接受的格式 val outputData
= readStream.map(x => { val row = new Row(3) row.setField(0, x.id) row.setField(1, x.timestamp) row.setField(2, x.timepreture) row }) def writeMysql(env: ExecutionEnvironment, outputData: DataSet[Row], url: String, user: String, pwd: String, sql: String) = { outputData.output(JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl(url) .setUsername(user) .setPassword(pwd) .setQuery(sql) .finish()) env.execute("insert data to mysql") print("data write successfully") } // 向mysql插入資料 writeMysql(env,outputData,url,username,password,sql_write) } }

4 scala讀取 MySQL

MysqlUtil

package com.atguigu.flink.utils

import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, Statement}

import com.alibaba.fastjson.JSONObject

import scala.collection.mutable.ListBuffer

object MysqlUtil {
  def main(args: Array[String]): Unit = {
    val list:  List[ JSONObject] = queryList("select * from sensor limit 5")
    println(list)
  }

  def queryList(sql:String):List[JSONObject]={
    //載入驅動
    Class.forName("com.mysql.jdbc.Driver")
    val resultList: ListBuffer[JSONObject] = new  ListBuffer[ JSONObject]()
    //連結資料庫
    val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/gmall1122?useSSL=false","root","123456")
    val stat: Statement = conn.createStatement
    val rs: ResultSet = stat.executeQuery(sql )
    val md: ResultSetMetaData = rs.getMetaData
    while (  rs.next ) {
      val rowData = new JSONObject();
      for (i  <-1 to md.getColumnCount  ) {
        rowData.put(md.getColumnName(i), rs.getObject(i))
      }
      resultList+=rowData
    }

    stat.close()
    conn.close()
    resultList.toList

    //
  }

}