Flink讀寫系列之-讀mysql並寫入mysql
阿新 • • 發佈:2019-01-01
在Flink文件中,提供connector讀取源資料和把處理結果儲存到外部系統中。但是沒有提供資料庫的connector,如果要讀寫資料庫,官網給出了非同步IO(Asynchronous I/O)專門用於訪問外部資料,詳細可看:
還有一種方法是繼承RichSourceFunction,重寫裡面的方法,具體如下:
讀取mysql的類:
package com.my.flink.utils.streaming.mysql; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import com.my.flink.utils.config.ConfigKeys; /** * @Description mysql source * @Author jiangxiaozhi * @Date 2018/10/15 17:05 **/ public class JdbcReader extends RichSourceFunction<Tuple2<String,String>> { private static final Logger logger = LoggerFactory.getLogger(JdbcReader.class); private Connection connection = null; private PreparedStatement ps = null; //該方法主要用於開啟資料庫連線,下面的ConfigKeys類是獲取配置的類 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName(ConfigKeys.DRIVER_CLASS());//載入資料庫驅動 connection = DriverManager.getConnection(ConfigKeys.SOURCE_DRIVER_URL(), ConfigKeys.SOURCE_USER(), ConfigKeys.SOURCE_PASSWORD());//獲取連線 ps = connection.prepareStatement(ConfigKeys.SOURCE_SQL()); } //執行查詢並獲取結果 @Override public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception { try { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()) { String name = resultSet.getString("nick"); String id = resultSet.getString("user_id"); logger.error("readJDBC name:{}", name); Tuple2<String,String> tuple2 = new Tuple2<>(); tuple2.setFields(id,name); ctx.collect(tuple2);//傳送結果,結果是tuple2型別,2表示兩個元素,可根據實際情況選擇 } } catch (Exception e) { logger.error("runException:{}", e); } } //關閉資料庫連線 @Override public void cancel() { try { super.close(); if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } catch (Exception e) { logger.error("runException:{}", e); } } }
寫入mysql的類:
package com.my.flink.utils.streaming.mysql; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import scala.Tuple2; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import com.my.flink.utils.config.ConfigKeys; /** * @Description mysql sink * @Author jiangxiaozhi * @Date 2018/10/15 18:31 **/ public class JdbcWriter extends RichSinkFunction<Tuple2<String,String>> { private Connection connection; private PreparedStatement preparedStatement; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); useUnicode=true&characterEncoding=utf8"; // 載入JDBC驅動 Class.forName(ConfigKeys.DRIVER_CLASS()); // 獲取資料庫連線 connection = DriverManager.getConnection(ConfigKeys.SINK_DRIVER_URL(),ConfigKeys.SINK_USER(),ConfigKeys.SINK_PASSWORD());//寫入mysql資料庫 preparedStatement = connection.prepareStatement(ConfigKeys.SINK_SQL());//insert sql在配置檔案中 super.open(parameters); } @Override public void close() throws Exception { super.close(); if(preparedStatement != null){ preparedStatement.close(); } if(connection != null){ connection.close(); } super.close(); } @Override public void invoke(Tuple1<String,String> value, Context context) throws Exception { try { String name = value._1;//獲取JdbcReader傳送過來的結果 preparedStatement.setString(1,name); preparedStatement.executeUpdate(); }catch (Exception e){ e.printStackTrace(); } } }
程式入口核心程式碼:
//scala程式碼 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(5000) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) val dataStream = env.addSource(new JdbcReader())//,讀取mysql資料,獲取dataStream後可以做邏輯處理,這裡沒有 做 dataStream.addSink(new JdbcWriter())//寫入mysql env.execute("flink mysql demo")//執行程式
執行mysql就可以在資料庫表中看到寫入的資料了。