1. 程式人生 > 其它 >Flink 周練(一)

Flink 周練(一)

1、自定義Flink資料來源,按照如下要求造資料
資料格式{
    "adsId": 1,
    "userId": 1,
    "provinceName":"山西"
    "timestamp": 1636690000
}
adsId是廣告id,取值範圍為1-10
userId是用id,取值1-50000
provinceName為省份,取值範圍為 北京,山西,山東,河南,河北,上海,福建,廣州
timestamp秒時間戳
2、建立Flink程式讀取自定義資料來源。
3、將讀取到的資料封裝成樣例類。
4、設定時間時間語義,使用timestamp作為時間參考。
5、通過側流收集遲到資料。(注意在造資料時製造一些遲到資料
6、設定1分鐘的滾動視窗,水印為5秒。 7、1分鐘內的資料列印到控制檯,每條資料包含視窗的開始時間和視窗結束時間。 8、統計每分鐘,每個廣告的點選次數。 9、統計每分鐘,廣告點選排名前3的廣告資訊。 10、將遲到資料儲存到kafka。


import com.alibaba.fastjson.JSON
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
import org.apache.kafka.clients.producer.ProducerRecord

import java.nio.charset.StandardCharsets
import java.time.Duration
import java.util.{Date, Properties}
import scala.beans.BeanProperty
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import scala.util.Random

object Test01 { def main(args: Array[String]): Unit = { //流處理的上下文環境 val env = StreamExecutionEnvironment.getExecutionEnvironment //設定並行度為1 env.setParallelism(1) //2、建立Flink程式讀取自定義資料來源。 獲取到自定義資料來源的資料 val streamDS: DataStream[ADS] = env.addSource(new MySource) //3、將讀取到的資料封裝成樣例類。 將資料進行還原為ADS實體類
.map(JSON.parseObject(_, classOf[ADS])) //5、通過側流收集遲到資料。(注意在造資料時製造一些遲到資料) val tag: OutputTag[ADS] = new OutputTag[ADS]("tag") //4、設定時間時間語義,使用timestamp作為時間參考。 //6、設定1分鐘的滾動視窗,水印為5秒。 val dataDS: DataStream[ADS] = streamDS .assignTimestampsAndWatermarks(WatermarkStrategy .forBoundedOutOfOrderness[ADS](Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner[ADS] { override def extractTimestamp(element: ADS, recordTimestamp: Long): Long = element.timestamp })) //7、1分鐘內的資料列印到控制檯,每條資料包含視窗的開始時間和視窗結束時間。 val windowDS: WindowedStream[ADS, Int, TimeWindow] = dataDS .keyBy(_.adsId) //根據id進行分組 .window(TumblingEventTimeWindows.of(Time.minutes(1))) //定義滾動視窗大小 .sideOutputLateData(tag) //收集側流資料 //8、統計每分鐘,每個廣告的點選次數。 統計每個廣告的點選次數 以及視窗資訊 val resDS: DataStream[ADSCount] = windowDS.aggregate(new MyAggreate, new MyWindow) //8、統計每分鐘,每個廣告的點選次數。 resDS.print("主流:") val tagDS: DataStream[ADS] = windowDS .aggregate(new MyAggreate, new MyWindow) .getSideOutput(tag) tagDS.print("測流:") //9、統計每分鐘,廣告點選排名前3的廣告資訊。 resDS .keyBy(_.end) .process(new MyProcess) .print("前三:") //10、將遲到資料儲存到kafka。 val properties = new Properties() properties.setProperty("bootstrap.servers", "hdp1:9092,hdp2:9092,hdp3:9092") val serializationSchema = new KafkaSerializationSchema[String] { override def serialize(element: String, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = new ProducerRecord[Array[Byte], Array[Byte]]( "test", // target topic element.getBytes(StandardCharsets.UTF_8)) // record contents } val myProducer = new FlinkKafkaProducer[String]( "test", // target topic serializationSchema, // serialization schema properties, // producer config FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // fault-tolerance tagDS.map(_.toString).addSink(myProducer) //執行流處理 env.execute() } } //廣告點選排名前3的廣告資訊 class MyProcess extends KeyedProcessFunction[Long,ADSCount,String] { //定義儲存狀態 建立list集合 儲存廣告資訊 val list = new ListStateDescriptor[ADSCount]("buffered-elements", TypeInformation.of(new TypeHint[ADSCount]() {})) lazy val listState: ListState[ADSCount] = getRuntimeContext.getListState(list) override def processElement(i: ADSCount, context: KeyedProcessFunction[Long, ADSCount, String]#Context, collector: Collector[String]): Unit = { //將資料放入list listState.add(i) //建立定時器 在一定時間段內對廣告點選次數進行區分 context.timerService().registerEventTimeTimer(i.end) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ADSCount, String]#OnTimerContext, out: Collector[String]): Unit = { //對資料進行排序 獲取前三 out.collect(listState.get().asScala.toList.sortBy(-_.count).take(3).toString()) } } //輸出格式的樣例類 case class ADSCount(start:Long,end:Long,key:Int,count:Int) //輸出格式 //Int Out Key W class MyWindow extends WindowFunction[Int,ADSCount,Int,TimeWindow] { override def apply(key: Int, window: TimeWindow, input: Iterable[Int], out: Collector[ADSCount]): Unit = { for (elem <- input) { out.collect(ADSCount(window.getStart,window.getEnd,key,elem)) } } } //求廣告點選次數 //IN ACC OUT class MyAggreate extends AggregateFunction[ADS,Int,Int] { //初始化 override def createAccumulator(): Int = 0 //各分支統計個數 override def add(in: ADS, acc: Int): Int = acc + 1 //合併分支 override def merge(acc: Int, acc1: Int): Int = acc + acc1 //返回結果 override def getResult(acc: Int): Int = acc } //建立樣例類 case class ADS(@BeanProperty adsId:Int,@BeanProperty userId:Long,@BeanProperty provinceName:String,@BeanProperty timestamp:Long) //1、自定義Flink資料來源,按照如下要求造資料。(10分) class MySource extends SourceFunction[String] { override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = { val arr: Array[String] = Array("北京", "山西", "山東", "河南", "河北", "上海", "福建", "廣州") var count = 0 while (true){ count = count + 1 val adsId = Random.nextInt(10) + 1 //adsId是廣告id,取值範圍為1-10 val userId = Random.nextInt(50000) + 1 //userId是用id,取值1-50000 val provinceName = arr(Random.nextInt(8))//provinceName為省份,取值範圍為 北京,山西,山東,河南,河北,上海,福建,廣州 //判斷髮送5條資料時 if (count % 5 == 0){ val data: String = JSON.toJSON(ADS(adsId, userId, provinceName, new Date().getTime - 20000)).toString //注意在造資料時製造一些遲到資料 sourceContext.collect(data) }else{ val data: String = JSON.toJSON(ADS(adsId, userId, provinceName, new Date().getTime)).toString //普通資料 sourceContext.collect(data) } //睡眠 Thread.sleep(1000) } } override def cancel(): Unit = ??? }