Apache Flink SQL示例
阿新 • • 發佈:2019-01-09
本章講解如何使用Flink SQL進行批處理操作。
這裡我提取了2016年中超聯賽射手榜的資料,通過Flink SQL進行簡單的彙總。
1、源資料
這裡儲存為csv格式:
2、在pom中新增Table API的依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>${flink.version}</version >
</dependency>
3、案例
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
public class TableJob {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
//source,這裡讀取CSV檔案,並轉換為對應的Class
DataSet<TopScorers> csvInput = env
.readCsvFile("E://flink-training-exercises//src//main//resource//2016_Chinese_Super_League_Top_Scorers.csv")
.ignoreFirstLine() .pojoType(TopScorers.class,"rank","player","country","club","total_score","total_score_home","total_score_visit","point_kick");
//將DataSet轉換為Table
Table topScore = tableEnv.fromDataSet(csvInput);
//將topScore註冊為一個表
tableEnv.registerTable("topScore",topScore);
//查詢球員所在的國家,以及這些國家的球員(內援和外援)的總進球數
Table groupedByCountry = tableEnv.sql("select country,sum(total_score) as sum_total_score from topScore group by country order by 2 desc");
//轉換回dataset
DataSet<Result> result = tableEnv.toDataSet(groupedByCountry,Result.class);
//將dataset map成tuple輸出
result.map(new MapFunction<Result, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(Result result) throws Exception {
String country = result.country;
int sum_total_score = result.sum_total_score;
return Tuple2.of(country,sum_total_score);
}
}).print();
}
/**
* 源資料的對映類
*/
public static class TopScorers {
/**
* 排名,球員,國籍,俱樂部,總進球,主場進球數,客場進球數,點球進球數
*/
public int rank;
public String player;
public String country;
public String club;
public int total_score;
public int total_score_home;
public int total_score_visit;
public int point_kick;
public TopScorers() {
super();
}
}
/**
* 統計結果對應的類
*/
public static class Result {
public String country;
public int sum_total_score;
public Result() {}
}
}
4、說明
這裡注意一下csv的格式,由於第一行是個說明行,所以在處理時要將第一行去掉:
ignoreFirstLine()
另外,本統計是想統計在這份榜單中,以球員所在的國家進行分組,然後統計進球數。
通過sql:
"select country,sum(total_score) as sum_total_score from topScore group by country order by 2 desc"
來實現,並將結果按照進球數倒敘排列。
5、結果
從結果中可以看出,巴西球員最受各俱樂部歡迎,而且以巴西為主的南美洲以及非洲球員,都是各隊比較喜歡的。
6、結論:
首先,南美球員技術出眾,單兵作戰能力強,而且南美球員不覺得來中超踢球是件丟人的事,所以很適合中超;第二,非洲球員身體素質出色,對抗能力強,前場突擊能力也是各俱樂部看中的地方。第三,國內球員表現疲軟,排名前30名的國內球員,總共進球數才43,也能從側面反映出國足目前鋒無力的情況是多麼嚴重了。