1. 程式人生 > 其它 >flink 自定義filter

flink 自定義filter

資料

sensor_1,1547718101,35.8
sensor_1,1547718102,22.2
sensor_1,1547718101,55.3
sensor_1,1547718102,24.1
sensor_1,1547718103,57
sensor_1,1547718103,58
sensor_1,1547718103,59
sensor_6,1547718101,15.4
sensor_7,1547718102,6.7
sensor_10,1547718205,38.1

  

程式碼

import org.apache.flink.api.common.functions.{FilterFunction, ReduceFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ object TransformTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //0.從檔案中讀取資料 val inputPath = "D:\\ideaDemo\\maven_flink\\src\\main\\resources\\sensor.txt"; val inputStream
= env.readTextFile(inputPath) //1.先轉換成樣例類型別(簡單轉換操作) val dataStream = inputStream.map(data => { val arr = data.split(",") SensorReding(arr(0), arr(1).toLong, arr(2).toDouble) }) .filter(new MyFilter) env.execute() } //自定義一個函式類 class MyFilter extends FilterFunction[SensorReding]{ override def filter(t: SensorReding): Boolean
= { t.id.startsWith("sensor") } }