記一次flink入門學習筆記
團隊有幾個系統資料量偏大,且每天以幾萬條的數量累增。有一個系統每天需要定時讀取資料庫,並進行相關的業務邏輯計算,從而獲取最新的使用者資訊,定時任務的整個耗時需要4小時左右。由於定時任務是夜晚執行,目前看來,系統還能抗1年,每天晚上可以把資料處理結束,第二天上班期間可以展示最新的資料。隨著資料和業務的增加,亟需解決這個瓶頸。團隊架構師決定使用flink大資料技術解決該瓶頸,所以本週開始學習與探索flink,並編寫一些demo。
1.專案的搭建
flink專案依賴於jdk和maven,且要求jdk版本高於1.7,maven版本好像需要3.0以上。我使用的是jdk1.8,maven是3.6,如下圖檢視其版本
網上提供了3中方式搭建專案:
1.1.通過maven命令搭建
1 mvn archetype:generate \ 2 -DarchetypeGroupId=org.apache.flink \ 3 -DarchetypeArtifactId=flink-quickstart-java \ 4 -DarchetypeVersion=1.7.2 \ 5 -DgroupId=flink-project \ 6 -DartifactId=flink-project \ 7 -Dversion=0.1 \ 8 -Dpackage=myflink \9 -DinteractiveMode=false
1.2.通過flink提供的指令碼搭建
curl https://flink.apache.org/q/quickstart.sh | bash
1.3.通過idea建立maven專案搭建
可直接通過一鍵next方式建立maven專案,可自行谷歌。
我使用flink模板建立的,如下圖
這裡idea預設是沒有flink-quickstart-java的模板的,需要先自行建立一個模板,如下圖:其中flink的版本可自行選擇,我maven網站檢視flink的依賴熱度,發現1.11.1目前使用的最多,所以也使用此版本。
專案建立好後,編寫相關demo熟悉flink相關api
2.demo
demo主要分為獲取有界資料來源和無解資料來源2類demo。有界資料來源如檔案、資料庫等,無解資料來源如MQ,Socket等。每類我都嘗試了1個demo,同時利用flink的複雜api寫了對資料庫資料的聚合、過濾、聯合,因為我們的業務主要是對資料庫的讀取、寫入與計算。
2.1 Flink的Hello Word demo
這個demo是官網git提供的,類似一個hello word,入門flink應該首先看這個demo。可自行去官網git檢視程式碼:https://github.com/apache/flink
原生代碼和執行結果如下:
1 public class Demo1WordCount { 2 public static void main(String[] args) throws Exception { 3 4 // set up the execution environment 5 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 6 7 // get input data 8 DataSet<String> text = env.fromElements( 9 "To be, or not to be,--that is the question:--", 10 "Whether 'tis nobler in the mind to suffer", 11 "The slings and arrows of outrageous fortune", 12 "Or to take arms against a sea of troubles," 13 ); 14 15 DataSet<Tuple2<String, Integer>> counts = 16 // split up the lines in pairs (2-tuples) containing: (word,1) 17 text.flatMap(new LineSplitter()) 18 // group by the tuple field "0" and sum up tuple field "1" 19 .groupBy(0) //(i,1) (am,1) (chinese,1) 20 .sum(1); 21 22 // execute and print result 23 counts.print(); 24 25 } 26 27 // 28 // User Functions 29 // 30 31 /** 32 * Implements the string tokenizer that splits sentences into words as a user-defined 33 * FlatMapFunction. The function takes a line (String) and splits it into 34 * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>). 35 */ 36 public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { 37 38 @Override 39 public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { 40 // normalize and split the line 41 String[] tokens = value.toLowerCase().split("\\W+"); 42 43 // emit the pairs 44 for (String token : tokens) { 45 if (token.length() > 0) { 46 out.collect(new Tuple2<String, Integer>(token, 1)); 47 } 48 } 49 } 50 } 51 }
2.2.讀取有界資料:對mysql進行讀取與寫入
1 public class ReadWriteByJdbc { 2 3 private static final Logger logger = LoggerFactory.getLogger(ReadWriteByJdbc.class); 4 private static final String INSERT = "insert into flink_demo (name, password) values (?,?)"; 5 private static final String SELECT = "select name,description from flink_demo where description is not null"; 6 7 public static void main(String[] args) throws Exception { 8 String driverClass = "com.mysql.jdbc.Driver"; 9 String dbUrl = "jdbc:mysql://xxxxxxxxxx:xxxxxxxxxx/xxxxxxxxxx"; 10 String userNmae = "xxxxxxxxxxxxxxx"; 11 String passWord = "xxxxxxxxxxxxxxxx"; 12 13 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 14 String filePath = "file:///E:/4_data/flink.txt"; 15 DataSet<Row> write = env.readCsvFile(filePath).fieldDelimiter(" ").types(String.class, String.class) 16 .map(new MapFunction<Tuple2<String, String>, Row>() { 17 @Override 18 public Row map(Tuple2<String, String> stringIntegerTuple2) throws Exception { 19 Row row = new Row(2); 20 row.setField(0, stringIntegerTuple2.f0.getBytes("UTF-8")); 21 row.setField(1, stringIntegerTuple2.f1.getBytes("UTF-8")); 22 return row; 23 } 24 }); 25 write.output( 26 JDBCOutputFormat.buildJDBCOutputFormat() 27 .setDrivername(driverClass) 28 .setDBUrl(dbUrl) 29 .setUsername(userNmae) 30 .setPassword(passWord) 31 .setQuery(INSERT) 32 .finish() 33 ); 34 env.execute(); 35 logger.error("------------------資料寫入mysql成功----------------"); 36 Thread.sleep(5000); 37 DataSource<Row> read = env.createInput(JDBCInputFormat.buildJDBCInputFormat() 38 .setDrivername(driverClass) 39 .setDBUrl(dbUrl) 40 .setUsername(userNmae) 41 .setPassword(passWord) 42 .setQuery(SELECT) 43 .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)) 44 .finish() 45 ); 46 logger.error("------------------讀取mysql資料如下---------------"); 47 read.map(new MapFunction<Row, Tuple2<String, String>>() { 48 @Override 49 public Tuple2<String, String> map(Row row) throws Exception { 50 return new Tuple2<>(row.getField(0).toString(), row.getField(1).toString()); 51 } 52 }).print(); 53 logger.error("===============Succeed=============="); 54 } 55 56 }
2.3.讀取無解資料,讀取socket資料
首先需要啟動一個socket,linux和蘋果系統自帶socket外掛。windows系統可以通過netcat模擬socket,netcat軟體可在網盤下載:
連結:https://pan.baidu.com/s/1oet8Vaza4D2JKdYBwwF7qA提取碼:wrx4
直接在window上啟動:nc -L -p 9000 -v ,如圖
表示socket已經正常啟動,監聽埠為9000,所以idea中執行java方法時,也需要設定埠為9000,設定方式如下:--port 9000
執行java方法,如下圖表示socket連線上了:
直接在dos中輸入字串,idea就會接受到這些字串,並通過flink對字串進行處理。
1 public class WordCount { 2 public static void main(String[] args) throws Exception { 3 //定義socket的埠號 4 int port; 5 try { 6 ParameterTool parameterTool = ParameterTool.fromArgs(args); 7 port = parameterTool.getInt("port"); 8 } catch (Exception e) { 9 System.err.println("沒有指定port引數,使用預設值9000"); 10 port = 9000; 11 } 12 13 //獲取執行環境 14 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 15 16 //連線socket獲取輸入的資料 17 DataStreamSource<String> text = env.socketTextStream("127.0.0.1", port, "\n"); 18 19 //計算資料 20 DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() { 21 public void flatMap(String value, Collector<WordWithCount> out) throws Exception { 22 String[] splits = value.split("\\s"); 23 for (String word : splits) { 24 out.collect(new WordWithCount(word, 1L)); 25 } 26 } 27 })//打平操作,把每行的單詞轉為<word,count>型別的資料 28 .keyBy("word")//針對相同的word資料進行分組 29 .timeWindow(Time.seconds(2), Time.seconds(1))//指定計算資料的視窗大小和滑動視窗大小 30 .sum("count"); 31 32 //把資料列印到控制檯 33 windowCount.print() 34 .setParallelism(1);//使用一個並行度 35 //注意:因為flink是懶載入的,所以必須呼叫execute方法,上面的程式碼才會執行 36 env.execute("streaming word count"); 37 38 } 39 40 /** 41 * 主要為了儲存單詞以及單詞出現的次數 42 */ 43 public static class WordWithCount { 44 public String word; 45 public long count; 46 47 public WordWithCount() { 48 } 49 50 public WordWithCount(String word, long count) { 51 this.word = word; 52 this.count = count; 53 } 54 55 @Override 56 public String toString() { 57 return "WordWithCount{" + 58 "word='" + word + '\'' + 59 ", count=" + count + 60 '}'; 61 } 62 } 63 }
2.4.讀取mysql資料,並進行聚合與過濾
1 @Slf4j 2 public class TableSqlDemo { 3 private static final String SELECT = "select cast(id as char) as id, name, password, description, cast(state as char ) as state, " + 4 "cast(create_time as char ) as create_time, cast(update_time as char ) as update_time from flink_demo;"; 5 // 資料庫相關配置 6 private static final String DRIVER_CLASS = "com.mysql.jdbc.Driver"; 7 private static final String DBURL = "jdbc:mysql://ip:port/flink_data?characterEncoding=utf-8&useSSL=false"; 8 private static final String USERNMAE = "xxxxx"; 9 private static final String PASSWORD = "xxxxxx"; 10 11 public static void main(String[] args) throws Exception { 12 13 14 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 15 // 讀取資料庫資料 16 DataSource<Row> read = env.createInput(JDBCInputFormat.buildJDBCInputFormat() 17 .setDrivername(DRIVER_CLASS) 18 .setDBUrl(DBURL) 19 .setUsername(USERNMAE) 20 .setPassword(PASSWORD) 21 .setQuery(SELECT) 22 .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, 23 BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)) 24 .finish() 25 ); 26 System.out.println(System.getProperty("line.separator")); 27 log.error("-------------讀取mysql資料如下------------------"); 28 // 將row當做字串輸出 29 // read.map((MapFunction<Row, String>) row -> row.toString()).print(); 30 31 // 通過map方法對映到bean實體,並輸出 32 read.map((MapFunction<Row, Bean>) row -> new Bean( 33 row.getField(0).toString(), 34 row.getField(1).toString(), 35 row.getField(2).toString(), 36 row.getField(3).toString(), 37 switchState(row.getField(4).toString()), 38 row.getField(5).toString(), 39 row.getField(6).toString())).print(); 40 System.out.println(System.getProperty("line.separator")); 41 log.error("-------------對資料進行聚合,計算在職、離職、轉正人數------------------"); 42 // 對讀取到的資料做聚合 43 DataSet<Tuple2<String, Integer>> counts = 44 read.flatMap(new StateCount()) 45 .groupBy(0) 46 .sum(1); 47 48 // execute and print result 49 counts.print(); 50 51 // 維表關聯 52 // TableConfig tableConfig = TableConfig.DEFAULT(); 53 BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT()); 54 tableEnv.registerDataSet("t1", read, "id, name, password, description, state, create_time, update_time"); 55 /*System.out.println(System.getProperty("line.separator")); 56 log.error("-------------列印資料型別(這塊有點坑)------------------"); 57 tableEnv.sqlQuery("select id, name, password, description, state, create_time, update_time from t1").printSchema();*/ 58 System.out.println(System.getProperty("line.separator")); 59 log.error("-------------過濾,檢索正式員工------------------"); 60 Table t2 = tableEnv.sqlQuery("select id, name, password, description, state, create_time, update_time from t1").where("state = '0'"); 61 DataSet<BeanExData> beanExDataDataSet = tableEnv.toDataSet(t2, BeanExData.class); 62 beanExDataDataSet.print(); 63 } 64 65 // 聚合計算,通過實現FlatMapFunction介面,重寫flatMap,來實現自己的邏輯 66 public static final class StateCount implements FlatMapFunction<Row, Tuple2<String, Integer>> { 67 @Override 68 public void flatMap(Row in, Collector<Tuple2<String, Integer>> out) throws Exception { 69 // pos 4 為狀態欄位 70 out.collect(new Tuple2<>(switchState(in.getField(4).toString()), 1)); 71 } 72 } 73 74 // 轉成中文,視覺化 75 private static String switchState(String state) { 76 String _state = ""; 77 if (StringUtils.isBlank(state)) { 78 return _state; 79 } 80 switch (state) { 81 case "0": 82 _state = "正式員工"; 83 break; 84 case "1": 85 _state = "試用期"; 86 break; 87 case "2": 88 _state = "離職"; 89 break; 90 } 91 return _state; 92 } 93 }
結果如下:
2.5.讀取mysql多個table資料,並進行關聯
1 @Slf4j 2 public class TableSqlJoinDemo { 3 private static final String SELECT = "select cast(id as char) as id, name, description, " + 4 "cast(create_time as char) as create_time, cast(update_time as char) as update_time from user_info;"; 5 6 private static final String USER_INFO = "select cast(id as char) as id, name, password, description, cast(state as char ) as state, " + 7 "cast(create_time as char ) as create_time, cast(update_time as char ) as update_time from flink_demo;"; 8 // 資料庫相關配置 9 private static final String DRIVER_CLASS = "com.mysql.jdbc.Driver"; 10 private static final String DBURL = "jdbc:mysql://ip:port/db?characterEncoding=utf-8&useSSL=false"; 11 private static final String USERNMAE = "username"; 12 private static final String PASSWORD = "password"; 13 14 public static void main(String[] args) throws Exception { 15 16 17 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 18 // 讀取資料庫資料 19 DataSource<Row> user = env.createInput(JDBCInputFormat.buildJDBCInputFormat().setDrivername(DRIVER_CLASS) 20 .setDBUrl(DBURL).setUsername(USERNMAE).setPassword(PASSWORD).setQuery(SELECT) 21 .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, 22 BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)) 23 .finish() 24 ); 25 System.out.println(System.getProperty("line.separator")); 26 log.error("-------------讀取mysql使用者資料------------------"); 27 // 將row當做字串輸出 28 user.map((MapFunction<Row, String>) row -> row.toString()).print(); 29 // 使用者資訊資料註冊到flink 30 BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT()); 31 // 第三個引數表示欄位,如果不填,表示註冊所有的資料 32 tableEnv.registerDataSet("user", user,"id, name, description, create_time, update_time"); 33 Table userTable = tableEnv.scan("user"); 34 tableEnv.registerTable("userTable", userTable); 35 // 輸出欄位的型別,保證資料的正確性 36 // System.out.println(System.getProperty("line.separator")); 37 /*log.error("-------------檢視資料型別------------------"); 38 39 userTable.printSchema();*/ 40 41 // 讀取資料庫資料 42 DataSource<Row> userInfo = env.createInput(JDBCInputFormat.buildJDBCInputFormat() 43 .setDrivername(DRIVER_CLASS).setDBUrl(DBURL).setUsername(USERNMAE).setPassword(PASSWORD).setQuery(USER_INFO) 44 .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, 45 BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)) 46 .finish() 47 ); 48 System.out.println(System.getProperty("line.separator")); 49 log.error("-------------讀取mysql使用者描述資料------------------"); 50 // 將row當做字串輸出 51 userInfo.map((MapFunction<Row, String>) row -> row.toString()).print(); 52 // 使用者資訊資料註冊到flink 53 tableEnv.registerDataSet("userInfo", userInfo, "id, name, password, description, state, create_time, update_time"); 54 Table userInfoTable = tableEnv.scan("userInfo"); 55 tableEnv.registerTable("userInfoTable", userInfoTable); 56 // 輸出欄位的型別,保證資料的正確性 57 // System.out.println(System.getProperty("line.separator")); 58 /*log.error("-------------檢視資料型別------------------"); 59 userInfoTable.printSchema();*/ 60 // 關聯操作 61 System.out.println(System.getProperty("line.separator")); 62 log.error("-------------關聯操作後的結果------------------"); 63 Table result = tableEnv.sqlQuery("select a.name, a.description, b.description as description1 from userTable a,userInfoTable b where a.name = b.name"); 64 DataSet<Row> beanExDataDataSet = tableEnv.toDataSet(result, Row.class); 65 beanExDataDataSet.map((MapFunction<Row, String>) row -> row.toString()).print(); 66 } 67 68 76 96 }
執行結果如下:
3.windows搭建flink服務
官網下載flink軟體:https://flink.apache.org/downloads.html,版本可自行選擇
dos上啟動.bat檔案即可,埠號預設為8081,可通過瀏覽器訪問
通過idea將java檔案打包成jar,並上傳到flink服務端上,如圖
執行submit,既可執行一個任務