推薦兩個不錯的flink專案
最近flink真是風生水起,但是浪院長看來這不過是阿里錯過了創造spark影響力之後,想要在flink領域建立絕對的影響力。但是,不可否認flink在實時領域確實目前來看獨樹一幟,當然也有它不適合的地方,比如今天要推薦的第一個基於flink開發的專案,流表和維表的join,還有很多地方還是用spark streaming更合適,但是整體的流處理而言flink確實很優秀,雖然目前測出了一些bug,後面會發文說明一下flink開發時候常見的坑和已有的自身bug。接下來轉入正題。
flinkStreamSQL
熟悉flink的應該都瞭解,flink支援流表之間的join,但到1.6為止都不支援流表和維表的join。浪尖最近,也在開發流平臺,需要到flink流表和維表的join。那麼針對這個大家第一印象,可以寫個運算元去實現,比如map等。但是浪尖這裡開發的流平臺不是說自己寫api,而是使用者通過sql去實現建立source,sink,udf,sql等,這個時候要進行維表join,大家可能是想到了udf。是的對於只有一個維表的情況下使用udf比較方便,但是多個維表,相對就麻煩很多了。
而基於flink開發的flinkStreamSQL主要是實現了flink 流表和維表的join,其主要功能如下:
自定義create table 語法(包括源表,輸出表,維表)
自定義create function 語法
實現了流與維表的join
浪尖花了個把小時看了一下原始碼,原始碼思路很清晰,主要是兩個步驟:
用flink api實現維表的功能: 要實現維表功能就要用到 flink Aysnc I/O 這個功能,是由阿里巴巴貢獻給apache flink的。關於非同步IO的介紹,可以參考:https://yq.aliyun.com/articles/457385
解析流與維表join的sql語法轉化成底層的flinkAPI
原始碼下載地址:
https://github.com/DTStack/flinkStreamSQL
為了方便大家閱讀,這裡浪尖也把維錶轉化的過程主要函式貼出來吧:
主函式
Main#main
SQL解析
SqlTree sqlTree = SqlParser.parseSql(sql)
拆讀
SqlParser#parseSql
TableInfoParserFactory#parseWithTableType
登錄檔
registerTable
存在維表的話,維錶轉換與邏輯sql執行
SideSqlExec#exec
也即是
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache);
不存在維表的話
tableEnv.sqlUpdate(result.getExecSql());
SqlSession sqlSession=null;
List<User> userList=new ArrayList<User>(); try{
sqlSession=MyBatisUtil.createSqlSession();
User user=new User(www.furggw.com);
user.setUserName("趙");
user.setUserRole(www.mingrenf178.com);
userList=sqlSession.getMapper(UserMapper.class).getUserListByUser(user);
}catch (Exception ex){
ex.printStackTrace();
}finally {
MyBatisUtil.closeSqlSession(sqlSession);
}
for (User user:
userList) {
System.out.println(user.getUserName()+"\t"+user.getUserRole());
}
使用Map入參編寫介面
List<User> getUserListByMap(Map<www.ysyl157.com String,String> userMap);
編寫UserMapper.xml檔案
<select id="getUserListByMap" resultType="User" parameterType=www.mcyllpt.com"Map"> SELECT * FROM USER www.meiwanyule.cn WHERE userName LIKE concat('%',#{userName},'%') and userRole=#{userRole}
FlinkX
FlinkX主要是用來做資料同步的,實現了多種異構資料來源之間高效的資料遷移。
不同的資料來源頭被抽象成不同的Reader外掛,不同的資料目標被抽象成不同的Writer外掛。理論上,FlinkX框架可以支援任意資料來源型別的資料同步工作。作為一套生態系統,每接入一套新資料來源該新加入的資料來源即可實現和現有的資料來源互通。
在底層實現上,FlinkX依賴Flink,資料同步任務會被翻譯成StreamGraph在Flink上執行