1. 程式人生 > 其它 >flink讀取不到檔案_Flink流處理API——Source

flink讀取不到檔案_Flink流處理API——Source

技術標籤:flink讀取不到檔案

本文主要從以下幾個方面介紹Flink的流處理API——Source

一、從集合中讀取資料

二、從檔案中讀取資料

三、從Kafka中讀取資料

四、自定義Source

資料處理的過程基本可以分為三個階段分別是,資料從來哪裡,做什麼業務邏輯,落地到哪裡去。

這三部分在Flink中分別被稱為Source、Transform和Sink

版本:

scala:2.11.12

Kafka:0.8.2.2

Flink:1.7.2

pom.xml依賴部分(log日誌的依賴一定要加上,否則當Flink從Kafka0.8中讀取資料報Failed to instantiate SLF4J LoggerFactory Reported exception)

    org.apache.flink            flink-scala_2.11            1.7.2org.apache.flink            flink-streaming-scala_2.11            1.7.2providedorg.apache.flink            flink-clients_2.11            1.7.2org.apache.flink            flink-connector-kafka-0.8_2.11            1.7.2org.slf4j            slf4j-api            1.7.22org.slf4j            slf4j-log4j12            1.7.22org.apache.bahir            flink-connector-redis_2.11            1.0mysql            mysql-connector-java            5.1.38

一、從集合中讀取資料

package xxximport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}// 樣例類,感測器ID,時間戳,溫度 (後面都使用這個樣例類作為資料的型別)case class SensorReading(id: String, timestamo: Long, temperature: Double){  override def toString: String = {    id+":"+ timestamo.toString + "," + temperature  }}/***從集合中讀取資料*/object Sensor {  def main(args: Array[String]): Unit = {    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment        import  org.apache.flink.api.scala._    val stream1: DataStream[SensorReading] = environment.fromCollection(List(      SensorReading("sensor_1", 1547718199, 35.80018327300259),      SensorReading("sensor_6", 1547718201, 15.402984393403084),      SensorReading("sensor_7", 1547718202, 6.720945201171228),      SensorReading("sensor_10", 1547718205, 38.101067604893444)    ))    stream1.print("Stream1:").setParallelism(1)    environment.execute()  }}

二、從檔案中讀取資料

b889b64fadaee58e68f9bcf9547143ed.png
package xxximport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}// 樣例類,感測器ID,時間戳,溫度case class SensorReading(id: String, timestamo: Long, temperature: Double){  override def toString: String = {    id+":"+ timestamo.toString + "," + temperature  }}/***從檔案中讀取資料*/object Sensor {  def main(args: Array[String]): Unit = {    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment     val stream2: DataStream[String] = environment.readTextFile(       "D:ScalaCodeFlinkTestsrcmainesourcessensor.txt")    stream2.print("Stream2:").setParallelism(1)    environment.execute()  }}

三、從Kafka中讀取資料

Kafka的brokerList:slave1:9092,slave2:9092,slave3:9092

zookeeper叢集:slave2:2181,slave3:2181,slave3:2181

package xxximport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08/** * 從kafka中讀取資料 */object ReadDataFromKafka {  def main(args: Array[String]): Unit = {    // 設定讀取的kafka引數    val properties = new Properties()    properties.setProperty("bootstrap.servers", "slave1:9092,slave2:9092,slave3:9092")    properties.setProperty("group.id", "flink_group1")    properties.setProperty("zookeeper.connect", "slave2:2181,slave3:2181.slave4:2181")    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // key的反序列化    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // value的反序列化    properties.setProperty("auto.offset.reset", "latest") // 偏移量    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    // 連結kafka讀取資料    val kafkaStream: DataStream[String] = environment.addSource(new FlinkKafkaConsumer08[String]("sensor",      new SimpleStringSchema(), properties))    kafkaStream.print().setParallelism(1)    environment.execute("readDataFromKafka")  }}

四、自定義Source

package xxximport org.apache.flink.streaming.api.functions.source.SourceFunctionimport org.apache.flink.streaming.api.scala._import scala.util.Random/** * 自定義Source */object ReadDataFromMySource {  def main(args: Array[String]): Unit = {    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    val dataStream: DataStream[String] = environment.addSource(new MySource())    dataStream.print().setParallelism(1)    environment.execute("MySource")      }}class MySource extends  SourceFunction[String]{  // 表示資料來源是否正常執行  var running:Boolean = true  // 資料正常生成  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {    val random = new Random()    var temp = 1.to(10).map(      i => (i, 100 + random.nextGaussian() * 100)    )        while (running){      // 更新數值      temp = temp.map(        t=>(t._1, t._2 + random.nextGaussian())      )      // 當前時間      val curTime = System.currentTimeMillis()      temp.foreach(t=>{        sourceContext.collect(curTime+": "+ t._1 + "--> "+ t._2)      })      Thread.sleep(500)    }  }  // 取消資料生成  override def cancel(): Unit ={    running = false  }}