Flink 周練(一)
阿新 • • 發佈:2022-05-05
1、自定義Flink資料來源,按照如下要求造資料 資料格式{ "adsId": 1, "userId": 1, "provinceName":"山西" "timestamp": 1636690000 } adsId是廣告id,取值範圍為1-10 userId是用id,取值1-50000 provinceName為省份,取值範圍為 北京,山西,山東,河南,河北,上海,福建,廣州 timestamp秒時間戳 2、建立Flink程式讀取自定義資料來源。 3、將讀取到的資料封裝成樣例類。 4、設定時間時間語義,使用timestamp作為時間參考。 5、通過側流收集遲到資料。(注意在造資料時製造一些遲到資料6、設定1分鐘的滾動視窗,水印為5秒。 7、1分鐘內的資料列印到控制檯,每條資料包含視窗的開始時間和視窗結束時間。 8、統計每分鐘,每個廣告的點選次數。 9、統計每分鐘,廣告點選排名前3的廣告資訊。 10、將遲到資料儲存到kafka。
import com.alibaba.fastjson.JSON import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy} import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation} import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema import org.apache.kafka.clients.producer.ProducerRecord import java.nio.charset.StandardCharsets import java.time.Duration import java.util.{Date, Properties} import scala.beans.BeanProperty import scala.collection.JavaConverters.iterableAsScalaIterableConverter import scala.util.Randomobject Test01 { def main(args: Array[String]): Unit = { //流處理的上下文環境 val env = StreamExecutionEnvironment.getExecutionEnvironment //設定並行度為1 env.setParallelism(1) //2、建立Flink程式讀取自定義資料來源。 獲取到自定義資料來源的資料 val streamDS: DataStream[ADS] = env.addSource(new MySource) //3、將讀取到的資料封裝成樣例類。 將資料進行還原為ADS實體類.map(JSON.parseObject(_, classOf[ADS])) //5、通過側流收集遲到資料。(注意在造資料時製造一些遲到資料) val tag: OutputTag[ADS] = new OutputTag[ADS]("tag") //4、設定時間時間語義,使用timestamp作為時間參考。 //6、設定1分鐘的滾動視窗,水印為5秒。 val dataDS: DataStream[ADS] = streamDS .assignTimestampsAndWatermarks(WatermarkStrategy .forBoundedOutOfOrderness[ADS](Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner[ADS] { override def extractTimestamp(element: ADS, recordTimestamp: Long): Long = element.timestamp })) //7、1分鐘內的資料列印到控制檯,每條資料包含視窗的開始時間和視窗結束時間。 val windowDS: WindowedStream[ADS, Int, TimeWindow] = dataDS .keyBy(_.adsId) //根據id進行分組 .window(TumblingEventTimeWindows.of(Time.minutes(1))) //定義滾動視窗大小 .sideOutputLateData(tag) //收集側流資料 //8、統計每分鐘,每個廣告的點選次數。 統計每個廣告的點選次數 以及視窗資訊 val resDS: DataStream[ADSCount] = windowDS.aggregate(new MyAggreate, new MyWindow) //8、統計每分鐘,每個廣告的點選次數。 resDS.print("主流:") val tagDS: DataStream[ADS] = windowDS .aggregate(new MyAggreate, new MyWindow) .getSideOutput(tag) tagDS.print("測流:") //9、統計每分鐘,廣告點選排名前3的廣告資訊。 resDS .keyBy(_.end) .process(new MyProcess) .print("前三:") //10、將遲到資料儲存到kafka。 val properties = new Properties() properties.setProperty("bootstrap.servers", "hdp1:9092,hdp2:9092,hdp3:9092") val serializationSchema = new KafkaSerializationSchema[String] { override def serialize(element: String, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = new ProducerRecord[Array[Byte], Array[Byte]]( "test", // target topic element.getBytes(StandardCharsets.UTF_8)) // record contents } val myProducer = new FlinkKafkaProducer[String]( "test", // target topic serializationSchema, // serialization schema properties, // producer config FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // fault-tolerance tagDS.map(_.toString).addSink(myProducer) //執行流處理 env.execute() } } //廣告點選排名前3的廣告資訊 class MyProcess extends KeyedProcessFunction[Long,ADSCount,String] { //定義儲存狀態 建立list集合 儲存廣告資訊 val list = new ListStateDescriptor[ADSCount]("buffered-elements", TypeInformation.of(new TypeHint[ADSCount]() {})) lazy val listState: ListState[ADSCount] = getRuntimeContext.getListState(list) override def processElement(i: ADSCount, context: KeyedProcessFunction[Long, ADSCount, String]#Context, collector: Collector[String]): Unit = { //將資料放入list listState.add(i) //建立定時器 在一定時間段內對廣告點選次數進行區分 context.timerService().registerEventTimeTimer(i.end) } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ADSCount, String]#OnTimerContext, out: Collector[String]): Unit = { //對資料進行排序 獲取前三 out.collect(listState.get().asScala.toList.sortBy(-_.count).take(3).toString()) } } //輸出格式的樣例類 case class ADSCount(start:Long,end:Long,key:Int,count:Int) //輸出格式 //Int Out Key W class MyWindow extends WindowFunction[Int,ADSCount,Int,TimeWindow] { override def apply(key: Int, window: TimeWindow, input: Iterable[Int], out: Collector[ADSCount]): Unit = { for (elem <- input) { out.collect(ADSCount(window.getStart,window.getEnd,key,elem)) } } } //求廣告點選次數 //IN ACC OUT class MyAggreate extends AggregateFunction[ADS,Int,Int] { //初始化 override def createAccumulator(): Int = 0 //各分支統計個數 override def add(in: ADS, acc: Int): Int = acc + 1 //合併分支 override def merge(acc: Int, acc1: Int): Int = acc + acc1 //返回結果 override def getResult(acc: Int): Int = acc } //建立樣例類 case class ADS(@BeanProperty adsId:Int,@BeanProperty userId:Long,@BeanProperty provinceName:String,@BeanProperty timestamp:Long) //1、自定義Flink資料來源,按照如下要求造資料。(10分) class MySource extends SourceFunction[String] { override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = { val arr: Array[String] = Array("北京", "山西", "山東", "河南", "河北", "上海", "福建", "廣州") var count = 0 while (true){ count = count + 1 val adsId = Random.nextInt(10) + 1 //adsId是廣告id,取值範圍為1-10 val userId = Random.nextInt(50000) + 1 //userId是用id,取值1-50000 val provinceName = arr(Random.nextInt(8))//provinceName為省份,取值範圍為 北京,山西,山東,河南,河北,上海,福建,廣州 //判斷髮送5條資料時 if (count % 5 == 0){ val data: String = JSON.toJSON(ADS(adsId, userId, provinceName, new Date().getTime - 20000)).toString //注意在造資料時製造一些遲到資料 sourceContext.collect(data) }else{ val data: String = JSON.toJSON(ADS(adsId, userId, provinceName, new Date().getTime)).toString //普通資料 sourceContext.collect(data) } //睡眠 Thread.sleep(1000) } } override def cancel(): Unit = ??? }