第一章 flink簡介與wordcount
一、簡介
flink是德國和歐洲的一些大學聯合發起的專案,14年進入了Apache基金會,不到一年的時間,flink就成為了Apache的頂級專案,flink的理念是“Apache Flink是為分散式、高效能、隨時可用以及準確的流處理應用打造的開源流處理框架,用於對無界和有界資料流進行有狀態計算。”。
flink有幾個重要的特點:
-
事件驅動型
應用是有狀態的,從事件流提取資料,並根據到來的事件觸發計算、狀態更新或者其他動作。以kafka為代表的訊息佇列幾乎都是事件驅動型應用。
與之不同的是SparkStreaming微批次,它將一批批資料看作連續的流,進行處理。
-
流與批的世界觀
批處理的特點是有界、持久、大量,非常適合全套記錄才能完成的計算,一般用於離線計算。流處理的特點是無界、實時,無需針對整個資料集操作,而是對通過系統傳輸的每個資料項執行操作,一般用於實時統計。在 spark 的世界觀中,一切都是由批次組成的,離線資料是一個大批次,而實時資料是由一個一個無限的小批次組成的。 而在 flink 的世界觀中,一切都是由流組成的,離線資料是有界限的流,實時資料是一個沒有界限的流,這就是所謂的有界流和無界流。
-
分層API
最底層的抽象僅僅提供了有狀態流,它將通過過程函式被嵌入到DataStream API中。實際上使用者只需要使用核心API程式設計計科,比如DataStream API(有界或者無界流資料)以及DataSet API(有界資料集)。Table API是以表為中心的宣告式程式設計,其中表有可能會動態變化(在表達流資料時)。
目前Flink作為批處理還不是主流,不如Spark成熟,所以DataSet使用的並不是很多。所以主要是學習DataStream API 的使用。Flink Table API和Flink SQL也並不完善,很多大廠都在自己定製。
Flink的幾大模組主要如下:
-
Flink Table&Flink SQL
-
Flink Gelly圖計算
-
Flink CEP複雜事件處理
二、Wordcount
Wordcount在大資料中有點像Hello World,當我們輸出Hello World的時候,就說名成功了,同樣在大資料專案中如果成功的統計出了文字或者socket流中的單詞數量,也相當於成功運行了第一個專案。flink是一個流批一體的計算引擎,所以wordcount分為兩種,從文字或者其它儲存中讀取的批處理和從socket讀取的流處理wordcount。
專案使用maven進行專案管理,首先編寫pom檔案,新增下面兩項依賴。
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.10.1</version> </dependency> </dependencies>
新增依賴後發現在第二個依賴中有2.12的字尾,flink使用Java編寫,但是他依賴的底層使用的是akka進行通訊,akka使用Scala進行編寫,這裡的2.12是指Scala的版本。
Spark原先使用akka進行通訊,後來從Spark1.3.1版本開始,為了解決大塊資料(如Shuffle)的傳輸問題,Spark引入了Netty通訊框架,到了1.6.0版本,Netty完全取代了Akka,承擔Spark內部所有的RPC通訊以及資料流傳輸。
批處理WordCount
package flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* @Classname WordCount
* @Date 2021/10/4 17:20
* @Created by github.com/myxiaoxin
*/
public class WordCount {
public static void main(String[] args) throws Exception {
//建立執行環境,每一個ExecutionEnvironment都是一個flink任務
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//從檔案
String inputPath = "README.md";
DataSet<String> inputDataset = env.readTextFile(inputPath);
DataSet<Tuple2<String,Integer>> wordCountDataSet = inputDataset
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = s.split(" ");
for(String word : words){
collector.collect(new Tuple2<String,Integer>(word,1));
}
}
})
.groupBy(0)
.sum(1);
wordCountDataSet.print();
}
}
流處理WordCount
package flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @Classname StreamWordCount
* @Date 2021/10/4 19:46
* @Created by github.com/myxiaoxin
*/
public class StreamWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
DataStream<String> inputDStream = env.socketTextStream(host,port);
DataStream<Tuple2<String,Integer>> wordCountDataStream = inputDStream
.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] arr = s.split(" ");
for (int i = 0; i < arr.length; i++) {
collector.collect(new Tuple2<String,Integer>(arr[i],1));
}
}
})
.keyBy(0)
.sum(1);
wordCountDataStream.print().setParallelism(1);
env.execute();
}
}
測試——在Linux中通過netcat命令進行傳送測試。
nc -lk 7777