1. 程式人生 > 實用技巧 >記一次flink入門學習筆記

記一次flink入門學習筆記

團隊有幾個系統資料量偏大,且每天以幾萬條的數量累增。有一個系統每天需要定時讀取資料庫,並進行相關的業務邏輯計算,從而獲取最新的使用者資訊,定時任務的整個耗時需要4小時左右。由於定時任務是夜晚執行,目前看來,系統還能抗1年,每天晚上可以把資料處理結束,第二天上班期間可以展示最新的資料。隨著資料和業務的增加,亟需解決這個瓶頸。團隊架構師決定使用flink大資料技術解決該瓶頸,所以本週開始學習與探索flink,並編寫一些demo。

1.專案的搭建

flink專案依賴於jdk和maven,且要求jdk版本高於1.7,maven版本好像需要3.0以上。我使用的是jdk1.8,maven是3.6,如下圖檢視其版本

網上提供了3中方式搭建專案:

1.1.通過maven命令搭建

1 mvn archetype:generate \
2     -DarchetypeGroupId=org.apache.flink \
3     -DarchetypeArtifactId=flink-quickstart-java \
4     -DarchetypeVersion=1.7.2 \
5     -DgroupId=flink-project \
6     -DartifactId=flink-project \
7     -Dversion=0.1 \
8     -Dpackage=myflink \
9 -DinteractiveMode=false

1.2.通過flink提供的指令碼搭建

curl https://flink.apache.org/q/quickstart.sh | bash

1.3.通過idea建立maven專案搭建

可直接通過一鍵next方式建立maven專案,可自行谷歌。

我使用flink模板建立的,如下圖

這裡idea預設是沒有flink-quickstart-java的模板的,需要先自行建立一個模板,如下圖:其中flink的版本可自行選擇,我maven網站檢視flink的依賴熱度,發現1.11.1目前使用的最多,所以也使用此版本。

專案建立好後,編寫相關demo熟悉flink相關api

2.demo

demo主要分為獲取有界資料來源和無解資料來源2類demo。有界資料來源如檔案、資料庫等,無解資料來源如MQ,Socket等。每類我都嘗試了1個demo,同時利用flink的複雜api寫了對資料庫資料的聚合、過濾、聯合,因為我們的業務主要是對資料庫的讀取、寫入與計算。

2.1 Flink的Hello Word demo

這個demo是官網git提供的,類似一個hello word,入門flink應該首先看這個demo。可自行去官網git檢視程式碼:https://github.com/apache/flink

原生代碼和執行結果如下:

 1 public class Demo1WordCount {
 2     public static void main(String[] args) throws Exception {
 3 
 4         // set up the execution environment
 5         final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 6 
 7         // get input data
 8         DataSet<String> text = env.fromElements(
 9                 "To be, or not to be,--that is the question:--",
10                 "Whether 'tis nobler in the mind to suffer",
11                 "The slings and arrows of outrageous fortune",
12                 "Or to take arms against a sea of troubles,"
13         );
14 
15         DataSet<Tuple2<String, Integer>> counts =
16                 // split up the lines in pairs (2-tuples) containing: (word,1)
17                 text.flatMap(new LineSplitter())
18                         // group by the tuple field "0" and sum up tuple field "1"
19                         .groupBy(0) //(i,1) (am,1) (chinese,1)
20                         .sum(1);
21 
22         // execute and print result
23         counts.print();
24 
25     }
26 
27     //
28     //     User Functions
29     //
30 
31     /**
32      * Implements the string tokenizer that splits sentences into words as a user-defined
33      * FlatMapFunction. The function takes a line (String) and splits it into
34      * multiple pairs in the form of "(word,1)" (Tuple2&lt;String, Integer&gt;).
35      */
36     public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
37 
38         @Override
39         public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
40             // normalize and split the line
41             String[] tokens = value.toLowerCase().split("\\W+");
42 
43             // emit the pairs
44             for (String token : tokens) {
45                 if (token.length() > 0) {
46                     out.collect(new Tuple2<String, Integer>(token, 1));
47                 }
48             }
49         }
50     }
51 }

2.2.讀取有界資料:對mysql進行讀取與寫入

 1 public class ReadWriteByJdbc {
 2 
 3     private static final Logger logger = LoggerFactory.getLogger(ReadWriteByJdbc.class);
 4     private static final String INSERT = "insert into flink_demo (name, password) values (?,?)";
 5     private static final String SELECT = "select name,description from flink_demo where description is not null";
 6 
 7     public static void main(String[] args) throws Exception {
 8         String driverClass = "com.mysql.jdbc.Driver";
 9         String dbUrl = "jdbc:mysql://xxxxxxxxxx:xxxxxxxxxx/xxxxxxxxxx";
10         String userNmae = "xxxxxxxxxxxxxxx";
11         String passWord = "xxxxxxxxxxxxxxxx";
12 
13         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
14         String filePath = "file:///E:/4_data/flink.txt";
15         DataSet<Row> write = env.readCsvFile(filePath).fieldDelimiter("  ").types(String.class, String.class)
16                 .map(new MapFunction<Tuple2<String, String>, Row>() {
17                     @Override
18                     public Row map(Tuple2<String, String> stringIntegerTuple2) throws Exception {
19                         Row row = new Row(2);
20                         row.setField(0, stringIntegerTuple2.f0.getBytes("UTF-8"));
21                         row.setField(1, stringIntegerTuple2.f1.getBytes("UTF-8"));
22                         return row;
23                     }
24                 });
25         write.output(
26                 JDBCOutputFormat.buildJDBCOutputFormat()
27                         .setDrivername(driverClass)
28                         .setDBUrl(dbUrl)
29                         .setUsername(userNmae)
30                         .setPassword(passWord)
31                         .setQuery(INSERT)
32                         .finish()
33         );
34         env.execute();
35         logger.error("------------------資料寫入mysql成功----------------");
36         Thread.sleep(5000);
37         DataSource<Row> read = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
38                 .setDrivername(driverClass)
39                 .setDBUrl(dbUrl)
40                 .setUsername(userNmae)
41                 .setPassword(passWord)
42                 .setQuery(SELECT)
43                 .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
44                 .finish()
45         );
46         logger.error("------------------讀取mysql資料如下---------------");
47         read.map(new MapFunction<Row, Tuple2<String, String>>() {
48             @Override
49             public Tuple2<String, String> map(Row row) throws Exception {
50                 return new Tuple2<>(row.getField(0).toString(), row.getField(1).toString());
51             }
52         }).print();
53         logger.error("===============Succeed==============");
54     }
55 
56 }

2.3.讀取無解資料,讀取socket資料

首先需要啟動一個socket,linux和蘋果系統自帶socket外掛。windows系統可以通過netcat模擬socket,netcat軟體可在網盤下載:

連結:https://pan.baidu.com/s/1oet8Vaza4D2JKdYBwwF7qA提取碼:wrx4

直接在window上啟動:nc -L -p 9000 -v ,如圖

表示socket已經正常啟動,監聽埠為9000,所以idea中執行java方法時,也需要設定埠為9000,設定方式如下:--port 9000

執行java方法,如下圖表示socket連線上了:

直接在dos中輸入字串,idea就會接受到這些字串,並通過flink對字串進行處理。

 1 public class WordCount {
 2     public static void main(String[] args) throws Exception {
 3         //定義socket的埠號
 4         int port;
 5         try {
 6             ParameterTool parameterTool = ParameterTool.fromArgs(args);
 7             port = parameterTool.getInt("port");
 8         } catch (Exception e) {
 9             System.err.println("沒有指定port引數,使用預設值9000");
10             port = 9000;
11         }
12 
13         //獲取執行環境
14         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
15 
16         //連線socket獲取輸入的資料
17         DataStreamSource<String> text = env.socketTextStream("127.0.0.1", port, "\n");
18 
19         //計算資料
20         DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
21             public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
22                 String[] splits = value.split("\\s");
23                 for (String word : splits) {
24                     out.collect(new WordWithCount(word, 1L));
25                 }
26             }
27         })//打平操作,把每行的單詞轉為<word,count>型別的資料
28                 .keyBy("word")//針對相同的word資料進行分組
29                 .timeWindow(Time.seconds(2), Time.seconds(1))//指定計算資料的視窗大小和滑動視窗大小
30                 .sum("count");
31 
32         //把資料列印到控制檯
33         windowCount.print()
34                 .setParallelism(1);//使用一個並行度
35         //注意:因為flink是懶載入的,所以必須呼叫execute方法,上面的程式碼才會執行
36         env.execute("streaming word count");
37 
38     }
39 
40     /**
41      * 主要為了儲存單詞以及單詞出現的次數
42      */
43     public static class WordWithCount {
44         public String word;
45         public long count;
46 
47         public WordWithCount() {
48         }
49 
50         public WordWithCount(String word, long count) {
51             this.word = word;
52             this.count = count;
53         }
54 
55         @Override
56         public String toString() {
57             return "WordWithCount{" +
58                     "word='" + word + '\'' +
59                     ", count=" + count +
60                     '}';
61         }
62     }
63 }

2.4.讀取mysql資料,並進行聚合與過濾

 1 @Slf4j
 2 public class TableSqlDemo {
 3     private static final String SELECT = "select cast(id as char) as id, name, password, description, cast(state as char ) as state, " +
 4             "cast(create_time as char ) as create_time, cast(update_time as char ) as update_time from flink_demo;";
 5     // 資料庫相關配置
 6     private static final String DRIVER_CLASS = "com.mysql.jdbc.Driver";
 7     private static final String DBURL = "jdbc:mysql://ip:port/flink_data?characterEncoding=utf-8&useSSL=false";
 8     private static final String USERNMAE = "xxxxx";
 9     private static final String PASSWORD = "xxxxxx";
10 
11     public static void main(String[] args) throws Exception {
12 
13 
14         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
15         // 讀取資料庫資料
16         DataSource<Row> read = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
17                 .setDrivername(DRIVER_CLASS)
18                 .setDBUrl(DBURL)
19                 .setUsername(USERNMAE)
20                 .setPassword(PASSWORD)
21                 .setQuery(SELECT)
22                 .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
23                         BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
24                 .finish()
25         );
26         System.out.println(System.getProperty("line.separator"));
27         log.error("-------------讀取mysql資料如下------------------");
28         // 將row當做字串輸出
29 //        read.map((MapFunction<Row, String>) row -> row.toString()).print();
30 
31         // 通過map方法對映到bean實體,並輸出
32         read.map((MapFunction<Row, Bean>) row -> new Bean(
33                 row.getField(0).toString(),
34                 row.getField(1).toString(),
35                 row.getField(2).toString(),
36                 row.getField(3).toString(),
37                 switchState(row.getField(4).toString()),
38                 row.getField(5).toString(),
39                 row.getField(6).toString())).print();
40         System.out.println(System.getProperty("line.separator"));
41         log.error("-------------對資料進行聚合,計算在職、離職、轉正人數------------------");
42         // 對讀取到的資料做聚合
43         DataSet<Tuple2<String, Integer>> counts =
44                 read.flatMap(new StateCount())
45                         .groupBy(0)
46                         .sum(1);
47 
48         // execute and print result
49         counts.print();
50 
51         // 維表關聯
52 //        TableConfig tableConfig = TableConfig.DEFAULT();
53         BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT());
54         tableEnv.registerDataSet("t1", read, "id, name, password, description, state, create_time, update_time");
55         /*System.out.println(System.getProperty("line.separator"));
56         log.error("-------------列印資料型別(這塊有點坑)------------------");
57         tableEnv.sqlQuery("select id, name, password, description, state, create_time, update_time from t1").printSchema();*/
58         System.out.println(System.getProperty("line.separator"));
59         log.error("-------------過濾,檢索正式員工------------------");
60         Table t2 = tableEnv.sqlQuery("select id, name, password, description, state, create_time, update_time from t1").where("state = '0'");
61         DataSet<BeanExData> beanExDataDataSet = tableEnv.toDataSet(t2, BeanExData.class);
62         beanExDataDataSet.print();
63     }
64 
65     // 聚合計算,通過實現FlatMapFunction介面,重寫flatMap,來實現自己的邏輯
66     public static final class StateCount implements FlatMapFunction<Row, Tuple2<String, Integer>> {
67         @Override
68         public void flatMap(Row in, Collector<Tuple2<String, Integer>> out) throws Exception {
69             // pos 4 為狀態欄位
70             out.collect(new Tuple2<>(switchState(in.getField(4).toString()), 1));
71         }
72     }
73 
74     // 轉成中文,視覺化
75     private static String switchState(String state) {
76         String _state = "";
77         if (StringUtils.isBlank(state)) {
78             return _state;
79         }
80         switch (state) {
81             case "0":
82                 _state = "正式員工";
83                 break;
84             case "1":
85                 _state = "試用期";
86                 break;
87             case "2":
88                 _state = "離職";
89                 break;
90         }
91         return _state;
92     }
93 }

結果如下:

2.5.讀取mysql多個table資料,並進行關聯

 1 @Slf4j
 2 public class TableSqlJoinDemo {
 3     private static final String SELECT = "select cast(id as char) as id, name, description, " +
 4             "cast(create_time as char) as create_time, cast(update_time as char) as update_time from user_info;";
 5 
 6     private static final String USER_INFO = "select cast(id as char) as id, name, password, description, cast(state as char ) as state, " +
 7             "cast(create_time as char ) as create_time, cast(update_time as char ) as update_time from flink_demo;";
 8     // 資料庫相關配置
 9     private static final String DRIVER_CLASS = "com.mysql.jdbc.Driver";
10     private static final String DBURL = "jdbc:mysql://ip:port/db?characterEncoding=utf-8&useSSL=false";
11     private static final String USERNMAE = "username";
12     private static final String PASSWORD = "password";
13 
14     public static void main(String[] args) throws Exception {
15 
16 
17         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
18         // 讀取資料庫資料
19         DataSource<Row> user = env.createInput(JDBCInputFormat.buildJDBCInputFormat().setDrivername(DRIVER_CLASS)
20                 .setDBUrl(DBURL).setUsername(USERNMAE).setPassword(PASSWORD).setQuery(SELECT)
21                 .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
22                         BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
23                 .finish()
24         );
25         System.out.println(System.getProperty("line.separator"));
26         log.error("-------------讀取mysql使用者資料------------------");
27         // 將row當做字串輸出
28         user.map((MapFunction<Row, String>) row -> row.toString()).print();
29         // 使用者資訊資料註冊到flink
30         BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT());
31         // 第三個引數表示欄位,如果不填,表示註冊所有的資料
32         tableEnv.registerDataSet("user", user,"id, name, description, create_time, update_time");
33         Table userTable = tableEnv.scan("user");
34         tableEnv.registerTable("userTable", userTable);
35         // 輸出欄位的型別,保證資料的正確性
36 //        System.out.println(System.getProperty("line.separator"));
37         /*log.error("-------------檢視資料型別------------------");
38 
39         userTable.printSchema();*/
40 
41         // 讀取資料庫資料
42         DataSource<Row> userInfo = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
43                 .setDrivername(DRIVER_CLASS).setDBUrl(DBURL).setUsername(USERNMAE).setPassword(PASSWORD).setQuery(USER_INFO)
44                 .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO,
45                         BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
46                 .finish()
47         );
48         System.out.println(System.getProperty("line.separator"));
49         log.error("-------------讀取mysql使用者描述資料------------------");
50         // 將row當做字串輸出
51         userInfo.map((MapFunction<Row, String>) row -> row.toString()).print();
52         // 使用者資訊資料註冊到flink
53         tableEnv.registerDataSet("userInfo", userInfo, "id, name, password, description, state, create_time, update_time");
54         Table userInfoTable = tableEnv.scan("userInfo");
55         tableEnv.registerTable("userInfoTable", userInfoTable);
56         // 輸出欄位的型別,保證資料的正確性
57 //        System.out.println(System.getProperty("line.separator"));
58         /*log.error("-------------檢視資料型別------------------");
59         userInfoTable.printSchema();*/
60         // 關聯操作
61         System.out.println(System.getProperty("line.separator"));
62         log.error("-------------關聯操作後的結果------------------");
63         Table result = tableEnv.sqlQuery("select a.name, a.description, b.description as description1  from userTable a,userInfoTable  b where a.name = b.name");
64         DataSet<Row> beanExDataDataSet = tableEnv.toDataSet(result, Row.class);
65         beanExDataDataSet.map((MapFunction<Row, String>) row -> row.toString()).print();
66     }
67 
68  76 96 }

執行結果如下:

3.windows搭建flink服務

官網下載flink軟體:https://flink.apache.org/downloads.html,版本可自行選擇

dos上啟動.bat檔案即可,埠號預設為8081,可通過瀏覽器訪問

通過idea將java檔案打包成jar,並上傳到flink服務端上,如圖

執行submit,既可執行一個任務