Flink入門程式WordCount 和 SQL實現
阿新 • • 發佈:2020-09-22
Flink入門程式WordCount 和 SQL實現
一、WordCount
1、首先建立好專案,然後新增相關依賴
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency>
2、DataSet WordCount
wordcount程式是大資料處理框架的入門程式,統計一段檔案每個單詞出現次數。該程式主要分為兩個部分:一部分是將文字拆分成單詞;另一部分是將單詞進行分組計數不能給列印輸出結果。
整體程式碼如下:
publicstaticvoidmain(String[]args)throwsException{ //建立Flink執行的上下文環境 finalExecutionEnvironmentenv=ExecutionEnvironment.getExecutionEnvironment(); //建立DataSet,這裡我們的輸入是一行一行的文字 DataSet<String>text=env.fromElements( "FlinkSparkStorm", "FlinkFlinkFlink", "SparkSparkSpark", "StormStormStorm" ); //通過Flink內建的轉換函式進行計算 DataSet<Tuple2<String,Integer>>counts= text.flatMap(newLineSplitter()) .groupBy(0) .sum(1); //結果列印 counts.printToErr(); } publicstaticfinalclassLineSplitterimplementsFlatMapFunction<String,Tuple2<String,Integer>>{ @Override publicvoidflatMap(Stringvalue,Collector<Tuple2<String,Integer>>out){ //將文字分割 String[]tokens=value.toLowerCase().split("\\W+"); for(Stringtoken:tokens){ if(token.length()>0){ out.collect(newTuple2<String,Integer>(token,1)); } } } }
實現的整個過程中分為一下幾個步驟。
(1)我們需要建立Flink的上下文執行環境:
ExecutionEnvironmentenv=ExecutionEnvironment.getExecutionEnvironment();
(2)使用fromElements函式建立一個DataSet物件,該物件中包含了我們的輸入,使用FlatMap、GroupBy、Sum函式進行轉換
(3)直接執行解僱
3、DataStream WordCount
為了模仿一個流式計算環境,我們選擇監聽一個本地的socket埠,並且使用Flink中的滾動視窗,每5s列印一次計算結果,程式碼如下:
publicclassStreamingJob{ publicstaticvoidmain(String[]args)throwsException{ //建立Flink的流式計算環境 finalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //監聽本地9000埠 DataStream<String>text=env.socketTextStream("127.0.0.1",9000,"\n"); //將接收的資料進行拆分,分組,視窗計算並且進行聚合輸出 DataStream<WordWithCount>windowCounts=text .flatMap(newFlatMapFunction<String,WordWithCount>(){ @Override publicvoidflatMap(Stringvalue,Collector<WordWithCount>out){ for(Stringword:value.split("\\s")){ out.collect(newWordWithCount(word,1L)); } } }) .keyBy("word") .timeWindow(Time.seconds(5),Time.seconds(1)) .reduce(newReduceFunction<WordWithCount>(){ @Override publicWordWithCountreduce(WordWithCounta,WordWithCountb){ returnnewWordWithCount(a.word,a.count+b.count); } }); //列印結果 windowCounts.print().setParallelism(1); env.execute("SocketWindowWordCount"); } //Datatypeforwordswithcount publicstaticclassWordWithCount{ publicStringword; publiclongcount; publicWordWithCount(){} publicWordWithCount(Stringword,longcount){ this.word=word; this.count=count; } @Override publicStringtoString(){ returnword+":"+count; } } }
整個流式計算的過程分為以下幾步:
(1)首先建立一個流式計算環境:
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
(2)進行本地9000埠監聽,將接受的資料進行拆分、分組、視窗計算並且進行聚合輸出,程式碼中使用了Flink的視窗函式,後面會進行詳解。
(3)在本地使用netcat命令啟動一個視窗:
nc-lk9000
(4)執行程式,得到結果
輸入:
$nc-lk9000
FlinkFlinkFlink
FlinkSparkStorm
結果:
Flink:4
Spark:1
Storm:1
4、Flink Table & SQL WordCount
Flink SQL 是Flink實時計算為簡化計算模型,降低使用者使用實時計算的門檻而設計的一套符合標準SQL語義的開發語言。
一個完整的Flink SQL便攜的程式包括以下三個部分:
- Source Operator:是對外部資料來源的抽象,目前Flink內建了很多常用的資料來源實現,如MySQL、Kafka等
- Transformation Operator:運算元操作主要完成比如查詢、聚合操作等。目前Flink SQL支援Union、join、Projection、Difference、intersection及window等操作
- Sink Operator:是對外結果表的抽象,內建了比如MySQL、Kafka等
(1)首先在pom中增加依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.10.0</version>
</dependency>
(2)建立上下文環境
ExecutionEnvironmentfbEnv=ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironmentfbTableEnv=BatchTableEnvironment.create(fbEnv);
(3)讀取一行資料作為輸入
Stringwords="helloflinkhellolagou";
String[]split=words.split("\\W+");
ArrayList<WC>list=newArrayList<>();
for(Stringword:split){
WCwc=newWC(word,1);
list.add(wc);
}
DataSet<WC>input=fbEnv.fromCollection(list);
(4)註冊成表,執行SQL,然後輸出
//DataSet轉sql,指定欄位名
Tabletable=fbTableEnv.fromDataSet(input,"word,frequency");
table.printSchema();
//註冊為一個表
fbTableEnv.createTemporaryView("WordCount",table);
Tabletable02=fbTableEnv.sqlQuery("selectwordasword,sum(frequency)asfrequencyfromWordCountGROUPBYword");
//將錶轉換DataSet
DataSet<WC>ds3=fbTableEnv.toDataSet(table02,WC.class);
ds3.printToErr();
整體程式碼如下:
publicclassWordCountSQL{
publicstaticvoidmain(String[]args)throwsException{
//獲取執行環境
ExecutionEnvironmentfbEnv=ExecutionEnvironment.getExecutionEnvironment();
//建立一個tableEnvironment
BatchTableEnvironmentfbTableEnv=BatchTableEnvironment.create(fbEnv);
Stringwords="helloflinkhellolagou";
String[]split=words.split("\\W+");
ArrayList<WC>list=newArrayList<>();
for(Stringword:split){
WCwc=newWC(word,1);
list.add(wc);
}
DataSet<WC>input=fbEnv.fromCollection(list);
//DataSet轉sql,指定欄位名
Tabletable=fbTableEnv.fromDataSet(input,"word,frequency");
table.printSchema();
//註冊為一個表
fbTableEnv.createTemporaryView("WordCount",table);
Tabletable02=fbTableEnv.sqlQuery("selectwordasword,sum(frequency)asfrequencyfromWordCountGROUPBYword");
//將錶轉換DataSet
DataSet<WC>ds3=fbTableEnv.toDataSet(table02,WC.class);
ds3.printToErr();
}
publicstaticclassWC{
publicStringword;
publiclongfrequency;
publicWC(){}
publicWC(Stringword,longfrequency){
this.word=word;
this.frequency=frequency;
}
@Override
publicStringtoString(){
returnword+","+frequency;
}
}
}
5、總結
這篇文章主要以wordcount場景用Flink來演示,讓大家體驗Flink SQL的強大之處,為後續內容打好基礎。