1. 程式人生 > >Flink SQL Client初探

Flink SQL Client初探

### 歡迎訪問我的GitHub [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) 內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等; ### 關於Flink SQL Client Flink Table & SQL的API實現了通過SQL語言處理實時技術算業務,但還是要編寫部分Java程式碼(或Scala),並且還要編譯構建才能提交到Flink執行環境,這對於不熟悉Java或Scala的開發者就略有些不友好了; SQL Client的目標就是解決上述問題(官方原話with a build tool before being submitted to a cluster.) ### 侷限性 遺憾的是,在Flink-1.10.0版本中,SQL Client只是個Beta版本(不適合用於生產環境),並且只能連線到本地Flink,不能像mysql、cassandra等客戶端工具那樣遠端連線server,這些在將來的版本會解決: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201113090800553-861285938.png) ### 環境資訊 接下來採用實戰的方式對Flink SQL Client做初步嘗試,環境資訊如下: 1. 電腦:MacBook Pro2018 13寸,macOS Catalina 10.15.3 2. Flink:1.10.0 3. JDK:1.8.0_211 ### 本地啟動flink 1. 下載flink包,地址:http://ftp.kddilabs.jp/infosystems/apache/flink/flink-1.10.0/flink-1.10.0-bin-scala_2.11.tgz 2. 解壓:tar -zxvf flink-1.10.0-bin-scala_2.11.tgz 3. 進目錄flink-1.10.0/bin/,執行命令./start-cluster.sh啟動本地flink; 4. 訪問該機器的8081埠,可見本地flink啟動成功: ![5](https://img2020.cnblogs.com/other/485422/202011/485422-20201113090801444-555461234.png) ### 啟動SQL Client CLI 1. 在目錄flink-1.10.0/bin/執行./sql-client.sh即可啟動SQL Client CLI,如下圖所示,紅框中的BETA提醒著在生產環境如果要用此工具: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201113090802511-256825979.png) 2. 第一個要掌握的是HELP命令: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201113090803388-1579374.png) 3. 從hello world開始把,執行命令select ‘Hello world!’;,控制檯輸出如下圖所示,輸入Q可退出: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201113090803910-293961450.png) ### 兩種展示模式 1. 第一種是table mode,效果像是對普通資料表的查詢,設定該模式的命令: ```shell SET execution.result-mode=table; ``` 2. 第二種是changelog mode,效果像是列印每一次資料變更的日誌,設定該模式的命令: ```shell SET execution.result-mode=changelog; ``` 3. 設定table mode後,執行以下命令作一次簡單的分組查詢: ```shell SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name; ``` 4. 為了便於對比,下圖同時貼上兩種模式的查詢結果,注意綠框中顯示了該行記錄是增加還是刪除: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201113090804249-1508083570.png) 5. 不論是哪種模式,查詢結構都儲存在SQL Client CLI程序的堆記憶體中; 6. 在chenglog模式下,為了保證控制檯可以正常輸入輸出,查詢結果只展示最近1000條; 7. table模式下,可以翻頁查詢更多結果,結果數量受配置項max-table-result-rows以及可用堆記憶體限制; ### 進一步體驗 前面寫了幾行SQL,對Flink SQL Client有了最基本的感受,接下來做進一步的體驗,內容如下: 1. 建立CSV檔案,這是個最簡單的圖書資訊表,只有三個欄位:名字、數量、類目,一共十條記錄; 2. 建立SQL Client用到的環境配置檔案,該檔案描述了資料來源以及對應的表的資訊; 3. 啟動SQL Client,執行SQL查詢上述CSV檔案; 4. 整個操作步驟如下圖所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201113090804779-2060620239.png) ### 操作 1. 首先請確保Flink已經啟動; 2. 建立名為book-store.csv的檔案,內容如下: ```shell name001,1,aaa name002,2,aaa name003,3,bbb name004,4,bbb name005,5,bbb name006,6,ccc name007,7,ccc name008,8,ccc name009,9,ccc name010,10,ccc ``` 3. 在flink-1.10.0/conf目錄下建立名為book-store.yaml的檔案,內容如下: ```shell tables: - name: BookStore type: source-table update-mode: append connector: type: filesystem path: "/Users/zhaoqin/temp/202004/26/book-store.csv" format: type: csv fields: - name: BookName type: VARCHAR - name: BookAmount type: INT - name: BookCatalog type: VARCHAR line-delimiter: "\n" comment-prefix: "," schema: - name: BookName type: VARCHAR - name: BookAmount type: INT - name: BookCatalog type: VARCHAR - name: MyBookView type: view query: "SELECT BookCatalog, SUM(BookAmount) AS Amount FROM BookStore GROUP BY BookCatalog" execution: planner: blink # optional: either 'blink' (default) or 'old' type: streaming # required: execution mode either 'batch' or 'streaming' result-mode: table # required: either 'table' or 'changelog' max-table-result-rows: 1000000 # optional: maximum number of maintained rows in # 'table' mode (1000000 by default, smaller 1 means unlimited) time-characteristic: event-time # optional: 'processing-time' or 'event-time' (default) parallelism: 1 # optional: Flink's parallelism (1 by default) periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default) max-parallelism: 16 # optional: Flink's maximum parallelism (128 by default) min-idle-state-retention: 0 # optional: table program's minimum idle state time max-idle-state-retention: 0 # optional: table program's maximum idle state time # (default database of the current catalog by default) restart-strategy: # optional: restart strategy type: fallback # "fallback" to global restart strategy by default # Configuration options for adjusting and tuning table programs. # A full list of options and their default values can be found # on the dedicated "Configuration" page. configuration: table.optimizer.join-reorder-enabled: true table.exec.spill-compression.enabled: true table.exec.spill-compression.block-size: 128kb # Properties that describe the cluster to which table programs are submitted to. deployment: response-timeout: 5000 ``` 4. 對於book-store.yaml檔案,有以下幾處需要注意: a. tables.type等於source-table,表明這是資料來源的配置資訊; b. tables.connector描述了詳細的資料來源資訊,path是book-store.csv檔案的完整路徑; c. tables.format描述了檔案內容; d. tables.schema描述了資料來源表的表結構; e. type為view表示MyBookView是個檢視(參考資料庫的檢視概念); 5. 在flink-1.10.0目錄執行以下命令,即可啟動SQL Client,並指定book-store.yaml為環境配置: ```shell bin/sql-client.sh embedded -d conf/book-store.yaml ``` 6. 查全表: ```shell SELECT * FROM BookStore; ``` ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201113090805372-483054357.png) 7. 按照BookCatalog分組統計記錄數: ```shell SELECT BookCatalog, COUNT(*) AS BookCount FROM BookStore GROUP BY BookCatalog; ``` ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201113090805824-1084504178.png) 8. 查詢檢視: ```shell select * from MyBookView; ``` ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201113090806328-1704349438.png) 至此,Flink SQL Client的初次體驗就完成了,咱們此工具算是有了基本瞭解,接下來的文章會進一步使用Flink SQL Client做些複雜的操作; ### 歡迎關注公眾號:程式設計師欣宸 > 微信搜尋「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界... [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blo