1. 程式人生 > >Flink讀寫系列之-讀mysql並寫入mysql

Flink讀寫系列之-讀mysql並寫入mysql

在Flink文件中,提供connector讀取源資料和把處理結果儲存到外部系統中。但是沒有提供資料庫的connector,如果要讀寫資料庫,官網給出了非同步IO(Asynchronous I/O)專門用於訪問外部資料,詳細可看:

還有一種方法是繼承RichSourceFunction,重寫裡面的方法,具體如下:

讀取mysql的類

package com.my.flink.utils.streaming.mysql;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import com.my.flink.utils.config.ConfigKeys;

/**
 * @Description mysql source
 * @Author jiangxiaozhi
 * @Date 2018/10/15 17:05
 **/
public class JdbcReader extends RichSourceFunction<Tuple2<String,String>> {
    private static final Logger logger = LoggerFactory.getLogger(JdbcReader.class);

    private Connection connection = null;
    private PreparedStatement ps = null;

    //該方法主要用於開啟資料庫連線,下面的ConfigKeys類是獲取配置的類
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName(ConfigKeys.DRIVER_CLASS());//載入資料庫驅動
        connection = DriverManager.getConnection(ConfigKeys.SOURCE_DRIVER_URL(), ConfigKeys.SOURCE_USER(), ConfigKeys.SOURCE_PASSWORD());//獲取連線
        ps = connection.prepareStatement(ConfigKeys.SOURCE_SQL());
    }

    //執行查詢並獲取結果
    @Override
    public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
        try {
            ResultSet resultSet = ps.executeQuery();
            while (resultSet.next()) {
                String name = resultSet.getString("nick");
                String id = resultSet.getString("user_id");
                logger.error("readJDBC name:{}", name);
                Tuple2<String,String> tuple2 = new Tuple2<>();
                tuple2.setFields(id,name);
                ctx.collect(tuple2);//傳送結果,結果是tuple2型別,2表示兩個元素,可根據實際情況選擇
            }
        } catch (Exception e) {
            logger.error("runException:{}", e);
        }

    }
     
    //關閉資料庫連線
    @Override
    public void cancel() {
        try {
            super.close();
            if (connection != null) {
                connection.close();
            }
            if (ps != null) {
                ps.close();
            }
        } catch (Exception e) {
            logger.error("runException:{}", e);
        }
    }
}

 寫入mysql的類:

package com.my.flink.utils.streaming.mysql;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import scala.Tuple2;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import com.my.flink.utils.config.ConfigKeys;

/**
 * @Description mysql sink
 * @Author jiangxiaozhi
 * @Date 2018/10/15 18:31
 **/
public class JdbcWriter extends RichSinkFunction<Tuple2<String,String>> {
    private Connection connection;
    private PreparedStatement preparedStatement;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
useUnicode=true&characterEncoding=utf8";
        // 載入JDBC驅動
        Class.forName(ConfigKeys.DRIVER_CLASS());
        // 獲取資料庫連線
        connection = DriverManager.getConnection(ConfigKeys.SINK_DRIVER_URL(),ConfigKeys.SINK_USER(),ConfigKeys.SINK_PASSWORD());//寫入mysql資料庫
        preparedStatement = connection.prepareStatement(ConfigKeys.SINK_SQL());//insert sql在配置檔案中
        super.open(parameters);
    }

    @Override
    public void close() throws Exception {
        super.close();
        if(preparedStatement != null){
            preparedStatement.close();
        }
        if(connection != null){
            connection.close();
        }
        super.close();
    }

    @Override
    public void invoke(Tuple1<String,String> value, Context context) throws Exception {
        try {
            String name = value._1;//獲取JdbcReader傳送過來的結果
            preparedStatement.setString(1,name);
            preparedStatement.executeUpdate();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

程式入口核心程式碼:

  //scala程式碼
  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(5000)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  val dataStream =  env.addSource(new JdbcReader())//,讀取mysql資料,獲取dataStream後可以做邏輯處理,這裡沒有
做
  dataStream.addSink(new JdbcWriter())//寫入mysql
  env.execute("flink mysql demo")//執行程式
  

執行mysql就可以在資料庫表中看到寫入的資料了。