資料案例----資料寫入Kafka、flink來消費
阿新 • • 發佈:2022-03-23
資料案例----資料寫入Kafka、flink消費
1、建立生產者,將資料寫入Kafka
package com.shujia.flink.dx import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import scala.io.Source object Demo1DataToKafka { def main(args: Array[String]): Unit = { /** * 1、建立生產者 */ val properties = new Properties() //1、kafka broker列表 properties.setProperty("bootstrap.servers", "master:9092,node1:9092,node2:9092") //2、指定kv的序列化類 properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val kafkaProducer = new KafkaProducer[String, String](properties) //讀取電信資料 val data: List[String] = Source.fromFile("data/dianxin_data").getLines().toList for (line <- data) { val record = new ProducerRecord[String, String]("dianxin", line) kafkaProducer.send(record) kafkaProducer.flush() //設定100ms一條資料 Thread.sleep(100) } kafkaProducer.close() } }
2、建立消費者來消費kafka中的資料,並將需求的結果儲存到MySQL
package com.shujia.flink.dx import java.sql.{Connection, DriverManager, PreparedStatement} import java.util.Properties import org.apache.flink.api.common.functions.{ReduceFunction, RuntimeContext} import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.common.state.{MapState, MapStateDescriptor, ReducingState, ReducingStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.util.Collector object Demo2CityFlow { def main(args: Array[String]): Unit = { /** * 實時統計每個城市的人瀏量 * 需要對手機號去重 */ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment /** * 讀取kafka中的資料 */ val properties = new Properties() //broler地址列表 properties.setProperty("bootstrap.servers", "master:9092,node1:9092,node2:9092") //消費者組,同一條資料在一個組內只處理一次 properties.setProperty("group.id", "asdasdsa") //建立消費者 val flinkKakfaConsumer = new FlinkKafkaConsumer[String]( "dianxin", //指定topic new SimpleStringSchema(), //指定資料格式 properties //指定配置檔案物件 ) flinkKakfaConsumer.setStartFromEarliest() // 儘可能從最早的記錄開始 val dianxinDS: DataStream[String] = env.addSource(flinkKakfaConsumer) /** * 取出城市編碼和手機號 */ val kvDS: DataStream[(String, String)] = dianxinDS.map(line => { val split: Array[String] = line.split(",") val mdn: String = split(0) val city: String = split(2) (city, mdn) }) //按照城市分組 val keyByDS: KeyedStream[(String, String), String] = kvDS.keyBy(_._1) //統計人流量 val cityCountDS: DataStream[(String, Int)] = keyByDS.process(new KeyedProcessFunction[String, (String, String), (String, Int)] { /** * map 狀態 * 使用map的key儲存手機號,map的value不用 */ var mapState: MapState[String, Int] = _ var reduceState: ReducingState[Int] = _ override def open(parameters: Configuration): Unit = { val context: RuntimeContext = getRuntimeContext //用於手機號去重的狀態 val mapStateDesc = new MapStateDescriptor[String, Int]("mdns", classOf[String], classOf[Int]) mapState = context.getMapState(mapStateDesc) //用於統計人流量的狀態 val reduceStateDesc = new ReducingStateDescriptor[Int]("count", new ReduceFunction[Int] { override def reduce(x: Int, y: Int): Int = x + y }, classOf[Int]) reduceState = context.getReducingState(reduceStateDesc) } override def processElement( value: (String, String), ctx: KeyedProcessFunction[String, (String, String), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = { val (city, mdn) = value //1、判斷當前手機號是否出現過 //如果手機號出現過,不需要做任務處理 //如果沒有出現過,在之前的統計基礎上加1 if (!mapState.contains(mdn)) { //將當前手機號儲存到狀態中 mapState.put(mdn, 1) //人流量加1 reduceState.add(1) //獲取最新的人流量 val count: Int = reduceState.get() //將資料傳送到下游 out.collect((city, count)) } } }) /** * 將結果儲存到mysql */ cityCountDS.addSink(new RichSinkFunction[(String, Int)] { override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = { val (city, num) = value stat.setString(1, city) stat.setInt(2, num) stat.execute() } var con: Connection = _ var stat: PreparedStatement = _ override def open(parameters: Configuration): Unit = { //1、載入驅動 Class.forName("com.mysql.jdbc.Driver") //建立連結 con = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=utf-8", "root", "123456") //編寫sql stat = con.prepareStatement("replace into city_count(city,num) values(?,?)") } override def close(): Unit = { stat.close() con.close() } }) env.execute() } }