Scala實現Flink消費kafka資料並用連線流過濾後存入PostgreSQL資料庫
1 前言
公司有一個專案整體的架構是要消費kafka資料並存入資料庫,以前選用的工具是spark streaming,最近flink已經變得比較流行了,所以也嘗試一下flink消費資料與spark streaming的區別。首先來簡單瞭解一下flink,它具有了流計算和批處理功能。它可以處理有界資料和無界資料,也就是可以處理永遠生產的資料。具體的細節我們不討論,我們直接搭建一個flink功能。總體的思路是source -> transform -> sink,即從source獲取相應的資料來源,然後進行資料轉換,將資料從比較亂的格式,轉換成我們需要的格式,轉換處理後,然後進行sink功能,也就是將資料寫入到相應的db裡邊或檔案中用於儲存和展現。
2 環境準備
JDK 1.8
Scala 2.11.8
Flink 1.10.1
3 程式碼
3.1 新增依賴
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>1.10.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.12</artifactId> <version>1.10.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.10.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.10.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.11.8</version> </dependency> </dependencies>
3.2 程式入口
/** * flink 消費kafka資料 並將消費後的資料存入 PostgreSQL */ object FlinkTest { def main(args: Array[String]): Unit = { //建立表描述器 val filterRules: MapStateDescriptor[String, String] = new MapStateDescriptor("filter_rule", BasicTypeInfo.STRING_TYPE_INFO, Types.STRING) // 讀取命令列引數 val fromArgs = ParameterTool.fromArgs(args) // 讀取配置檔案 val parameterTool = ParameterTool.fromPropertiesFile(fromArgs.get("properties")) //checkpoint配置 val checkpointDirectory = parameterTool.getRequired("checkpointDirectory.test") val checkpointSecondInterval = parameterTool.getLong("checkpointSecondInterval") // kafka設定 val kafkaBootStrapServers = parameterTool.getRequired("kafka.bootstrap.servers.test") val kafkaGroupID = parameterTool.getRequired("kafka.group.id.test") val kafkaSourceTopic = parameterTool.getRequired("kafka.topic.test") // Postgresql 配置 val postgresqlHost = parameterTool.getRequired("postgresql.host") val postgresqlPort = parameterTool.getInt("postgresql.port") val postgresqlDB = parameterTool.getRequired("postgresql.db") val postgresqlUser = parameterTool.getRequired("postgresql.user") val postgresqlPassword = parameterTool.getRequired("postgresql.password") val postgresqlInterval = parameterTool.getInt("postgre.secondInterval") val env = StreamExecutionEnvironment.getExecutionEnvironment // 開啟Checkpoint ,每10000毫秒進行依次 checkoint //env.enableCheckpointing(10000); env.enableCheckpointing(60000) // Checkpoint 語義設定為 Exactly_ONCE env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // Checkpoint 的超時時間 env.getCheckpointConfig.setCheckpointTimeout(120000) // 同時只允許一個Checkpoint在發生 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) // 兩次Checkpoint之間的最小時間間隔為 checkpointSecondInterval * 1000 ms env.getCheckpointConfig.setMinPauseBetweenCheckpoints(checkpointSecondInterval * 1000) // 當FLink任務取消時保留外部儲存的Checkpoint資訊 env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) // 作業最多允許Checkpoint失敗10次 env.getCheckpointConfig.setTolerableCheckpointFailureNumber(10) // 設定狀態後端 val backend = new RocksDBStateBackend(checkpointDirectory, true) env.setStateBackend(backend.asInstanceOf[StateBackend]) /* kafka配置 */ val kafkaSourceProps = new Properties() kafkaSourceProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapServers) kafkaSourceProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupID) //增加kafkasource 資料來源 kafka的消費者 val kafkaSource = env.addSource(new FlinkKafkaConsumer(kafkaSourceTopic, new SimpleStringSchema, kafkaSourceProps)) .uid("kafkaSource").name("kafkaSource").setParallelism(3) import scala.collection.mutable.Map //提取資料,並解析 val alarmEventStream = kafkaSource.process(new ProcessFunction[String, Map[String,String]] { override def processElement(value: String, ctx: ProcessFunction[String, Map[String, String]]#Context, out: Collector[Map[String, String]]): Unit = { if(value.contains("業務所需資訊")){ val arrs = value.split("\n") var map:Map[String,String] = Map() var key = "" for(a <- arrs){ key = a.split(":")(0) map.put(key,a.split(key+":")(1)) } out.collect(map) } } }).uid("test").name("test").setParallelism(2) // 自定義PostgreSQL Source,週期性地從PostgreSQL中獲取過濾資料,並廣播出去 val pgsource = new PostgreSQLSource(postgresqlHost, postgresqlPort, postgresqlDB, postgresqlUser, postgresqlPassword, postgresqlInterval) val configStream: SingleOutputStreamOperator[ListBuffer[String]] = env.addSource(pgsource) .uid("DataListFromPostgreSQL") .name("DataListFromPostgreSQL").setParallelism(3) // 將過濾流廣播,形成BroadcastStream val filterRulesStream: BroadcastStream[ListBuffer[String]] = configStream.broadcast(filterRules) //實時流和過濾流連線 val connectedStream: BroadcastConnectedStream[Map[String, String], ListBuffer[String]] =alarmEventStream.connect(filterRulesStream) val filter = new FilterSpecificProblemIDProcessFunction() val filterStream: SingleOutputStreamOperator[Map[String,String]] = connectedStream.process(filter) // 將過濾後的資料寫入 PostgreSQL filterStream.addSink( new ESBPostgreSQLSink (postgresqlHost, postgresqlPort, postgresqlDB, postgresqlUser, postgresqlPassword)) .uid("PostgresSQLSink") .name("PostgresSQLSink") .setParallelism(1) //執行任務 env.execute("ESBWirelessAlarmDataProcessing") } }
3.3 自定義source
在提取pg資料庫過濾資料的時候,由於flink沒有實現pg資料庫的source,所以需要我們自己定義一個source
class PostgreSQLSource(host:String,port:Int,db:String,username:String,pass:String,secondInterval:Int) extends RichSourceFunction[ListBuffer[String]]{ private var connection: Connection = null private var ps: PreparedStatement = null //表示資料來源是否執行正常 var running: Boolean = true override def open(parameters: Configuration): Unit = { super.open(parameters) Class.forName("org.postgresql.Driver") connection = DriverManager.getConnection("jdbc:postgresql://" + host + ":" + port + "/" + db, username, pass) val sql = "select testData from testTable" ps = connection.prepareStatement(sql) } override def close(): Unit = { if (connection!=null){ connection.close() } if (ps !=null){ ps.close() } } override def cancel(): Unit = { running = false } override def run(ctx: SourceFunction.SourceContext[ListBuffer[String]]): Unit = { try { while (running) { var listBuffer: ListBuffer[String] = ListBuffer() var res: ResultSet = ps.executeQuery; while (res.next()) { val testData = res.getString("testData"); listBuffer += testData } ctx.collect(listBuffer) } }catch { case e : Exception=>{ println("出現異常") } } } }
3.4 自定義BroadcastProcessFunction
在處理實時流和過濾流連線後的連線流時,需要我們傳入一個BroadcastProcessFunction,這裡面我們自定義一個類繼承BroadcastProcessFunction,然後再完成我們的業務邏輯
class FilterSpecificProblemIDProcessFunction extends BroadcastProcessFunction[Map[String,String],ListBuffer[String],Map[String,String]]{ /** * 定義MapStateDescriptor 狀態描述符 */ val filterRules: MapStateDescriptor[String, String] = new MapStateDescriptor("filter_rule", BasicTypeInfo.STRING_TYPE_INFO, Types.STRING) override def processElement(value: Map[String, String], readOnlyContext: BroadcastProcessFunction[Map[String, String], ListBuffer[String], Map[String, String]]#ReadOnlyContext, collector: Collector[Map[String,String]]): Unit = { //實時流id val id:String = value.getOrElse("id","NIL") //獲取連線狀態 val broadcastState: ReadOnlyBroadcastState[String, String] = readOnlyContext.getBroadcastState(filterRules) if (broadcastState.contains(id)){ collector.collect(value) } } override def processBroadcastElement(value: ListBuffer[String], context: BroadcastProcessFunction[Map[String, String], ListBuffer[String], Map[String, String]]#Context, collector: Collector[ Map[String, String]]): Unit = { if (value.size == 0 || value == null){ return } val broadcastState: BroadcastState[String, String]= context.getBroadcastState(filterRules) //清空狀態 broadcastState.clear() //更新狀態 for (i <- 0 until value.size) { broadcastState.put(value.apply(i), value.apply(i)) } } }
3.5 自定義sink
同理,在將資料存入pg資料庫的時候,由於flink沒有實現pg資料庫的sink,所以需要我們自己定義一個sink
class ESBPostgreSQLSink(postgresqlHost: String, postgresqlPort: Int, postgresqlDB: String, postgresqlUser: String, postgresqlPassword: String) extends RichSinkFunction[Map[String,String]] { private var connection:Connection = null private var psInsert: PreparedStatement = null private var psUpdate: PreparedStatement = null private var psInsertUpdate: PreparedStatement = null override def open(parameters: Configuration): Unit = { super.open(parameters) Class.forName("org.postgresql.Driver") connection = DriverManager.getConnection("jdbc:postgresql://" + postgresqlHost + ":" + postgresqlPort + "/" + postgresqlDB, postgresqlUser, postgresqlPassword) val insert = "新增" val update ="更新" val insertUpdate = "存在則更新,不存在則新增" psInsert = this.connection.prepareStatement(insert) psUpdate = this.connection.prepareStatement(update) psInsertUpdate = this.connection.prepareStatement(insertUpdate) } override def close(): Unit = { super.close() //關閉連線和釋放資源 if (connection != null) connection.close() if (psInsert != null) psInsert.close() if (psUpdate != null) psUpdate.close() } override def invoke(value: Map[String, String], context: SinkFunction.Context[_]): Unit = { //此處就是具體插入資料庫的業務邏輯 } } } }
注意事項
1 Flink每次做Checkpoint的時候,會Flush緩衝區的資料,以及將Pending(已經完成的檔案,但為被Checkpoint記錄,可以通過sink.setPendingSuffix("xxx")來設定)結尾的檔案記錄下來
2 Flink每60秒(可以通過sink.setInactiveBucketCheckInterval(60 * 1000)來進行設定)檢測,如果一個檔案的FSDataOutputStream在60秒內(可以通過sink.setInactiveBucketThreshold(60 * 1000)來設定),都還沒有接收到資料,Flink就會認為該檔案是不活躍的Bucket,那麼就會被Flush後關閉該檔案;
3 在Flink內部封裝了一個集合Map<String, BucketState<T>> bucketStates = new HashMap<>();用來記錄當前正在使用的檔案,key是檔案的路徑,BucketState內部封裝了該檔案的所有資訊,包括建立時間,最後一次寫入時間(這裡的寫入指的是寫入快取區的時間,不是Flush的時間)。當前檔案是開啟還是關閉,寫緩衝區的方法。都在這裡。每次Flink要對檔案進行操作的時候,都會從這裡拿到檔案的封裝物件;
4 在程式碼裡設定合理並行度是一個需要長久觀察和除錯的過程!!可能也是我後面需要更加深入學習flink的地方!!並行度並不是設定的越高越好,太高會浪費資源,並且效率也不會提升太多,太少則會造成執行效率低下!主要還是要根據具體的業務邏輯取設定比較合理
總結
總體來說,相對於spark streaming,flink的程式碼略顯複雜!畢竟也是處在一個上升的階段,各方面還是有待完善的地方!但是,對於業務有很高的實時性要求的,flink絕對是一個不錯的選擇,處理速度是非常快的!這篇部落格就和大家分享到這裡,如果大家在研究學習的過程當中有什麼問題,可以傳送郵件給我,我會盡我所能為您解答,與君共勉!