1. 程式人生 > 實用技巧 >Flink入門程式WordCount 和 SQL實現

Flink入門程式WordCount 和 SQL實現

Flink入門程式WordCount 和 SQL實現

一、WordCount

1、首先建立好專案,然後新增相關依賴

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>

2、DataSet WordCount

wordcount程式是大資料處理框架的入門程式,統計一段檔案每個單詞出現次數。該程式主要分為兩個部分:一部分是將文字拆分成單詞;另一部分是將單詞進行分組計數不能給列印輸出結果。
整體程式碼如下:

publicstaticvoidmain(String[]args)throwsException{

//建立Flink執行的上下文環境
finalExecutionEnvironmentenv=ExecutionEnvironment.getExecutionEnvironment();

//建立DataSet,這裡我們的輸入是一行一行的文字
DataSet<String>text=env.fromElements(
"FlinkSparkStorm",
"FlinkFlinkFlink",
"SparkSparkSpark",
"StormStormStorm"
);
//通過Flink內建的轉換函式進行計算
DataSet<Tuple2<String,Integer>>counts=
text.flatMap(newLineSplitter())
.groupBy(0)
.sum(1);
//結果列印
counts.printToErr();

}

publicstaticfinalclassLineSplitterimplementsFlatMapFunction<String,Tuple2<String,Integer>>{

@Override
publicvoidflatMap(Stringvalue,Collector<Tuple2<String,Integer>>out){
//將文字分割
String[]tokens=value.toLowerCase().split("\\W+");

for(Stringtoken:tokens){
if(token.length()>0){
out.collect(newTuple2<String,Integer>(token,1));
}
}
}
}

實現的整個過程中分為一下幾個步驟。
(1)我們需要建立Flink的上下文執行環境:

ExecutionEnvironmentenv=ExecutionEnvironment.getExecutionEnvironment();

(2)使用fromElements函式建立一個DataSet物件,該物件中包含了我們的輸入,使用FlatMap、GroupBy、Sum函式進行轉換
(3)直接執行解僱

3、DataStream WordCount

為了模仿一個流式計算環境,我們選擇監聽一個本地的socket埠,並且使用Flink中的滾動視窗,每5s列印一次計算結果,程式碼如下:

publicclassStreamingJob{

publicstaticvoidmain(String[]args)throwsException{

//建立Flink的流式計算環境
finalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();

//監聽本地9000埠
DataStream<String>text=env.socketTextStream("127.0.0.1",9000,"\n");

//將接收的資料進行拆分,分組,視窗計算並且進行聚合輸出
DataStream<WordWithCount>windowCounts=text
.flatMap(newFlatMapFunction<String,WordWithCount>(){
@Override
publicvoidflatMap(Stringvalue,Collector<WordWithCount>out){
for(Stringword:value.split("\\s")){
out.collect(newWordWithCount(word,1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5),Time.seconds(1))
.reduce(newReduceFunction<WordWithCount>(){
@Override
publicWordWithCountreduce(WordWithCounta,WordWithCountb){
returnnewWordWithCount(a.word,a.count+b.count);
}
});

//列印結果
windowCounts.print().setParallelism(1);

env.execute("SocketWindowWordCount");
}

//Datatypeforwordswithcount
publicstaticclassWordWithCount{

publicStringword;
publiclongcount;

publicWordWithCount(){}

publicWordWithCount(Stringword,longcount){
this.word=word;
this.count=count;
}

@Override
publicStringtoString(){
returnword+":"+count;
}
}
}

整個流式計算的過程分為以下幾步:
(1)首先建立一個流式計算環境:

StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();

(2)進行本地9000埠監聽,將接受的資料進行拆分、分組、視窗計算並且進行聚合輸出,程式碼中使用了Flink的視窗函式,後面會進行詳解。
(3)在本地使用netcat命令啟動一個視窗:

nc-lk9000

(4)執行程式,得到結果

輸入:

$nc-lk9000
FlinkFlinkFlink
FlinkSparkStorm

結果:

Flink:4
Spark:1
Storm:1

Flink SQL 是Flink實時計算為簡化計算模型,降低使用者使用實時計算的門檻而設計的一套符合標準SQL語義的開發語言。
一個完整的Flink SQL便攜的程式包括以下三個部分:

  • Source Operator:是對外部資料來源的抽象,目前Flink內建了很多常用的資料來源實現,如MySQL、Kafka等
  • Transformation Operator:運算元操作主要完成比如查詢、聚合操作等。目前Flink SQL支援Union、join、Projection、Difference、intersection及window等操作
  • Sink Operator:是對外結果表的抽象,內建了比如MySQL、Kafka等

(1)首先在pom中增加依賴

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.10.0</version>
</dependency>

(2)建立上下文環境

ExecutionEnvironmentfbEnv=ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironmentfbTableEnv=BatchTableEnvironment.create(fbEnv);

(3)讀取一行資料作為輸入

Stringwords="helloflinkhellolagou";
String[]split=words.split("\\W+");
ArrayList<WC>list=newArrayList<>();

for(Stringword:split){
WCwc=newWC(word,1);
list.add(wc);
}
DataSet<WC>input=fbEnv.fromCollection(list);

(4)註冊成表,執行SQL,然後輸出

//DataSet轉sql,指定欄位名
Tabletable=fbTableEnv.fromDataSet(input,"word,frequency");
table.printSchema();

//註冊為一個表
fbTableEnv.createTemporaryView("WordCount",table);

Tabletable02=fbTableEnv.sqlQuery("selectwordasword,sum(frequency)asfrequencyfromWordCountGROUPBYword");

//將錶轉換DataSet
DataSet<WC>ds3=fbTableEnv.toDataSet(table02,WC.class);
ds3.printToErr();

整體程式碼如下:

publicclassWordCountSQL{

publicstaticvoidmain(String[]args)throwsException{

//獲取執行環境
ExecutionEnvironmentfbEnv=ExecutionEnvironment.getExecutionEnvironment();
//建立一個tableEnvironment
BatchTableEnvironmentfbTableEnv=BatchTableEnvironment.create(fbEnv);

Stringwords="helloflinkhellolagou";

String[]split=words.split("\\W+");
ArrayList<WC>list=newArrayList<>();

for(Stringword:split){
WCwc=newWC(word,1);
list.add(wc);
}
DataSet<WC>input=fbEnv.fromCollection(list);

//DataSet轉sql,指定欄位名
Tabletable=fbTableEnv.fromDataSet(input,"word,frequency");
table.printSchema();

//註冊為一個表
fbTableEnv.createTemporaryView("WordCount",table);

Tabletable02=fbTableEnv.sqlQuery("selectwordasword,sum(frequency)asfrequencyfromWordCountGROUPBYword");

//將錶轉換DataSet
DataSet<WC>ds3=fbTableEnv.toDataSet(table02,WC.class);
ds3.printToErr();
}

publicstaticclassWC{
publicStringword;
publiclongfrequency;

publicWC(){}

publicWC(Stringword,longfrequency){
this.word=word;
this.frequency=frequency;
}

@Override
publicStringtoString(){
returnword+","+frequency;
}
}
}

5、總結

這篇文章主要以wordcount場景用Flink來演示,讓大家體驗Flink SQL的強大之處,為後續內容打好基礎。