1. 程式人生 > >Apache Flink SQL示例

Apache Flink SQL示例

本章講解如何使用Flink SQL進行批處理操作。

這裡我提取了2016年中超聯賽射手榜的資料,通過Flink SQL進行簡單的彙總。

1、源資料

這裡儲存為csv格式:

這裡寫圖片描述

2、在pom中新增Table API的依賴:

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>${flink.version}</version
>
</dependency>

3、案例


import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;


public
class TableJob { public static void main(String[] args) throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env); //source,這裡讀取CSV檔案,並轉換為對應的Class
DataSet<TopScorers> csvInput = env .readCsvFile("E://flink-training-exercises//src//main//resource//2016_Chinese_Super_League_Top_Scorers.csv") .ignoreFirstLine() .pojoType(TopScorers.class,"rank","player","country","club","total_score","total_score_home","total_score_visit","point_kick"); //將DataSet轉換為Table Table topScore = tableEnv.fromDataSet(csvInput); //將topScore註冊為一個表 tableEnv.registerTable("topScore",topScore); //查詢球員所在的國家,以及這些國家的球員(內援和外援)的總進球數 Table groupedByCountry = tableEnv.sql("select country,sum(total_score) as sum_total_score from topScore group by country order by 2 desc"); //轉換回dataset DataSet<Result> result = tableEnv.toDataSet(groupedByCountry,Result.class); //將dataset map成tuple輸出 result.map(new MapFunction<Result, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> map(Result result) throws Exception { String country = result.country; int sum_total_score = result.sum_total_score; return Tuple2.of(country,sum_total_score); } }).print(); } /** * 源資料的對映類 */ public static class TopScorers { /** * 排名,球員,國籍,俱樂部,總進球,主場進球數,客場進球數,點球進球數 */ public int rank; public String player; public String country; public String club; public int total_score; public int total_score_home; public int total_score_visit; public int point_kick; public TopScorers() { super(); } } /** * 統計結果對應的類 */ public static class Result { public String country; public int sum_total_score; public Result() {} } }

4、說明

這裡注意一下csv的格式,由於第一行是個說明行,所以在處理時要將第一行去掉:

ignoreFirstLine() 

另外,本統計是想統計在這份榜單中,以球員所在的國家進行分組,然後統計進球數。

通過sql:

"select country,sum(total_score) as sum_total_score from topScore group by country order by 2 desc"

來實現,並將結果按照進球數倒敘排列。

5、結果

這裡寫圖片描述

從結果中可以看出,巴西球員最受各俱樂部歡迎,而且以巴西為主的南美洲以及非洲球員,都是各隊比較喜歡的。

6、結論:

首先,南美球員技術出眾,單兵作戰能力強,而且南美球員不覺得來中超踢球是件丟人的事,所以很適合中超;第二,非洲球員身體素質出色,對抗能力強,前場突擊能力也是各俱樂部看中的地方。第三,國內球員表現疲軟,排名前30名的國內球員,總共進球數才43,也能從側面反映出國足目前鋒無力的情況是多麼嚴重了。