Flink的sink實戰之四:自定義
阿新 • • 發佈:2020-11-11
### 歡迎訪問我的GitHub
[https://github.com/zq2599/blog_demos](https://github.com/zq2599/blog_demos)
內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;
### 本篇概覽
Flink官方提供的sink服務可能滿足不了我們的需要,此時可以開發自定義的sink,文字就來一起實戰;
### 全系列連結
1. [《Flink的sink實戰之一:初探》](https://blog.csdn.net/boling_cavalry/article/details/105597628)
2. [《Flink的sink實戰之二:kafka》](https://blog.csdn.net/boling_cavalry/article/details/105598224)
3. [《Flink的sink實戰之三:cassandra3》](https://blog.csdn.net/boling_cavalry/article/details/105598968)
4. [《Flink的sink實戰之四:自定義》](https://blog.csdn.net/boling_cavalry/article/details/105599511)
### 繼承關係
1. 在正式編碼前,要先弄清楚對sink能力是如何實現的,前面我們實戰過的print、kafka、cassandra等sink操作,核心類的繼承關係如下圖所示:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201111103655583-1390712486.png)
2. 可見實現sink能力的關鍵,是實現RichFunction和SinkFunction介面,前者用於資源控制(如open、close等操作),後者負責sink的具體操作,來看看最簡單的PrintSinkFunction類是如何實現SinkFunction介面的invoke方法:
```java
@Override
public void invoke(IN record) {
writer.write(record);
}
```
3. 現在對sink的基本邏輯已經清楚了,可以開始編碼實戰了;
### 內容和版本
本次實戰很簡單:自定義sink,用於將資料寫入MySQL,涉及的版本資訊如下:
1. jdk:1.8.0_191
2. flink:1.9.2
3. maven:3.6.0
4. flink所在作業系統:CentOS Linux release 7.7.1908
5. MySQL:5.7.29
6. IDEA:2018.3.5 (Ultimate Edition)
### 原始碼下載
如果您不想寫程式碼,整個系列的原始碼可在GitHub下載到,地址和連結資訊如下表所示(https://github.com/zq2599/blog_demos):
| 名稱 | 連結 | 備註|
| :-------- | :----| :----|
| 專案主頁| https://github.com/zq2599/blog_demos | 該專案在GitHub上的主頁 |
| git倉庫地址(https)| https://github.com/zq2599/blog_demos.git | 該專案原始碼的倉庫地址,https協議 |
| git倉庫地址(ssh)| [email protected]:zq2599/blog_demos.git | 該專案原始碼的倉庫地址,ssh協議 |
這個git專案中有多個資料夾,本章的應用在flinksinkdemo資料夾下,如下圖紅框所示:
![在這裡插入圖片描述](https://img2020.cnblogs.com/other/485422/202011/485422-20201111103656928-1681737216.png)
### 資料庫準備
請您將MySQL準備好,並執行以下sql,用於建立資料庫flinkdemo和表student:
```sql
create database if not exists flinkdemo;
USE flinkdemo;
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
`age` int(10) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
```
### 編碼
1. 使用[《Flink的sink實戰之二:kafka》](https://xinchen.blog.csdn.net/article/details/105598224)中建立的flinksinkdemo工程;
2. 在pom.xml中增加mysql的依賴:
```xml
```
3. 建立和資料庫的student表對應的實體類Student.java:
```java
package com.bolingcavalry.customize;
public class Student {
private int id;
private String name;
private int age;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public Student(String name, int age) {
this.name = name;
this.age = age;
}
}
```
4. 建立自定義sink類MySQLSinkFunction.java,這是本文的核心,有關資料庫的連線、斷開、寫入資料都集中在此:
```java
package com.bolingcavalry.customize;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class MySQLSinkFunction extends RichSink