JS構建圖——資料結構
宣告:本系列部落格為原創,最先發表在拉勾教育,其中一部分為免費閱讀部分。被讀者各種搬運至各大網站。所有其他的來源均為抄襲。
Flink入門程式WordCount 和 SQL實現
一、WordCount
1、首先建立好專案,然後新增相關依賴
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!--<scope>provided</scope>--> </dependency>
2、DataSet WordCount
wordcount程式是大資料處理框架的入門程式,統計一段檔案每個單詞出現次數。該程式主要分為兩個部分:一部分是將文字拆分成單詞;另一部分是將單詞進行分組計數不能給列印輸出結果。
整體程式碼如下:
public static void main(String[] args) throws Exception { // 建立Flink執行的上下文環境 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 建立DataSet,這裡我們的輸入是一行一行的文字 DataSet<String> text = env.fromElements( "Flink Spark Storm", "Flink Flink Flink", "Spark Spark Spark", "Storm Storm Storm" ); // 通過Flink內建的轉換函式進行計算 DataSet<Tuple2<String, Integer>> counts = text.flatMap(new LineSplitter()) .groupBy(0) .sum(1); //結果列印 counts.printToErr(); } public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // 將文字分割 String[] tokens = value.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }
實現的整個過程中分為一下幾個步驟。
(1)我們需要建立Flink的上下文執行環境:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
(2)使用fromElements函式建立一個DataSet物件,該物件中包含了我們的輸入,使用FlatMap、GroupBy、Sum函式進行轉換
(3)直接執行解僱
3、DataStream WordCount
為了模仿一個流式計算環境,我們選擇監聽一個本地的socket埠,並且使用Flink中的滾動視窗,每5s列印一次計算結果,程式碼如下:
public class StreamingJob {
public static void main(String[] args) throws Exception {
// 建立Flink的流式計算環境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 監聽本地9000埠
DataStream<String> text = env.socketTextStream("127.0.0.1", 9000, "\n");
// 將接收的資料進行拆分,分組,視窗計算並且進行聚合輸出
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// 列印結果
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
整個流式計算的過程分為以下幾步:
(1)首先建立一個流式計算環境:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
(2)進行本地9000埠監聽,將接受的資料進行拆分、分組、視窗計算並且進行聚合輸出,程式碼中使用了Flink的視窗函式,後面會進行詳解。
(3)在本地使用netcat命令啟動一個視窗:
nc -lk 9000
(4)執行程式,得到結果
輸入:
$ nc -lk 9000
Flink Flink Flink
Flink Spark Storm
結果:
Flink : 4
Spark : 1
Storm : 1
4、Flink Table & SQL WordCount
Flink SQL 是Flink實時計算為簡化計算模型,降低使用者使用實時計算的門檻而設計的一套符合標準SQL語義的開發語言。
一個完整的Flink SQL便攜的程式包括以下三個部分:
- Source Operator:是對外部資料來源的抽象,目前Flink內建了很多常用的資料來源實現,如MySQL、Kafka等
- Transformation Operator:運算元操作主要完成比如查詢、聚合操作等。目前Flink SQL支援Union、join、Projection、Difference、intersection及window等操作
- Sink Operator:是對外結果表的抽象,內建了比如MySQL、Kafka等
(1)首先在pom中增加依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.10.0</version>
</dependency>
(2)建立上下文環境
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
(3)讀取一行資料作為輸入
String words = "hello flink hello lagou";
String[] split = words.split("\\W+");
ArrayList<WC> list = new ArrayList<>();
for(String word : split){
WC wc = new WC(word,1);
list.add(wc);
}
DataSet<WC> input = fbEnv.fromCollection(list);
(4)註冊成表,執行SQL,然後輸出
//DataSet 轉sql, 指定欄位名
Table table = fbTableEnv.fromDataSet(input, "word,frequency");
table.printSchema();
//註冊為一個表
fbTableEnv.createTemporaryView("WordCount", table);
Table table02 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount GROUP BY word");
//將錶轉換DataSet
DataSet<WC> ds3 = fbTableEnv.toDataSet(table02, WC.class);
ds3.printToErr();
整體程式碼如下:
public class WordCountSQL {
public static void main(String[] args) throws Exception{
//獲取執行環境
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
//建立一個tableEnvironment
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
String words = "hello flink hello lagou";
String[] split = words.split("\\W+");
ArrayList<WC> list = new ArrayList<>();
for(String word : split){
WC wc = new WC(word,1);
list.add(wc);
}
DataSet<WC> input = fbEnv.fromCollection(list);
//DataSet 轉sql, 指定欄位名
Table table = fbTableEnv.fromDataSet(input, "word,frequency");
table.printSchema();
//註冊為一個表
fbTableEnv.createTemporaryView("WordCount", table);
Table table02 = fbTableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount GROUP BY word");
//將錶轉換DataSet
DataSet<WC> ds3 = fbTableEnv.toDataSet(table02, WC.class);
ds3.printToErr();
}
public static class WC {
public String word;
public long frequency;
public WC() {}
public WC(String word, long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return word + ", " + frequency;
}
}
}
5、總結
這篇文章主要以wordcount場景用Flink來演示,讓大家體驗Flink SQL的強大之處,為後續內容打好基礎。