flink讀取不到檔案_Flink流處理API——Source
阿新 • • 發佈:2021-01-04
技術標籤:flink讀取不到檔案
本文主要從以下幾個方面介紹Flink的流處理API——Source
一、從集合中讀取資料
二、從檔案中讀取資料
三、從Kafka中讀取資料
四、自定義Source
資料處理的過程基本可以分為三個階段分別是,資料從來哪裡,做什麼業務邏輯,落地到哪裡去。
這三部分在Flink中分別被稱為Source、Transform和Sink
版本:
scala:2.11.12
Kafka:0.8.2.2
Flink:1.7.2
pom.xml依賴部分(log日誌的依賴一定要加上,否則當Flink從Kafka0.8中讀取資料報Failed to instantiate SLF4J LoggerFactory Reported exception)
org.apache.flink flink-scala_2.11 1.7.2org.apache.flink flink-streaming-scala_2.11 1.7.2providedorg.apache.flink flink-clients_2.11 1.7.2org.apache.flink flink-connector-kafka-0.8_2.11 1.7.2org.slf4j slf4j-api 1.7.22org.slf4j slf4j-log4j12 1.7.22org.apache.bahir flink-connector-redis_2.11 1.0mysql mysql-connector-java 5.1.38
一、從集合中讀取資料
package xxximport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}// 樣例類,感測器ID,時間戳,溫度 (後面都使用這個樣例類作為資料的型別)case class SensorReading(id: String, timestamo: Long, temperature: Double){ override def toString: String = { id+":"+ timestamo.toString + "," + temperature }}/***從集合中讀取資料*/object Sensor { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val stream1: DataStream[SensorReading] = environment.fromCollection(List( SensorReading("sensor_1", 1547718199, 35.80018327300259), SensorReading("sensor_6", 1547718201, 15.402984393403084), SensorReading("sensor_7", 1547718202, 6.720945201171228), SensorReading("sensor_10", 1547718205, 38.101067604893444) )) stream1.print("Stream1:").setParallelism(1) environment.execute() }}
二、從檔案中讀取資料
package xxximport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}// 樣例類,感測器ID,時間戳,溫度case class SensorReading(id: String, timestamo: Long, temperature: Double){ override def toString: String = { id+":"+ timestamo.toString + "," + temperature }}/***從檔案中讀取資料*/object Sensor { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val stream2: DataStream[String] = environment.readTextFile( "D:ScalaCodeFlinkTestsrcmainesourcessensor.txt") stream2.print("Stream2:").setParallelism(1) environment.execute() }}
三、從Kafka中讀取資料
Kafka的brokerList:slave1:9092,slave2:9092,slave3:9092
zookeeper叢集:slave2:2181,slave3:2181,slave3:2181
package xxximport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08/** * 從kafka中讀取資料 */object ReadDataFromKafka { def main(args: Array[String]): Unit = { // 設定讀取的kafka引數 val properties = new Properties() properties.setProperty("bootstrap.servers", "slave1:9092,slave2:9092,slave3:9092") properties.setProperty("group.id", "flink_group1") properties.setProperty("zookeeper.connect", "slave2:2181,slave3:2181.slave4:2181") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // key的反序列化 properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // value的反序列化 properties.setProperty("auto.offset.reset", "latest") // 偏移量 val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // 連結kafka讀取資料 val kafkaStream: DataStream[String] = environment.addSource(new FlinkKafkaConsumer08[String]("sensor", new SimpleStringSchema(), properties)) kafkaStream.print().setParallelism(1) environment.execute("readDataFromKafka") }}
四、自定義Source
package xxximport org.apache.flink.streaming.api.functions.source.SourceFunctionimport org.apache.flink.streaming.api.scala._import scala.util.Random/** * 自定義Source */object ReadDataFromMySource { def main(args: Array[String]): Unit = { val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val dataStream: DataStream[String] = environment.addSource(new MySource()) dataStream.print().setParallelism(1) environment.execute("MySource") }}class MySource extends SourceFunction[String]{ // 表示資料來源是否正常執行 var running:Boolean = true // 資料正常生成 override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = { val random = new Random() var temp = 1.to(10).map( i => (i, 100 + random.nextGaussian() * 100) ) while (running){ // 更新數值 temp = temp.map( t=>(t._1, t._2 + random.nextGaussian()) ) // 當前時間 val curTime = System.currentTimeMillis() temp.foreach(t=>{ sourceContext.collect(curTime+": "+ t._1 + "--> "+ t._2) }) Thread.sleep(500) } } // 取消資料生成 override def cancel(): Unit ={ running = false }}