Flink的流處理API(二)
一、Environment
1,getExecutionEnvironment
getExecutionEnvironment會根據查詢執行的方式決定返回什麼樣的執行環境,是最常用的一種建立執行環境的方式。
2,createLocalEnvironment
返回本地執行環境,需要在呼叫時指定預設的並行度。
val env = StreamExecutionEnvironment.createLocalEnvironment(1) //parallelism
3,createRemoteEnvironment
返回叢集執行環境,將Jar提交到遠端伺服器。需要在呼叫時指定JobManager的IP和埠號,並指定要在叢集中執行的Jar包。
//hostname port jarFiles val env = ExecutionEnvironment.createRemoteEnvironment(host, port,"/flink/wc.jar")
4,maven依賴
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.12</artifactId> <version>1.7.0</version> </dependency> </dependencies> <build> <plugins> <!-- 該外掛用於將Scala程式碼編譯成class檔案 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.4.6</version> <executions> <execution> <!-- 宣告繫結到maven的compile階段 --> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
二、Source
1,基本讀取方式
//檔案中讀取 val fileDs = env.readTextFile("in/tbStock.txt") //埠讀取 val socketDs = env.socketTextStream("localhost",777) //集合中獲取 val collectDs = env.fromCollection(List("aaa","bbb","ccc","aaa"))
2,kafka source
//kafka配置檔案 val properties = new Properties() properties.setProperty("bootstrap.servers", "hadoop102:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest") //接收kafka的topic-demo這個topic發來的資料 val kafkaDataStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer011[String]("topic-demo", new SimpleStringSchema(), properties))
3,Flink Kafa如何實現exactly-once
可參考: https://www.aboutyun.com/forum.php?mod=viewthread&tid=27395
Flink通過checkpoint來儲存資料是否處理完成的狀態
由JobManager協調各個TaskManager進行checkpoint儲存,checkpoint儲存在 StateBackend中,預設StateBackend是記憶體級的,也可以改為檔案級的進行持久化儲存。
執行過程實際上是一個兩段式提交,每個運算元執行完成,會進行“預提交”,直到執行完sink操作,會發起“確認提交”,如果執行失敗,預提交會放棄掉。
如果宕機需要通過StateBackend進行恢復,只能恢復所有確認提交的操作。
4,自定義source
env.addSource(new MySource) //自定義source class MySource extends SourceFunction[(String,Double)] { //flag: 表示資料來源是否還在正常執行 var running: Boolean = true override def cancel(): Unit = { running = false } override def run(ctx: SourceFunction.SourceContext[(String,Double)]): Unit = { //初始化一個隨機數發生器 val rand = new Random() var curTemp = 1.to(10).map( i => ("item_" + i, 65 + rand.nextGaussian() * 20) ) while (running) { curTemp.foreach( t => ctx.collect(t) ) Thread.sleep(5000) //每5秒鐘產生一組資料 } } }
三、Transform
1,基本轉換運算元
//map val streamMap = stream.map { x => x * 2 } //flatmap val streamFlatMap = stream.flatMap{ x => x.split(" ") } //filter val streamFilter = stream.filter{ x => x == 1 }
2,KeyBy與Reduce
keyBy(DataStream → KeyedStream):輸入必須是Tuple型別,邏輯地將一個流拆分成不相交的分割槽,每個分割槽包含具有相同key的元素,在內部以hash的形式實現的。
reduce(KeyedStream → DataStream):一個分組資料流的聚合操作,合併當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是隻返回最後一次聚合的最終結果。
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0) //reduce //sum keyedStream.reduce{ (ch1,ch2)=> (ch1._1,ch1._2+ch2._2) }.print()
3,Split和Select
split(DataStream → SplitStream):根據某些特徵把一個DataStream拆分成兩個或者多個DataStream。
select(SplitStream→DataStream):從一個SplitStream中獲取一個或者多個DataStream。
//根據Item的id進行拆分 val splitStream:SplitStream[Item] = dStream.split { item => List(item.id) } //獲取標記為item_1的資料集 splitStream.select("item_1").print()
4,Connect和CoMap
connect(DataStream,DataStream → ConnectedStreams):連線兩個保持他們型別的資料流,兩個資料流被Connect之後,只是被放在了一個同一個流中,內部依然保持各自的資料和形式不發生任何變化,兩個流相互獨立。
CoMap,CoFlatMap(ConnectedStreams → DataStream):作用於ConnectedStreams上,功能與map和flatMap一樣,對ConnectedStreams中的每一個Stream分別進行map和flatMap處理。
val connStream: ConnectedStreams[StartUpLog, StartUpLog] = appStoreStream.connect(otherStream) val allStream: DataStream[String] = connStream.map( (log1: StartUpLog) => log1.ch, (log2: StartUpLog) => log2.ch )
4,Union
DataStream → DataStream:對兩個或者兩個以上的DataStream進行union操作,產生一個包含所有DataStream元素的新DataStream。注意:如果你將一個DataStream跟它自己做union操作,在新的DataStream中,你將看到每一個元素都出現兩次。
val unionStream: DataStream[StartUpLog] = appStoreStream.union(otherStream) unionStream.print("union:::")
5,Connect與 Union 區別:
1)Union之前兩個流的型別必須是一樣,Connect可以不一樣,在之後的coMap中再去調整成為一樣的。
2)Connect只能操作兩個流,Union可以操作多個
四、實現UDF函式
1,函式類(Function Classes)
Flink暴露了所有udf函式的介面(實現方式為介面或者抽象類)。例如:MapFunction, FilterFunction, ProcessFunction 等等。
val flinkTweets = tweets.filter(new FlinkFilter)
//自定義filter類 class FlinkFilter extends FilterFunction[String] { override def filter(value: String): Boolean = { value.contains("flink") } }
2,匿名函式(Lamda Functions)
val flinkTweets = tweets.filter(_.contains("flink"))
3,富含數(Rich Functions)
富函式是 DataStream API 提供的一個函式類的介面,所有 Flink 函式類都有其 Rich 版本。它與常規函式的不同在於,可以獲取執行環境的上下文,並擁有一些生命週期方法,所以可以實現更復雜的功能。
open()方法是 rich function 的初始化方法,當一個運算元例如map或者filter被呼叫之前open()會被呼叫。
close()方法是生命週期中的最後一個呼叫的方法,做一些清理工作。
getRuntimeContext()方法提供了函式的 RuntimeContext 的一些資訊,例如函式執行的並行度,任務的名字,以及 state 狀態。
五、Sink
Flink 沒有類似於spark中foreach方法,讓使用者進行迭代的操作。雖有對外的輸出操作都要利用Sink完成。最後通過類似如下方式完成整個任務最終輸出操作。
1,kafka
dstream.addSink(new FlinkKafkaProducer011[String]("linux01:9092","test", new SimpleStringSchema()))
2,redis
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
val config = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build() resultDStream.addSink(new RedisSink[Item](config,new MyRedisMapper)) //定義redisMapper class MyRedisMapper extends RedisMapper[Item] { override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET,"item_test") //hkey } override def getKeyFromData(data: Item): String = data.id override def getValueFromData(data: Item): String = data.toString }
3,Elasticsearch
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.7.2</version> </dependency>
//定義es的host集合 val list = new util.ArrayList[HttpHost]() list.add(new HttpHost("linux01", 9200)) //定義esBuilder val esBuilder = new ElasticsearchSink.Builder[Item](list,new ElasticsearchSinkFunction[Item] { override def process(element: Item, ctx: RuntimeContext, indexer: RequestIndexer): Unit = { //定義es資料儲存方式和儲存值 val json = new util.HashMap[String, String]() json.put("data", element.toString) //定義儲存索引 type 和資料來源 val indexRequest = Requests.indexRequest().index("indexName").`type`("_doc").source(json) indexer.add(indexRequest) } }) resultDStream.addSink(esBuilder.build())
4,自定義sink(JDBC)
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version> </dependency>
resultDStream.addSink(new MyJDBCSink) //自定義jdbcsink class MyJDBCSink extends RichSinkFunction[Sensor]{ var conn: Connection = _ var insertStmt: PreparedStatement = _ var updateStmt: PreparedStatement = _ //open 簡歷連線 override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456") insertStmt = conn.prepareStatement("INSERT INTO item_test (id, num) VALUES (?, ?)") updateStmt = conn.prepareStatement("UPDATE item_test SET num = ? WHERE id = ?") } //呼叫執行 override def invoke(value: Sensor, context: SinkFunction.Context[_]): Unit = { updateStmt.setDouble(1, value.temp) updateStmt.setString(2, value.id) updateStmt.execute() if (updateStmt.getUpdateCount == 0) { insertStmt.setString(1, value.id) insertStmt.setDouble(2, value.temp) insertStmt.execute() } } //關閉資源 override def close(): Unit = { insertStmt.close() updateStmt.close() conn.close() } }
&n