1. 程式人生 > >Flink的sink實戰之四:自定義

Flink的sink實戰之四:自定義

### 歡迎訪問我的GitHub [https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos) 內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等; ### 本篇概覽 Flink官方提供的sink服務可能滿足不了我們的需要,此時可以開發自定義的sink,文字就來一起實戰; ### 全系列連結 1. [《Flink的sink實戰之一:初探》](https://blog.csdn.net/boling_cavalry/article/details/105597628) 2. [《Flink的sink實戰之二:kafka》](https://blog.csdn.net/boling_cavalry/article/details/105598224) 3. [《Flink的sink實戰之三:cassandra3》](https://blog.csdn.net/boling_cavalry/article/details/105598968) 4. [《Flink的sink實戰之四:自定義》](https://blog.csdn.net/boling_cavalry/article/details/105599511) ### 繼承關係 1. 在正式編碼前,要先弄清楚對sink能力是如何實現的,前面我們實戰過的print、kafka、cassandra等sink操作,核心類的繼承關係如下圖所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201111103655583-1390712486.png) 2. 可見實現sink能力的關鍵,是實現RichFunction和SinkFunction介面,前者用於資源控制(如open、close等操作),後者負責sink的具體操作,來看看最簡單的PrintSinkFunction類是如何實現SinkFunction介面的invoke方法: ```java @Override public void invoke(IN record) { writer.write(record); } ``` 3. 現在對sink的基本邏輯已經清楚了,可以開始編碼實戰了; ### 內容和版本 本次實戰很簡單:自定義sink,用於將資料寫入MySQL,涉及的版本資訊如下: 1. jdk:1.8.0_191 2. flink:1.9.2 3. maven:3.6.0 4. flink所在作業系統:CentOS Linux release 7.7.1908 5. MySQL:5.7.29 6. IDEA:2018.3.5 (Ultimate Edition) ### 原始碼下載 如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos): | 名稱 | 連結 | 備註| | :-------- | :----| :----| | 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 | | git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 | | git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 | 這個git專案中有多個資料夾,本章的應用在flinksinkdemo資料夾下,如下圖紅框所示: ![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201111103656928-1681737216.png) ### 資料庫準備 請您將MySQL準備好,並執行以下sql,用於建立資料庫flinkdemo和表student: ```sql create database if not exists flinkdemo; USE flinkdemo; DROP TABLE IF EXISTS `student`; CREATE TABLE `student` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(25) COLLATE utf8_bin DEFAULT NULL, `age` int(10) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin; ``` ### 編碼 1. 使用[《Flink的sink實戰之二:kafka》](https://xinchen.blog.csdn.net/article/details/105598224)中建立的flinksinkdemo工程; 2. 在pom.xml中增加mysql的依賴: ```xml ``` 3. 建立和資料庫的student表對應的實體類Student.java: ```java package com.bolingcavalry.customize; public class Student { private int id; private String name; private int age; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public Student(String name, int age) { this.name = name; this.age = age; } } ``` 4. 建立自定義sink類MySQLSinkFunction.java,這是本文的核心,有關資料庫的連線、斷開、寫入資料都集中在此: ```java package com.bolingcavalry.customize; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; public class MySQLSinkFunction extends RichSink